from re import Pattern as RegEx from pathlib import Path, PurePath from custtypes import ExecutedPath, IdlePath, VirtualPrivateServers, AnsibleScopes from enum import Enum from softman import Apps from random import choice as gamble from collections.abc import Sequence from typing import Never, Union, Self, Callable, Required, Literal from typing import TypedDict as Dict from glob import glob as globbify from whereami import USER_PATH from softman import Softs # import os class RootFate(Enum): disposal = 0 retention = 1 class SSHKeyType(Enum): pubkey = 0 privkey = 1 dual = 2 # @TODO create unit tests for below class class SSHKey: def __init__(self, *path: ExecutedPath | str): if len(path) > 2 or len(path) < 1: raise ValueError path = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, path)) self.__idx: int = 0 self.__prev: Self | None = None self.__next: Self | None = None self.category: SSHKeyType | None = None if len(path) < 2: self.__value: ExecutedPath | tuple[ExecutedPath] = path[0] else: self.category = SSHKeyType.dual.name self.__value: ExecutedPath | tuple[ExecutedPath] = path def __int__(self) -> int: return self.__idx def __str__(self) -> str: return str(self.__value) def __repr__(self) -> str: return "SSHKey(" + str(self.__value) + ")" def __nonzero__(self) -> bool: return True def __format__(self, formatstr) -> str: match formatstr: case "item": return str(self.__idx) + ": " + str(self.__value) case "int": return str(self.__idx) case _: return str(self.__value) def __next__(self) -> ExecutedPath | tuple[ExecutedPath]: return self.__next def __prev__(self) -> ExecutedPath | tuple[ExecutedPath]: return self.__prev def __call__(self, *path: ExecutedPath | str | None) -> ExecutedPath | tuple[ExecutedPath]: if path is not None or len(path) > 0: if len(path) > 2: raise ValueError path = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, path)) if len(path) < 2: self.__value = path[0] else: self.__value = path return self.__value def __eq__(self, other: Self) -> bool: return self.__value == other._SSHKey__value def __ne__(self, other: Self) -> bool | Never: return self.__value != other._SSHKey__value def __eqcontent__(self, other: Self) -> bool: return self.__value.read_text() == other._SSHKey__value.read_text() def __neqcontent__(self, other: Self) -> bool: return self.__value.read_text() != other._SSHKey__value.read_text() def __len__(self): if isinstance(self.__value, tuple): return len(self.__value) else: return 1 def update(self, *path: ExecutedPath | str) -> Self | Never: if len(path) > 2 or len(path) < 1: raise ValueError path = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, path)) if len(path) < 2: self.__value = path[0] else: self.__value = path return self def __add__(self, other: Self | ExecutedPath | str): if isinstance(self.__value, tuple): raise ValueError if isinstance(other, Self): if isinstance(other.__SSHKey__value, tuple): raise ValueError result = self.update(self.__value, other._SSHKey__value) elif isinstance(other, (str, ExecutedPath)): result = self.update(self.__value, other) else: raise TypeError return result def __radd__(self, other: Self | ExecutedPath | str): if isinstance(self.__value, tuple): raise ValueError if isinstance(other, Self): if isinstance(other.__SSHKey__value, tuple): raise ValueError result = self.update(other._SSHKey__value, self.__value) elif isinstance(other, (str, ExecutedPath)): result = self.update(other, self.__value) else: raise TypeError return result # @TODO write following 2 subtraction algorithms using 'set' data type conversion and methods def __sub__(self, other: Self | ExecutedPath | str): raise NotImplementedError def __rsub__(self, other: Self | ExecutedPath | str): raise NotImplementedError def replace(self, old: ExecutedPath | str | tuple[ExecutedPath | str] | list[ExecutedPath | str], new: ExecutedPath | str | tuple[ExecutedPath | str] | list[ExecutedPath | str]) -> Self | Never: if isinstance(old, str): old = Path(old) if isinstance(new, str): new = Path(new) if isinstance(old, (list, tuple)): if len(old) > 2 or len(old) < 1: raise ValueError old = tuple(map(lambda p: Path(p) if isinstance(p, str) else p, old)) if isinstance(new, (list, tuple)): if len(new) > 2 or len(new) < 1: raise ValueError new = tuple(map(lambda p: Path(p) if isinstance(p, str) else p, new)) if isinstance(self.__value, (tuple, list)): if isinstance(old, tuple): remaining_value = list(filter(lambda p: p not in old, self.__value)) if isinstance(new, tuple): self.__value = (*remaining_value, *new) else: self.__value = (*remaining_value, new) else: remaining_value = list(filter(lambda p: p != old, self.__value)) if isinstance(new, tuple): self.__value = (*remaining_value, *new) else: self.__value = (*remaining_value, new) if len(self.__value) > 2: self.__value = self.__value[0] elif isinstance(self.__value, ExecutedPath): if isinstance(old, tuple): remaining_value = None if self.__value in old else self.__value else: remaining_value = None if self.__value == old else self.__value if remaining_value is None: self.__value = new else: raise ValueError return self def read(self, idx: int | None = None) -> str | tuple[str]: if idx is not None and isinstance(self.__value, tuple): result = self.__value[idx] else: if idx is not None: raise KeyError result = self.__value if isinstance(result, tuple): result = tuple(map(lambda p: p.read_text(), result)) else: result = result.read_text() return result @property def status(self) -> str | Never: # @TODO analyze 'read' return value of this class's instance to check whether it is a private key, # unless '__value' of this class's instance is a tuple, in which case it is a dual key raise NotImplementedError def reverse(self) -> Never: if isinstance(self.__value, tuple): v1 = self.__value[0] v2 = self.__value[1] result = self.update(v2, v1) else: result = self return result def prev(arg): if isinstance(arg, SSHKey): return arg._SSHKey__prev__() else: raise TypeError def stream_equal(s1, s2) -> bool | Never: if isinstance(s1, SSHKey) and isinstance(s2, SSHKey): return s1._SSHKey__eqcontent__(s2) else: raise TypeError def stream_unequal(s1, s2) -> bool | Never: if isinstance(s1, SSHKey) and isinstance(s2, SSHKey): return s1._SSHKey__neqcontent__(s2) else: raise TypeError # @TODO create unit tests for below class class SSHKeyCollection(Sequence): __user_path: ExecutedPath = USER_PATH __ssh_path: ExecutedPath = __user_path / ".ssh" def __init__(self): self.__current: SSHKey | None = None self.__first: SSHKey | None = None self.__last: SSHKey | None = None self.__indices: range | None = None # @TODO adjust item magic methods for slicing purposes def __getitem__(self, key: int | slice) -> SSHKey | Never: self.__current = self.__first if self.__current is None: raise KeyError if isinstance(key, int): if int(self.__current) == key: return self.__current else: while int(self.__current) != key: if self.__current is None: raise KeyError self.__current = next(self.__current) result = self.__current elif isinstance(key, slice): step = key.step sshkcoll = SSHKeyCollection() range_args = [] if hasattr(key, "start") and getattr(key, "start") is not None: range_args.append(key.start) if hasattr(key, "stop") and getattr(key, "stop") is not None: range_args.append(key.stop) if hasattr(key, "step") and getattr(key, "step") is not None: range_args.append(key.step) indices = range(*range_args) # test_coll = [] while int(self.__current) < key.stop: if self.__current is None: raise KeyError elif int(self.__current) < key.start: continue elif int(self.__current) >= key.start: if int(self.__current) in indices: sshkcoll.append(self.__current) test_coll.append(self.__current) else: continue self.__current = next(self.__current) # print(test_coll) result = sshkcoll return result def __len__(self) -> int: if self.__indices is None: return 0 return len(self.__indices) def pop(self, key: int = -1) -> Never: self.__current = self.__first if self.__current is None: raise KeyError if key == -1: if self.__last is not None: past = self.__last self.__last._SSHKey__prev._SSHKey__next = None self.__last = self.__last._SSHKey__prev self.__current = self.__last else: past = self.__first self.__first = None return past elif key <= -2: raise NotImplementedError else: while int(self.__current) != key: self.__current = next(self.__current) if self.__current is None: raise KeyError past = self.__current count = self.__current._SSHKey__idx prior = self.__current._SSHKey__prev posterior = self.__current._SSHKey__next posterior._SSHKey__prev = prior posterior._SSHKey__prev._SSHKey__next = posterior self.__current = posterior while self.__current is not None: self.__current._SSHKey__idx = count self.__current = next(self.__current) count += 1 return past def remove(self) -> Never: raise NotImplementedError def append(self, *value: ExecutedPath | str) -> SSHKey: if len(value) < 1 or len(value) > 2: raise ValueError value = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, value)) ssh_key = SSHKey(*value) if self.__first is None: # print("branch1") ssh_key._SSHKey__idx = 0 # print(ssh_key._SSHKey__idx) self.__indices = range(ssh_key._SSHKey__idx + 1) self.__first = ssh_key self.__current = self.__first else: # print("branch2") if self.__last is not None: # print("branch2.1") ssh_key._SSHKey__idx = self.__last._SSHKey__idx + 1 # print(ssh_key._SSHKey__idx) self.__last._SSHKey__next = ssh_key self.__last._SSHKey__next._SSHKey__prev = self.__last self.__last = next(self.__last) else: # print("branch2.2") ssh_key._SSHKey__idx = self.__first._SSHKey__idx + 1 # print(ssh_key._SSHKey__idx) self.__first._SSHKey__next = ssh_key self.__first._SSHKey__next._SSHKey__prev = self.__first self.__last = self.__first._SSHKey__next self.__indices = range(ssh_key._SSHKey__idx + 1) self.__current = self.__last #print(self.__current) return self.__current def __setitem__(self, key: int | slice, *value: ExecutedPath | str) -> None | Never: if len(value) < 1 or len(value) > 2: raise ValueError value = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, value)) self.__current = self.__first if self.__current is None: raise KeyError if isinstance(key, int): if int(self.__current) == key: if self.__current() is None or len(self.__current()) < 1: self.__current(*value) else: if int(self.__current) == key: return self.__current(*value) while int(self.__current) != key: if self.__current is None: raise KeyError self.__current = next(self.__current) self.__current(*value) elif isinstance(key, slice): raise NotImplementedError def __delitem__(self, key: int | slice) -> None | Never: self.__current = self.__first if self.__current is None: raise KeyError if isinstance(key, int): if key == -1: if self.__last is not None: self.__last._SSHKey__prev._SSHKey__next = None self.__last = self.__last._SSHKey__prev self.__current = self.__last else: self.__first = None return past elif key <= -2: raise NotImplementedError else: while int(self.__current) != key: self.__current = next(self.__current) if self.__current is None: raise KeyError count = self.__current._SSHKey__idx prior = self.__current._SSHKey__prev posterior = self.__current._SSHKey__next posterior._SSHKey__prev = prior posterior._SSHKey__prev._SSHKey__next = posterior self.__current = posterior while self.__current is not None: self.__current._SSHKey__idx = count self.__current = next(self.__current) count += 1 elif isinstance(key, slice): raise NotImplementedError @property def head(self) -> SSHKey | None: return self.__first @property def tail(self) -> SSHKey | None: if self.__last is None: return self.__first return self.__last def __contains__(self) -> bool | Never: raise NotImplementedError def __missing__(self) -> Never: raise NotImplementedError def __next__(self): raise NotImplementedError def __iter__(self) -> Self | Never: self.__Current = self.__first return self.__current def count(self, query: RegEx | str) -> int | Never: raise NotImplementedError # @TODO make sure to implement below method def pull(self, query: RegEx | str = "*") -> Never: if isinstance(query, RegEx): keypaths = self.__ssh_path.glob("*") for p in keypaths: if query.fullmatch(p.name): self.append(p) else: continue else: keypaths = self.__ssh_path.glob(query) for p in keypaths: self.append(p) def reverse(self) -> None | Never: raise NotImplementedError def sort(self, key: Callable = (lambda e: e), reverse: bool = False) -> None | Never: raise NotImplementedError # @TODO maybe move to separate module for classes for handling users and groups class UserSSH: def __init__(self, username: str = "root", paths: Apps | None = None, keys: dict = dict(), password: str = "password123", fate: RootFate = RootFate.disposal.name): self.username = username self.paths = paths self.keys = keys self.password = password self.fate = fate sshkey_coll = SSHKeyCollection() sshkey_coll.pull() # print(len(sshkey_coll) + 2) print(sshkey_coll[0:4][3])