Files
skato-ansible/sshkey_man.py

342 lines
11 KiB
Python

from re import Pattern as RegEx
from pathlib import Path, PurePath
from custtypes import ExecutedPath, IdlePath, VirtualPrivateServers, AnsibleScopes
from enum import Enum
from softman import Apps
from random import gamble
from collections.abc import Sequence
from typing import Never, Union, Self, Callable
from glob import glob as globbify
from whereami import USER_PATH
from softman import Softs
import os
class RootFate(Enum):
disposal = 0
retention = 1
class SSHKeyType(Enum):
pubkey = 0
privkey = 1
dual = 2
class SSHKey:
def __init__(self, *path: ExecutedPath):
if len(path) > 2 or len(path) < 1:
raise ValueError
self.__idx: int = 0
self.__prev: Self | None = None
self.__next: Self | None = None
self.category: SSHKeyType | None = None
if len(path) < 2:
self.__value: ExecutedPath | tuple[ExecutedPath] = path[0]
else:
self.category = SSHKeyType.dual.name
self.__value: ExecutedPath | tuple[ExecutedPath] = path
def __int__(self) -> int:
return self.__idx
def __str__(self) -> str:
return str(self.__value)
def __repr__(self) -> ExecutedPath | tuple[ExecutedPath]:
return "SSHKey(" + str(self.__value) + ")"
def __nonzero__(self) -> bool:
return True
def __format__(self, formatstr) -> str:
match formatstr:
case "item":
return str(self.__idx) + ": " + str(self.__value)
case "int":
return str(self.__idx)
case _:
return str(self.__value)
def __next__(self) -> ExecutedPath | tuple[ExecutedPath]:
return self.__next
def __prev__(self) -> ExecutedPath | tuple[ExecutedPath]:
return self.__prev
def __call__(self, *path: ExecutedPath | str | None) -> ExecutedPath | tuple[ExecutedPath]:
if path is not None or len(path) > 0:
if len(path) > 2:
raise ValueError
path = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, path))
if len(path) < 2:
self.__value = path[0]
else:
self.__value = path
return self.__value
def __eq__(self, other: Self) -> bool:
return self.__value == other._SSHKey__value
def __eqcontent__(self, other: Self) -> bool:
return self.__value.read_text() == other._SSHKey__value.read_text()
def update(self, *path: ExecutedPath | str) -> None | Never:
if len(path) > 2 or len(path) < 1:
raise ValueError
path = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, path))
if len(path) < 2:
self.__value = path[0]
else:
self.__value = path
def replace(self, old: ExecutedPath | str | tuple[ExecutedPath | str] | list[ExecutedPath | str], new: ExecutedPath | str | tuple[ExecutedPath | str] | list[ExecutedPath | str]) -> None | Never:
if isinstance(old, str):
old = Path(old)
if isinstance(new, str):
new = Path(new)
if isinstance(old, (list, tuple)):
if len(old) > 2 or len(old) < 1:
raise ValueError
old = tuple(map(lambda p: Path(p) if isinstance(p, str) else p, old))
if isinstance(new, (list, tuple)):
if len(new) > 2 or len(new) < 1:
raise ValueError
new = tuple(map(lambda p: Path(p) if isinstance(p, str) else p, new))
if isinstance(self.__value, (tuple, list)):
if isinstance(old, tuple):
remaining_value = list(filter(lambda p: p not in old, self.__value))
if isinstance(new, tuple):
self.__value = (*remaining_value, *new)
else:
self.__value = (*remaining_value, new)
else:
remaining_value = list(filter(lambda p: p != old, self.__value))
if isinstance(new, tuple):
self.__value = (*remaining_value, *new)
else:
self.__value = (*remaining_value, new)
if len(self.__value) > 2:
self.__value = self.__value[0]
elif isinstance(self.__value, ExecutedPath):
if isinstance(old, tuple):
remaining_value = None if self.__value in old else self.__value
else:
remaining_value = None if self.__value == old else self.__value
if remaining_value is None:
self.__value = new
else:
raise ValueError
def read(self, idx: int | None = None) -> str | tuple[str]:
if idx is not None and isinstance(self.__value, tuple):
result = self.__value[idx]
else:
if idx is not None:
raise KeyError
result = self.__value
if isinstance(result, tuple):
result = tuple(map(lambda p: p.read_text(), result))
else:
result = result.read_text()
return result
@property
def status(self) -> Never:
# @TODO this method should return string or Enum value after analyzing whether this key is public or private
raise NotImplementedError
def prev(arg):
if isinstance(arg, SSHKey):
return arg._SSHKey__prev__()
else:
raise TypeError
def stream_equal(s1, s2) -> bool | Never:
if isinstance(s1, SSHKey) and isinstance(s2, SSHKey):
return s1._SSHKey__eqcontent__(s2)
else:
raise TypeError
class SSHKeyCollection(Sequence):
__user_path: ExecutedPath = USER_PATH
def __init__(self):
self.__current: SSHKey | None = None
self.__first: SSHKey | None = None
self.__last: SSHKey | None = None
self.__indices: range | None = None
def __getitem__(self, key: int) -> SSHKey | Never:
if self.__current is None:
raise KeyError
elif int(self.__current) == key:
return self.__current
else:
self.__current = self.__first
while int(self.__current) != key:
if next(self.__current) is not None:
self.__current = next(self.__current)
else:
# raise StopIteration
break
return self.__current
def __len__(self) -> int:
if self.__indices is None:
return 0
return len(self.__indices)
def pop(self) -> Never:
raise NotImplementedError
def remove(self) -> Never:
raise NotImplementedError
def append(self, *value: ExecutedPath | str) -> None | Never:
if len(value) < 1 or len(value) > 2:
raise ValueError
value = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, value))
ssh_key = SSHKey(*value)
if self.__first is None:
ssh_key._SSHKey__idx = 0
self.__indices = range(ssh_key._SSHKey__idx + 1)
self.__first = ssh_key
self.__current = self.__first
else:
ssh_key._SSHKey__idx = len(self.__indices)
if self.__last is not None:
self.__last._SSHKey__next = ssh_key
self.__last._SSHKey__next._SSHKey__prev = self.__last
self.__last = next(self.__last)
else:
self.__first._SSHKey__next = ssh_key
self.__first._SSHKey__next._SSHKey__prev = self.__first
self.__last = self.__first._SSHKey__next
self.__indices = range(ssh_key._SSHKey__idx + 1)
self.__current = self.__last
def __setitem__(self, key: int, *value: ExecutedPath | str) -> None | Never:
if len(value) < 1 or len(value) > 2:
raise ValueError
value = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, value))
if self.__current is None:
raise KeyError
elif int(self.__current) == key:
if self.__current() is None or len(self.__current()) < 1:
self.__current(*value)
else:
self.__current = self.__first
while int(self.__current) != key:
if next(self.__current) is not None:
self.__current = next(self.__current)
else:
# raise StopIteration
break
self.__current(*value)
def __delitem__(self, key: int) -> None | Never:
if self.__current is None:
raise KeyError
elif int(self.__current) == key:
prior = self.__current._SSHKey__prev
posterior = self.__current._SSHKey__next
posterior._SSHKey__idx = self.__current._SSHKey__idx
prior._SSHKey__next = posterior
prior._SSHKey__next._SSHKey__prev = prior
self.__current = prior._SSHKey__next
while next(self.__current) is not None:
self.__current = next(self.__current)
self.__current._SSHKey__idx += 1
else:
self.__current = self.__first
while int(self.__current) != key:
if next(self.__current) is not None:
self.__current = next(self.__current)
else:
# raise StopIteration
break
prior = self.__current._SSHKey__prev
posterior = self.__current._SSHKey__next
posterior._SSHKey__idx = self.__current._SSHKey__idx
prior._SSHKey__next = posterior
prior._SSHKey__next._SSHKey__prev = prior
self.__current = prior._SSHKey__next
while next(self.__current) is not None:
self.__current = next(self.__current)
self.__current._SSHKey__idx += 1
@property
def head(self) -> SSHKey | None:
return self.__first
@property
def tail(self) -> SSHKey | None:
if self.__last is None:
return self.__first
return self.__last
# @TODO make sure to implement below method
def __contains__(self) -> bool | Never:
raise NotImplementedError
def __missing__(self) -> Never:
raise NotImplementedError
# @TODO make sure to implement below method
def __iter__(self) -> Self | Never:
raise NotImplementedError
def count(self, query: RegEx | str) -> int | Never:
raise NotImplementedError
def pull(self, query: RegEx | str = "*") -> Never:
raise NotImplementedError
def reverse(self) -> None | Never:
raise NotImplementedError
def sort(self, key: Callable = (lambda e: e), reverse: bool = False) -> None | Never:
raise NotImplementedError
class UserSSH:
def __init__(self, username: str = "root", paths: Apps | None = None, keys: _userSSHSubParams = __user_ssh_keys, password: str = "password123", fate: RootFate = RootFate.disposal.name):
self.username = username
self.paths = paths
self.keys = keys
self.password = password
self.fate = fate