diff --git a/entities.py b/entities.py new file mode 100644 index 0000000..d24f57c --- /dev/null +++ b/entities.py @@ -0,0 +1,247 @@ +from typing import Self, Literal, Never, Callable, Sequence +from custtypes import Roles, Scopes, PathCollection, PathRoles, Software, SoftwareRoles, ExecutedPath, PacMans, UserName, GroupName, IdlePath, ExecutedPath +from pathlib import Path, PurePath +from sshkey import SSHKeyCollection, SSHKeyType, SSHKey +from random import choice as gamble +from re import Pattern as RegEx +from softman import sshd + +class Group: + def __init__(self, group_name: GroupName = GroupName.sudo, gid: int = 27): + self.group_name = group_name + self.id = gid + self.category: Literal["system", "regular"] = "system" + +class User: + def __init__(self, username: UserName | str = UserName.root.name.lower(), password: str = "test", services: list = [Software.openssh_server.name.lower()], uid: int = 0): + self.exists = True + self.username = username + self.id = uid + self.password = password + self.services: tuple = tuple(services) + self.shell = "/bin/bash" + self.home = "/" + self.category: Literal["system", "regular"] = "regular" + group = Group(username, self.id) + self.primary_group = group + self.supp_groups = None + + if self.supp_groups is None: + self.is_admin = True + elif isinstance(self.supp_groups, Sequence) and GroupName.sudo in self.supp_groups: + self.is_admin = True + else: + self.is_admin = False + + ssh_keys = SSHKeyCollection() + ssh_keys.pull() + self.__ssh_keys = ssh_keys + self.__public_keys = self.__ssh_keys.publish(SSHKeyType.pubkey, datatype=list) + self.__private_keys = self.__ssh_keys.publish(SSHKeyType.privkey, datatype=list)[0] + # print("here") + pubkeys = ssh_keys.publish(SSHKeyType.pubkey, datatype=list) + self.__auth_keys: SSHKeyCollection = SSHKeyCollection() + for p in pubkeys: + self.__auth_keys.append(p) + privkeys = ssh_keys.publish(SSHKeyType.privkey, datatype=list) + self.__priv_keys: SSHKeyCollection = SSHKeyCollection() + for p in privkeys[0]: + self.__priv_keys.append(p) + self.__priv_key_pref: int = privkeys[1] + self.__apps = (sshd,) + self.__ssh_keypairs: tuple[SSHKey | tuple[SSHKey]] | SSHKey = tuple() + self.__ssh_keypair_chosen = False + + + def get_app(self, name: Software) -> Never: + raise NotImplementedError + + def update_app(self, name: Software, attr: str, method = None) -> Never: + raise NotImplementedError + + def __update_sshd(self, app: Software = Software.openssh_server): + for a in self.__apps: + if a.alt_names[PacMans.APT.name.lower()] == app.name.lower(): + if hasattr(a, "users"): + users = getattr(a, "users") + + if self.username not in users: + users[self.username] = dict() + + users[self.username]["auth_keys"] = self.__auth_keys + users[self.username]["priv_keys"] = self.__priv_keys + users[self.username]["keys"] = self.__ssh_keys + users[self.username]["keypairs"] = self.__ssh_keypairs + users[self.username]["preferred_priv_key"] = self.__priv_key_pref + setattr(self, "users", users) + else: + users = { + self.username: { + "auth_keys": self.__auth_keys, + "priv_keys": self.__priv_keys, + "keypairs": self.__ssh_keypairs, + "keys": self.__ssh_keys + } + } + a.declare(users = users) + else: + continue + + def add_keypair(self, private_key: SSHKey | ExecutedPath | str | int | RegEx, public_key: SSHKey | ExecutedPath | str | int | RegEx, from_host = True): + if not self.__ssh_keypair_chosen: + self.__priv_keys = SSHKeyCollection() + self.__auth_keys = SSHKeyCollection() + + if from_host: + pubkeys = self.__ssh_keys.publish(SSHKeyType.pubkey, datatype=list) + + # print(pubkeys) + if isinstance(public_key, int): + public_key = pubkeys[public_key] + elif isinstance(public_key, SSHKey): + public_key = tuple(filter(lambda k: str(k) == str(public_key()), pubkeys)) + if len(public_key) > 0: + public_key = public_key[0] + else: + public_key = None + elif isinstance(public_key, str): + public_key = tuple(filter(lambda k: str(k) == public_key or public_key in str(k), pubkeys)) + if len(public_key) > 0: + public_key = public_key[0] + else: + public_key = None + elif isinstance(public_key, RegEx): + public_key = tuple(filter(lambda k: public_key.search(str(k)), pubkeys)) + if len(public_key) > 0: + public_key = public_key[0] + else: + public_key = None + else: + public_key = tuple(filter(lambda k: str(k) == str(public_key), pubkeys)) + if len(public_key) > 0: + public_key = public_key[0] + else: + public_key = None + + privkeys = self.__ssh_keys.publish(SSHKeyType.privkey, datatype=list)[0] + + if isinstance(private_key, int): + private_key = privkeys[private_key] + elif isinstance(private_key, SSHKey): + private_key = tuple(filter(lambda k: str(k) == str(private_key()), privkeys)) + if len(private_key) > 0: + private_key = private_key[0] + else: + private_key = None + elif isinstance(private_key, str): + private_key = tuple(filter(lambda k: str(k) == private_key or private_key in str(k), privkeys)) + if len(private_key) > 0: + private_key = private_key[0] + else: + private_key = None + elif isinstance(private_key, RegEx): + private_key = tuple(filter(lambda k: private_key.search(str(k)), privkeys)) + if len(private_key) > 0: + private_key = private_key[0] + else: + private_key = None + else: + private_key = tuple(filter(lambda k: str(k) == str(private_key), privkeys)) + if len(private_key) > 0: + private_key = private_key[0] + else: + private_key = None + else: + if isinstance(public_key, SSHKey): + public_key = public_key() + elif isinstance(public_key, str): + public_key = Path(public_key) + + self.__ssh_keys.append(public_key) + + if isinstance(private_key, SSHKey): + private_key = private_key() + elif isinstance(private_key, str): + private_key = Path(private_key) + + self.__ssh_keys.append(private_key) + + if private_key is None or public_key is None: + raise KeyError + + self.__auth_keys.append(public_key) + self.__priv_keys.append(private_key) + self.__ssh_keypairs = (*self.__ssh_keypairs, (self.__priv_keys.tail, self.__auth_keys.tail),) + self.__priv_key_pref = len(self.__ssh_keypairs) - 1 + self.__update_sshd() + + self.__ssh_keypair_chosen = True + + + def get_keypair(self, preference: int): + if not self.__ssh_keypair_chosen: + raise Exception + + if isinstance(self.__ssh_keypairs, SSHKey) and not isinstance(self.__ssh_keypairs(), tuple): + raise ValueError + + if isinstance(self.__ssh_keypairs, SSHKey): + if isinstance(self.__ssh_keypairs(), tuple): + return self.__ssh_keypairs[preference] + else: + return self.__ssh_keypairs + else: + return self.__ssh_keypairs[preference] + + @property + def available_keys(self) -> SSHKeyCollection: + return self.__ssh_keys + + @property + def public_keys(self) -> SSHKeyCollection: + return self.__public_keys + + @property + def private_keys(self) -> SSHKeyCollection: + return self.__private_keys + + @property + def keypair_preference(self) -> int: + return self.__priv_key_pref + + def prefer_keypair(self, preference: int | RegEx | str): + if isinstance(preference, int): + if preference < len(self.__ssh_keypairs): + self.__priv_key_pref = preference + else: + raise KeyError + elif isinstance(preference, RegEx): + count = 0 + for keypair in self.__ssh_keypairs: + if preference.search(keypair[0]) or preference.search(keypair[1]): + self.__priv_key_pref = count + count += 1 + else: + count = 0 + for keypair in self.__ssh_keypairs: + if preference in str(keypair[0]()) or preference in str(keypair[1]()): + self.__priv_key_pref = count + count += 1 + + @property + def available_keypairs(self) -> tuple[SSHKey] | SSHKey | None: + return self.__ssh_keypairs + + @property + def keypair(self): + kp = self.__ssh_keypairs[self.__priv_key_pref] + return kp[0] + kp[1] + + @property + def authorized_keys(self) -> SSHKeyCollection: + return self.__auth_keys + + @property + def credential_keys(self) -> SSHKeyCollection: + return self.__priv_keys + diff --git a/servs.py b/softman.py similarity index 61% rename from servs.py rename to softman.py index 8533269..516ebd9 100644 --- a/servs.py +++ b/softman.py @@ -1,18 +1,18 @@ -from typing import TypedDict as Dict -from typing import Self, Literal, Never -from collections.abc import Sequence -from custtypes import Roles, Scopes, PathCollection, PathRoles, Software, SoftwareRoles, ExecutedPath, PacMans, UserName, GroupName, IdlePath, ExecutedPath -from pathlib import Path, PurePath +from typing import Self, Never, Callable, Sequence +from custtypes import Roles, Scopes, PathCollection, PathRoles +from custtypes import Software, SoftwareRoles, ExecutedPath +from custtypes import PacMans, UserName, GroupName, IdlePath +from custtypes import ExecutedPath, AnsibleRoles from sshkey import SSHKeyCollection, SSHKeyType, SSHKey from whereami import PROJ_ROLES class App: def __init__(self, name: Software, role: SoftwareRoles = SoftwareRoles.client, paths: PathRoles | None = None): - self.__name = name + self.__name = name.name.lower().replace("_", "-") # @TODO create dict type hint for below data struct self.alt_names = dict() - self.alt_names[PacMans.APT] = self.__name - self.role = role + self.alt_names[PacMans.APT.name.lower()] = self.__name + self.role = role.name.lower() if paths is not None: if Roles.EXE in paths: setattr(self, "_" + Roles.EXE.name.lower(), paths[Roles.EXE.name.lower()]) @@ -20,6 +20,8 @@ class App: setattr(self, "_" + Roles.CONF.name.lower(), paths[Roles.CONF.name.lower()]) if Roles.DATA in paths: setattr(self, "_" + Roles.DATA.name.lower(), paths[Roles.DATA.name.lower()]) + if Roles.MEM in paths: + setattr(self, "_" + Roles.MEM.name.lower(), paths[Roles.MEM.name.lower()]) self.__parents: tuple[Self] | None = None self.__children: tuple[Self| None] = [] self.__api: str | None = None @@ -47,6 +49,19 @@ class App: else: raise Exception + @property + def mem_paths(self): + if hasattr(self, "_" + Roles.MEM.name.lower()): + return self._mem + else: + raise Exception + + def get_paths(self, role: Roles = Roles.CONF): + if hasattr(self, "_" + role.name.lower()): + return getattr(self, "_" + role.name.lower()) + else: + raise Exception + def append(self, datatype: Roles = Roles.CONF, scope: Scopes | None = None, path: IdlePath | ExecutedPath | str | PathCollection | None = None): if path is None: raise TypeError @@ -57,6 +72,7 @@ class App: paths = getattr(self, "_" + datatype) else: setattr(self, "_" + datatype, dict()) + paths = getattr(self, "_" + datatype) if scope is not None: if isinstance(path, str): @@ -99,11 +115,13 @@ class App: # del txt self.__file.close() - def __call__(self, path = str, mode = "r+", scope: Scopes = Scopes.PROJ, index: int = 0) -> None: - if not hasattr(self, "_" + Roles.CONF.name.lower()): + def __call__(self, path = str, mode = "r+", scope: Scopes = Scopes.PROJ, role: Roles = Roles.CONF, index: int = 0) -> Callable: + if not hasattr(self, "_" + role.name.lower()): raise Exception + else: + config = getattr(self, "_" + role.name.lower()) - conf_coll = self._conf[scope.name.lower()] + conf_coll = config[scope.name.lower()] if isinstance(conf_coll, Sequence): conf_coll = conf_coll[index] @@ -116,58 +134,24 @@ class App: self.__current_filepath = filepath self.__file = open(str(filepath), mode) + return __enter__ + # @TODO write below method to duplicate file or template in local to project role file/template def clone(self, source_scope: Scopes = Scopes.SYS, target_scope: Scopes = Scopes.PROJ, index: int = 0) -> Never: raise NotImplementedError + def declare(self, **kwargs): + for k, v in kwargs.items(): + setattr(self, k, v) + +sshd_proj_files = map(lambda r: r / AnsibleRoles.bootstrap.name.lower() / "files" / "sshd_config.d", PROJ_ROLES) +sshd_proj_files = list(filter(lambda p: p.exists(), sshd_proj_files)) +sshd_proj_templates = map(lambda r: r / AnsibleRoles.bootstrap.name.lower() / "templates" / "sshd_config.d", PROJ_ROLES) +sshd_proj_templates = list(filter(lambda p: p.exists(), sshd_proj_templates)) # @TODO rewrite below using DIR_ROOTS var from whereami module sshd_paths: PathRoles = { Roles.CONF.name.lower(): { - Scopes.PROJ.name.lower(): [ - PROJ_ROLES / "bootstrap" / "files" / "sshd_config.d", - PROJ_ROLES / "bootstrap" / "templates" / "sshd_config.d" - ], + Scopes.PROJ.name.lower(): sshd_proj_files + sshd_proj_templates } } -sshd = App(Software.sshd, SoftwareRoles.server, sshd_paths) - -class Group: - def __init__(self, group_name: GroupName = GroupName.sudo, gid: int = 27): - self.group_name = group_name - self.id = gid - self.category: Literal["system", "regular"] = "system" - -class User: - def __init__(self, username: UserName = UserName.root.name, password: str = "test", services: list = [Software.sshd.name.lower()], uid: int = 0): - self.exists = True - self.username = username - self.id = uid - self.password = password - self.services: tuple = tuple(services) - self.shell = "/bin/bash" - self.home = "/" - self.category: Literal["system", "regular"] = "regular" - group = Group(username, self.id) - self.primary_group = group - self.supp_groups = None - - if self.supp_groups is None: - self.is_admin = True - elif isinstance(self.supp_groups, Sequence) and GroupName.sudo in self.supp_groups: - self.is_admin = True - else: - self.is_admin = False - - ssh_keys = SSHKeyCollection() - ssh_keys.pull() - self.ssh_keys = ssh_keys - pubkeys = ssh_keys.publish(SSHKeyType.pubkey.name.lower(), datatype=list) - self.__auth_keys: list[str] = list(map(lambda k: k.read_text(), pubkeys)) - privkeys = ssh_keys.publish(SSHKeyType.privkey.name.lower(), datatype=list) - self.__priv_keys: list[str] = list(map(lambda k: str(k), privkeys[0])) - self.__priv_key_pref: int = privkeys[1] - self.__apps = (sshd,) - - def add_keypair(self, private_key, public_key): - raise NotImplementedError - +sshd = App(Software.openssh_server, SoftwareRoles.server, sshd_paths) \ No newline at end of file