Compare commits

...

2 Commits

3 changed files with 177 additions and 65 deletions

View File

@@ -4,10 +4,10 @@ Library of classes modeling Ansible nodes or their types.
from enum import Enum
from pathlib import Path, PurePath
from typing import TypeAlias as Neotype, TypedDict as Dict
from typing import Never, Union, Literal, Required, Self
from typing import TypedDict as Dict
from typing import Union, Literal, Required, Self
from custtypes import ExecutedPath, IdlePath, VirtualPrivateServers, AnsibleScopes
from softman import Software, SoftPathGroup, SoftScope, Apps
from softman import Software, SoftPathGroup, SoftScope, Apps, Softs
from whereami import USER_PATH, PROJ_ROOT
from ansible_vault import Vault
import secrets
@@ -71,9 +71,6 @@ class ControlNode:
def sys_data(self) -> ExecutedPath:
return self.__data_paths
class Softs(Enum):
ssh = 0
# userSSHParams = Dict("userSSHParams", {
# "username": Required[str],
# "paths": Apps,
@@ -119,7 +116,7 @@ class RemoteNode:
SoftScope.GLOBAL.name: PurePath(str(cnode.sys_confs[0]), "update-motd.d")
}
}
self.root["software"].declare(Softs.ssh.name, **app_input)
self.root["software"].append(Softs.ssh.name, **app_input)
self._fqdn = name
self.root["software"]._fqdn = name
@@ -136,12 +133,7 @@ class RemoteNode:
self._api_key: str | None = api_key
self.service = service
self.region = region
self.__authkeys_selected = False
self.__usedkeys_selected = False
self.__keys_selected = False
self.__finalized_keys = False
self.model: dict | None = None
self.__key_accumulator: tuple | list | None = []
def set_region(self, name: Literal["us-east"] = "us-east") -> None:
self.region = name

View File

@@ -10,6 +10,7 @@ from custtypes import ExecutedPath, IdlePath
from enum import Enum
from pathlib import Path, PurePath
from whereami import USER_PATH, PROJ_ROOT
from collections.abc import Sequence
AppPath: Neotype = Union[ExecutedPath, IdlePath]
@@ -68,14 +69,14 @@ Apps = type("Apps", (), __app_input)
# @TODO continue adding magic methods to below class
# @NOTE https://rszalski.github.io/magicmethods/#sequence
class Software:
class Software (Sequence):
__user_path: ExecutedPath = USER_PATH
def __init__(self):
self._fqdn: str | None = None
# @TODO fix NameError for 'Software' in parameter type check
def declare(self, name: str, **kwpaths: _SubAppParams) -> AppParams:
def append(self, name: str, **kwpaths: _SubAppParams) -> AppParams:
keyword_args: AppParams = kwpaths
app = Apps(**keyword_args)
@@ -153,11 +154,29 @@ class Software:
)
return len(apps)
def __contains__(self):
def pop(self) -> Never:
raise NotImplementedError
def __missing__(self):
def remove(self) -> Never:
raise NotImplementedError
def __iter__(self):
def __contains__(self) -> Never:
raise NotImplementedError
def count(self) -> Never:
return NotImplementedError
def __missing__(self) -> Never:
raise NotImplementedError
def __iter__(self) -> Never:
raise NotImplementedError
def reverse(self) -> Never:
raise NotImplementedError
def sort(self, key: Callable = (lambda e: e), reverse: bool = False) -> Never:
raise NotImplementedError
class Softs(Enum):
ssh = 0

View File

