Files
skato-ansible/sshkey_man.py

561 lines
18 KiB
Python

from re import Pattern as RegEx
from pathlib import Path, PurePath
from custtypes import ExecutedPath, IdlePath, VirtualPrivateServers, AnsibleScopes
from enum import Enum
from softman import Apps
from random import choice as gamble
from collections.abc import Sequence
from typing import Never, Union, Self, Callable, Required, Literal
from typing import TypedDict as Dict
from glob import glob as globbify
from whereami import USER_PATH
from softman import Softs
# import os
class RootFate(Enum):
disposal = 0
retention = 1
class SSHKeyType(Enum):
pubkey = 0
privkey = 1
dual = 2
# @TODO create unit tests for below class
class SSHKey:
def __init__(self, *path: ExecutedPath | str):
if len(path) > 2 or len(path) < 1:
raise ValueError
path = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, path))
self.__idx: int = 0
self.__prev: Self | None = None
self.__next: Self | None = None
self.category: SSHKeyType | None = None
if len(path) < 2:
self.__value: ExecutedPath | tuple[ExecutedPath] = path[0]
else:
self.category = SSHKeyType.dual.name
self.__value: ExecutedPath | tuple[ExecutedPath] = path
def __int__(self) -> int:
return self.__idx
def __str__(self) -> str:
key_basename = Path(str(self.__value)).name
return "🔑" + key_basename
def __repr__(self) -> str:
return "SSHKey(" + str(self.__value) + ")"
def __nonzero__(self) -> bool:
return True
def __format__(self, formatstr) -> str:
match formatstr:
case "item":
return str(self.__idx) + ": " + str(self.__value)
case "int":
return str(self.__idx)
case _:
return str(self)
def __next__(self) -> ExecutedPath | tuple[ExecutedPath]:
return self.__next
def __prev__(self) -> ExecutedPath | tuple[ExecutedPath]:
return self.__prev
def __call__(self, *path: ExecutedPath | str | None) -> ExecutedPath | tuple[ExecutedPath]:
if path is not None or len(path) > 0:
if len(path) > 2:
raise ValueError
path = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, path))
if len(path) < 2:
self.__value = path[0]
else:
self.__value = path
return self.__value
def __eq__(self, other: Self) -> bool:
return self.__value == other._SSHKey__value
def __ne__(self, other: Self) -> bool | Never:
return self.__value != other._SSHKey__value
def __eqcontent__(self, other: Self) -> bool:
return self.__value.read_text() == other._SSHKey__value.read_text()
def __neqcontent__(self, other: Self) -> bool:
return self.__value.read_text() != other._SSHKey__value.read_text()
def __len__(self):
if isinstance(self.__value, tuple):
return len(self.__value)
else:
return 1
def update(self, *path: ExecutedPath | str) -> Self | Never:
if len(path) > 2 or len(path) < 1:
raise ValueError
path = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, path))
if len(path) < 2:
self.__value = path[0]
else:
self.__value = path
return self
def __add__(self, other: Self | ExecutedPath | str):
if isinstance(self.__value, tuple):
raise ValueError
if isinstance(other, (str, ExecutedPath)):
result = self.update(self.__value, other)
else:
if isinstance(other.__SSHKey__value, tuple):
raise ValueError
result = self.update(self.__value, other._SSHKey__value)
return result
def __radd__(self, other: Self | ExecutedPath | str):
if isinstance(self.__value, tuple):
raise ValueError
if isinstance(other, (str, ExecutedPath)):
result = self.update(other, self.__value)
else:
if isinstance(other.__SSHKey__value, tuple):
raise ValueError
result = self.update(other._SSHKey__value, self.__value)
return result
# @TODO write following 2 subtraction algorithms using 'set' data type conversion and methods
def __sub__(self, other: Self | ExecutedPath | str):
raise NotImplementedError
def __rsub__(self, other: Self | ExecutedPath | str):
raise NotImplementedError
def replace(self, old: ExecutedPath | str | tuple[ExecutedPath | str] | list[ExecutedPath | str], new: ExecutedPath | str | tuple[ExecutedPath | str] | list[ExecutedPath | str]) -> Self | Never:
if isinstance(old, str):
old = Path(old)
if isinstance(new, str):
new = Path(new)
if isinstance(old, (list, tuple)):
if len(old) > 2 or len(old) < 1:
raise ValueError
old = tuple(map(lambda p: Path(p) if isinstance(p, str) else p, old))
if isinstance(new, (list, tuple)):
if len(new) > 2 or len(new) < 1:
raise ValueError
new = tuple(map(lambda p: Path(p) if isinstance(p, str) else p, new))
if isinstance(self.__value, (tuple, list)):
if isinstance(old, tuple):
remaining_value = list(filter(lambda p: p not in old, self.__value))
if isinstance(new, tuple):
self.__value = (*remaining_value, *new)
else:
self.__value = (*remaining_value, new)
else:
remaining_value = list(filter(lambda p: p != old, self.__value))
if isinstance(new, tuple):
self.__value = (*remaining_value, *new)
else:
self.__value = (*remaining_value, new)
if len(self.__value) > 2:
self.__value = self.__value[0]
elif isinstance(self.__value, ExecutedPath):
if isinstance(old, tuple):
remaining_value = None if self.__value in old else self.__value
else:
remaining_value = None if self.__value == old else self.__value
if remaining_value is None:
self.__value = new
else:
raise ValueError
return self
def read(self, idx: int | None = None) -> str | tuple[str]:
if idx is not None and isinstance(self.__value, tuple):
result = self.__value[idx]
else:
if idx is not None:
raise KeyError
result = self.__value
if isinstance(result, tuple):
result = tuple(map(lambda p: p.read_text(), result))
else:
result = result.read_text()
return result
@property
def status(self) -> str | Never:
# @TODO analyze 'read' return value of this class's instance to check whether it is a private key,
# unless '__value' of this class's instance is a tuple, in which case it is a dual key
raise NotImplementedError
def reverse(self) -> Never:
if isinstance(self.__value, tuple):
v1 = self.__value[0]
v2 = self.__value[1]
result = self.update(v2, v1)
else:
result = self
return result
def prev(arg):
if isinstance(arg, SSHKey):
return arg._SSHKey__prev__()
else:
raise TypeError
def stream_equal(s1, s2) -> bool | Never:
if isinstance(s1, SSHKey) and isinstance(s2, SSHKey):
return s1._SSHKey__eqcontent__(s2)
else:
raise TypeError
def stream_unequal(s1, s2) -> bool | Never:
if isinstance(s1, SSHKey) and isinstance(s2, SSHKey):
return s1._SSHKey__neqcontent__(s2)
else:
raise TypeError
# @TODO create unit tests for below class
class SSHKeyCollection(Sequence):
__user_path: ExecutedPath = USER_PATH
__ssh_path: ExecutedPath = __user_path / ".ssh"
def __init__(self):
self.__current: SSHKey | None = None
self.__first: SSHKey | None = None
self.__last: SSHKey | None = None
self.__indices: range | None = None
# @TODO have other item magic methods mimic this one for slicing purposes
def __getitem__(self, key: int | slice) -> SSHKey | Never:
self.__current = self.__first
if self.__current is None:
raise KeyError
if isinstance(key, int):
if int(self.__current) == key:
return self.__current
else:
while int(self.__current) != key:
if self.__current is None:
raise KeyError
self.__current = next(self.__current)
result = self.__current
elif isinstance(key, slice):
step = key.step
sshkcoll = SSHKeyCollection()
if hasattr(key, "start"):
if getattr(key, "start") is None:
start = 0
else:
start = key.start
if hasattr(key, "stop"):
if getattr(key, "stop") is None:
stop = len(self.__indices)
else:
stop = key.stop
if hasattr(key, "step"):
if getattr(key, "step") is None:
step = 1
else:
step = key.step
indices = range(start, stop, step)
# test_coll = []
while int(self.__current) < stop:
if int(self.__current) < start:
continue
elif int(self.__current) >= start:
if int(self.__current) in indices:
sshkcoll.append(self.__current)
# test_coll.append(self.__current)
else:
continue
self.__current = next(self.__current)
if self.__current is None:
break
# print(test_coll)
result = sshkcoll
return result
def __len__(self) -> int:
if self.__indices is None:
return 0
return len(self.__indices)
def pop(self, key: int = -1) -> Never:
self.__current = self.__first
if self.__current is None:
raise KeyError
if key == -1:
if self.__last is not None:
past = self.__last
self.__last._SSHKey__prev._SSHKey__next = None
self.__last = self.__last._SSHKey__prev
self.__current = self.__last
else:
past = self.__first
self.__first = None
return past
elif key <= -2:
raise NotImplementedError
else:
while int(self.__current) != key:
self.__current = next(self.__current)
if self.__current is None:
raise KeyError
past = self.__current
count = self.__current._SSHKey__idx
prior = self.__current._SSHKey__prev
posterior = self.__current._SSHKey__next
posterior._SSHKey__prev = prior
posterior._SSHKey__prev._SSHKey__next = posterior
self.__current = posterior
while self.__current is not None:
self.__current._SSHKey__idx = count
self.__current = next(self.__current)
count += 1
return past
def remove(self) -> Never:
raise NotImplementedError
def append(self, *value: ExecutedPath | str) -> SSHKey:
if len(value) < 1 or len(value) > 2:
raise ValueError
value = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, value))
ssh_key = SSHKey(*value)
if self.__first is None:
# print("branch1")
ssh_key._SSHKey__idx = 0
# print(ssh_key._SSHKey__idx)
self.__indices = range(ssh_key._SSHKey__idx + 1)
self.__first = ssh_key
self.__current = self.__first
else:
# print("branch2")
if self.__last is not None:
# print("branch2.1")
ssh_key._SSHKey__idx = self.__last._SSHKey__idx + 1
# print(ssh_key._SSHKey__idx)
self.__last._SSHKey__next = ssh_key
self.__last._SSHKey__next._SSHKey__prev = self.__last
self.__last = next(self.__last)
else:
# print("branch2.2")
ssh_key._SSHKey__idx = self.__first._SSHKey__idx + 1
# print(ssh_key._SSHKey__idx)
self.__first._SSHKey__next = ssh_key
self.__first._SSHKey__next._SSHKey__prev = self.__first
self.__last = self.__first._SSHKey__next
self.__indices = range(ssh_key._SSHKey__idx + 1)
self.__current = self.__last
#print(self.__current)
return self.__current
def __setitem__(self, key: int | slice, *value: ExecutedPath | str) -> None | Never:
if len(value) < 1 or len(value) > 2:
raise ValueError
value = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, value))
self.__current = self.__first
if self.__current is None:
raise KeyError
if isinstance(key, int):
if int(self.__current) == key:
if self.__current() is None or len(self.__current()) < 1:
self.__current(*value)
else:
if int(self.__current) == key:
return self.__current(*value)
while int(self.__current) != key:
if self.__current is None:
raise KeyError
self.__current = next(self.__current)
self.__current(*value)
elif isinstance(key, slice):
raise NotImplementedError
def __delitem__(self, key: int | slice) -> None | Never:
self.__current = self.__first
if self.__current is None:
raise KeyError
if isinstance(key, int):
if key == -1:
if self.__last is not None:
self.__last._SSHKey__prev._SSHKey__next = None
self.__last = self.__last._SSHKey__prev
self.__current = self.__last
else:
self.__first = None
return past
elif key <= -2:
raise NotImplementedError
else:
while int(self.__current) != key:
self.__current = next(self.__current)
if self.__current is None:
raise KeyError
count = self.__current._SSHKey__idx
prior = self.__current._SSHKey__prev
posterior = self.__current._SSHKey__next
posterior._SSHKey__prev = prior
posterior._SSHKey__prev._SSHKey__next = posterior
self.__current = posterior
while self.__current is not None:
self.__current._SSHKey__idx = count
self.__current = next(self.__current)
count += 1
elif isinstance(key, slice):
raise NotImplementedError
@property
def head(self) -> SSHKey | None:
return self.__first
@property
def tail(self) -> SSHKey | None:
if self.__last is None:
return self.__first
return self.__last
def __contains__(self) -> bool | Never:
raise NotImplementedError
def __missing__(self) -> Never:
raise NotImplementedError
def __next__(self):
self.__current = next(self.__current)
if self.__current is not None:
return self.__current
raise StopIteration
def __iter__(self) -> Self | Never:
self.__current = self.__first
# return self.__current
return self
def count(self, query: RegEx | str) -> int | Never:
raise NotImplementedError
# @TODO make sure to implement below method
def pull(self, query: RegEx | str = "*") -> Never:
if isinstance(query, RegEx):
keypaths = self.__ssh_path.glob("*")
for p in keypaths:
if query.fullmatch(p.name):
self.append(p)
else:
continue
else:
keypaths = self.__ssh_path.glob(query)
for p in keypaths:
self.append(p)
def reverse(self) -> None | Never:
raise NotImplementedError
def sort(self, key: Callable = (lambda e: e), reverse: bool = False) -> None | Never:
raise NotImplementedError
def __str__(self):
prefix = "[("
postfix = "|]"
self.__current = self.__first
concat = lambda s: str(s)[1:] + ", "
content = str()
count = 0
while self.__current is not None:
content += str(count) + ">" + concat(self.__current)
self.__current = next(self.__current)
count += 1
content = content[0:len(content)-2]
return prefix + content + postfix
# @TODO maybe move to separate module for classes for handling users and groups
class UserSSH:
def __init__(self, username: str = "root", paths: Apps | None = None, keys: dict = dict(), password: str = "password123", fate: RootFate = RootFate.disposal.name):
self.username = username
self.paths = paths
self.keys = keys
self.password = password
self.fate = fate
sshkey_coll = SSHKeyCollection()
sshkey_coll.pull()
print(len(sshkey_coll))
print(sshkey_coll[0:len(sshkey_coll)])
for k in sshkey_coll:
print(k)