""" Library of classes modeling Ansible nodes or their types. """ from pathlib import Path, PurePath from typing import TypeAlias as Neotype from typing import TypedDict as Dict from typing import Never, Union, Literal from collections.abc import Callable from custtypes import ExecutedPath, IdlePath from enum import Enum from softman import Software, SoftPathGroup, SoftScope from whereami import USER_PATH, PROJ_ROOT from ansible_vault import Vault from cerberus import Validator as constrain_by class ControlNode: __user_path: ExecutedPath = USER_PATH __proj_root: ExecutedPath = PROJ_ROOT __conf_paths: tuple[IdlePath] | list[IdlePath] = ( PurePath("/etc"), PurePath("/usr", "local", "etc"), PurePath(str(__user_path), ".config") ) __data_paths: tuple[IdlePath] | list[IdlePath] = ( PurePath("/usr", "local", "share"), PurePath(str(__user_path), ".local", "share") ) def __init__(self, ansible_proj_root: ExecutedPath | None): if ansible_proj_root is not None: self.__proj_root = ansible_proj_root self.proj_roles: tuple[IdlePath] | list[IdlePath] = ( PurePath(str(self.__proj_root), "roles"), PurePath(str(self.__proj_root), ".ansible/roles") ) self.role_data: tuple[IdlePath] | list[IdlePath] = ( PurePath(str(self.proj_roles[0]), "files"), PurePath(str(self.proj_roles[0]), "templates") ) self.invvar_data: tuple[IdlePath] | list[IdlePath] = ( PurePath(str(self.__proj_root), "group_vars"), PurePath(str(self.__proj_root), "host_vars") ) @property def user_path(self) -> ExecutedPath: return self.__user_path @property def proj_root(self) -> ExecutedPath: return self.__proj_root @property def conf_paths(self) -> ExecutedPath: return self.__conf_paths @property def conf_paths(self) -> ExecutedPath: return self.__data_paths class Softs(Enum): OpenSSH = "ssh" userSSHParams = Dict("userSSHParams", { "username": Required[str], "paths": Software._Apps, "keys": { "available": Required[tuple[ExecutedPath]], "selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]], "authorized": ExecutedPath | list[ExecutedPath] | int | list[int], "used": ExecutedPath | list[ExecutedPath] | int | list[int] }, "password": Required[str], "fate": Literal["disposal", "retention"] }, total=False) __user_ssh_input = { "username": "", "paths": None, "keys": { "available": tuple(), "selected": [0, 1], "authorized": 1, "used": 0 }, "password": "password123", "fate": "disposal" } # @TODO remove unpacking of '__user_ssh_input' userSSH = type("userSSH", (), **__user_ssh_input) vps_schema = { "fqdn": {"type": "string"}, "vps_service": { "type": "dict", "schema": { "exists": {"type": "boolean"}, "password": {"type": "string"}, "api_key": {"type": "string"}, "type": { "type": "string", "anyof": ["linode"] }, "region": { "type": "string", "anyof": ["us-east"] }, "ssh_authorized_keys": {"type": "list"}, "root_fate": { "type": "string", "anyof": ["disposal", "retention"] } } }, "keywords": {"type": "list"} } vps_schema = constrain_by(vps_schema) class VirtualPrivateServers(Enum): Linode = 0 class RemoteNode: # __user_path = _fqdn: str | None = None def __init__(self, cnode: ControlNode, api_key: str, password: str, name: str, service: VirtualPrivateServers = VirtualPrivateServers.Linode.name, keywords: list[str] | None = None): self.root: dict = dict() self.root["username"]: str = "root" vault = Vault(password) self.root["password"]: str = vault.dump(password) self.root["software"]: Software = Software() app_input = { SoftPathGroup.CONFIG.name: { SoftScope.PERSONAL.name: PurePath(str(cnode.user_path), ("." + Softs.OpenSSH)) }, SoftPathGroup.DATA.name: { SoftScope.GLOBAL.name: PurePath(str(cnode.conf_paths[0]), "update-motd.d") } } self.root["software"].declare(Softs.OpenSSH, **app_input) self._fqdn = name self.root["software"]._fqdn = name root_ssh_input: userSSHParams = { "username": self.root["username"], "password": self.root["password"] } self.ssh: userSSH = userSSH(**root_ssh_input) self.apps: list = self.root["software"].list(contents = True) self.keywords = keywords self._api_key: str | None = api_key self.service = service def set_region(self, name: Literal["us-east"] = "us-east") -> None: self.region = name def get_region(self) -> str: return self.region def set_password(self, password: str) -> None: vault = Vault(value) self.root["password"] = vault.dump(value) self.ssh.password: str = self.root["password"] def get_password(self) -> Vault | str: return self.root["password"] def set_api(self, key: str) -> None: self._api_key = key def get_api(self) -> Vault | str: vault = Vault(self._api_key) return vault.dump(self._api_key) def add_tags(self, *name): self.keywords: list = [] self.keywords += list(name) def import_keys(self, key_basenames: str | tuple[str], match_sort: tuple[Callable, bool] = (lambda e: e, False)): keyfiles: list[ExecutedPath] | list[None] | None = list() if isinstance(key_basenames, tuple): for basename in key_basenames: keyfiles += Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(basename) elif isinstance(key_basenames, str): keyfiles = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(key_basenames) else: raise ValueError updated_keyfiles: list[ExecutedPath] = [] for filename in keyfiles: new_keyfile = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])) / str(filename) updated_keyfiles.append(new_keyfile.resolve()) keyfiles = updated_keyfiles self.ssh.paths = self.root["software"].ssh self.ssh.keys["available"] = tuple(keyfiles.sort(reverse = match_sort[1], key = match_sort[0])) def show_keys(self, which: Literal["authorized", "used", "available", "selected"] = "available") -> tuple[tuple, str]: delimiters: str | tuple = "{}" gap: Callable[[bool], str | None] = lambda b: " " if b else "" sep_format: str = "{0}" sep: Callable[[str, bool], str] = lambda s, b: sep_format.format(s) + gap(b) label_format: str = "{0}:" label: Callable[[str, bool], str] = lambda s, b: label_format.format(s) + gap(b) def render(kfs, source: str): if isinstance(kfs, list): member_ints: list = list() for f in kfs: if isinstance(f, int): members_ints.append(f) if len(member_ints) > 0: kfs = [self.ssh.keys[source][i] for i in kfs] elif isinstance(kfs, int): kfs = [self.ssh.keys[source][kfs]] elif isinstance(kfs, ExecutedPath): kfs = [self.ssh.keys[source]] return kfs keyfiles = self.ssh.keys[which] if which == "selected": keyfiles = render(keyfiles, "available") if which == "authorized": keyfiles = render(keyfiles, "selected") if which == "used": keyfiles = render(keyfiles, "selected") stringified_keyfiles = list(map(lambda t: label(t[0], True) + str(t[1]), enumerate(keyfiles))) stringified_keyfiles = sep(",", True).join(stringified_keyfiles) stringified_keyfiles = delimiters[0:len(delimiters)] + stringified_keyfiles[:(len(stringified_keyfiles) - 2)] + delimiters[1:] return (tuple(keyfiles), stringified_keyfiles) def pick_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never: keyfiles = self.ssh.keys[source] authlist = [] if source == "available" or source == "selected": for s in selections: if isinstance(s, int): authlist.append(keyfiles[s]) elif isinstance(s, ExecutedPath): path_set = set([s]) kf_set = set(keyfiles) overlap = kf_set & path_set if overlap is not None and len(overlap) > 0: authlist.append(list(overlap)[0]) else: continue elif isinstance(s, str): kf_strs = list(map(lambda p: str(p), keyfiles)) if s in kf_strs: authlist.append(Path(s)) if source == "available": self.ssh.keys["selected"] = authlist return self.ssh.keys["selected"] elif source == "selected": self.ssh.keys["authorized"] = authlist return self.ssh.keys["authorized"] elif source == "authorized": for s in selections: if isinstance(s, int): authlist.append(keyfiles[s]) elif isinstance(s, ExecutedPath): for p in keyfiles: if str(s) in str(p): authlist.append(p) else: continue elif isinstance(s, str): for p in keyfiles: if s in str(p): authlist.append(p) else: continue self.ssh.keys["used"] = authlist return self.ssh.keys["used"] def remove_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never: keyfiles = self.ssh.keys[source] for s in selections: if isinstance(s, int): removed_elem = keyfiles.pop(s) elif isinstance(s, ExecutedPath): path_set = set([s]) kf_set = set(keyfiles) overlap = kf_set & path_set if overlap is not None and len(overlap) > 0: item = list(overlap)[0] keyfiles = [p for p in keyfiles if p != item] self.ssh.keys[source] = keyfiles else: continue elif isinstance(s, str): kf_strs = list(map(lambda p: str(p), keyfiles)) if s in kf_strs: keyfiles = [p for p in keyfiles if str(p) != S] self.ssh.keys[source] = keyfiles self.ssh.keys[source] = keyfiles if source == "available": self.remove_keys("selected", *selections) elif source == "selected": self.remove_keys("authorized", *selections) elif source == "authorized": self.remove_keys("used", *selections) return self.ssh.keys def itemize(self): model = dict() vault_api = Vault(self._api_key) vault_pass = Vault(self.ssh.password) model["fqdn"] = self._fqdn model["vps_service"] = { "exists": True, "password": vault_pass.dump(self.ssh.password), "api_key": vault_api.dump(self._api_key), "type": self.service.lower(), "region": self.region, "ssh_authorized_keys": self.ssh.keys["authorized"], "root_fate": self.ssh.fate } model["keywords"] = self.keywords if vps_schema.validate(model): return model