""" Library of classes modeling Ansible nodes or their types. """ from pathlib import Path, PurePath from typing import TypeAlias as Neotype from typing import TypedDict as Dict from typing import Never, Union, Literal, Required from collections.abc import Callable from custtypes import ExecutedPath, IdlePath from enum import Enum from softman import Software, SoftPathGroup, SoftScope, _Apps from whereami import USER_PATH, PROJ_ROOT from ansible_vault import Vault from cerberus import Validator as constrain_by from random import choice class ControlNode: __user_path: ExecutedPath = USER_PATH __proj_root: ExecutedPath = PROJ_ROOT __conf_paths: tuple[IdlePath] | list[IdlePath] = ( PurePath("/etc"), PurePath("/usr", "local", "etc"), PurePath(str(__user_path), ".config") ) __data_paths: tuple[IdlePath] | list[IdlePath] = ( PurePath("/usr", "local", "share"), PurePath(str(__user_path), ".local", "share") ) def __init__(self, ansible_proj_root: ExecutedPath | None = None): if ansible_proj_root is not None: self.__proj_root = ansible_proj_root self.proj_roles: tuple[IdlePath] | list[IdlePath] = ( PurePath(str(self.__proj_root), "roles"), PurePath(str(self.__proj_root), ".ansible/roles") ) self.role_data: tuple[IdlePath] | list[IdlePath] = ( PurePath(str(self.proj_roles[0]), "files"), PurePath(str(self.proj_roles[0]), "templates"), PurePath(str(self.proj_roles[1]), "files"), PurePath(str(self.proj_roles[1]), "templates") ) self.invvar_data: tuple[IdlePath] | list[IdlePath] = ( PurePath(str(self.__proj_root), "group_vars"), PurePath(str(self.__proj_root), "host_vars") ) @property def user_path(self) -> ExecutedPath: return self.__user_path @property def proj_root(self) -> ExecutedPath: return self.__proj_root @property def conf_paths(self) -> ExecutedPath: return self.__conf_paths @property def data_paths(self) -> ExecutedPath: return self.__data_paths class Softs(Enum): ssh = 0 userSSHSubParams = { "available": Required[tuple[ExecutedPath]], "selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]], "authorized": ExecutedPath | list[ExecutedPath] | int | list[int], "used": ExecutedPath | list[ExecutedPath] | int | list[int] } __user_ssh_keys = { "available": tuple(), "selected": [0, 1], "authorized": 1, "used": 0 } def userSSHInit(self, username: str = "root", paths: _Apps | None = None, keys: userSSHSubParams = __user_ssh_keys, password: str = "password123", fate: Literal["disposal", "retention"] = "disposal"): self.username = username self.paths = paths self.keys = keys self.password = password self.fate = fate userSSHParams = Dict("userSSHParams", { "username": Required[str], "paths": _Apps, "keys": __user_ssh_keys, "password": Required[str], "fate": Literal["disposal", "retention"] }, total=False) __user_ssh_input = { "username": "", "paths": None, "keys": { "available": tuple(), "selected": [0, 1], "authorized": 1, "used": 0 }, "password": "password123", "fate": "disposal",\ "__init__": userSSHInit } userSSH = type("userSSH", (), __user_ssh_input) vpsSchema = Dict("vpsSchema", { "fqdn": Required[str], "vps_service": { "exists": Required[bool], "password": Required[str], "api_key": Required[str], "type": Required[Literal["linode"]], "region": Literal["us-east"], "ssh_authorized_keys": list[IdlePath | ExecutedPath | str], "ssh_private_key_paths": list[IdlePath | ExecutedPath | str], "ssh_private_key_path_pref": int, "root_fate": Required[Literal["disposal","retention"]], "ssh_motd_script_basenames": list[str] }, "keywords": list[str] }, total=False) class VirtualPrivateServers(Enum): Linode = 0 class RemoteNode: # __user_path = _fqdn: str | None = None def __init__(self, cnode: ControlNode, api_key: str, password: str, name: str, service: VirtualPrivateServers = VirtualPrivateServers.Linode.name, region: Literal["us-east"] | None = None, keywords: list[str] | None = None): self.root: dict = dict() self.root["username"]: str = "root" vault = Vault(password) self.root["password"]: str = vault.dump(password) self.root["software"]: Software = Software() app_input = { SoftPathGroup.CONFIG.name: { SoftScope.PERSONAL.name: PurePath(str(cnode.user_path), ("." + Softs.ssh.name)) }, SoftPathGroup.DATA.name: { SoftScope.GLOBAL.name: PurePath(str(cnode.conf_paths[0]), "update-motd.d") } } self.root["software"].declare(Softs.ssh.name, **app_input) self._fqdn = name self.root["software"]._fqdn = name root_ssh_input: userSSHParams = { "username": self.root["username"], "password": self.root["password"] } self.ssh: userSSH = userSSH(**root_ssh_input) self.apps: list = self.root["software"].list(contents = True) self.keywords = keywords self._api_key: str | None = api_key self.service = service self.region = region self.__authkeys_selected = False self.__usedkeys_selected = False self.__keys_selected = False self.__finalized_keys = False def set_region(self, name: Literal["us-east"] = "us-east") -> None: self.region = name def get_region(self) -> str: return self.region def set_password(self, password: str) -> None: vault = Vault(value) self.root["password"] = vault.dump(value) self.ssh.password: str = self.root["password"] def get_password(self) -> Vault | str: return self.root["password"] def set_api(self, key: str) -> None: self._api_key = key def get_api(self) -> Vault | str: vault = Vault(self._api_key) return vault.dump(self._api_key) def add_tags(self, *name): self.keywords: list = [] self.keywords += list(name) def import_keys(self, key_basenames: str | tuple[str], match_sort: tuple[Callable, bool] = (lambda e: e.stem, False)): keyfiles: list[ExecutedPath] | list[None] | None = list() if isinstance(key_basenames, tuple): for basename in key_basenames: keyfiles += Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(basename) elif isinstance(key_basenames, str): keyfiles = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(key_basenames) else: raise ValueError updated_keyfiles: list[ExecutedPath] = [] for filename in keyfiles: new_keyfile = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])) / str(filename) updated_keyfiles.append(new_keyfile.resolve()) keyfiles = updated_keyfiles self.ssh.paths = self.root["software"].ssh self.ssh.keys["available"] = tuple(sorted(keyfiles, key=match_sort[0], reverse=match_sort[1])) def show_keys(self, which: Literal["authorized", "used", "available", "selected"] = "available", kformat = object) -> tuple[tuple, str]: delimiters: str | tuple = "[]" gap: Callable[[bool], str | None] = lambda b: " " if b else "" sep_format: str = "{0}" sep: Callable[[str, bool], str] = lambda s, b: sep_format.format(s) + gap(b) label_format: str = "{0}:" label: Callable[[str, bool], str] = lambda s, b: label_format.format(s) + gap(b) def render(kfs, source: str): if isinstance(kfs, list): member_ints: list = list() for f in kfs: if isinstance(f, int): members_ints.append(f) if len(member_ints) > 0: kfs = [self.ssh.keys[source][i] for i in kfs] elif isinstance(kfs, int): kfs = [self.ssh.keys[source][kfs]] elif isinstance(kfs, ExecutedPath): kfs = [self.ssh.keys[source]] return kfs keyfiles = self.ssh.keys[which] if which == "selected": keyfiles = render(keyfiles, "available") if which == "authorized": keyfiles = render(keyfiles, "selected") if which == "used": keyfiles = render(keyfiles, "selected") stringified_keyfiles = list(map(lambda t: label(t[0], True) + str(t[1]), enumerate(keyfiles))) stringified_keyfiles = sep(",", True).join(stringified_keyfiles) stringified_keyfiles = delimiters[0] + stringified_keyfiles[:(len(stringified_keyfiles) - 2)] + delimiters[1:] if kformat == str: result = stringified_keyfiles elif kformat == list: result = keyfiles elif kformat == tuple: result = tuple(keyfiles) elif kformat == object: result = (tuple(keyfiles), stringified_keyfiles) print(result) return result def pick_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never: keyfiles = self.ssh.keys[source] # print(keyfiles) if keyfiles is None: raise TypeError elif isinstance(keyfiles, (tuple, list)) and len(keyfiles) < 1: raise ValueError authlist = [] if source == "available": for s in selections: if isinstance(s, int): # print(s) authlist.append(keyfiles[s]) elif isinstance(s, ExecutedPath): path_set = set([s]) kf_set = set(keyfiles) overlap = kf_set & path_set if overlap is not None and len(overlap) > 0: authlist.append(list(overlap)[0]) else: continue elif isinstance(s, str): kf_strs = list(map(lambda p: str(p), keyfiles)) if s in kf_strs: authlist.append(Path(s)) self.ssh.keys["selected"] = authlist self.__keys_selected = True result = self.ssh.keys["selected"] if source == "selected": privkeys = list() pubkeys = list() count = 1 for s in selections: if isinstance(s, int): # print(s) if count % 2 == 0: pubkeys.append(keyfiles[s]) else: privkeys.append(keyfiles[s]) elif isinstance(s, ExecutedPath): path_set = set([s]) kf_set = set(keyfiles) overlap = kf_set & path_set if overlap is not None and len(overlap) > 0: if count % 2 == 0: pubkeys.append(list(overlap)[0]) else: privkeys.append(list(overlap)[0]) else: continue elif isinstance(s, str): kf_strs = list(map(lambda p: str(p), keyfiles)) if s in kf_strs: if count % 2 == 0: pubkeys.append(Path(s)) else: privkeys.append(Path(s)) count += 1 self.ssh.keys["authorized"] = pubkeys self.ssh.keys["used"] = privkeys self.__authkeys_selected = True self.__usedkeys_selected = True result = (self.ssh.keys["authorized"], self.ssh.keys["used"]) elif source == "authorized": for s in selections: if isinstance(s, int): if self.ssh.keys["selected"][s] in keyfiles: authlist.append(self.ssh.keys["selected"][s]) elif isinstance(s, ExecutedPath): for p in self.ssh.keys["selected"]: if str(s) in str(p) and str(s) in list(map(lambda p: str(p), keyfiles)): authlist.append(p) else: continue elif isinstance(s, str): for p in self.ssh.keys["selected"]: if s in str(p) and s in list(map(lambda p: str(p), keyfiles)): authlist.append(p) else: continue self.ssh.keys["used"] = authlist self.__usedkeys_selected = True result = self.ssh.keys["used"] self.__finalized_keys = self.__keys_selected and self.__authkeys_selected and self.__usedkeys_selected return result # @TODO test and debug below 'RemoteNode' method def remove_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never: keyfiles = self.ssh.keys[source] for s in selections: if isinstance(s, int): removed_elem = keyfiles.pop(s) elif isinstance(s, ExecutedPath): path_set = set([s]) kf_set = set(keyfiles) overlap = kf_set & path_set if overlap is not None and len(overlap) > 0: item = list(overlap)[0] keyfiles = [p for p in keyfiles if p != item] self.ssh.keys[source] = keyfiles else: continue elif isinstance(s, str): kf_strs = list(map(lambda p: str(p), keyfiles)) if s in kf_strs: keyfiles = [p for p in keyfiles if str(p) != S] self.ssh.keys[source] = keyfiles self.ssh.keys[source] = keyfiles if source == "available": self.remove_keys("selected", *selections) elif source == "selected": self.remove_keys("authorized", *selections) elif source == "authorized": self.remove_keys("used", *selections) return self.ssh.keys def itemize(self): model: dict | vpsSchema = dict() vault_api = Vault(self._api_key) vault_pass = Vault(self.ssh.password) # print(self.ssh.keys["selected"]) if not self.__keys_selected: self.pick_keys("available", *self.ssh.keys["selected"]) self.pick_keys("selected", self.ssh.keys["authorized"], self.ssh.keys["used"]) # self.pick_keys("selected", self.ssh.keys["used"]) authorized_keys = list(map(lambda p: p.read_text(), self.ssh.keys["authorized"])) used_keys = list(map(lambda p: str(p), self.ssh.keys["used"])) model["fqdn"] = self._fqdn model["vps_service"] = { "exists": True, "password": vault_pass.dump(self.ssh.password), "api_key": vault_api.dump(self._api_key), "type": self.service.lower(), "region": self.region, "ssh_authorized_keys": authorized_keys, "ssh_private_key_paths": used_keys, "ssh_private_key_path_pref": choice(list(range(len(used_keys)))), "root_fate": self.ssh.fate, "ssh_motd_script_basenames": [] } model["keywords"] = self.keywords return model rnode = RemoteNode(ControlNode(), "dfsfsd", "sdgsadh", "test.io", VirtualPrivateServers.Linode.name, "us-east", ["meh"]) rnode.import_keys("*ed25519*sukaato_*yubikey.p[up][bk]", (lambda e: e.stem, True)) rnode.show_keys(kformat=tuple) print(rnode.itemize())