from typing import Self, Literal, Never, Callable, Sequence from custtypes import Roles, Scopes, PathCollection, PathRoles from custtypes import Software, SoftwareRoles, PacMans from custtypes import ExecutedPath, IdlePath from custtypes import UserName, GroupName from pathlib import Path, PurePath from sshkey import SSHKeyCollection, SSHKeyType, SSHKey from random import choice as gamble from re import Pattern as RegEx from softman import sshd from yaml import YAMLObject class Group(YAMLObject): yaml_tag = u"!Group" # @TODO create Enum class child for category parameter type hinting in below method def __init__(self, group_name: GroupName = GroupName.sudo, category: Literal["system", "regular"] = "system", gid: int | str = 27): if isinstance(group_name, GroupName): self.group_name = group_name.name.lower() else: self.group_name = group_name self.id = str(gid) self.type = category def __repr__(self): return "%s(group_name=%r,category=%r,gid=%r)" % ( self.__class__.__name__, self.group_name, self.category, self.id ) class User(YAMLObject): yaml_tag = u"!User" def __init__(self, username: UserName | str = UserName.root, password: str = "test", services: list[str | Software] = [Software.openssh_server], uid: int | str = 0): self.exists = True if isinstance(username, UserName): self.username = username.name.lower() else: self.username = username self.id = str(uid) self.password = password new_services = [] for s in services: if isinstance(s, Software): new_services.append(s.name.lower()) else: new_services.append(s) self.services: tuple = tuple(new_services) self.shell = "/bin/bash" self.home = "/" self.category: Literal["system", "regular"] = "regular" group = Group(username, self.id) self.group = group self.groups: list[str | GroupName] | None = None if self.groups is None: self.admin = True elif isinstance(self.groups, Sequence) and GroupName.sudo in self.groups: self.admin = True else: self.admin = False ssh_keys = SSHKeyCollection() ssh_keys.pull() self.__ssh_keys = ssh_keys # print("here") self.__public_keys = self.__ssh_keys.publish(SSHKeyType.pubkey, datatype=list) pubkeys = ssh_keys.publish(SSHKeyType.pubkey, datatype=list) self.__auth_keys: SSHKeyCollection = SSHKeyCollection() for p in pubkeys: self.__auth_keys.append(p) self.ssh_authorized_keys: list[str | None] = [] self.__private_keys = self.__ssh_keys.publish(SSHKeyType.privkey, datatype=list)[0] privkeys = ssh_keys.publish(SSHKeyType.privkey, datatype=list) self.__priv_keys: SSHKeyCollection = SSHKeyCollection() for p in privkeys[0]: self.__priv_keys.append(p) self.ssh_private_key_paths: list[str | None] = [] self.__priv_key_pref: int = privkeys[1] self.ssh_private_key_path_pref: int = privkeys[1] self.__apps = (sshd,) self.__ssh_keypairs: tuple[SSHKey | tuple[SSHKey]] | SSHKey = tuple() self.__ssh_keypair_chosen = False def get_app(self, name: Software) -> Never: raise NotImplementedError def update_app(self, name: Software, attr: str, method = None) -> Never: raise NotImplementedError def __update_sshd(self, app: Software = Software.openssh_server): for a in self.__apps: if a.alt_names[PacMans.APT.name.lower()] == app.name.lower(): if hasattr(a, "users"): users = getattr(a, "users") if self.username not in users: users[self.username] = dict() users[self.username]["authorized_keys"] = self.__auth_keys users[self.username]["credential_keys"] = self.__priv_keys users[self.username]["keys"] = self.__ssh_keys users[self.username]["keypairs"] = self.__ssh_keypairs users[self.username]["preferred_priv_key"] = self.__priv_key_pref setattr(self, "users", users) else: users = { self.username: { "auth_keys": self.__auth_keys, "priv_keys": self.__priv_keys, "keypairs": self.__ssh_keypairs, "keys": self.__ssh_keys } } a.declare(users = users) else: continue def add_keypair(self, private_key: SSHKey | ExecutedPath | str | int | RegEx, public_key: SSHKey | ExecutedPath | str | int | RegEx, from_host = True): if not self.__ssh_keypair_chosen: self.__priv_keys = SSHKeyCollection() self.__auth_keys = SSHKeyCollection() if from_host: pubkeys = self.__ssh_keys.publish(SSHKeyType.pubkey, datatype=list) # print(pubkeys) if isinstance(public_key, int): public_key = pubkeys[public_key] elif isinstance(public_key, SSHKey): public_key = tuple(filter(lambda k: str(k) == str(public_key()), pubkeys)) if len(public_key) > 0: public_key = public_key[0] else: public_key = None elif isinstance(public_key, str): public_key = tuple(filter(lambda k: str(k) == public_key or public_key in str(k), pubkeys)) if len(public_key) > 0: public_key = public_key[0] else: public_key = None elif isinstance(public_key, RegEx): public_key = tuple(filter(lambda k: public_key.search(str(k)), pubkeys)) if len(public_key) > 0: public_key = public_key[0] else: public_key = None else: public_key = tuple(filter(lambda k: str(k) == str(public_key), pubkeys)) if len(public_key) > 0: public_key = public_key[0] else: public_key = None privkeys = self.__ssh_keys.publish(SSHKeyType.privkey, datatype=list)[0] if isinstance(private_key, int): private_key = privkeys[private_key] elif isinstance(private_key, SSHKey): private_key = tuple(filter(lambda k: str(k) == str(private_key()), privkeys)) if len(private_key) > 0: private_key = private_key[0] else: private_key = None elif isinstance(private_key, str): private_key = tuple(filter(lambda k: str(k) == private_key or private_key in str(k), privkeys)) if len(private_key) > 0: private_key = private_key[0] else: private_key = None elif isinstance(private_key, RegEx): private_key = tuple(filter(lambda k: private_key.search(str(k)), privkeys)) if len(private_key) > 0: private_key = private_key[0] else: private_key = None else: private_key = tuple(filter(lambda k: str(k) == str(private_key), privkeys)) if len(private_key) > 0: private_key = private_key[0] else: private_key = None else: if isinstance(public_key, SSHKey): public_key = public_key() elif isinstance(public_key, str): public_key = Path(public_key) self.__ssh_keys.append(public_key) if isinstance(private_key, SSHKey): private_key = private_key() elif isinstance(private_key, str): private_key = Path(private_key) self.__ssh_keys.append(private_key) if private_key is None or public_key is None: raise KeyError self.__auth_keys.append(public_key) self.ssh_authorized_keys.append(public_key.read_text()) self.__priv_keys.append(private_key) self.ssh_private_key_paths.append(str(private_key)) self.__ssh_keypairs = (*self.__ssh_keypairs, (self.__priv_keys.tail, self.__auth_keys.tail),) self.__priv_key_pref = len(self.__ssh_keypairs) - 1 self.ssh_private_key_path_pref = len(self.__ssh_keypairs) - 1 self.__update_sshd() self.__ssh_keypair_chosen = True def get_keypair(self, preference: int): if not self.__ssh_keypair_chosen: raise Exception if isinstance(self.__ssh_keypairs, SSHKey) and not isinstance(self.__ssh_keypairs(), tuple): raise ValueError if isinstance(self.__ssh_keypairs, SSHKey): if isinstance(self.__ssh_keypairs(), tuple): return self.__ssh_keypairs[preference] else: return self.__ssh_keypairs else: return self.__ssh_keypairs[preference] @property def keys(self) -> SSHKeyCollection: return self.__ssh_keys @property def public_keys(self) -> SSHKeyCollection: return self.__public_keys @property def private_keys(self) -> SSHKeyCollection: return self.__private_keys @property def keypair_preference(self) -> int: return self.__priv_key_pref def prefer_keypair(self, preference: int | RegEx | str): if isinstance(preference, int): if preference < len(self.__ssh_keypairs): self.__priv_key_pref = preference else: raise KeyError elif isinstance(preference, RegEx): count = 0 for keypair in self.__ssh_keypairs: if preference.search(keypair[0]) or preference.search(keypair[1]): self.__priv_key_pref = count count += 1 else: count = 0 for keypair in self.__ssh_keypairs: if preference in str(keypair[0]()) or preference in str(keypair[1]()): self.__priv_key_pref = count count += 1 @property def keypairs(self) -> tuple[SSHKey] | SSHKey | None: return self.__ssh_keypairs @property def keypair(self): kp = self.__ssh_keypairs[self.__priv_key_pref] return kp[0] + kp[1] @property def authorized_keys(self) -> SSHKeyCollection: return self.__auth_keys @property def credential_keys(self) -> SSHKeyCollection: return self.__priv_keys def __repr__(self) -> str: return "%s(username=%r,password=%r,services=%r,uid=%r)" % ( self.__class__.__name__, self.username, self.password, self.services, self.id )