diff --git a/anodes.py b/anodes.py index ef8c771..0c5ed76 100644 --- a/anodes.py +++ b/anodes.py @@ -5,7 +5,7 @@ Library of classes modeling Ansible nodes or their types. from pathlib import Path, PurePath from typing import TypeAlias as Neotype from typing import TypedDict as Dict -from typing import Never, Union, Literal, Required +from typing import Never, Union, Literal, Required, Self from collections.abc import Callable from custtypes import ExecutedPath, IdlePath, VirtualPrivateServers, AnsibleScopes from enum import Enum @@ -78,6 +78,10 @@ class ControlNode: class Softs(Enum): ssh = 0 +class RootFate(Enum): + disposal = 0 + retention = 1 + _userSSHSubParams = { "available": Required[tuple[ExecutedPath]], "selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]], @@ -90,12 +94,6 @@ __user_ssh_keys = { "authorized": 1, "used": 0 } -def __userSSHInit(self, username: str = "root", paths: _Apps | None = None, keys: _userSSHSubParams = __user_ssh_keys, password: str = "password123", fate: Literal["disposal", "retention"] = "disposal"): - self.username = username - self.paths = paths - self.keys = keys - self.password = password - self.fate = fate userSSHParams = Dict("userSSHParams", { "username": Required[str], "paths": _Apps, @@ -116,7 +114,212 @@ __user_ssh_input = { "fate": "disposal",\ "__init__": __userSSHInit } -userSSH = type("userSSH", (), __user_ssh_input) +#userSSH = type("userSSH", (), __user_ssh_input) + +class SSHKey: + def __init__(self, *path: ExecutedPath): + if len(path) > 2 or len(path) < 1: + raise ValueError + + self.kind: Literal["public_key","private_key", "dual"] | str | None = None + + self.__idx: int = 0 + self.__prev: Self | None = None + self.__next: Self | None = None + + if len(path) < 2: + self.__value: ExecutedPath | tuple[ExecutedPath] = path[0] + else: + self.__value: ExecutedPath | tuple[ExecutedPath] = path + + def __int__(self) -> int: + return self.__idx + + def __str__(self) -> str: + return str(self.__value) + + def __repr__(self) -> ExecutedPath | tuple[ExecutedPath]: + return self.__value + + def __nonzero__(self) -> bool: + return True + + def __format__(self, formatstr) -> str: + match formatstr: + case "item": + return str(self.__idx) + ": " + str(self.__value) + case "int": + return str(self.__idx) + case _: + return str(self.__value) + + def __next__(self) -> ExecutedPath | tuple[ExecutedPath]: + return self.__next + + def __prev__(self) -> ExecutedPath | tuple[ExecutedPath]: + return self.__prev + + def __call__(self) -> ExecutedPath | tuple[ExecutedPath]: + return self.__value + + def update(self, *path: ExecutedPath | str) -> None | Never: + if len(path) > 2 or len(path) < 1: + raise ValueError + + path = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, path)) + + if len(path) < 2: + self.__value = path[0] + else: + self.__value = path + + def replace(self, old: ExecutedPath | str | tuple[ExecutedPath | str] | list[ExecutedPath | str], new: ExecutedPath | str | tuple[ExecutedPath | str] | list[ExecutedPath | str]) -> None | Never: + if isinstance(old, str): + old = Path(old) + if isinstance(new, str): + new = Path(new) + + if isinstance(old, (list, tuple)): + if len(old) > 2 or len(old) < 1: + raise ValueError + + old = tuple(map(lambda p: Path(p) if isinstance(p, str) else p, old)) + if isinstance(new, (list, tuple)): + if len(new) > 2 or len(new) < 1: + raise ValueError + + new = tuple(map(lambda p: Path(p) if isinstance(p, str) else p, new)) + + if isinstance(self.__value, (tuple, list)): + if isinstance(old, tuple): + remaining_value = list(filter(lambda p: p not in old, self.__value)) + + if isinstance(new, tuple): + self.__value = (*remaining_value, *new) + else: + self.__value = (*remaining_value, new) + else: + remaining_value = list(filter(lambda p: p != old, self.__value)) + + if isinstance(new, tuple): + self.__value = (*remaining_value, *new) + else: + self.__value = (*remaining_value, new) + + if len(self.__value) > 2: + self.__value = self.__value[0] + elif isinstance(self.__value, ExecutedPath): + if isinstance(old, tuple): + remaining_value = None if self.__value in old else self.__value + else: + remaining_value = None if self.__value == old else self.__value + + if remaining_value is None: + self.__value = new + else: + raise ValueError + + def publish(self, idx: int | None = None) -> str | tuple[str]: + if idx is not None: + result = self.__value[idx] + else: + result = self.__value + + if isinstance(result, tuple): + result = tuple(map(lambda p: p.read_text(), result)) + else: + result = result.read_text() + + return result + + @property + def status(self) -> Never: + # @TODO this method should return string or Enum value after analyzing whether this key is public or private + raise NotImplementedError + +class SSHKeyCollection: + def __init__(self): + self.__current: SSHKey | None = None + self.__first: SSHKey | None = None + self.__last: SSHKey | None = None + self.__indices: range | None = None + + def __setitem__(self, key: int, *value: ExecutedPath | str) -> None | Never: + if len(value) < 1 or len(value) > 2: + raise ValueError + + value = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, value)) + + if self.__current is None: + self.__current = SSHKey(*value) + self.__current._SSHKey__idx = key + elif int(self.__current) == key: + if self.__current() is None or len(self.__current()) < 1: + self.__current.update(*value) + else: + while int(self.__current) != key: + if next(self.__current) is not None: + self.__current = next(self.__current) + else: + break + + self.__current.update(*value) + + def __getitem__(self, key: int) -> SSHKey: + if self.__current is None: + raise KeyError + elif int(self.__current) == key: + return self.__current + else: + while int(self.__current) != key: + if next(self.__current) is not None: + self.__current = next(self.__current) + else: + break + + return self.__current + + def __delitem__(self, key: int) -> Never: + raise NotImplementedError + + def append(self, *value: ExecutedPath | str) -> None | Never: + if len(value) < 1 or len(value) > 2: + raise ValueError + + value = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, value)) + + ssh_key = SSHKey(*value) + + if self.__first is None: + ssh_key._SSHKey__idx = 0 + self.__indices = range(ssh_key._SSHKey__idx + 1) + self.__first = ssh_key + + if self.__last is None: + self.__last = ssh_key + + self.__first._SSHKey__next = self.__last + self.__last._SSHKey__prev = self.__first + + self.__current = self.__first + else: + ssh_key._SSHKey__idx = len(self.__indices) + + ssh_key._SSHKey__prev = self.__last + self.__last = ssh_key + + self.__indices = range(ssh_key._SSHKey__idx + 1) + + def import_keys(self) -> Never: + raise NotImplementedError + +class UserSSH: + def __init__(self, username: str = "root", paths: _Apps | None = None, keys: _userSSHSubParams = __user_ssh_keys, password: str = "password123", fate: RootFate = RootFate.disposal.name): + self.username = username + self.paths = paths + self.keys = keys + self.password = password + self.fate = fate vpsSchema = Dict("vpsSchema", { "fqdn": Required[str], @@ -164,7 +367,7 @@ class RemoteNode: "username": self.root["username"], "password": self.root["password"] } - self.ssh: userSSH = userSSH(**root_ssh_input) + self.ssh: UserSSH = UserSSH(**root_ssh_input) self.apps: list = self.root["software"].list(contents = True) self.keywords = keywords