from typing import Self, Literal, Never, Callable, Sequence from custtypes import Roles, Scopes, PathCollection, PathRoles from custtypes import Software, SoftwareRoles, PacMans from custtypes import ExecutedPath, IdlePath, File from custtypes import UserName, GroupName, VPS, VPSRegion, RootFate from pathlib import Path, PurePath from sshkey import SSHKeyCollection, SSHKeyType, SSHKey from random import choice as gamble from re import Pattern as RegEx from softman import sshd from yaml import YAMLObject from ansible_vault import Vault class Group(YAMLObject): yaml_tag = u"!Group" # @TODO create Enum class child for category parameter type hinting in below method def __init__(self, group_name: GroupName | str = GroupName.sudo, category: Literal["system", "regular"] = "system", gid: int | str | None = 27): if isinstance(group_name, GroupName): self.group_name = group_name.name.lower() else: self.group_name = group_name self.id = str(gid) self.type = category def __repr__(self): return "%s(group_name=%r,category=%r,gid=%r)" % ( self.__class__.__name__, self.group_name, self.category, self.id ) class User(YAMLObject): yaml_tag = u"!User" def __init__(self, username: UserName | str = UserName.root, password: str = "test", services: list[str | Software] = [Software.openssh_server], uid: int | str | None = 0): self.exists = True if isinstance(username, UserName): self.username = username.name.lower() else: self.username = username self.id = str(uid) self.password = password new_services = [] for s in services: if isinstance(s, Software): new_services.append(s.name.lower()) else: new_services.append(s) self.services: tuple = tuple(new_services) self.shell = "/bin/bash" self.home = "/" self.category: Literal["system", "regular"] = "regular" group = Group(username, self.id) self.group = group self.groups: list[str | GroupName] | None = None if self.groups is None: self.admin = True elif isinstance(self.groups, Sequence) and GroupName.sudo in self.groups: self.admin = True else: self.admin = False ssh_keys = SSHKeyCollection() ssh_keys.pull() self.__ssh_keys = ssh_keys # print("here") self.__public_keys = self.__ssh_keys.publish(SSHKeyType.pubkey, datatype=list) pubkeys = ssh_keys.publish(SSHKeyType.pubkey, datatype=list) self.__auth_keys: SSHKeyCollection = SSHKeyCollection() for p in pubkeys: self.__auth_keys.append(p) self.ssh_authorized_keys: list[str | None] = [] self.__private_keys = self.__ssh_keys.publish(SSHKeyType.privkey, datatype=list)[0] privkeys = ssh_keys.publish(SSHKeyType.privkey, datatype=list) self.__priv_keys: SSHKeyCollection = SSHKeyCollection() for p in privkeys[0]: self.__priv_keys.append(p) self.ssh_private_key_paths: list[str | None] = [] self.__priv_key_pref: int = privkeys[1] self.ssh_private_key_path_pref: int = privkeys[1] self.__apps = (sshd,) self.__ssh_keypairs: tuple[SSHKey | tuple[SSHKey]] | SSHKey = tuple() self.__ssh_keypair_chosen = False def get_app(self, name: Software) -> Never: raise NotImplementedError def update_app(self, name: Software, attr: str, method = None) -> Never: raise NotImplementedError def __update_sshd(self, app: Software = Software.openssh_server): for a in self.__apps: if a.alt_names[PacMans.APT.name.lower()] == app.name.lower(): if hasattr(a, "users"): users = getattr(a, "users") if self.username not in users: users[self.username] = dict() users[self.username]["authorized_keys"] = self.__auth_keys users[self.username]["credential_keys"] = self.__priv_keys users[self.username]["keys"] = self.__ssh_keys users[self.username]["keypairs"] = self.__ssh_keypairs users[self.username]["preferred_priv_key"] = self.__priv_key_pref setattr(self, "users", users) else: users = { self.username: { "auth_keys": self.__auth_keys, "priv_keys": self.__priv_keys, "keypairs": self.__ssh_keypairs, "keys": self.__ssh_keys } } a.declare(users = users) else: continue def choose_keypair(self, private_key: SSHKey | ExecutedPath | str | int | RegEx, public_key: SSHKey | ExecutedPath | str | int | RegEx, from_host = True): if not self.__ssh_keypair_chosen: self.__priv_keys = SSHKeyCollection() self.__auth_keys = SSHKeyCollection() if from_host: pubkeys = self.__ssh_keys.publish(SSHKeyType.pubkey, datatype=list) # print(pubkeys) if isinstance(public_key, int): public_key = pubkeys[public_key] elif isinstance(public_key, SSHKey): public_key = tuple(filter(lambda k: str(k) == str(public_key()), pubkeys)) if len(public_key) > 0: public_key = public_key[0] else: public_key = None elif isinstance(public_key, str): public_key = tuple(filter(lambda k: str(k) == public_key or public_key in str(k), pubkeys)) if len(public_key) > 0: public_key = public_key[0] else: public_key = None elif isinstance(public_key, RegEx): public_key = tuple(filter(lambda k: public_key.search(str(k)), pubkeys)) if len(public_key) > 0: public_key = public_key[0] else: public_key = None else: public_key = tuple(filter(lambda k: str(k) == str(public_key), pubkeys)) if len(public_key) > 0: public_key = public_key[0] else: public_key = None privkeys = self.__ssh_keys.publish(SSHKeyType.privkey, datatype=list)[0] if isinstance(private_key, int): private_key = privkeys[private_key] elif isinstance(private_key, SSHKey): private_key = tuple(filter(lambda k: str(k) == str(private_key()), privkeys)) if len(private_key) > 0: private_key = private_key[0] else: private_key = None elif isinstance(private_key, str): private_key = tuple(filter(lambda k: str(k) == private_key or private_key in str(k), privkeys)) if len(private_key) > 0: private_key = private_key[0] else: private_key = None elif isinstance(private_key, RegEx): private_key = tuple(filter(lambda k: private_key.search(str(k)), privkeys)) if len(private_key) > 0: private_key = private_key[0] else: private_key = None else: private_key = tuple(filter(lambda k: str(k) == str(private_key), privkeys)) if len(private_key) > 0: private_key = private_key[0] else: private_key = None else: if isinstance(public_key, SSHKey): public_key = public_key() elif isinstance(public_key, str): public_key = Path(public_key) self.__ssh_keys.append(public_key) if isinstance(private_key, SSHKey): private_key = private_key() elif isinstance(private_key, str): private_key = Path(private_key) self.__ssh_keys.append(private_key) if private_key is None or public_key is None: raise KeyError self.__auth_keys.append(public_key) self.ssh_authorized_keys.append(public_key.read_text()) self.__priv_keys.append(private_key) self.ssh_private_key_paths.append(str(private_key)) self.__ssh_keypairs = (*self.__ssh_keypairs, (self.__priv_keys.tail, self.__auth_keys.tail),) self.__priv_key_pref = len(self.__ssh_keypairs) - 1 self.ssh_private_key_path_pref = len(self.__ssh_keypairs) - 1 self.__update_sshd() self.__ssh_keypair_chosen = True def get_keypair(self, preference: int): if not self.__ssh_keypair_chosen: raise Exception if isinstance(self.__ssh_keypairs, SSHKey) and not isinstance(self.__ssh_keypairs(), tuple): raise ValueError if isinstance(self.__ssh_keypairs, SSHKey): if isinstance(self.__ssh_keypairs(), tuple): return self.__ssh_keypairs[preference] else: return self.__ssh_keypairs else: return self.__ssh_keypairs[preference] @property def keys(self) -> SSHKeyCollection: return self.__ssh_keys @property def public_keys(self) -> SSHKeyCollection: return self.__public_keys @property def private_keys(self) -> SSHKeyCollection: return self.__private_keys @property def keypair_preference(self) -> int: return self.__priv_key_pref def prefer_keypair(self, preference: int | RegEx | str): if isinstance(preference, int): if preference < len(self.__ssh_keypairs): self.__priv_key_pref = preference else: raise KeyError elif isinstance(preference, RegEx): count = 0 for keypair in self.__ssh_keypairs: if preference.search(keypair[0]) or preference.search(keypair[1]): self.__priv_key_pref = count count += 1 else: count = 0 for keypair in self.__ssh_keypairs: if preference in str(keypair[0]()) or preference in str(keypair[1]()): self.__priv_key_pref = count count += 1 @property def keypairs(self) -> tuple[SSHKey] | SSHKey | None: return self.__ssh_keypairs @property def keypair(self): kp = self.__ssh_keypairs[self.__priv_key_pref] return kp[0] + kp[1] @property def authorized_keys(self) -> SSHKeyCollection: return self.__auth_keys @property def credential_keys(self) -> SSHKeyCollection: return self.__priv_keys def __repr__(self) -> str: return "%s(username=%r,password=%r,services=%r,uid=%r)" % ( self.__class__.__name__, self.username, self.password, self.services, self.id ) class AnsibleCrypt: def __init__(self, string: str, source: File | None = None): self.__args = (string, source) self.__lock = Vault(string).dump self.__stream = None if source is not None: self.__stream = source.read() self.__data = self.__lock(string, self.__stream) else: self.__data = self.__lock(string) def unlock(self, string: str): unlock = Vault(string).load if self.__stream is not None: result = unlock(self.__stream) else: result = unlock(string) return result def __str__(self): return self.__data def __repr__(self): return "%s(%r, source=%r)" % ( self.__class__.__name__, *self.__args ) class VirtualPrivateServer(YAMLObject): yaml_tag = u"!VirtualPrivateServer" def __init__(self, root: User, api: str, vps: VPS | str = VPS.Linode): self.region: VPSRegion | None = None if vps == VPS.Linode: self.region = VPSRegion.us_east api_key = AnsibleCrypt(api) self.__api_key: AnsibleCrypt = api_key self.api_key: str = str(api_key) self.password: str = root.password self.exists: bool = True self.type: str = vps.name.lower() self.__default_fate: RootFate = RootFate.disposal self.root_fate: str = self.__default_fate.name.lower() self.ssh_authorized_keys: list[str] = root.ssh_authorized_keys self.ssh_private_key_paths: list[str] = root.ssh_private_key_paths self.ssh_private_key_path_pref: int = root.ssh_private_key_path_pref # @TODO add SSH MOTD attribute