""" Library of classes modeling Ansible nodes or their types. """ from pathlib import Path, PurePath from typing import TypeAlias as Neotype from typing import TypedDict as Dict from typing import Never, Union, Literal from collections.abc import Callable from custtypes import ExecutedPath, IdlePath from enum import Enum from softman import Software, SoftPathGroup, SoftScope from whereami import USER_PATH, PROJ_ROOT # @TODO use below 2 imports in 'get_head_user' function to grab username of highest-privileged login user from os import name as shell_type # from os import environ as env_vars class ControlNode: __user_path: ExecutedPath = USER_PATH __proj_root: ExecutedPath = PROJ_ROOT __conf_paths: tuple[IdlePath] | list[IdlePath] = ( PurePath("/etc"), PurePath("/usr", "local", "etc"), PurePath(str(__user_path), ".config") ) __data_paths: tuple[IdlePath] | list[IdlePath] = ( PurePath("/usr", "local", "share"), PurePath(str(__user_path), ".local", "share") ) def __init__(self, ansible_proj_root: ExecutedPath | None): if ansible_root is not None: self.__proj_root = ansible_proj_root @property def user_path(self) -> ExecutedPath: return self.__user_path @property def proj_root(self) -> ExecutedPath: return self.__proj_root @property def conf_paths(self) -> ExecutedPath: return self.__conf_paths @property def conf_paths(self) -> ExecutedPath: return self.__data_paths class Softs(Enum): OpenSSH = "ssh" userSSHParams = Dict("userSSHParams", { "username": Required[str], "paths": Required[Software._Apps], "keys": { "available": Required[tuple[ExecutedPath]], "selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]], "authorized": ExecutedPath | list[ExecutedPath] | int | list[int], "used": ExecutedPath | list[ExecutedPath] | int | list[int] }, "fate": Literal["disposal", "retention"] }, total=False) __user_ssh_input = { "username": "", "paths": None, "keys": { "available": tuple(), "selected": [0, 1], "authorized": 1, "used": 0 }, "fate": "disposal" } userSSH = type("userSSH", (), **__user_ssh_input) # @TODO continue to write below function def get_head_user(): if shell_type == "nt": raise NotImplementedError else: raise NotImplementedError class RemoteNode: # __user_path = _fqdn: str | None = None def __init__(self, cnode: ControlNode, name: str | None, keywords: list[str] | None = None): self.root: dict = dict() self.root["username"]: str = "root" self.root["software"]: Software = Software() app_input = { SoftPathGroup.CONFIG.name: { SoftScope.PERSONAL.name: PurePath(str(cnode.user_path), ("." + Softs.OpenSSH)) }, SoftPathGroup.DATA.name: { SoftScope.GLOBAL.name: PurePath(str(cnode.conf_paths[0]), "update-motd.d") } } self.root["software"].declare(Softs.OpenSSH, **app_input) if name is not None: self._fqdn = name self.root["software"]._fqdn = name self.apps: list = self.root["software"].list(contents = True) self.keywords = keywords def import_keys(self, key_basenames: str | tuple[str], match_sort: tuple[Callable, bool] = (lambda e: e, False)): keyfiles: list[ExecutedPath] | list[None] | None = list() if isinstance(key_basenames, tuple): for basename in key_basenames: keyfiles += Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(basename) elif isinstance(key_basenames, str): keyfiles = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(key_basenames) else: raise ValueError updated_keyfiles: list[ExecutedPath] = [] for filename in keyfiles: new_keyfile = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])) / str(filename) updated_keyfiles.append(new_keyfile.resolve()) keyfiles = updated_keyfiles root_ssh_input: userSSHParams = { "username": "root", "paths": self.root["software"].ssh, "keys": { "available": tuple(keyfiles.sort(reverse = match_sort[1], key = match_sort[0])) } } self.ssh: userSSH = userSSH(**root_ssh_input) def show_keys(self, which: Literal["authorized", "used", "available", "selected"] = "available") -> tuple[tuple, str]: delimiters: str | tuple = "{}" gap: Callable[[bool], str | None] = lambda b: " " if b else "" sep_format: str = "{0}" sep: Callable[[str, bool], str] = lambda s, b: sep_format.format(s) + gap(b) label_format: str = "{0}:" label: Callable[[str, bool], str] = lambda s, b: label_format.format(s) + gap(b) def render(kfs, source: str): if isinstance(kfs, list): member_ints: list = list() for f in kfs: if isinstance(f, int): members_ints.append(f) if len(member_ints) > 0: kfs = [self.ssh.keys[source][i] for i in kfs] elif isinstance(kfs, int): kfs = [self.ssh.keys[source][kfs]] elif isinstance(kfs, ExecutedPath): kfs = [self.ssh.keys[source]] return kfs keyfiles = self.ssh.keys[which] if which == "selected": keyfiles = render(keyfiles, "available") if which == "authorized": keyfiles = render(keyfiles, "selected") if which == "used": keyfiles = render(keyfiles, "selected") stringified_keyfiles = list(map(lambda t: label(t[0], True) + str(t[1]), enumerate(keyfiles))) stringified_keyfiles = sep(",", True).join(stringified_keyfiles) stringified_keyfiles = delimiters[0:len(delimiters)] + stringified_keyfiles[:(len(stringified_keyfiles) - 2)] + delimiters[1:] return (tuple(keyfiles), stringified_keyfiles) # @TODO continue to work on two below methods def pick_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never: keyfiles = self.ssh.keys[source] authlist = [] if source == "available": for s in selections: if isinstance(s, int): authlist.append(keyfiles[s]) elif isinstance(s, ExecutedPath): path_set = set([ExecutedPath]) kf_set = set(keyfiles) overlap = kf_set & path_set if overlap is not None and len(overlap) > 0: authlist.append(list(overlap)[0]) else: continue elif isinstance(s, str): kf_strs = list(map(lambda p: str(p), keyfiles)) if s in kf_strs: authlist.append(Path(s)) self.ssh.keys["selected"] = authlist elif source == "selected": raise NotImplementedError def remove_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never: raise NotImplementedError