""" Library of classes modeling Ansible nodes or their types. """ from pathlib import Path, PurePath from typing import TypeAlias as Neotype from typing import TypedDict as Dict from typing import Never, Union, Literal, Required, Self from collections.abc import Callable from custtypes import ExecutedPath, IdlePath, VirtualPrivateServers, AnsibleScopes from enum import Enum from softman import Software, SoftPathGroup, SoftScope, Apps from whereami import USER_PATH, PROJ_ROOT from ansible_vault import Vault from cerberus import Validator as constrain_by from random import choice import secrets from abc import ABC, abstractmethod class ControlNode: __user_path: ExecutedPath = USER_PATH __proj_root: ExecutedPath = PROJ_ROOT __conf_paths: tuple[IdlePath] | list[IdlePath] = ( PurePath("/etc"), PurePath("/usr", "local", "etc"), PurePath(str(__user_path), ".config") ) __data_paths: tuple[IdlePath] | list[IdlePath] = ( PurePath("/usr", "local", "share"), PurePath(str(__user_path), ".local", "share") ) def __init__(self, ansible_proj_root: ExecutedPath | None = None): if ansible_proj_root is not None: self.__proj_root = ansible_proj_root role_paths: tuple[IdlePath] | list[IdlePath] = ( PurePath(str(self.__proj_root), "roles"), PurePath(str(self.__proj_root), ".ansible/roles") ) roledata_paths: tuple[IdlePath] | list[IdlePath] = ( PurePath(str(self.proj_roles[0]), "files"), PurePath(str(self.proj_roles[0]), "templates"), PurePath(str(self.proj_roles[1]), "files"), PurePath(str(self.proj_roles[1]), "templates") ) setattr(self, AnsibleScopes.ROLE.name, { "data": roledata_paths, "vars": role_paths }) setattr(self, AnsibleScopes.GROUPVARS.name, { "vars": (PurePath(str(self.__proj_root), "group_vars"),) }) setattr(self, AnsibleScopes.HOSTVARS.name, { "vars": (PurePath(str(self.__proj_root), "host_vars"),) }) setattr(self, AnsibleScopes.INVENTORY.name, { "vars": (PurePath(str(self.__proj_root), "hosts.yml"),) }) def get_scope(self, scope: AnsibleScopes = AnsibleScopes.INVENTORY.name, index = 0): return getattr(self, scope)[index] @property def home(self) -> ExecutedPath: return self.__user_path @property def root(self) -> ExecutedPath: return self.__proj_root @property def sys_confs(self) -> ExecutedPath: return self.__conf_paths @property def sys_data(self) -> ExecutedPath: return self.__data_paths class Softs(Enum): ssh = 0 class RootFate(Enum): disposal = 0 retention = 1 _userSSHSubParams = { "available": Required[tuple[ExecutedPath]], "selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]], "authorized": ExecutedPath | list[ExecutedPath] | int | list[int], "used": ExecutedPath | list[ExecutedPath] | int | list[int] } __user_ssh_keys = { "available": tuple(), "selected": [0, 1], "authorized": 1, "used": 0 } userSSHParams = Dict("userSSHParams", { "username": Required[str], "paths": _Apps, "keys": __user_ssh_keys, "password": Required[str], "fate": Literal["disposal", "retention"] }, total=False) __user_ssh_input = { "username": "", "paths": None, "keys": { "available": tuple(), "selected": [0, 1], "authorized": 1, "used": 0 }, "password": "password123", "fate": "disposal",\ "__init__": __userSSHInit } #userSSH = type("userSSH", (), __user_ssh_input) class SSHKey: def __init__(self, *path: ExecutedPath): if len(path) > 2 or len(path) < 1: raise ValueError self.kind: Literal["public_key","private_key", "dual"] | str | None = None self.__idx: int = 0 self.__prev: Self | None = None self.__next: Self | None = None if len(path) < 2: self.__value: ExecutedPath | tuple[ExecutedPath] = path[0] else: self.__value: ExecutedPath | tuple[ExecutedPath] = path def __int__(self) -> int: return self.__idx def __str__(self) -> str: return str(self.__value) def __repr__(self) -> ExecutedPath | tuple[ExecutedPath]: return self.__value def __nonzero__(self) -> bool: return True def __format__(self, formatstr) -> str: match formatstr: case "item": return str(self.__idx) + ": " + str(self.__value) case "int": return str(self.__idx) case _: return str(self.__value) def __next__(self) -> ExecutedPath | tuple[ExecutedPath]: return self.__next def __prev__(self) -> ExecutedPath | tuple[ExecutedPath]: return self.__prev def __call__(self) -> ExecutedPath | tuple[ExecutedPath]: return self.__value def update(self, *path: ExecutedPath | str) -> None | Never: if len(path) > 2 or len(path) < 1: raise ValueError path = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, path)) if len(path) < 2: self.__value = path[0] else: self.__value = path def replace(self, old: ExecutedPath | str | tuple[ExecutedPath | str] | list[ExecutedPath | str], new: ExecutedPath | str | tuple[ExecutedPath | str] | list[ExecutedPath | str]) -> None | Never: if isinstance(old, str): old = Path(old) if isinstance(new, str): new = Path(new) if isinstance(old, (list, tuple)): if len(old) > 2 or len(old) < 1: raise ValueError old = tuple(map(lambda p: Path(p) if isinstance(p, str) else p, old)) if isinstance(new, (list, tuple)): if len(new) > 2 or len(new) < 1: raise ValueError new = tuple(map(lambda p: Path(p) if isinstance(p, str) else p, new)) if isinstance(self.__value, (tuple, list)): if isinstance(old, tuple): remaining_value = list(filter(lambda p: p not in old, self.__value)) if isinstance(new, tuple): self.__value = (*remaining_value, *new) else: self.__value = (*remaining_value, new) else: remaining_value = list(filter(lambda p: p != old, self.__value)) if isinstance(new, tuple): self.__value = (*remaining_value, *new) else: self.__value = (*remaining_value, new) if len(self.__value) > 2: self.__value = self.__value[0] elif isinstance(self.__value, ExecutedPath): if isinstance(old, tuple): remaining_value = None if self.__value in old else self.__value else: remaining_value = None if self.__value == old else self.__value if remaining_value is None: self.__value = new else: raise ValueError def publish(self, idx: int | None = None) -> str | tuple[str]: if idx is not None: result = self.__value[idx] else: result = self.__value if isinstance(result, tuple): result = tuple(map(lambda p: p.read_text(), result)) else: result = result.read_text() return result @property def status(self) -> Never: # @TODO this method should return string or Enum value after analyzing whether this key is public or private raise NotImplementedError class SSHKeyCollection: def __init__(self): self.__current: SSHKey | None = None self.__first: SSHKey | None = None self.__last: SSHKey | None = None self.__indices: range | None = None def __setitem__(self, key: int, *value: ExecutedPath | str) -> None | Never: if len(value) < 1 or len(value) > 2: raise ValueError value = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, value)) if self.__current is None: self.__current = SSHKey(*value) self.__current._SSHKey__idx = key elif int(self.__current) == key: if self.__current() is None or len(self.__current()) < 1: self.__current.update(*value) else: while int(self.__current) != key: if next(self.__current) is not None: self.__current = next(self.__current) else: break self.__current.update(*value) def __getitem__(self, key: int) -> SSHKey: if self.__current is None: raise KeyError elif int(self.__current) == key: return self.__current else: while int(self.__current) != key: if next(self.__current) is not None: self.__current = next(self.__current) else: break return self.__current def __delitem__(self, key: int) -> Never: raise NotImplementedError def append(self, *value: ExecutedPath | str) -> None | Never: if len(value) < 1 or len(value) > 2: raise ValueError value = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, value)) ssh_key = SSHKey(*value) if self.__first is None: ssh_key._SSHKey__idx = 0 self.__indices = range(ssh_key._SSHKey__idx + 1) self.__first = ssh_key if self.__last is None: self.__last = ssh_key self.__first._SSHKey__next = self.__last self.__last._SSHKey__prev = self.__first self.__current = self.__first else: ssh_key._SSHKey__idx = len(self.__indices) ssh_key._SSHKey__prev = self.__last self.__last = ssh_key self.__indices = range(ssh_key._SSHKey__idx + 1) def import_keys(self) -> Never: raise NotImplementedError class UserSSH: def __init__(self, username: str = "root", paths: _Apps | None = None, keys: _userSSHSubParams = __user_ssh_keys, password: str = "password123", fate: RootFate = RootFate.disposal.name): self.username = username self.paths = paths self.keys = keys self.password = password self.fate = fate vpsSchema = Dict("vpsSchema", { "fqdn": Required[str], "vps_service": { "exists": Required[bool], "password": Required[str], "api_key": Required[str], "type": Required[Literal["linode"]], "region": Literal["us-east"], "ssh_authorized_keys": list[IdlePath | ExecutedPath | str], "ssh_private_key_paths": list[IdlePath | ExecutedPath | str], "ssh_private_key_path_pref": int, "root_fate": Required[Literal["disposal","retention"]], "ssh_motd_script_basenames": list[str] }, "keywords": list[str] }, total=False) class RemoteNode: # __user_path = _fqdn: str | None = None def __init__(self, cnode: ControlNode, api_key: str = secrets.token_urlsafe(32), password: str = "password123", name: str = ".test", service: VirtualPrivateServers = VirtualPrivateServers.Linode.name, region: Literal["us-east"] | None = "us-east", keywords: list[str] | None = ["server"]): self.root: dict = dict() self.root["username"]: str = "root" vault = Vault(password) self.root["password"]: str = vault.dump(password) self.root["software"]: Software = Software() self.owner = cnode app_input = { SoftPathGroup.CONFIG.name: { SoftScope.PERSONAL.name: PurePath(str(cnode.home), ("." + Softs.ssh.name)) }, SoftPathGroup.DATA.name: { SoftScope.GLOBAL.name: PurePath(str(cnode.sys_confs[0]), "update-motd.d") } } self.root["software"].declare(Softs.ssh.name, **app_input) self._fqdn = name self.root["software"]._fqdn = name root_ssh_input: userSSHParams = { "username": self.root["username"], "password": self.root["password"] } self.ssh: UserSSH = UserSSH(**root_ssh_input) self.apps: list = self.root["software"].list(contents = True) self.keywords = keywords self._api_key: str | None = api_key self.service = service self.region = region self.__authkeys_selected = False self.__usedkeys_selected = False self.__keys_selected = False self.__finalized_keys = False self.model: dict | None = None self.__key_accumulator: tuple | list | None = [] def set_region(self, name: Literal["us-east"] = "us-east") -> None: self.region = name def set_password(self, password: str) -> None: vault = Vault(self.root["password"]) self.root["password"] = vault.dump(self.root["password"]) def set_api(self, key: str) -> None: vault = Vault(self._api_key) self._api_key = vault.dump(self._api_key) def add_tags(self, *name): self.keywords: list = [] self.keywords += list(name) def import_keys(self, key_basenames: str | tuple[str], match_sort: tuple[Callable, bool] = (lambda e: e.stem, False)): keyfiles: list[ExecutedPath] | list[None] | None = list() if isinstance(key_basenames, tuple): for basename in key_basenames: keyfiles += Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(basename) elif isinstance(key_basenames, str): keyfiles = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(key_basenames) else: raise ValueError updated_keyfiles: list[ExecutedPath] = [] for filename in keyfiles: new_keyfile = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])) / str(filename) updated_keyfiles.append(new_keyfile.resolve()) keyfiles = updated_keyfiles self.ssh.paths = self.root["software"].ssh self.ssh.keys["available"] = tuple(sorted(keyfiles, key=match_sort[0], reverse=match_sort[1])) def show_keys(self, which: Literal["authorized", "used", "available", "selected"] = "available", kformat = object) -> tuple[tuple, str]: delimiters: str | tuple = "[]" gap: Callable[[bool], str | None] = lambda b: " " if b else "" sep_format: str = "{0}" sep: Callable[[str, bool], str] = lambda s, b: sep_format.format(s) + gap(b) label_format: str = "{0}:" label: Callable[[str, bool], str] = lambda s, b: label_format.format(s) + gap(b) def render(kfs, source: str): if isinstance(kfs, list): member_ints: list = list() for f in kfs: if isinstance(f, int): members_ints.append(f) if len(member_ints) > 0: kfs = [self.ssh.keys[source][i] for i in kfs] elif isinstance(kfs, int): kfs = [self.ssh.keys[source][kfs]] elif isinstance(kfs, ExecutedPath): kfs = [self.ssh.keys[source]] return kfs keyfiles = self.ssh.keys[which] if which == "selected": keyfiles = render(keyfiles, "available") if which == "authorized" or which == "used": keyfiles = render(keyfiles, "selected") stringified_keyfiles = list(map(lambda t: label(t[0], True) + str(t[1]), enumerate(keyfiles))) stringified_keyfiles = sep(",", True).join(stringified_keyfiles) stringified_keyfiles = delimiters[0] + stringified_keyfiles[:(len(stringified_keyfiles) - 2)] + delimiters[1:] if kformat == str: result = stringified_keyfiles elif kformat == list: result = keyfiles elif kformat == tuple: result = tuple(keyfiles) elif kformat == object: result = (tuple(keyfiles), stringified_keyfiles) print(result) return result def pick_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never: keyfiles = self.ssh.keys[source] # print(keyfiles) if keyfiles is None: raise TypeError elif isinstance(keyfiles, (tuple, list)) and len(keyfiles) < 1: raise ValueError authlist = [] if source == "available": for s in selections: if isinstance(s, int): # print(s) authlist.append(keyfiles[s]) elif isinstance(s, ExecutedPath): path_set = set([s]) kf_set = set(keyfiles) overlap = kf_set & path_set if overlap is not None and len(overlap) > 0: authlist.append(list(overlap)[0]) else: continue elif isinstance(s, str): kf_strs = list(map(lambda p: str(p), keyfiles)) if s in kf_strs: authlist.append(Path(s)) self.ssh.keys["selected"] = authlist self.__keys_selected = True result = self.ssh.keys["selected"] if source == "selected": privkeys = list() pubkeys = list() count = 1 for s in selections: if isinstance(s, int): # print(s) if count % 2 == 0: pubkeys.append(keyfiles[s]) else: privkeys.append(keyfiles[s]) elif isinstance(s, ExecutedPath): path_set = set([s]) kf_set = set(keyfiles) overlap = kf_set & path_set if overlap is not None and len(overlap) > 0: if count % 2 == 0: pubkeys.append(list(overlap)[0]) else: privkeys.append(list(overlap)[0]) else: continue elif isinstance(s, str): kf_strs = list(map(lambda p: str(p), keyfiles)) if s in kf_strs: if count % 2 == 0: pubkeys.append(Path(s)) else: privkeys.append(Path(s)) count += 1 self.ssh.keys["authorized"] = pubkeys self.ssh.keys["used"] = privkeys self.__authkeys_selected = True self.__usedkeys_selected = True result = (self.ssh.keys["authorized"], self.ssh.keys["used"]) elif source == "authorized": for s in selections: if isinstance(s, int): if self.ssh.keys["selected"][s] in keyfiles: authlist.append(self.ssh.keys["selected"][s]) elif isinstance(s, ExecutedPath): for p in self.ssh.keys["selected"]: if str(s) in str(p) and str(s) in list(map(lambda p: str(p), keyfiles)): authlist.append(p) else: continue elif isinstance(s, str): for p in self.ssh.keys["selected"]: if s in str(p) and s in list(map(lambda p: str(p), keyfiles)): authlist.append(p) else: continue self.ssh.keys["used"] = authlist self.__usedkeys_selected = True result = self.ssh.keys["used"] self.__finalized_keys = self.__keys_selected and self.__authkeys_selected and self.__usedkeys_selected return result # @TODO continue writing below method def remove_keys(self, target: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | str | ExecutedPath): keyfiles = self.ssh.keys[target] key_accumulator_populated = (slf.__key_accumulator is not None and isinstance(slf.__key_accumulator, (tuple, list)) and len(slf.__key_accumulator) > 0) for s in selections: if isinstance(s, int): if target == "available": self.__key_accumulator.append(list(keyfiles).pop(s)) else: self.__key_accumulator.append(keyfiles.pop(s)) elif isinstance(s, (str, ExecutedPath)): if isinstance(s, str): removed_keyfiles = list(filter(lambda p: str(p) == s, keyfiles)) else: removed_keyfiles = list(filter(lambda p: p == s, keyfiles)) keyfiles = filter(lambda p: str(p) != s, keyfiles) self.__key_accumulator += removed_keyfiles self.ssh.keys[target] = keyfiles if target == "available": selected_diff = list(set(self.ssh.keys["selected"]) - set(self.__key_accumulator)) self.ssh.keys["selected"] = selected_diff if len(selected_diff) >= 2 else [0, 1] auth_diff = list(set(self.ssh.keys["authorized"]) - set(self.__key_accumulator)) self.ssh.keys["authorized"] = auth_diff used_diff = list(set(self.ssh.keys["used"]) - set(self.__key_accumulator)) self.ssh.keys["used"] = used_diff elif target == "selected": available_diff = list(set(self.ssh.keys["available"]) - set(self.__key_accumulator)) self.ssh.keys["available"] = available_diff if len(available_diff) >= 2 else [0, 1] auth_diff = list(set(self.ssh.keys["authorized"]) - set(self.__key_accumulator)) self.ssh.keys["authorized"] = auth_diff used_diff = list(set(self.ssh.keys["used"]) - set(self.__key_accumulator)) self.ssh.keys["used"] = used_diff elif target == "authorized": available_diff = list(set(self.ssh.keys["available"]) - set(self.__key_accumulator)) self.ssh.keys["available"] = available_diff if len(available_diff) >= 2 else [0, 1] selected_diff = list(set(self.ssh.keys["selected"]) - set(self.__key_accumulator)) self.ssh.keys["selected"] = selected_diff used_diff = list(set(self.ssh.keys["used"]) - set(self.__key_accumulator)) self.ssh.keys["used"] = used_diff elif target == "used": available_diff = list(set(self.ssh.keys["available"]) - set(self.__key_accumulator)) self.ssh.keys["available"] = available_diff if len(available_diff) >= 2 else [0, 1] selected_diff = list(set(self.ssh.keys["selected"]) - set(self.__key_accumulator)) self.ssh.keys["selected"] = selected_diff auth_diff = list(set(self.ssh.keys["used"]) - set(self.__key_accumulator)) self.ssh.keys["authorized"] = auth_diff def itemize(self): model: dict | vpsSchema = dict() vault_api = Vault(self._api_key) vault_pass = Vault(self.ssh.password) # print(self.ssh.keys["selected"]) if not self.__keys_selected: self.pick_keys("available", *self.ssh.keys["selected"]) self.pick_keys("selected", self.ssh.keys["authorized"], self.ssh.keys["used"]) # self.pick_keys("selected", self.ssh.keys["used"]) authorized_keys = list(map(lambda p: p.read_text(), self.ssh.keys["authorized"])) used_keys = list(map(lambda p: str(p), self.ssh.keys["used"])) model["fqdn"] = self._fqdn model["vps_service"] = { "exists": True, "password": vault_pass.dump(self.ssh.password), "api_key": vault_api.dump(self._api_key), "type": self.service.lower(), "region": self.region, "ssh_authorized_keys": authorized_keys, "ssh_private_key_paths": used_keys, "ssh_private_key_path_pref": choice(list(range(len(used_keys)))), "root_fate": self.ssh.fate, "ssh_motd_script_basenames": [] } model["keywords"] = self.keywords return model