""" Library of classes modeling Ansible nodes or their types. """ from pathlib import Path, PurePath from typing import TypeAlias as Neotype from typing import TypedDict as Dict from typing import Never, Union, Literal, Required from collections.abc import Callable from custtypes import ExecutedPath, IdlePath, VirtualPrivateServers, AnsibleScopes from enum import Enum from softman import Software, SoftPathGroup, SoftScope, Apps from whereami import USER_PATH, PROJ_ROOT from ansible_vault import Vault from cerberus import Validator as constrain_by from random import choice import secrets class ControlNode: __user_path: ExecutedPath = USER_PATH __proj_root: ExecutedPath = PROJ_ROOT __conf_paths: tuple[IdlePath] | list[IdlePath] = ( PurePath("/etc"), PurePath("/usr", "local", "etc"), PurePath(str(__user_path), ".config") ) __data_paths: tuple[IdlePath] | list[IdlePath] = ( PurePath("/usr", "local", "share"), PurePath(str(__user_path), ".local", "share") ) def __init__(self, ansible_proj_root: ExecutedPath | None = None): if ansible_proj_root is not None: self.__proj_root = ansible_proj_root role_paths: tuple[IdlePath] | list[IdlePath] = ( PurePath(str(self.__proj_root), "roles"), PurePath(str(self.__proj_root), ".ansible/roles") ) roledata_paths: tuple[IdlePath] | list[IdlePath] = ( PurePath(str(self.proj_roles[0]), "files"), PurePath(str(self.proj_roles[0]), "templates"), PurePath(str(self.proj_roles[1]), "files"), PurePath(str(self.proj_roles[1]), "templates") ) setattr(self, AnsibleScopes.ROLE.name, { "data": roledata_paths, "vars": role_paths }) setattr(self, AnsibleScopes.GROUPVARS.name, { "vars": (PurePath(str(self.__proj_root), "group_vars"),) }) setattr(self, AnsibleScopes.HOSTVARS.name, { "vars": (PurePath(str(self.__proj_root), "host_vars"),) }) setattr(self, AnsibleScopes.INVENTORY.name, { "vars": (PurePath(str(self.__proj_root), "hosts.yml"),) }) def get_scope(self, scope: AnsibleScopes = AnsibleScopes.INVENTORY.name, index = 0): return getattr(self, scope)[index] @property def home(self) -> ExecutedPath: return self.__user_path @property def root(self) -> ExecutedPath: return self.__proj_root @property def sys_confs(self) -> ExecutedPath: return self.__conf_paths @property def sys_data(self) -> ExecutedPath: return self.__data_paths class Softs(Enum): ssh = 0 _userSSHSubParams = { "available": Required[tuple[ExecutedPath]], "selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]], "authorized": ExecutedPath | list[ExecutedPath] | int | list[int], "used": ExecutedPath | list[ExecutedPath] | int | list[int] } __user_ssh_keys = { "available": tuple(), "selected": [0, 1], "authorized": 1, "used": 0 } def __userSSHInit(self, username: str = "root", paths: _Apps | None = None, keys: _userSSHSubParams = __user_ssh_keys, password: str = "password123", fate: Literal["disposal", "retention"] = "disposal"): self.username = username self.paths = paths self.keys = keys self.password = password self.fate = fate userSSHParams = Dict("userSSHParams", { "username": Required[str], "paths": _Apps, "keys": __user_ssh_keys, "password": Required[str], "fate": Literal["disposal", "retention"] }, total=False) __user_ssh_input = { "username": "", "paths": None, "keys": { "available": tuple(), "selected": [0, 1], "authorized": 1, "used": 0 }, "password": "password123", "fate": "disposal",\ "__init__": __userSSHInit } userSSH = type("userSSH", (), __user_ssh_input) vpsSchema = Dict("vpsSchema", { "fqdn": Required[str], "vps_service": { "exists": Required[bool], "password": Required[str], "api_key": Required[str], "type": Required[Literal["linode"]], "region": Literal["us-east"], "ssh_authorized_keys": list[IdlePath | ExecutedPath | str], "ssh_private_key_paths": list[IdlePath | ExecutedPath | str], "ssh_private_key_path_pref": int, "root_fate": Required[Literal["disposal","retention"]], "ssh_motd_script_basenames": list[str] }, "keywords": list[str] }, total=False) class RemoteNode: # __user_path = _fqdn: str | None = None def __init__(self, cnode: ControlNode, api_key: str = secrets.token_urlsafe(32), password: str = "password123", name: str = ".test", service: VirtualPrivateServers = VirtualPrivateServers.Linode.name, region: Literal["us-east"] | None = "us-east", keywords: list[str] | None = ["server"]): self.root: dict = dict() self.root["username"]: str = "root" vault = Vault(password) self.root["password"]: str = vault.dump(password) self.root["software"]: Software = Software() self.owner = cnode app_input = { SoftPathGroup.CONFIG.name: { SoftScope.PERSONAL.name: PurePath(str(cnode.home), ("." + Softs.ssh.name)) }, SoftPathGroup.DATA.name: { SoftScope.GLOBAL.name: PurePath(str(cnode.sys_confs[0]), "update-motd.d") } } self.root["software"].declare(Softs.ssh.name, **app_input) self._fqdn = name self.root["software"]._fqdn = name root_ssh_input: userSSHParams = { "username": self.root["username"], "password": self.root["password"] } self.ssh: userSSH = userSSH(**root_ssh_input) self.apps: list = self.root["software"].list(contents = True) self.keywords = keywords self._api_key: str | None = api_key self.service = service self.region = region self.__authkeys_selected = False self.__usedkeys_selected = False self.__keys_selected = False self.__finalized_keys = False self.model: dict | None = None self.__key_accumulator: tuple | list | None = [] def set_region(self, name: Literal["us-east"] = "us-east") -> None: self.region = name def set_password(self, password: str) -> None: vault = Vault(self.root["password"]) self.root["password"] = vault.dump(self.root["password"]) def set_api(self, key: str) -> None: vault = Vault(self._api_key) self._api_key = vault.dump(self._api_key) def add_tags(self, *name): self.keywords: list = [] self.keywords += list(name) def import_keys(self, key_basenames: str | tuple[str], match_sort: tuple[Callable, bool] = (lambda e: e.stem, False)): keyfiles: list[ExecutedPath] | list[None] | None = list() if isinstance(key_basenames, tuple): for basename in key_basenames: keyfiles += Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(basename) elif isinstance(key_basenames, str): keyfiles = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(key_basenames) else: raise ValueError updated_keyfiles: list[ExecutedPath] = [] for filename in keyfiles: new_keyfile = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])) / str(filename) updated_keyfiles.append(new_keyfile.resolve()) keyfiles = updated_keyfiles self.ssh.paths = self.root["software"].ssh self.ssh.keys["available"] = tuple(sorted(keyfiles, key=match_sort[0], reverse=match_sort[1])) def show_keys(self, which: Literal["authorized", "used", "available", "selected"] = "available", kformat = object) -> tuple[tuple, str]: delimiters: str | tuple = "[]" gap: Callable[[bool], str | None] = lambda b: " " if b else "" sep_format: str = "{0}" sep: Callable[[str, bool], str] = lambda s, b: sep_format.format(s) + gap(b) label_format: str = "{0}:" label: Callable[[str, bool], str] = lambda s, b: label_format.format(s) + gap(b) def render(kfs, source: str): if isinstance(kfs, list): member_ints: list = list() for f in kfs: if isinstance(f, int): members_ints.append(f) if len(member_ints) > 0: kfs = [self.ssh.keys[source][i] for i in kfs] elif isinstance(kfs, int): kfs = [self.ssh.keys[source][kfs]] elif isinstance(kfs, ExecutedPath): kfs = [self.ssh.keys[source]] return kfs keyfiles = self.ssh.keys[which] if which == "selected": keyfiles = render(keyfiles, "available") if which == "authorized": keyfiles = render(keyfiles, "selected") if which == "used": keyfiles = render(keyfiles, "selected") stringified_keyfiles = list(map(lambda t: label(t[0], True) + str(t[1]), enumerate(keyfiles))) stringified_keyfiles = sep(",", True).join(stringified_keyfiles) stringified_keyfiles = delimiters[0] + stringified_keyfiles[:(len(stringified_keyfiles) - 2)] + delimiters[1:] if kformat == str: result = stringified_keyfiles elif kformat == list: result = keyfiles elif kformat == tuple: result = tuple(keyfiles) elif kformat == object: result = (tuple(keyfiles), stringified_keyfiles) print(result) return result def pick_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never: keyfiles = self.ssh.keys[source] # print(keyfiles) if keyfiles is None: raise TypeError elif isinstance(keyfiles, (tuple, list)) and len(keyfiles) < 1: raise ValueError authlist = [] if source == "available": for s in selections: if isinstance(s, int): # print(s) authlist.append(keyfiles[s]) elif isinstance(s, ExecutedPath): path_set = set([s]) kf_set = set(keyfiles) overlap = kf_set & path_set if overlap is not None and len(overlap) > 0: authlist.append(list(overlap)[0]) else: continue elif isinstance(s, str): kf_strs = list(map(lambda p: str(p), keyfiles)) if s in kf_strs: authlist.append(Path(s)) self.ssh.keys["selected"] = authlist self.__keys_selected = True result = self.ssh.keys["selected"] if source == "selected": privkeys = list() pubkeys = list() count = 1 for s in selections: if isinstance(s, int): # print(s) if count % 2 == 0: pubkeys.append(keyfiles[s]) else: privkeys.append(keyfiles[s]) elif isinstance(s, ExecutedPath): path_set = set([s]) kf_set = set(keyfiles) overlap = kf_set & path_set if overlap is not None and len(overlap) > 0: if count % 2 == 0: pubkeys.append(list(overlap)[0]) else: privkeys.append(list(overlap)[0]) else: continue elif isinstance(s, str): kf_strs = list(map(lambda p: str(p), keyfiles)) if s in kf_strs: if count % 2 == 0: pubkeys.append(Path(s)) else: privkeys.append(Path(s)) count += 1 self.ssh.keys["authorized"] = pubkeys self.ssh.keys["used"] = privkeys self.__authkeys_selected = True self.__usedkeys_selected = True result = (self.ssh.keys["authorized"], self.ssh.keys["used"]) elif source == "authorized": for s in selections: if isinstance(s, int): if self.ssh.keys["selected"][s] in keyfiles: authlist.append(self.ssh.keys["selected"][s]) elif isinstance(s, ExecutedPath): for p in self.ssh.keys["selected"]: if str(s) in str(p) and str(s) in list(map(lambda p: str(p), keyfiles)): authlist.append(p) else: continue elif isinstance(s, str): for p in self.ssh.keys["selected"]: if s in str(p) and s in list(map(lambda p: str(p), keyfiles)): authlist.append(p) else: continue self.ssh.keys["used"] = authlist self.__usedkeys_selected = True result = self.ssh.keys["used"] self.__finalized_keys = self.__keys_selected and self.__authkeys_selected and self.__usedkeys_selected return result # @TODO continue writing below method def remove_keys(self, target: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | str | ExecutedPath): keyfiles = self.ssh.keys[target] key_accumulator_populated = (slf.__key_accumulator is not None and isinstance(slf.__key_accumulator, (tuple, list)) and len(slf.__key_accumulator) > 0) for s in selections: if isinstance(s, int): if target == "available": self.__key_accumulator.append(list(keyfiles).pop(s)) else: self.__key_accumulator.append(keyfiles.pop(s)) elif isinstance(s, (str, ExecutedPath)): if isinstance(s, str): removed_keyfiles = list(filter(lambda p: str(p) == s, keyfiles)) else: removed_keyfiles = list(filter(lambda p: p == s, keyfiles)) keyfiles = filter(lambda p: str(p) != s, keyfiles) self.__key_accumulator += removed_keyfiles self.ssh.keys[target] = keyfiles if target == "available": selected_diff = list(set(self.ssh.keys["selected"]) - set(self.__key_accumulator)) self.ssh.keys["selected"] = selected_diff if len(selected_diff) >= 2 else [0, 1] auth_diff = list(set(self.ssh.keys["authorized"]) - set(self.__key_accumulator)) self.ssh.keys["authorized"] = auth_diff used_diff = list(set(self.ssh.keys["used"]) - set(self.__key_accumulator)) self.ssh.keys["used"] = used_diff elif target == "selected": available_diff = list(set(self.ssh.keys["available"]) - set(self.__key_accumulator)) self.ssh.keys["available"] = available_diff if len(available_diff) >= 2 else [0, 1] auth_diff = list(set(self.ssh.keys["authorized"]) - set(self.__key_accumulator)) self.ssh.keys["authorized"] = auth_diff used_diff = list(set(self.ssh.keys["used"]) - set(self.__key_accumulator)) self.ssh.keys["used"] = used_diff elif target == "authorized": available_diff = list(set(self.ssh.keys["available"]) - set(self.__key_accumulator)) self.ssh.keys["available"] = available_diff if len(available_diff) >= 2 else [0, 1] selected_diff = list(set(self.ssh.keys["selected"]) - set(self.__key_accumulator)) self.ssh.keys["selected"] = selected_diff used_diff = list(set(self.ssh.keys["used"]) - set(self.__key_accumulator)) self.ssh.keys["used"] = used_diff elif target == "used": available_diff = list(set(self.ssh.keys["available"]) - set(self.__key_accumulator)) self.ssh.keys["available"] = available_diff if len(available_diff) >= 2 else [0, 1] selected_diff = list(set(self.ssh.keys["selected"]) - set(self.__key_accumulator)) self.ssh.keys["selected"] = selected_diff auth_diff = list(set(self.ssh.keys["used"]) - set(self.__key_accumulator)) self.ssh.keys["authorized"] = auth_diff def itemize(self): model: dict | vpsSchema = dict() vault_api = Vault(self._api_key) vault_pass = Vault(self.ssh.password) # print(self.ssh.keys["selected"]) if not self.__keys_selected: self.pick_keys("available", *self.ssh.keys["selected"]) self.pick_keys("selected", self.ssh.keys["authorized"], self.ssh.keys["used"]) # self.pick_keys("selected", self.ssh.keys["used"]) authorized_keys = list(map(lambda p: p.read_text(), self.ssh.keys["authorized"])) used_keys = list(map(lambda p: str(p), self.ssh.keys["used"])) model["fqdn"] = self._fqdn model["vps_service"] = { "exists": True, "password": vault_pass.dump(self.ssh.password), "api_key": vault_api.dump(self._api_key), "type": self.service.lower(), "region": self.region, "ssh_authorized_keys": authorized_keys, "ssh_private_key_paths": used_keys, "ssh_private_key_path_pref": choice(list(range(len(used_keys)))), "root_fate": self.ssh.fate, "ssh_motd_script_basenames": [] } model["keywords"] = self.keywords return model