refactor: renamed module with utilities for management of software for given users and groups

This commit is contained in:
2026-01-21 09:17:37 -05:00
parent 4eab3bd787
commit aea6638243
2 changed files with 173 additions and 182 deletions

173
servs.py Normal file
View File

@@ -0,0 +1,173 @@
from typing import TypedDict as Dict
from typing import Self, Literal, Never
from collections.abc import Sequence
from custtypes import Roles, Scopes, PathCollection, PathRoles, Software, SoftwareRoles, ExecutedPath, PacMans, UserName, GroupName, IdlePath, ExecutedPath
from pathlib import Path, PurePath
from sshkey import SSHKeyCollection, SSHKeyType, SSHKey
from whereami import PROJ_ROLES
class App:
def __init__(self, name: Software, role: SoftwareRoles = SoftwareRoles.client, paths: PathRoles | None = None):
self.__name = name
# @TODO create dict type hint for below data struct
self.alt_names = dict()
self.alt_names[PacMans.APT] = self.__name
self.role = role
if paths is not None:
if Roles.EXE in paths:
setattr(self, "_" + Roles.EXE.name.lower(), paths[Roles.EXE.name.lower()])
if Roles.CONF in paths:
setattr(self, "_" + Roles.CONF.name.lower(), paths[Roles.CONF.name.lower()])
if Roles.DATA in paths:
setattr(self, "_" + Roles.DATA.name.lower(), paths[Roles.DATA.name.lower()])
self.__parents: tuple[Self] | None = None
self.__children: tuple[Self| None] = []
self.__api: str | None = None
self.__current_filepath: IdlePath | ExecutedPath | None = None
self.__content: str | None = None
@property
def conf_paths(self):
if hasattr(self, "_" + Roles.CONF.name.lower()):
return self._conf
else:
raise Exception
@property
def data_paths(self):
if hasattr(self, "_" + Roles.DATA.name.lower()):
return self._data
else:
raise Exception
@property
def exec_paths(self):
if hasattr(self, "_" + Roles.EXE.name.lower()):
return self._exe
else:
raise Exception
def append(self, datatype: Roles = Roles.CONF, scope: Scopes | None = None, path: IdlePath | ExecutedPath | str | PathCollection | None = None):
if path is None:
raise TypeError
datatype = datatype.name.lower()
if hasattr(self, "_" + datatype):
paths = getattr(self, "_" + datatype)
else:
setattr(self, "_" + datatype, dict())
if scope is not None:
if isinstance(path, str):
path: ExecutedPath = Path(path)
if scope.name.lower() not in paths:
paths[scope.name.lower()] = []
paths[scope.name.lower()].append(path)
else:
paths: PathCollection = path
setattr(self, "_" + datatype, paths)
def inherit(self, other):
if other._App__children is None:
other._App__children = tuple()
if self not in other._App__children:
other.adopt(self)
self.__parents = (*self.__parents, other)
def adopt(self, other: Self):
if other._App__parents is None:
other._App__parents = tuple()
if self not in other._App__parent:
other.inherit(self)
self.__children = (*self.__children, other)
def __enter__(self) -> dict | Sequence:
self.__content = self.__current_filepath.read_text()
return self.__content
def __exit__(self, exc_type, exc_value, exc_traceback) -> None:
# txt = yams.dump(self.__content)
# self.__file.write(txt)
# del txt
self.__file.close()
def __call__(self, path = str, mode = "r+", scope: Scopes = Scopes.PROJ, index: int = 0) -> None:
if not hasattr(self, "_" + Roles.CONF.name.lower()):
raise Exception
conf_coll = self._conf[scope.name.lower()]
if isinstance(conf_coll, Sequence):
conf_coll = conf_coll[index]
if isinstance(conf_coll, str):
conf_coll = Path(conf_coll)
filepath = conf_coll / path
self.__current_filepath = filepath
self.__file = open(str(filepath), mode)
# @TODO write below method to duplicate file or template in local to project role file/template
def clone(self, source_scope: Scopes = Scopes.SYS, target_scope: Scopes = Scopes.PROJ, index: int = 0) -> Never:
raise NotImplementedError
# @TODO rewrite below using DIR_ROOTS var from whereami module
sshd_paths: PathRoles = {
Roles.CONF.name.lower(): {
Scopes.PROJ.name.lower(): [
PROJ_ROLES / "bootstrap" / "files" / "sshd_config.d",
PROJ_ROLES / "bootstrap" / "templates" / "sshd_config.d"
],
}
}
sshd = App(Software.sshd, SoftwareRoles.server, sshd_paths)
class Group:
def __init__(self, group_name: GroupName = GroupName.sudo, gid: int = 27):
self.group_name = group_name
self.id = gid
self.category: Literal["system", "regular"] = "system"
class User:
def __init__(self, username: UserName = UserName.root.name, password: str = "test", services: list = [Software.sshd.name.lower()], uid: int = 0):
self.exists = True
self.username = username
self.id = uid
self.password = password
self.services: tuple = tuple(services)
self.shell = "/bin/bash"
self.home = "/"
self.category: Literal["system", "regular"] = "regular"
group = Group(username, self.id)
self.primary_group = group
self.supp_groups = None
if self.supp_groups is None:
self.is_admin = True
elif isinstance(self.supp_groups, Sequence) and GroupName.sudo in self.supp_groups:
self.is_admin = True
else:
self.is_admin = False
ssh_keys = SSHKeyCollection()
ssh_keys.pull()
self.ssh_keys = ssh_keys
pubkeys = ssh_keys.publish(SSHKeyType.pubkey.name.lower(), datatype=list)
self.__auth_keys: list[str] = list(map(lambda k: k.read_text(), pubkeys))
privkeys = ssh_keys.publish(SSHKeyType.privkey.name.lower(), datatype=list)
self.__priv_keys: list[str] = list(map(lambda k: str(k), privkeys[0]))
self.__priv_key_pref: int = privkeys[1]
self.__apps = (sshd,)
def add_keypair(self, private_key, public_key):
raise NotImplementedError