diff --git a/anodes.py b/anodes.py new file mode 100644 index 0000000..b8bc352 --- /dev/null +++ b/anodes.py @@ -0,0 +1,207 @@ +""" +Library of classes modeling Ansible nodes or their types. +""" + +from pathlib import Path, PurePath +from typing import TypeAlias as Neotype +from typing import TypedDict as Dict +from typing import Never, Union, Literal +from collections.abc import Callable +from custtypes import ExecutedPath, IdlePath +from enum import Enum +from softman import Software, SoftPathGroup, SoftScope +from whereami import USER_PATH, PROJ_ROOT +# @TODO use below 2 imports in 'get_head_user' function to grab username of highest-privileged login user +from os import name as shell_type +# from os import environ as env_vars + +class ControlNode: + __user_path: ExecutedPath = USER_PATH + __proj_root: ExecutedPath = PROJ_ROOT + __conf_paths: tuple[IdlePath] | list[IdlePath] = ( + PurePath("/etc"), + PurePath("/usr", "local", "etc"), + PurePath(str(__user_path), ".config") + ) + __data_paths: tuple[IdlePath] | list[IdlePath] = ( + PurePath("/usr", "local", "share"), + PurePath(str(__user_path), ".local", "share") + ) + + def __init__(self, ansible__proj_root: ExecutedPath | None): + if ansible_root is not None: + self.__proj_root = ansible_proj_root + + @property + def user_path(self) -> ExecutedPath: + return self.__user_path + + @property + def proj_root(self) -> ExecutedPath: + return self.__proj_root + + @property + def conf_paths(self) -> ExecutedPath: + return self.__conf_paths + + @property + def conf_paths(self) -> ExecutedPath: + return self.__data_paths + +class Softs(Enum): + OpenSSH = "ssh" + +userSSHParams = Dict("userSSHParams", { + "username": Required[str], + "paths": Required[Software._Apps], + "keys": { + "available": Required[tuple[ExecutedPath]], + "selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]], + "authorized": ExecutedPath | list[ExecutedPath] | int | list[int], + "used": ExecutedPath | list[ExecutedPath] | int | list[int] + }, + "fate": Literal["disposal", "retention"] +}, total=False) +__user_ssh_input = { + "username": "", + "paths": None, + "keys": { + "available": tuple(), + "selected": [0, 1], + "authorized": 1, + "used": 0 + }, + "fate": "disposal" +} +userSSH = type("userSSH", (), **__user_ssh_input) + +# @TODO continue to write below function +def get_head_user(): + if shell_type == "nt": + raise NotImplementedError + else: + raise NotImplementedError + +class RemoteNode: + # __user_path = + _fqdn: str | None = None + + def __init__(self, cnode: ControlNode, name: str | None, keywords: list[str] | None = None): + self.root: dict = dict() + self.root["username"]: str = "root" + self.root["software"]: Software = Software() + + app_input = { + SoftPathGroup.CONFIG.name: { + SoftScope.PERSONAL.name: PurePath(str(cnode.user_path), ("." + Softs.OpenSSH)) + }, + SoftPathGroup.DATA.name: { + SoftScope.GLOBAL.name: PurePath(str(cnode.conf_paths[0]), "update-motd.d") + } + } + self.root["software"].declare(Softs.OpenSSH, **app_input) + + if name is not None: + self._fqdn = name + self.root["software"]._fqdn = name + + self.apps: list = self.root["software"].list(contents = True) + self.keywords = keywords + + def import_keys(self, key_basenames: str | tuple[str], match_sort: tuple[Callable, bool] = (lambda e: e, False)): + keyfiles: list[ExecutedPath] | list[None] | None = list() + if isinstance(key_basenames, tuple): + for basename in key_basenames: + keyfiles += Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(basename) + elif isinstance(key_basenames, str): + keyfiles = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(key_basenames) + else: + raise ValueError + + updated_keyfiles: list[ExecutedPath] = [] + for filename in keyfiles: + new_keyfile = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])) / str(filename) + updated_keyfiles.append(new_keyfile.resolve()) + keyfiles = updated_keyfiles + + root_ssh_input: userSSHParams = { + "username": "root", + "paths": self.root["software"].ssh, + "keys": { + "available": tuple(keyfiles.sort(reverse = match_sort[1], key = match_sort[0])) + } + } + self.ssh: userSSH = userSSH(**root_ssh_input) + + def show_keys(self, which: Literal["authorized", "used", "available", "selected"] = "available") -> tuple[tuple, str]: + delimiters: str | tuple = "{}" + gap: Callable[[bool], str | None] = lambda b: " " if b else "" + sep_format: str = "{0}" + sep: Callable[[str, bool], str] = lambda s, b: sep_format.format(s) + gap(b) + label_format: str = "{0}:" + label: Callable[[str, bool], str] = lambda s, b: label_format.format(s) + gap(b) + + def render(kfs, source: str): + if isinstance(kfs, list): + member_ints: list = list() + for f in kfs: + if isinstance(f, int): + members_ints.append(f) + + if len(member_ints) > 0: + kfs = [self.ssh.keys[source][i] for i in kfs] + elif isinstance(kfs, int): + kfs = [self.ssh.keys[source][kfs]] + elif isinstance(kfs, ExecutedPath): + kfs = [self.ssh.keys[source]] + + return kfs + + keyfiles = self.ssh.keys[which] + + if which == "selected": + keyfiles = render(keyfiles, "available") + + if which == "authorized": + keyfiles = render(keyfiles, "selected") + + if which == "used": + keyfiles = render(keyfiles, "selected") + + stringified_keyfiles = list(map(lambda t: label(t[0], True) + str(t[1]), enumerate(keyfiles))) + stringified_keyfiles = sep(",", True).join(stringified_keyfiles) + stringified_keyfiles = delimiters[0:len(delimiters)] + stringified_keyfiles[:(len(stringified_keyfiles) - 2)] + delimiters[1:] + + return (tuple(keyfiles), stringified_keyfiles) + + # @TODO continue to work on two below methods + def pick_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never: + keyfiles = self.ssh.keys[source] + + authlist = [] + if source == "available": + for s in selections: + if isinstance(s, int): + authlist.append(keyfiles[s]) + elif isinstance(s, ExecutedPath): + path_set = set([ExecutedPath]) + kf_set = set(keyfiles) + overlap = kf_set & path_set + + if overlap is not None and len(overlap) > 0: + authlist.append(list(overlap)[0]) + else: + continue + elif isinstance(s, str): + kf_strs = list(map(lambda p: str(p), keyfiles)) + + if s in kf_strs: + authlist.append(Path(s)) + self.ssh.keys["selected"] = authlist + elif source == "selected": + raise NotImplementedError + + + def remove_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never: + raise NotImplementedError +