Files
skato-ansible/anodes.py
2026-01-02 17:07:04 -05:00

345 lines
12 KiB
Python

"""
Library of classes modeling Ansible nodes or their types.
"""
from pathlib import Path, PurePath
from typing import TypeAlias as Neotype
from typing import TypedDict as Dict
from typing import Never, Union, Literal
from collections.abc import Callable
from custtypes import ExecutedPath, IdlePath
from enum import Enum
from softman import Software, SoftPathGroup, SoftScope
from whereami import USER_PATH, PROJ_ROOT
from ansible_vault import Vault
from cerberus import Validator as constrain_by
class ControlNode:
__user_path: ExecutedPath = USER_PATH
__proj_root: ExecutedPath = PROJ_ROOT
__conf_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath("/etc"),
PurePath("/usr", "local", "etc"),
PurePath(str(__user_path), ".config")
)
__data_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath("/usr", "local", "share"),
PurePath(str(__user_path), ".local", "share")
)
def __init__(self, ansible_proj_root: ExecutedPath | None):
if ansible_proj_root is not None:
self.__proj_root = ansible_proj_root
self.proj_roles: tuple[IdlePath] | list[IdlePath] = (
PurePath(str(self.__proj_root), "roles"),
PurePath(str(self.__proj_root), ".ansible/roles")
)
self.role_data: tuple[IdlePath] | list[IdlePath] = (
PurePath(str(self.proj_roles[0]), "files"),
PurePath(str(self.proj_roles[0]), "templates")
)
self.invvar_data: tuple[IdlePath] | list[IdlePath] = (
PurePath(str(self.__proj_root), "group_vars"),
PurePath(str(self.__proj_root), "host_vars")
)
@property
def user_path(self) -> ExecutedPath:
return self.__user_path
@property
def proj_root(self) -> ExecutedPath:
return self.__proj_root
@property
def conf_paths(self) -> ExecutedPath:
return self.__conf_paths
@property
def conf_paths(self) -> ExecutedPath:
return self.__data_paths
class Softs(Enum):
OpenSSH = "ssh"
userSSHParams = Dict("userSSHParams", {
"username": Required[str],
"paths": Software._Apps,
"keys": {
"available": Required[tuple[ExecutedPath]],
"selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]],
"authorized": ExecutedPath | list[ExecutedPath] | int | list[int],
"used": ExecutedPath | list[ExecutedPath] | int | list[int]
},
"password": Required[str],
"fate": Literal["disposal", "retention"]
}, total=False)
__user_ssh_input = {
"username": "",
"paths": None,
"keys": {
"available": tuple(),
"selected": [0, 1],
"authorized": 1,
"used": 0
},
"password": "password123",
"fate": "disposal"
}
# @TODO remove unpacking of '__user_ssh_input'
userSSH = type("userSSH", (), **__user_ssh_input)
vps_schema = {
"fqdn": {"type": "string"},
"vps_service": {
"type": "dict",
"schema": {
"exists": {"type": "boolean"},
"password": {"type": "string"},
"api_key": {"type": "string"},
"type": {
"type": "string",
"anyof": ["linode"]
},
"region": {
"type": "string",
"anyof": ["us-east"]
},
"ssh_authorized_keys": {"type": "list"},
"root_fate": {
"type": "string",
"anyof": ["disposal", "retention"]
}
}
},
"keywords": {"type": "list"}
}
vps_schema = constrain_by(vps_schema)
class VirtualPrivateServers(Enum):
Linode = 0
class RemoteNode:
# __user_path =
_fqdn: str | None = None
def __init__(self, cnode: ControlNode, api_key: str, password: str, name: str, service: VirtualPrivateServers = VirtualPrivateServers.Linode.name, keywords: list[str] | None = None):
self.root: dict = dict()
self.root["username"]: str = "root"
vault = Vault(password)
self.root["password"]: str = vault.dump(password)
self.root["software"]: Software = Software()
app_input = {
SoftPathGroup.CONFIG.name: {
SoftScope.PERSONAL.name: PurePath(str(cnode.user_path), ("." + Softs.OpenSSH))
},
SoftPathGroup.DATA.name: {
SoftScope.GLOBAL.name: PurePath(str(cnode.conf_paths[0]), "update-motd.d")
}
}
self.root["software"].declare(Softs.OpenSSH, **app_input)
self._fqdn = name
self.root["software"]._fqdn = name
root_ssh_input: userSSHParams = {
"username": self.root["username"],
"password": self.root["password"]
}
self.ssh: userSSH = userSSH(**root_ssh_input)
self.apps: list = self.root["software"].list(contents = True)
self.keywords = keywords
self._api_key: str | None = api_key
self.service = service
def set_region(self, name: Literal["us-east"] = "us-east") -> None:
self.region = name
def get_region(self) -> str:
return self.region
def set_password(self, password: str) -> None:
vault = Vault(value)
self.root["password"] = vault.dump(value)
self.ssh.password: str = self.root["password"]
def get_password(self) -> Vault | str:
return self.root["password"]
def set_api(self, key: str) -> None:
self._api_key = key
def get_api(self) -> Vault | str:
vault = Vault(self._api_key)
return vault.dump(self._api_key)
def add_tags(self, *name):
self.keywords: list = []
self.keywords += list(name)
def import_keys(self, key_basenames: str | tuple[str], match_sort: tuple[Callable, bool] = (lambda e: e, False)):
keyfiles: list[ExecutedPath] | list[None] | None = list()
if isinstance(key_basenames, tuple):
for basename in key_basenames:
keyfiles += Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(basename)
elif isinstance(key_basenames, str):
keyfiles = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(key_basenames)
else:
raise ValueError
updated_keyfiles: list[ExecutedPath] = []
for filename in keyfiles:
new_keyfile = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])) / str(filename)
updated_keyfiles.append(new_keyfile.resolve())
keyfiles = updated_keyfiles
self.ssh.paths = self.root["software"].ssh
self.ssh.keys["available"] = tuple(keyfiles.sort(reverse = match_sort[1], key = match_sort[0]))
def show_keys(self, which: Literal["authorized", "used", "available", "selected"] = "available") -> tuple[tuple, str]:
delimiters: str | tuple = "{}"
gap: Callable[[bool], str | None] = lambda b: " " if b else ""
sep_format: str = "{0}"
sep: Callable[[str, bool], str] = lambda s, b: sep_format.format(s) + gap(b)
label_format: str = "{0}:"
label: Callable[[str, bool], str] = lambda s, b: label_format.format(s) + gap(b)
def render(kfs, source: str):
if isinstance(kfs, list):
member_ints: list = list()
for f in kfs:
if isinstance(f, int):
members_ints.append(f)
if len(member_ints) > 0:
kfs = [self.ssh.keys[source][i] for i in kfs]
elif isinstance(kfs, int):
kfs = [self.ssh.keys[source][kfs]]
elif isinstance(kfs, ExecutedPath):
kfs = [self.ssh.keys[source]]
return kfs
keyfiles = self.ssh.keys[which]
if which == "selected":
keyfiles = render(keyfiles, "available")
if which == "authorized":
keyfiles = render(keyfiles, "selected")
if which == "used":
keyfiles = render(keyfiles, "selected")
stringified_keyfiles = list(map(lambda t: label(t[0], True) + str(t[1]), enumerate(keyfiles)))
stringified_keyfiles = sep(",", True).join(stringified_keyfiles)
stringified_keyfiles = delimiters[0:len(delimiters)] + stringified_keyfiles[:(len(stringified_keyfiles) - 2)] + delimiters[1:]
return (tuple(keyfiles), stringified_keyfiles)
def pick_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never:
keyfiles = self.ssh.keys[source]
authlist = []
if source == "available" or source == "selected":
for s in selections:
if isinstance(s, int):
authlist.append(keyfiles[s])
elif isinstance(s, ExecutedPath):
path_set = set([s])
kf_set = set(keyfiles)
overlap = kf_set & path_set
if overlap is not None and len(overlap) > 0:
authlist.append(list(overlap)[0])
else:
continue
elif isinstance(s, str):
kf_strs = list(map(lambda p: str(p), keyfiles))
if s in kf_strs:
authlist.append(Path(s))
if source == "available":
self.ssh.keys["selected"] = authlist
return self.ssh.keys["selected"]
elif source == "selected":
self.ssh.keys["authorized"] = authlist
return self.ssh.keys["authorized"]
elif source == "authorized":
for s in selections:
if isinstance(s, int):
authlist.append(keyfiles[s])
elif isinstance(s, ExecutedPath):
for p in keyfiles:
if str(s) in str(p):
authlist.append(p)
else:
continue
elif isinstance(s, str):
for p in keyfiles:
if s in str(p):
authlist.append(p)
else:
continue
self.ssh.keys["used"] = authlist
return self.ssh.keys["used"]
def remove_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never:
keyfiles = self.ssh.keys[source]
for s in selections:
if isinstance(s, int):
removed_elem = keyfiles.pop(s)
elif isinstance(s, ExecutedPath):
path_set = set([s])
kf_set = set(keyfiles)
overlap = kf_set & path_set
if overlap is not None and len(overlap) > 0:
item = list(overlap)[0]
keyfiles = [p for p in keyfiles if p != item]
self.ssh.keys[source] = keyfiles
else:
continue
elif isinstance(s, str):
kf_strs = list(map(lambda p: str(p), keyfiles))
if s in kf_strs:
keyfiles = [p for p in keyfiles if str(p) != S]
self.ssh.keys[source] = keyfiles
self.ssh.keys[source] = keyfiles
if source == "available":
self.remove_keys("selected", *selections)
elif source == "selected":
self.remove_keys("authorized", *selections)
elif source == "authorized":
self.remove_keys("used", *selections)
return self.ssh.keys
def itemize(self):
model = dict()
vault_api = Vault(self._api_key)
vault_pass = Vault(self.ssh.password)
model["fqdn"] = self._fqdn
model["vps_service"] = {
"exists": True,
"password": vault_pass.dump(self.ssh.password),
"api_key": vault_api.dump(self._api_key),
"type": self.service.lower(),
"region": self.region,
"ssh_authorized_keys": self.ssh.keys["authorized"],
"root_fate": self.ssh.fate
}
model["keywords"] = self.keywords
if vps_schema.validate(model):
return model