From e412e3d5ab905a4abbae64e42cf232d5a6cc8598 Mon Sep 17 00:00:00 2001 From: Alex Tavarez Date: Wed, 7 Jan 2026 12:07:07 -0500 Subject: [PATCH] changed method name and added inheritance for 'Software' class, moved 'SSHKey' related classes to separate file and fixed 'Software' instance method call --- anodes.py | 16 ++--- sshkey_man.py | 195 ++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 152 insertions(+), 59 deletions(-) diff --git a/anodes.py b/anodes.py index e377a63..30f67b0 100644 --- a/anodes.py +++ b/anodes.py @@ -4,10 +4,10 @@ Library of classes modeling Ansible nodes or their types. from enum import Enum from pathlib import Path, PurePath -from typing import TypeAlias as Neotype, TypedDict as Dict -from typing import Never, Union, Literal, Required, Self +from typing import TypedDict as Dict +from typing import Union, Literal, Required, Self from custtypes import ExecutedPath, IdlePath, VirtualPrivateServers, AnsibleScopes -from softman import Software, SoftPathGroup, SoftScope, Apps +from softman import Software, SoftPathGroup, SoftScope, Apps, Softs from whereami import USER_PATH, PROJ_ROOT from ansible_vault import Vault import secrets @@ -71,9 +71,6 @@ class ControlNode: def sys_data(self) -> ExecutedPath: return self.__data_paths -class Softs(Enum): - ssh = 0 - # userSSHParams = Dict("userSSHParams", { # "username": Required[str], # "paths": Apps, @@ -119,7 +116,7 @@ class RemoteNode: SoftScope.GLOBAL.name: PurePath(str(cnode.sys_confs[0]), "update-motd.d") } } - self.root["software"].declare(Softs.ssh.name, **app_input) + self.root["software"].append(Softs.ssh.name, **app_input) self._fqdn = name self.root["software"]._fqdn = name @@ -136,12 +133,7 @@ class RemoteNode: self._api_key: str | None = api_key self.service = service self.region = region - self.__authkeys_selected = False - self.__usedkeys_selected = False - self.__keys_selected = False - self.__finalized_keys = False self.model: dict | None = None - self.__key_accumulator: tuple | list | None = [] def set_region(self, name: Literal["us-east"] = "us-east") -> None: self.region = name diff --git a/sshkey_man.py b/sshkey_man.py index 943c551..badacf1 100644 --- a/sshkey_man.py +++ b/sshkey_man.py @@ -4,6 +4,12 @@ from custtypes import ExecutedPath, IdlePath, VirtualPrivateServers, AnsibleScop from enum import Enum from softman import Apps from random import gamble +from collections.abc import Sequence +from typing import Never, Union, Self, Callable +from glob import glob as globbify +from whereami import USER_PATH +from softman import Softs +import os class RootFate(Enum): disposal = 0 @@ -38,7 +44,7 @@ class SSHKey: return str(self.__value) def __repr__(self) -> ExecutedPath | tuple[ExecutedPath]: - return self.__value + return "SSHKey(" + str(self.__value) + ")" def __nonzero__(self) -> bool: return True @@ -58,9 +64,26 @@ class SSHKey: def __prev__(self) -> ExecutedPath | tuple[ExecutedPath]: return self.__prev - def __call__(self) -> ExecutedPath | tuple[ExecutedPath]: + def __call__(self, *path: ExecutedPath | str | None) -> ExecutedPath | tuple[ExecutedPath]: + if path is not None or len(path) > 0: + if len(path) > 2: + raise ValueError + + path = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, path)) + + if len(path) < 2: + self.__value = path[0] + else: + self.__value = path + return self.__value + def __eq__(self, other: Self) -> bool: + return self.__value == other._SSHKey__value + + def __eqcontent__(self, other: Self) -> bool: + return self.__value.read_text() == other._SSHKey__value.read_text() + def update(self, *path: ExecutedPath | str) -> None | Never: if len(path) > 2 or len(path) < 1: raise ValueError @@ -118,10 +141,13 @@ class SSHKey: else: raise ValueError - def publish(self, idx: int | None = None) -> str | tuple[str]: - if idx is not None: + def read(self, idx: int | None = None) -> str | tuple[str]: + if idx is not None and isinstance(self.__value, tuple): result = self.__value[idx] else: + if idx is not None: + raise KeyError + result = self.__value if isinstance(result, tuple): @@ -136,49 +162,54 @@ class SSHKey: # @TODO this method should return string or Enum value after analyzing whether this key is public or private raise NotImplementedError -class SSHKeyCollection: +def prev(arg): + if isinstance(arg, SSHKey): + return arg._SSHKey__prev__() + else: + raise TypeError + +def stream_equal(s1, s2) -> bool | Never: + if isinstance(s1, SSHKey) and isinstance(s2, SSHKey): + return s1._SSHKey__eqcontent__(s2) + else: + raise TypeError + +class SSHKeyCollection(Sequence): + __user_path: ExecutedPath = USER_PATH + def __init__(self): self.__current: SSHKey | None = None self.__first: SSHKey | None = None self.__last: SSHKey | None = None self.__indices: range | None = None - def __setitem__(self, key: int, *value: ExecutedPath | str) -> None | Never: - if len(value) < 1 or len(value) > 2: - raise ValueError - - value = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, value)) - - if self.__current is None: - self.__current = SSHKey(*value) - self.__current._SSHKey__idx = key - elif int(self.__current) == key: - if self.__current() is None or len(self.__current()) < 1: - self.__current.update(*value) - else: - while int(self.__current) != key: - if next(self.__current) is not None: - self.__current = next(self.__current) - else: - break - - self.__current.update(*value) - - def __getitem__(self, key: int) -> SSHKey: + def __getitem__(self, key: int) -> SSHKey | Never: if self.__current is None: raise KeyError elif int(self.__current) == key: return self.__current else: + self.__current = self.__first + while int(self.__current) != key: if next(self.__current) is not None: self.__current = next(self.__current) else: + # raise StopIteration break return self.__current - def __delitem__(self, key: int) -> Never: + def __len__(self) -> int: + if self.__indices is None: + return 0 + + return len(self.__indices) + + def pop(self) -> Never: + raise NotImplementedError + + def remove(self) -> Never: raise NotImplementedError def append(self, *value: ExecutedPath | str) -> None | Never: @@ -194,44 +225,114 @@ class SSHKeyCollection: self.__indices = range(ssh_key._SSHKey__idx + 1) self.__first = ssh_key - if self.__last is None: - self.__last = ssh_key - - self.__first._SSHKey__next = self.__last - self.__last._SSHKey__prev = self.__first - self.__current = self.__first else: ssh_key._SSHKey__idx = len(self.__indices) - ssh_key._SSHKey__prev = self.__last - self.__last = ssh_key + if self.__last is not None: + self.__last._SSHKey__next = ssh_key + self.__last._SSHKey__next._SSHKey__prev = self.__last + self.__last = next(self.__last) + else: + self.__first._SSHKey__next = ssh_key + self.__first._SSHKey__next._SSHKey__prev = self.__first + self.__last = self.__first._SSHKey__next self.__indices = range(ssh_key._SSHKey__idx + 1) self.__current = self.__last - def pop(self) -> Never: + def __setitem__(self, key: int, *value: ExecutedPath | str) -> None | Never: + if len(value) < 1 or len(value) > 2: + raise ValueError + + value = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, value)) + + if self.__current is None: + raise KeyError + elif int(self.__current) == key: + if self.__current() is None or len(self.__current()) < 1: + self.__current(*value) + else: + self.__current = self.__first + + while int(self.__current) != key: + if next(self.__current) is not None: + self.__current = next(self.__current) + else: + # raise StopIteration + break + + self.__current(*value) + + def __delitem__(self, key: int) -> None | Never: + if self.__current is None: + raise KeyError + elif int(self.__current) == key: + prior = self.__current._SSHKey__prev + posterior = self.__current._SSHKey__next + posterior._SSHKey__idx = self.__current._SSHKey__idx + prior._SSHKey__next = posterior + prior._SSHKey__next._SSHKey__prev = prior + self.__current = prior._SSHKey__next + + while next(self.__current) is not None: + self.__current = next(self.__current) + self.__current._SSHKey__idx += 1 + else: + self.__current = self.__first + + while int(self.__current) != key: + if next(self.__current) is not None: + self.__current = next(self.__current) + else: + # raise StopIteration + break + + prior = self.__current._SSHKey__prev + posterior = self.__current._SSHKey__next + posterior._SSHKey__idx = self.__current._SSHKey__idx + prior._SSHKey__next = posterior + prior._SSHKey__next._SSHKey__prev = prior + self.__current = prior._SSHKey__next + + while next(self.__current) is not None: + self.__current = next(self.__current) + self.__current._SSHKey__idx += 1 + + @property + def head(self) -> SSHKey | None: + return self.__first + + @property + def tail(self) -> SSHKey | None: + if self.__last is None: + return self.__first + + return self.__last + + # @TODO make sure to implement below method + def __contains__(self) -> bool | Never: raise NotImplementedError - def remove(self) -> Never: + def __missing__(self) -> Never: + raise NotImplementedError + + # @TODO make sure to implement below method + def __iter__(self) -> Self | Never: + raise NotImplementedError + + def count(self, query: RegEx | str) -> int | Never: raise NotImplementedError def pull(self, query: RegEx | str = "*") -> Never: raise NotImplementedError - def __len__(self): - return len(self.__indices) - - def __contains__(self): + def reverse(self) -> None | Never: raise NotImplementedError - def __missing__(self): + def sort(self, key: Callable = (lambda e: e), reverse: bool = False) -> None | Never: raise NotImplementedError - def __iter__(self): - raise NotImplementedError - - class UserSSH: def __init__(self, username: str = "root", paths: Apps | None = None, keys: _userSSHSubParams = __user_ssh_keys, password: str = "password123", fate: RootFate = RootFate.disposal.name): self.username = username