from re import Pattern as RegEx from re import fullmatch as Match from pathlib import Path, PurePath from custtypes import ExecutedPath, IdlePath from enum import StrEnum, auto from random import choice as gamble # from collections.abc import Sequence, Iterable from typing import Never, Self, Callable, Iterable, Sequence from whereami import USER_PATH from itertools import chain # import os class SSHKeyType(StrEnum): pubkey = auto() privkey = auto() dual = auto() # @TODO create unit tests for below class class SSHKey: def __init__(self, *path: ExecutedPath | str): if len(path) > 2 or len(path) < 1: raise ValueError path = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, path)) self.__idx: int = 0 self.__prev: Self | None = None self.__next: Self | None = None self.category: SSHKeyType | None = None if len(path) < 2: self.__value: ExecutedPath | tuple[ExecutedPath] = path[0] else: self.category = SSHKeyType.dual.name.lower() self.__value: ExecutedPath | tuple[ExecutedPath] = path def __int__(self) -> int: return self.__idx def update_status(self) -> None: if isinstance(self.__value, tuple): privkey_present = False pubkey_present = False for p in self.__value: if "-----BEGIN OPENSSH PRIVATE KEY-----" in p.read_text(): privkey_present = True else: pubkey_present = True if pubkey_present and privkey_present: self.category = SSHKeyType.dual.name.lower() elif pubkey_present or privkey_present: if pubkey_present: self.category = SSHKeyType.pubkey.name.lower() if privkey_present: self.category = SSHKeyType.privkey.name.lower() elif isinstance(self.__value, ExecutedPath): if "-----BEGIN OPENSSH PRIVATE KEY-----" in self.__value.read_text(): self.category = SSHKeyType.privkey.name.lower() else: self.category = SSHKeyType.pubkey.name.lower() def __str__(self) -> str: if isinstance(self.__value, tuple): key_basename = Path(str(self.__value[0])).stem + "." + self.category else: key_basename = Path(str(self.__value)).name return "🔑" + key_basename def __repr__(self) -> str: return "%s(%r)" % (self.__class__.__name__, self.__value) def __nonzero__(self) -> bool: return True def __format__(self, formatstr) -> str: match formatstr: case "item": return str(self.__idx) + ": " + str(self.__value) case "int": return str(self.__idx) case _: return str(self) def __next__(self) -> ExecutedPath | tuple[ExecutedPath]: return self.__next def __prev__(self) -> ExecutedPath | tuple[ExecutedPath]: return self.__prev def __call__(self, *path: ExecutedPath | str | None) -> ExecutedPath | tuple[ExecutedPath]: if path is not None or len(path) > 0: if len(path) > 2: raise ValueError path = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, path)) if len(path) < 2: self.__value = path[0] else: self.__value = path return self.__value def __eq__(self, other: Self) -> bool: return self.__value == other._SSHKey__value def __ne__(self, other: Self) -> bool | Never: return self.__value != other._SSHKey__value def __eqcontent__(self, other: Self) -> bool: return self.__value.read_text() == other._SSHKey__value.read_text() def __neqcontent__(self, other: Self) -> bool: return self.__value.read_text() != other._SSHKey__value.read_text() def __len__(self): if isinstance(self.__value, tuple): return len(self.__value) else: return 1 def update(self, *path: ExecutedPath | str) -> Self | Never: if len(path) > 2 or len(path) < 1: raise ValueError path = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, path)) if len(path) < 2: self.__value = path[0] else: self.__value = path return self def __add__(self, other: Self | ExecutedPath | str) -> Self: if isinstance(other, (str, ExecutedPath)): if isinstance(self.__value, tuple): self.update(*self.__value, other) else: self.update(*self.__value, other) else: if isinstance(other._SSHKey__value, tuple): if isinstance(self.__value, tuple): self.update(*self.__value, *other._SSHKey__value) else: self.update(self.__value, *other._SSHKey__value) else: if isinstance(self.__value, tuple): self.update(*self.__value, other._SSHKey__value) else: self.update(self.__value, other._SSHKey__value) self.update_status() return self def __radd__(self, other: Self | ExecutedPath | str) -> Self: if isinstance(self.__value, tuple): if isinstance(other, (ExecutedPath, str)): other.update(other, *self.__value) else: if isinstance(other._SSHKey__value, tuple): other.update(*other._SSHKey__value, *self.__value) else: other.update(other._SSHKey__value, *self.__value) else: if isinstance(other, (ExecutedPath, str)): other.update(other, self.__value) else: if isinstance(other._SSHKey__value, tuple): other.update(*other._SSHKey__value, self.__value) else: other.update(other._SSHKey__value, self.__value) other.update_status() return self # @TODO write following 2 subtraction algorithms using 'set' data type conversion and methods def __sub__(self, other: Self | ExecutedPath | str) -> Never: raise NotImplementedError def __rsub__(self, other: Self | ExecutedPath | str) -> Never: raise NotImplementedError def __getitem__(self, key: int) -> ExecutedPath | str | Never: if isinstance(self.__value, tuple): return self.__value[key] else: raise KeyError def __setitem__(self, key: int, value: ExecutedPath | str) -> None: if isinstance(self.__value, tuple): new_entry = list(self.__value) if isinstance(value, str): value = Path(value) new_entry[key] = value self.__value = tuple(new_entry) else: raise KeyError def replace(self, old: ExecutedPath | str | tuple[ExecutedPath | str] | list[ExecutedPath | str], new: ExecutedPath | str | tuple[ExecutedPath | str] | list[ExecutedPath | str]) -> Self | Never: if isinstance(old, str): old = Path(old) if isinstance(new, str): new = Path(new) if isinstance(old, (list, tuple)): if len(old) > 2 or len(old) < 1: raise ValueError old = tuple(map(lambda p: Path(p) if isinstance(p, str) else p, old)) if isinstance(new, (list, tuple)): if len(new) > 2 or len(new) < 1: raise ValueError new = tuple(map(lambda p: Path(p) if isinstance(p, str) else p, new)) if isinstance(self.__value, (tuple, list)): if isinstance(old, tuple): remaining_value = list(filter(lambda p: p not in old, self.__value)) if isinstance(new, tuple): self.__value = (*remaining_value, *new) else: self.__value = (*remaining_value, new) else: remaining_value = list(filter(lambda p: p != old, self.__value)) if isinstance(new, tuple): self.__value = (*remaining_value, *new) else: self.__value = (*remaining_value, new) if len(self.__value) > 2: self.__value = self.__value[0] elif isinstance(self.__value, ExecutedPath): if isinstance(old, tuple): remaining_value = None if self.__value in old else self.__value else: remaining_value = None if self.__value == old else self.__value if remaining_value is None: self.__value = new else: raise ValueError return self def read(self, idx: int | None = None) -> str | tuple[str]: if idx is not None and isinstance(self.__value, tuple): result = self.__value[idx] else: if idx is not None: raise KeyError result = self.__value if isinstance(result, tuple): result = tuple(map(lambda p: p.read_text(), result)) else: result = result.read_text() return result @property def status(self) -> str: self.update_status() return self.category def reverse(self) -> Never: if isinstance(self.__value, tuple): v1 = self.__value[0] v2 = self.__value[1] result = self.update(v2, v1) else: result = self return result def prev(arg): if isinstance(arg, SSHKey): return arg._SSHKey__prev__() else: raise TypeError def stream_equal(s1, s2) -> bool | Never: if isinstance(s1, SSHKey) and isinstance(s2, SSHKey): return s1._SSHKey__eqcontent__(s2) else: raise TypeError def stream_unequal(s1, s2) -> bool | Never: if isinstance(s1, SSHKey) and isinstance(s2, SSHKey): return s1._SSHKey__neqcontent__(s2) else: raise TypeError # @TODO create unit tests for below class class SSHKeyCollection(Sequence): __user_path: ExecutedPath = USER_PATH() __ssh_path: ExecutedPath = __user_path / ".ssh" def __init__(self): self.__current: SSHKey | None = None self.__first: SSHKey | None = None self.__last: SSHKey | None = None self.__indices: range | None = None # @TODO allow initialization with unpacked parameter or sequence/iterable argument def __getitem__(self, key: int | slice) -> SSHKey | Never: self.__current = self.__first if self.__current is None: raise KeyError if isinstance(key, int): if int(self.__current) == key: return self.__current else: while int(self.__current) != key: if self.__current is None: raise KeyError self.__current = next(self.__current) result = self.__current elif isinstance(key, slice): step = key.step sshkcoll = SSHKeyCollection() if hasattr(key, "start"): if getattr(key, "start") is None: start = 0 else: start = key.start if hasattr(key, "stop"): if getattr(key, "stop") is None: stop = len(self.__indices) else: stop = key.stop if hasattr(key, "step"): if getattr(key, "step") is None: step = 1 else: step = key.step indices = range(start, stop, step) # test_coll = [] while int(self.__current) < stop: if int(self.__current) < start: continue elif int(self.__current) >= start: if int(self.__current) in indices: sshkcoll.append(self.__current) # test_coll.append(self.__current) else: continue self.__current = next(self.__current) if self.__current is None: break # print(test_coll) result = sshkcoll return result def __len__(self) -> int: if self.__indices is None: return 0 return len(self.__indices) def pop(self, key: int = -1) -> Never: self.__current = self.__first if self.__current is None: raise KeyError if key == -1: if self.__last is not None: past = self.__last self.__last._SSHKey__prev._SSHKey__next = None self.__last = self.__last._SSHKey__prev self.__current = self.__last else: past = self.__first self.__first = None return past elif key <= -2: raise NotImplementedError else: while int(self.__current) != key: self.__current = next(self.__current) if self.__current is None: raise KeyError past = self.__current count = self.__current._SSHKey__idx prior = self.__current._SSHKey__prev posterior = self.__current._SSHKey__next posterior._SSHKey__prev = prior posterior._SSHKey__prev._SSHKey__next = posterior self.__current = posterior while self.__current is not None: self.__current._SSHKey__idx = count self.__current = next(self.__current) count += 1 return past def remove(self) -> Never: raise NotImplementedError def append(self, *value: ExecutedPath | str) -> SSHKey: if len(value) < 1 or len(value) > 2: raise ValueError value = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, value)) ssh_key = SSHKey(*value) if self.__first is None: # print("branch1") ssh_key._SSHKey__idx = 0 # print(ssh_key._SSHKey__idx) ssh_key.update_status() self.__indices = range(ssh_key._SSHKey__idx + 1) self.__first = ssh_key self.__current = self.__first else: # print("branch2") if self.__last is not None: # print("branch2.1") ssh_key._SSHKey__idx = self.__last._SSHKey__idx + 1 # print(ssh_key._SSHKey__idx) ssh_key.update_status() self.__last._SSHKey__next = ssh_key self.__last._SSHKey__next._SSHKey__prev = self.__last self.__last = next(self.__last) else: # print("branch2.2") ssh_key._SSHKey__idx = self.__first._SSHKey__idx + 1 # print(ssh_key._SSHKey__idx) ssh_key.update_status() self.__first._SSHKey__next = ssh_key self.__first._SSHKey__next._SSHKey__prev = self.__first self.__last = self.__first._SSHKey__next self.__indices = range(ssh_key._SSHKey__idx + 1) self.__current = self.__last #print(self.__current) return self.__current def __setitem__(self, key: int | slice, *value: ExecutedPath | str) -> None | Never: if len(value) < 1 or len(value) > 2: raise ValueError value = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, value)) self.__current = self.__first if self.__current is None: raise KeyError if isinstance(key, int): if int(self.__current) == key: if self.__current() is None or len(self.__current()) < 1: self.__current(*value) else: if int(self.__current) == key: return self.__current(*value) while int(self.__current) != key: if self.__current is None: raise KeyError self.__current = next(self.__current) self.__current(*value) elif isinstance(key, slice): raise NotImplementedError def __delitem__(self, key: int | slice) -> None | Never: self.__current = self.__first if self.__current is None: raise KeyError if isinstance(key, int): if key == -1: if self.__last is not None: self.__last._SSHKey__prev._SSHKey__next = None self.__last = self.__last._SSHKey__prev self.__current = self.__last else: self.__first = None return past elif key <= -2: raise NotImplementedError else: while int(self.__current) != key: self.__current = next(self.__current) if self.__current is None: raise KeyError count = self.__current._SSHKey__idx prior = self.__current._SSHKey__prev posterior = self.__current._SSHKey__next posterior._SSHKey__prev = prior posterior._SSHKey__prev._SSHKey__next = posterior self.__current = posterior while self.__current is not None: self.__current._SSHKey__idx = count self.__current = next(self.__current) count += 1 elif isinstance(key, slice): raise NotImplementedError @property def head(self) -> SSHKey | None: return self.__first @property def tail(self) -> SSHKey | None: if self.__last is None: return self.__first return self.__last def __contains__(self, value: ExecutedPath | str) -> bool: self.__current = self.__first if isinstance(value, ExecutedPath): value = str(value) is_contained = False while self.__current is not None: if str(self.__current._SSHKey__value) == value: is_contained = True break self.__current = next(self.__current) return is_contained def __missing__(self, value: ExecutedPath | str) -> Never: raise NotImplementedError def __next__(self): self.__current = next(self.__current) if self.__current is not None: return self.__current else: raise StopIteration def __iter__(self) -> Self | Never: self.__current = self.__first # return self.__current return self def count(self, query: RegEx | str) -> int | Never: raise NotImplementedError def pull(self, query: RegEx | str = "*") -> None: if isinstance(query, RegEx): keypaths = self.__ssh_path.glob("*") for p in keypaths: if query.fullmatch(p.name): if not Match("(known_hosts|authorized_keys|config).*", p.name): self.append(p) else: continue else: keypaths = self.__ssh_path.glob(query) for p in keypaths: if not Match("(known_hosts|authorized_keys|config).*", p.name): self.append(p) def reverse(self) -> None | Never: raise NotImplementedError def sort(self, key: Callable = (lambda e: e), reverse: bool = False) -> None | Never: raise NotImplementedError def __str__(self) -> str: prefix = "[(" postfix = "|]" self.__current = self.__first concat = lambda s: str(s) + ", " content = str() count = 0 while self.__current is not None: content += str(count) + ">" + concat(self.__current) self.__current = next(self.__current) count += 1 content = content[0:len(content)-2] return prefix + content + postfix def index(self, item: str | ExecutedPath) -> Never: raise NotImplementedError def publish(self, category: SSHKeyType | str | None = SSHKeyType.pubkey, pref: int | None = None, datatype = list): privkey = list() pubkey = list() self.__current = self.__first # @TODO create conditional case that publishes all keys if datatype == list: while self.__current is not None: # print(self.__current) if self.__current.category == SSHKeyType.privkey.name.lower(): privkey.append(self.__current._SSHKey__value) elif self.__current.category == SSHKeyType.pubkey.name.lower(): pubkey.append(self.__current._SSHKey__value) elif self.__current.category == SSHKeyType.dual.name.lower(): privkey.append(self.__current._SSHKey__value[0]) pubkey.append(self.__current._SSHKey__value[1]) self.__current = next(self.__current) # print("publish running...") if pref is None: preference = gamble(range(len(privkey))) else: preference = pref # print(category) if category.name.lower() == SSHKeyType.pubkey.name.lower(): return pubkey elif category.name.lower() == SSHKeyType.privkey.name.lower(): return (privkey, preference) else: return (privkey, pubkey, preference) elif datatype == dict: # @TODO have result var equal to instance of a yaml.YAMLObject class from parse module raise NotImplementedError return result def __repr__(self) -> str: return "%s()" % (self.__class__.__name__)