@@ -4,6 +4,12 @@ from custtypes import ExecutedPath, IdlePath, VirtualPrivateServers, AnsibleScop
from enum import Enum
from softman import Apps
from random import gamble
from collections.abc import Sequence
from typing import Never, Union, Self, Callable
from glob import glob as globbify
from whereami import USER_PATH
from softman import Softs
import os
class RootFate(Enum):
disposal = 0
@@ -38,7 +44,7 @@ class SSHKey:
return str(self.__value)
def __repr__(self) -> ExecutedPath | tuple[ExecutedPath]:
return self.__value
return "SSHKey(" + str(self.__value) + ")"
def __nonzero__(self) -> bool:
return True
@@ -58,9 +64,26 @@ class SSHKey:
def __prev__(self) -> ExecutedPath | tuple[ExecutedPath]:
return self.__prev
def __call__(self) -> ExecutedPath | tuple[ExecutedPath]:
def __call__(self, *path: ExecutedPath | str | None) -> ExecutedPath | tuple[ExecutedPath]:
if path is not None or len(path) > 0:
if len(path) > 2:
raise ValueError
path = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, path))
if len(path) < 2:
self.__value = path[0]
else:
self.__value = path
return self.__value
def __eq__(self, other: Self) -> bool:
return self.__value == other._SSHKey__value
def __eqcontent__(self, other: Self) -> bool:
return self.__value.read_text() == other._SSHKey__value.read_text()
def update(self, *path: ExecutedPath | str) -> None | Never:
if len(path) > 2 or len(path) < 1:
raise ValueError
@@ -118,10 +141,13 @@ class SSHKey:
else:
raise ValueError
def publish(self, idx: int | None = None) -> str | tuple[str]:
if idx is not None:
def read(self, idx: int | None = None) -> str | tuple[str]:
if idx is not None and isinstance(self.__value, tuple):
result = self.__value[idx]
else:
if idx is not None:
raise KeyError
result = self.__value
if isinstance(result, tuple):
@@ -136,49 +162,54 @@ class SSHKey:
# @TODO this method should return string or Enum value after analyzing whether this key is public or private
raise NotImplementedError
class SSHKeyCollection:
def prev(arg):
if isinstance(arg, SSHKey):
return arg._SSHKey__prev__()
else:
raise TypeError
def stream_equal(s1, s2) -> bool | Never:
if isinstance(s1, SSHKey) and isinstance(s2, SSHKey):
return s1._SSHKey__eqcontent__(s2)
else:
raise TypeError
class SSHKeyCollection(Sequence):
__user_path: ExecutedPath = USER_PATH
def __init__(self):
self.__current: SSHKey | None = None
self.__first: SSHKey | None = None
self.__last: SSHKey | None = None
self.__indices: range | None = None
def __setitem__(self, key: int, *value: ExecutedPath | str) -> None | Never:
if len(value) < 1 or len(value) > 2:
raise ValueError
value = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, value))
if self.__current is None:
self.__current = SSHKey(*value)
self.__current._SSHKey__idx = key
elif int(self.__current) == key:
if self.__current() is None or len(self.__current()) < 1:
self.__current.update(*value)
else:
while int(self.__current) != key:
if next(self.__current) is not None:
self.__current = next(self.__current)
else:
break
self.__current.update(*value)
def __getitem__(self, key: int) -> SSHKey:
def __getitem__(self, key: int) -> SSHKey | Never:
if self.__current is None:
raise KeyError
elif int(self.__current) == key:
return self.__current
else:
self.__current = self.__first
while int(self.__current) != key:
if next(self.__current) is not None:
self.__current = next(self.__current)
else:
# raise StopIteration
break
return self.__current
def __delitem__(self, key: int) -> Never:
def __len__(self) -> int:
if self.__indices is None:
return 0
return len(self.__indices)
def pop(self) -> Never:
raise NotImplementedError
def remove(self) -> Never:
raise NotImplementedError
def append(self, *value: ExecutedPath | str) -> None | Never:
@@ -194,44 +225,114 @@ class SSHKeyCollection:
self.__indices = range(ssh_key._SSHKey__idx + 1)
self.__first = ssh_key
if self.__last is None:
self.__last = ssh_key
self.__first._SSHKey__next = self.__last
self.__last._SSHKey__prev = self.__first
self.__current = self.__first
else:
ssh_key._SSHKey__idx = len(self.__indices)
ssh_key._SSHKey__prev = self.__last
self.__last = ssh_key
if self.__last is not None:
self.__last._SSHKey__next = ssh_key
self.__last._SSHKey__next._SSHKey__prev = self.__last
self.__last = next(self.__last)
else:
self.__first._SSHKey__next = ssh_key
self.__first._SSHKey__next._SSHKey__prev = self.__first
self.__last = self.__first._SSHKey__next
self.__indices = range(ssh_key._SSHKey__idx + 1)
self.__current = self.__last
def pop(self) -> Never:
def __setitem__(self, key: int, *value: ExecutedPath | str) -> None | Never:
if len(value) < 1 or len(value) > 2:
raise ValueError
value = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, value))
if self.__current is None:
raise KeyError
elif int(self.__current) == key:
if self.__current() is None or len(self.__current()) < 1:
self.__current(*value)
else:
self.__current = self.__first
while int(self.__current) != key:
if next(self.__current) is not None:
self.__current = next(self.__current)
else:
# raise StopIteration
break
self.__current(*value)
def __delitem__(self, key: int) -> None | Never:
if self.__current is None:
raise KeyError
elif int(self.__current) == key:
prior = self.__current._SSHKey__prev
posterior = self.__current._SSHKey__next
posterior._SSHKey__idx = self.__current._SSHKey__idx
prior._SSHKey__next = posterior
prior._SSHKey__next._SSHKey__prev = prior
self.__current = prior._SSHKey__next
while next(self.__current) is not None:
self.__current = next(self.__current)
self.__current._SSHKey__idx += 1
else:
self.__current = self.__first
while int(self.__current) != key:
if next(self.__current) is not None:
self.__current = next(self.__current)
else:
# raise StopIteration
break
prior = self.__current._SSHKey__prev
posterior = self.__current._SSHKey__next
posterior._SSHKey__idx = self.__current._SSHKey__idx
prior._SSHKey__next = posterior
prior._SSHKey__next._SSHKey__prev = prior
self.__current = prior._SSHKey__next
while next(self.__current) is not None:
self.__current = next(self.__current)
self.__current._SSHKey__idx += 1
@property
def head(self) -> SSHKey | None:
return self.__first
@property
def tail(self) -> SSHKey | None:
if self.__last is None:
return self.__first
return self.__last
# @TODO make sure to implement below method
def __contains__(self) -> bool | Never:
raise NotImplementedError
def remove(self) -> Never:
def __missing__(self) -> Never:
raise NotImplementedError
# @TODO make sure to implement below method
def __iter__(self) -> Self | Never:
raise NotImplementedError
def count(self, query: RegEx | str) -> int | Never:
raise NotImplementedError
def pull(self, query: RegEx | str = "*") -> Never:
raise NotImplementedError
def __len__(self):
return len(self.__indices)
def __contains__(self):
def reverse(self) -> None | Never:
raise NotImplementedError
def __missing__(self):
def sort(self, key: Callable = (lambda e: e), reverse: bool = False) -> None | Never:
raise NotImplementedError
def __iter__(self):
raise NotImplementedError
class UserSSH:
def __init__(self, username: str = "root", paths: Apps | None = None, keys: _userSSHSubParams = __user_ssh_keys, password: str = "password123", fate: RootFate = RootFate.disposal.name):
self.username = username