From d46a10191b23c4900bc532e32f2c3bc3ad9122f7 Mon Sep 17 00:00:00 2001 From: Alex Tavarez Date: Sat, 3 Jan 2026 15:53:10 -0500 Subject: [PATCH] fix, refactor: debugged circular class ref via refactor & renaming --- anodes.py | 218 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 151 insertions(+), 67 deletions(-) diff --git a/anodes.py b/anodes.py index 36c37ef..8e17eba 100644 --- a/anodes.py +++ b/anodes.py @@ -5,14 +5,15 @@ Library of classes modeling Ansible nodes or their types. from pathlib import Path, PurePath from typing import TypeAlias as Neotype from typing import TypedDict as Dict -from typing import Never, Union, Literal +from typing import Never, Union, Literal, Required from collections.abc import Callable from custtypes import ExecutedPath, IdlePath from enum import Enum -from softman import Software, SoftPathGroup, SoftScope +from softman import Software, SoftPathGroup, SoftScope, _Apps from whereami import USER_PATH, PROJ_ROOT from ansible_vault import Vault from cerberus import Validator as constrain_by +from random import choice class ControlNode: __user_path: ExecutedPath = USER_PATH @@ -27,7 +28,7 @@ class ControlNode: PurePath(str(__user_path), ".local", "share") ) - def __init__(self, ansible_proj_root: ExecutedPath | None): + def __init__(self, ansible_proj_root: ExecutedPath | None = None): if ansible_proj_root is not None: self.__proj_root = ansible_proj_root self.proj_roles: tuple[IdlePath] | list[IdlePath] = ( @@ -36,7 +37,9 @@ class ControlNode: ) self.role_data: tuple[IdlePath] | list[IdlePath] = ( PurePath(str(self.proj_roles[0]), "files"), - PurePath(str(self.proj_roles[0]), "templates") + PurePath(str(self.proj_roles[0]), "templates"), + PurePath(str(self.proj_roles[1]), "files"), + PurePath(str(self.proj_roles[1]), "templates") ) self.invvar_data: tuple[IdlePath] | list[IdlePath] = ( PurePath(str(self.__proj_root), "group_vars"), @@ -56,21 +59,34 @@ class ControlNode: return self.__conf_paths @property - def conf_paths(self) -> ExecutedPath: + def data_paths(self) -> ExecutedPath: return self.__data_paths class Softs(Enum): - OpenSSH = "ssh" + ssh = 0 +userSSHSubParams = { + "available": Required[tuple[ExecutedPath]], + "selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]], + "authorized": ExecutedPath | list[ExecutedPath] | int | list[int], + "used": ExecutedPath | list[ExecutedPath] | int | list[int] +} +__user_ssh_keys = { + "available": tuple(), + "selected": [0, 1], + "authorized": 1, + "used": 0 +} +def userSSHInit(self, username: str = "root", paths: _Apps | None = None, keys: userSSHSubParams = __user_ssh_keys, password: str = "password123", fate: Literal["disposal", "retention"] = "disposal"): + self.username = username + self.paths = paths + self.keys = keys + self.password = password + self.fate = fate userSSHParams = Dict("userSSHParams", { "username": Required[str], - "paths": Software._Apps, - "keys": { - "available": Required[tuple[ExecutedPath]], - "selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]], - "authorized": ExecutedPath | list[ExecutedPath] | int | list[int], - "used": ExecutedPath | list[ExecutedPath] | int | list[int] - }, + "paths": _Apps, + "keys": __user_ssh_keys, "password": Required[str], "fate": Literal["disposal", "retention"] }, total=False) @@ -84,37 +100,27 @@ __user_ssh_input = { "used": 0 }, "password": "password123", - "fate": "disposal" + "fate": "disposal",\ + "__init__": userSSHInit } -# @TODO remove unpacking of '__user_ssh_input' -userSSH = type("userSSH", (), **__user_ssh_input) +userSSH = type("userSSH", (), __user_ssh_input) -vps_schema = { - "fqdn": {"type": "string"}, +vpsSchema = Dict("vpsSchema", { + "fqdn": Required[str], "vps_service": { - "type": "dict", - "schema": { - "exists": {"type": "boolean"}, - "password": {"type": "string"}, - "api_key": {"type": "string"}, - "type": { - "type": "string", - "anyof": ["linode"] - }, - "region": { - "type": "string", - "anyof": ["us-east"] - }, - "ssh_authorized_keys": {"type": "list"}, - "root_fate": { - "type": "string", - "anyof": ["disposal", "retention"] - } - } + "exists": Required[bool], + "password": Required[str], + "api_key": Required[str], + "type": Required[Literal["linode"]], + "region": Literal["us-east"], + "ssh_authorized_keys": list[IdlePath | ExecutedPath | str], + "ssh_private_key_paths": list[IdlePath | ExecutedPath | str], + "ssh_private_key_path_pref": int, + "root_fate": Required[Literal["disposal","retention"]], + "ssh_motd_script_basenames": list[str] }, - "keywords": {"type": "list"} -} -vps_schema = constrain_by(vps_schema) + "keywords": list[str] +}, total=False) class VirtualPrivateServers(Enum): Linode = 0 @@ -123,7 +129,7 @@ class RemoteNode: # __user_path = _fqdn: str | None = None - def __init__(self, cnode: ControlNode, api_key: str, password: str, name: str, service: VirtualPrivateServers = VirtualPrivateServers.Linode.name, keywords: list[str] | None = None): + def __init__(self, cnode: ControlNode, api_key: str, password: str, name: str, service: VirtualPrivateServers = VirtualPrivateServers.Linode.name, region: Literal["us-east"] | None = None, keywords: list[str] | None = None): self.root: dict = dict() self.root["username"]: str = "root" vault = Vault(password) @@ -132,13 +138,13 @@ class RemoteNode: app_input = { SoftPathGroup.CONFIG.name: { - SoftScope.PERSONAL.name: PurePath(str(cnode.user_path), ("." + Softs.OpenSSH)) + SoftScope.PERSONAL.name: PurePath(str(cnode.user_path), ("." + Softs.ssh.name)) }, SoftPathGroup.DATA.name: { SoftScope.GLOBAL.name: PurePath(str(cnode.conf_paths[0]), "update-motd.d") } } - self.root["software"].declare(Softs.OpenSSH, **app_input) + self.root["software"].declare(Softs.ssh.name, **app_input) self._fqdn = name self.root["software"]._fqdn = name @@ -153,6 +159,11 @@ class RemoteNode: self.keywords = keywords self._api_key: str | None = api_key self.service = service + self.region = region + self.__authkeys_selected = False + self.__usedkeys_selected = False + self.__keys_selected = False + self.__finalized_keys = False def set_region(self, name: Literal["us-east"] = "us-east") -> None: self.region = name @@ -179,7 +190,7 @@ class RemoteNode: self.keywords: list = [] self.keywords += list(name) - def import_keys(self, key_basenames: str | tuple[str], match_sort: tuple[Callable, bool] = (lambda e: e, False)): + def import_keys(self, key_basenames: str | tuple[str], match_sort: tuple[Callable, bool] = (lambda e: e.stem, False)): keyfiles: list[ExecutedPath] | list[None] | None = list() if isinstance(key_basenames, tuple): for basename in key_basenames: @@ -196,10 +207,10 @@ class RemoteNode: keyfiles = updated_keyfiles self.ssh.paths = self.root["software"].ssh - self.ssh.keys["available"] = tuple(keyfiles.sort(reverse = match_sort[1], key = match_sort[0])) + self.ssh.keys["available"] = tuple(sorted(keyfiles, key=match_sort[0], reverse=match_sort[1])) - def show_keys(self, which: Literal["authorized", "used", "available", "selected"] = "available") -> tuple[tuple, str]: - delimiters: str | tuple = "{}" + def show_keys(self, which: Literal["authorized", "used", "available", "selected"] = "available", kformat = object) -> tuple[tuple, str]: + delimiters: str | tuple = "[]" gap: Callable[[bool], str | None] = lambda b: " " if b else "" sep_format: str = "{0}" sep: Callable[[str, bool], str] = lambda s, b: sep_format.format(s) + gap(b) @@ -235,17 +246,34 @@ class RemoteNode: stringified_keyfiles = list(map(lambda t: label(t[0], True) + str(t[1]), enumerate(keyfiles))) stringified_keyfiles = sep(",", True).join(stringified_keyfiles) - stringified_keyfiles = delimiters[0:len(delimiters)] + stringified_keyfiles[:(len(stringified_keyfiles) - 2)] + delimiters[1:] + stringified_keyfiles = delimiters[0] + stringified_keyfiles[:(len(stringified_keyfiles) - 2)] + delimiters[1:] - return (tuple(keyfiles), stringified_keyfiles) + if kformat == str: + result = stringified_keyfiles + elif kformat == list: + result = keyfiles + elif kformat == tuple: + result = tuple(keyfiles) + elif kformat == object: + result = (tuple(keyfiles), stringified_keyfiles) + + print(result) + return result def pick_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never: keyfiles = self.ssh.keys[source] + # print(keyfiles) + + if keyfiles is None: + raise TypeError + elif isinstance(keyfiles, (tuple, list)) and len(keyfiles) < 1: + raise ValueError authlist = [] - if source == "available" or source == "selected": + if source == "available": for s in selections: if isinstance(s, int): + # print(s) authlist.append(keyfiles[s]) elif isinstance(s, ExecutedPath): path_set = set([s]) @@ -262,32 +290,73 @@ class RemoteNode: if s in kf_strs: authlist.append(Path(s)) - if source == "available": - self.ssh.keys["selected"] = authlist - return self.ssh.keys["selected"] - elif source == "selected": - self.ssh.keys["authorized"] = authlist - return self.ssh.keys["authorized"] + self.ssh.keys["selected"] = authlist + self.__keys_selected = True + result = self.ssh.keys["selected"] + if source == "selected": + privkeys = list() + pubkeys = list() + + count = 1 + for s in selections: + if isinstance(s, int): + # print(s) + if count % 2 == 0: + pubkeys.append(keyfiles[s]) + else: + privkeys.append(keyfiles[s]) + elif isinstance(s, ExecutedPath): + path_set = set([s]) + kf_set = set(keyfiles) + overlap = kf_set & path_set + + if overlap is not None and len(overlap) > 0: + if count % 2 == 0: + pubkeys.append(list(overlap)[0]) + else: + privkeys.append(list(overlap)[0]) + else: + continue + elif isinstance(s, str): + kf_strs = list(map(lambda p: str(p), keyfiles)) + + if s in kf_strs: + if count % 2 == 0: + pubkeys.append(Path(s)) + else: + privkeys.append(Path(s)) + count += 1 + + self.ssh.keys["authorized"] = pubkeys + self.ssh.keys["used"] = privkeys + self.__authkeys_selected = True + self.__usedkeys_selected = True + result = (self.ssh.keys["authorized"], self.ssh.keys["used"]) elif source == "authorized": for s in selections: if isinstance(s, int): - authlist.append(keyfiles[s]) + if self.ssh.keys["selected"][s] in keyfiles: + authlist.append(self.ssh.keys["selected"][s]) elif isinstance(s, ExecutedPath): - for p in keyfiles: - if str(s) in str(p): + for p in self.ssh.keys["selected"]: + if str(s) in str(p) and str(s) in list(map(lambda p: str(p), keyfiles)): authlist.append(p) else: continue elif isinstance(s, str): - for p in keyfiles: - if s in str(p): + for p in self.ssh.keys["selected"]: + if s in str(p) and s in list(map(lambda p: str(p), keyfiles)): authlist.append(p) else: continue self.ssh.keys["used"] = authlist - return self.ssh.keys["used"] + self.__usedkeys_selected = True + result = self.ssh.keys["used"] + self.__finalized_keys = self.__keys_selected and self.__authkeys_selected and self.__usedkeys_selected + return result + # @TODO test and debug below 'RemoteNode' method def remove_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never: keyfiles = self.ssh.keys[source] @@ -324,9 +393,18 @@ class RemoteNode: return self.ssh.keys def itemize(self): - model = dict() + model: dict | vpsSchema = dict() vault_api = Vault(self._api_key) vault_pass = Vault(self.ssh.password) + # print(self.ssh.keys["selected"]) + + if not self.__keys_selected: + self.pick_keys("available", *self.ssh.keys["selected"]) + self.pick_keys("selected", self.ssh.keys["authorized"], self.ssh.keys["used"]) + # self.pick_keys("selected", self.ssh.keys["used"]) + + authorized_keys = list(map(lambda p: p.read_text(), self.ssh.keys["authorized"])) + used_keys = list(map(lambda p: str(p), self.ssh.keys["used"])) model["fqdn"] = self._fqdn model["vps_service"] = { @@ -335,10 +413,16 @@ class RemoteNode: "api_key": vault_api.dump(self._api_key), "type": self.service.lower(), "region": self.region, - "ssh_authorized_keys": self.ssh.keys["authorized"], - "root_fate": self.ssh.fate + "ssh_authorized_keys": authorized_keys, + "ssh_private_key_paths": used_keys, + "ssh_private_key_path_pref": choice(list(range(len(used_keys)))), + "root_fate": self.ssh.fate, + "ssh_motd_script_basenames": [] } model["keywords"] = self.keywords + return model - if vps_schema.validate(model): - return model +rnode = RemoteNode(ControlNode(), "dfsfsd", "sdgsadh", "test.io", VirtualPrivateServers.Linode.name, "us-east", ["meh"]) +rnode.import_keys("*ed25519*sukaato_*yubikey.p[up][bk]", (lambda e: e.stem, True)) +rnode.show_keys(kformat=tuple) +print(rnode.itemize()) \ No newline at end of file