Compare commits

..

6 Commits

7 changed files with 594 additions and 121 deletions

75
anodes.py Normal file
View File

@@ -0,0 +1,75 @@
from custtypes import AnsibleRoles, AnsibleScopes, ExecutedPath
from whereami import USER_PATH, ANSIBLE_CONFIG, ANSIBLE_ROOTS
from configparser import ConfigParser as cfg
from typing import Sequence
from re import Pattern as RegEx
from pathlib import PurePath
from parse import Parser
# @TODO below class should mostly work as a context manager
class ControlNode:
__parser = Parser()
def __init__(self):
self.__user_path = USER_PATH()
self.__config = cfg()
if ANSIBLE_CONFIG.exists():
self.__config.read(str(ANSIBLE_CONFIG))
else:
raise Exception
self.__data = None
self.__filepath: ExecutedPath | None = None
self.__file = None
def __enter__(self):
self.__file = open(str(self.__filepath), "r+")
self.__data = self.__parser.load(self.__filepath)
return self.__data
def __exit__(self, exc_type, exc_value, exc_traceback):
result = self.__parser.dump(self.__data)
if isinstance(result, str):
self.__file.write(result)
else:
result.write(self.__file)
self.__file.close()
def __call__(self, scope: AnsibleScopes = AnsibleScopes.INVENTORY, pick: int | str | RegEx | AnsibleRoles = 0, filepath: ExecutedPath | str | None = None):
if scope is None:
raise NotImplementedError
else:
paths = ANSIBLE_ROOTS[scope.name.lower()]
if isinstance(paths, Sequence):
path = None
if isinstance(pick, int):
path = paths[pick]
elif isinstance(pick, str):
path = tuple(filter(lambda p: str(p) == pick or pick in str(p), paths))
if len(path) > 0:
path = path[0]
elif isinstance(pick, RegEx):
path = tuple(filter(lambda p: pick.search(str(p)), paths))
if len(path) > 0:
path = path[0]
else:
path = tuple(filter(lambda p: str(p) == pick.name.lower() or pick.name.lower() in str(p), paths))
if len(path) > 0:
path = path[0]
if path is None:
raise KeyError
if isinstance(filepath, ExecutedPath):
filepath = str(filepath)
if path.is_dir():
self.__filepath = path / filepath
elif path.is_file():
self.__filepath = path
return __enter__

View File

@@ -56,8 +56,8 @@ class GroupName(StrEnum):
sudo = auto() sudo = auto()
class Software(StrEnum): class Software(StrEnum):
ssh = auto() openssh_client = auto()
sshd = auto() openssh_server = auto()
class SoftwareRoles(StrEnum): class SoftwareRoles(StrEnum):
client = auto() client = auto()

247
entities.py Normal file
View File

@@ -0,0 +1,247 @@
from typing import Self, Literal, Never, Callable, Sequence
from custtypes import Roles, Scopes, PathCollection, PathRoles, Software, SoftwareRoles, ExecutedPath, PacMans, UserName, GroupName, IdlePath, ExecutedPath
from pathlib import Path, PurePath
from sshkey import SSHKeyCollection, SSHKeyType, SSHKey
from random import choice as gamble
from re import Pattern as RegEx
from softman import sshd
class Group:
def __init__(self, group_name: GroupName = GroupName.sudo, gid: int = 27):
self.group_name = group_name
self.id = gid
self.category: Literal["system", "regular"] = "system"
class User:
def __init__(self, username: UserName | str = UserName.root.name.lower(), password: str = "test", services: list = [Software.openssh_server.name.lower()], uid: int = 0):
self.exists = True
self.username = username
self.id = uid
self.password = password
self.services: tuple = tuple(services)
self.shell = "/bin/bash"
self.home = "/"
self.category: Literal["system", "regular"] = "regular"
group = Group(username, self.id)
self.primary_group = group
self.supp_groups = None
if self.supp_groups is None:
self.is_admin = True
elif isinstance(self.supp_groups, Sequence) and GroupName.sudo in self.supp_groups:
self.is_admin = True
else:
self.is_admin = False
ssh_keys = SSHKeyCollection()
ssh_keys.pull()
self.__ssh_keys = ssh_keys
self.__public_keys = self.__ssh_keys.publish(SSHKeyType.pubkey, datatype=list)
self.__private_keys = self.__ssh_keys.publish(SSHKeyType.privkey, datatype=list)[0]
# print("here")
pubkeys = ssh_keys.publish(SSHKeyType.pubkey, datatype=list)
self.__auth_keys: SSHKeyCollection = SSHKeyCollection()
for p in pubkeys:
self.__auth_keys.append(p)
privkeys = ssh_keys.publish(SSHKeyType.privkey, datatype=list)
self.__priv_keys: SSHKeyCollection = SSHKeyCollection()
for p in privkeys[0]:
self.__priv_keys.append(p)
self.__priv_key_pref: int = privkeys[1]
self.__apps = (sshd,)
self.__ssh_keypairs: tuple[SSHKey | tuple[SSHKey]] | SSHKey = tuple()
self.__ssh_keypair_chosen = False
def get_app(self, name: Software) -> Never:
raise NotImplementedError
def update_app(self, name: Software, attr: str, method = None) -> Never:
raise NotImplementedError
def __update_sshd(self, app: Software = Software.openssh_server):
for a in self.__apps:
if a.alt_names[PacMans.APT.name.lower()] == app.name.lower():
if hasattr(a, "users"):
users = getattr(a, "users")
if self.username not in users:
users[self.username] = dict()
users[self.username]["auth_keys"] = self.__auth_keys
users[self.username]["priv_keys"] = self.__priv_keys
users[self.username]["keys"] = self.__ssh_keys
users[self.username]["keypairs"] = self.__ssh_keypairs
users[self.username]["preferred_priv_key"] = self.__priv_key_pref
setattr(self, "users", users)
else:
users = {
self.username: {
"auth_keys": self.__auth_keys,
"priv_keys": self.__priv_keys,
"keypairs": self.__ssh_keypairs,
"keys": self.__ssh_keys
}
}
a.declare(users = users)
else:
continue
def add_keypair(self, private_key: SSHKey | ExecutedPath | str | int | RegEx, public_key: SSHKey | ExecutedPath | str | int | RegEx, from_host = True):
if not self.__ssh_keypair_chosen:
self.__priv_keys = SSHKeyCollection()
self.__auth_keys = SSHKeyCollection()
if from_host:
pubkeys = self.__ssh_keys.publish(SSHKeyType.pubkey, datatype=list)
# print(pubkeys)
if isinstance(public_key, int):
public_key = pubkeys[public_key]
elif isinstance(public_key, SSHKey):
public_key = tuple(filter(lambda k: str(k) == str(public_key()), pubkeys))
if len(public_key) > 0:
public_key = public_key[0]
else:
public_key = None
elif isinstance(public_key, str):
public_key = tuple(filter(lambda k: str(k) == public_key or public_key in str(k), pubkeys))
if len(public_key) > 0:
public_key = public_key[0]
else:
public_key = None
elif isinstance(public_key, RegEx):
public_key = tuple(filter(lambda k: public_key.search(str(k)), pubkeys))
if len(public_key) > 0:
public_key = public_key[0]
else:
public_key = None
else:
public_key = tuple(filter(lambda k: str(k) == str(public_key), pubkeys))
if len(public_key) > 0:
public_key = public_key[0]
else:
public_key = None
privkeys = self.__ssh_keys.publish(SSHKeyType.privkey, datatype=list)[0]
if isinstance(private_key, int):
private_key = privkeys[private_key]
elif isinstance(private_key, SSHKey):
private_key = tuple(filter(lambda k: str(k) == str(private_key()), privkeys))
if len(private_key) > 0:
private_key = private_key[0]
else:
private_key = None
elif isinstance(private_key, str):
private_key = tuple(filter(lambda k: str(k) == private_key or private_key in str(k), privkeys))
if len(private_key) > 0:
private_key = private_key[0]
else:
private_key = None
elif isinstance(private_key, RegEx):
private_key = tuple(filter(lambda k: private_key.search(str(k)), privkeys))
if len(private_key) > 0:
private_key = private_key[0]
else:
private_key = None
else:
private_key = tuple(filter(lambda k: str(k) == str(private_key), privkeys))
if len(private_key) > 0:
private_key = private_key[0]
else:
private_key = None
else:
if isinstance(public_key, SSHKey):
public_key = public_key()
elif isinstance(public_key, str):
public_key = Path(public_key)
self.__ssh_keys.append(public_key)
if isinstance(private_key, SSHKey):
private_key = private_key()
elif isinstance(private_key, str):
private_key = Path(private_key)
self.__ssh_keys.append(private_key)
if private_key is None or public_key is None:
raise KeyError
self.__auth_keys.append(public_key)
self.__priv_keys.append(private_key)
self.__ssh_keypairs = (*self.__ssh_keypairs, (self.__priv_keys.tail, self.__auth_keys.tail),)
self.__priv_key_pref = len(self.__ssh_keypairs) - 1
self.__update_sshd()
self.__ssh_keypair_chosen = True
def get_keypair(self, preference: int):
if not self.__ssh_keypair_chosen:
raise Exception
if isinstance(self.__ssh_keypairs, SSHKey) and not isinstance(self.__ssh_keypairs(), tuple):
raise ValueError
if isinstance(self.__ssh_keypairs, SSHKey):
if isinstance(self.__ssh_keypairs(), tuple):
return self.__ssh_keypairs[preference]
else:
return self.__ssh_keypairs
else:
return self.__ssh_keypairs[preference]
@property
def available_keys(self) -> SSHKeyCollection:
return self.__ssh_keys
@property
def public_keys(self) -> SSHKeyCollection:
return self.__public_keys
@property
def private_keys(self) -> SSHKeyCollection:
return self.__private_keys
@property
def keypair_preference(self) -> int:
return self.__priv_key_pref
def prefer_keypair(self, preference: int | RegEx | str):
if isinstance(preference, int):
if preference < len(self.__ssh_keypairs):
self.__priv_key_pref = preference
else:
raise KeyError
elif isinstance(preference, RegEx):
count = 0
for keypair in self.__ssh_keypairs:
if preference.search(keypair[0]) or preference.search(keypair[1]):
self.__priv_key_pref = count
count += 1
else:
count = 0
for keypair in self.__ssh_keypairs:
if preference in str(keypair[0]()) or preference in str(keypair[1]()):
self.__priv_key_pref = count
count += 1
@property
def available_keypairs(self) -> tuple[SSHKey] | SSHKey | None:
return self.__ssh_keypairs
@property
def keypair(self):
kp = self.__ssh_keypairs[self.__priv_key_pref]
return kp[0] + kp[1]
@property
def authorized_keys(self) -> SSHKeyCollection:
return self.__auth_keys
@property
def credential_keys(self) -> SSHKeyCollection:
return self.__priv_keys

82
parse.py Normal file
View File

@@ -0,0 +1,82 @@
from pathlib import Path
import yaml as yams
from configparser import ConfigParser as cfg
from custtypes import ExecutedPath
from re import compile as rgx, IGNORECASE
class Parser:
def __init__(self):
self.__is_yaml = rgx(r"\.ya?ml$", flags=IGNORECASE).match
self.__is_ini = rgx(r"\.ini$", flags=IGNORECASE).match
self.__is_config = rgx(r"\.cfg$", flags=IGNORECASE).match
self.__is_json = rgx(r"\.[jb]son$", flags=IGNORECASE).match
self.__data = None
self.__content: str | None = None
self.__is_path: bool = False
# @TODO use Enum child class for below type hint instead
self.__method: Literal["yaml", "config"] | None = None
self.__file: ExecutedPath | None = None
def load(self, filepath: ExecutedPath | str, **kwargs):
if isinstance(filepath, ExecutedPath):
self.__is_path = True
self.__file = filepath
filepath = str(filepath)
else:
self.__is_path = False
if Path(filepath).exists():
self.__file = Path(filepath)
else:
self.__content = filepath
if self.__is_yaml(filepath):
self.__method = "yaml"
if len(kwargs) > 0:
self.__data = yams.load(filepath, Loader=yams.Loader, **kwargs)
else:
self.__data = yams.load(filepath, Loader=yams.Loader)
elif self.__is_config(filepath):
self.__method = "config"
self.__data = cfg()
if self.__is_path:
if len(kwargs) > 0:
self.__data.read(filepath, **kwargs)
else:
self.__data.read(filepath)
else:
if len(kwargs) > 0:
self.__data.read_string(filepath, **kwargs)
else:
self.__data.read_string(filepath, **kwargs)
else:
raise TypeError
return self.__data
def dump(self, obj = None, **kwargs):
if self.__method == "yaml":
if obj is None:
obj = self.__data
if len(kwargs) > 0:
self.__content = yams.dump(obj, Dumper=yams.Dumper, **kwargs)
else:
self.__content = yams.dump(obj, Dumper=yams.Dumper)
elif self.__method == "config":
if obj is None:
if self.__is_path:
return self.__file
else:
if self.__file is None:
return self.__content
else:
return self.__file
else:
if isinstance(obj, ConfigParser):
return obj
else:
raise TypeError
else:
raise TypeError
return self.__content

View File

@@ -1,18 +1,18 @@
from typing import TypedDict as Dict from typing import Self, Never, Callable, Sequence
from typing import Self, Literal, Never from custtypes import Roles, Scopes, PathCollection, PathRoles
from collections.abc import Sequence from custtypes import Software, SoftwareRoles, ExecutedPath
from custtypes import Roles, Scopes, PathCollection, PathRoles, Software, SoftwareRoles, ExecutedPath, PacMans, UserName, GroupName, IdlePath, ExecutedPath from custtypes import PacMans, UserName, GroupName, IdlePath
from pathlib import Path, PurePath from custtypes import ExecutedPath, AnsibleRoles
from sshkey import SSHKeyCollection, SSHKeyType, SSHKey from sshkey import SSHKeyCollection, SSHKeyType, SSHKey
from whereami import PROJ_ROLES from whereami import PROJ_ROLES
class App: class App:
def __init__(self, name: Software, role: SoftwareRoles = SoftwareRoles.client, paths: PathRoles | None = None): def __init__(self, name: Software, role: SoftwareRoles = SoftwareRoles.client, paths: PathRoles | None = None):
self.__name = name self.__name = name.name.lower().replace("_", "-")
# @TODO create dict type hint for below data struct # @TODO create dict type hint for below data struct
self.alt_names = dict() self.alt_names = dict()
self.alt_names[PacMans.APT] = self.__name self.alt_names[PacMans.APT.name.lower()] = self.__name
self.role = role self.role = role.name.lower()
if paths is not None: if paths is not None:
if Roles.EXE in paths: if Roles.EXE in paths:
setattr(self, "_" + Roles.EXE.name.lower(), paths[Roles.EXE.name.lower()]) setattr(self, "_" + Roles.EXE.name.lower(), paths[Roles.EXE.name.lower()])
@@ -20,6 +20,8 @@ class App:
setattr(self, "_" + Roles.CONF.name.lower(), paths[Roles.CONF.name.lower()]) setattr(self, "_" + Roles.CONF.name.lower(), paths[Roles.CONF.name.lower()])
if Roles.DATA in paths: if Roles.DATA in paths:
setattr(self, "_" + Roles.DATA.name.lower(), paths[Roles.DATA.name.lower()]) setattr(self, "_" + Roles.DATA.name.lower(), paths[Roles.DATA.name.lower()])
if Roles.MEM in paths:
setattr(self, "_" + Roles.MEM.name.lower(), paths[Roles.MEM.name.lower()])
self.__parents: tuple[Self] | None = None self.__parents: tuple[Self] | None = None
self.__children: tuple[Self| None] = [] self.__children: tuple[Self| None] = []
self.__api: str | None = None self.__api: str | None = None
@@ -47,6 +49,19 @@ class App:
else: else:
raise Exception raise Exception
@property
def mem_paths(self):
if hasattr(self, "_" + Roles.MEM.name.lower()):
return self._mem
else:
raise Exception
def get_paths(self, role: Roles = Roles.CONF):
if hasattr(self, "_" + role.name.lower()):
return getattr(self, "_" + role.name.lower())
else:
raise Exception
def append(self, datatype: Roles = Roles.CONF, scope: Scopes | None = None, path: IdlePath | ExecutedPath | str | PathCollection | None = None): def append(self, datatype: Roles = Roles.CONF, scope: Scopes | None = None, path: IdlePath | ExecutedPath | str | PathCollection | None = None):
if path is None: if path is None:
raise TypeError raise TypeError
@@ -57,6 +72,7 @@ class App:
paths = getattr(self, "_" + datatype) paths = getattr(self, "_" + datatype)
else: else:
setattr(self, "_" + datatype, dict()) setattr(self, "_" + datatype, dict())
paths = getattr(self, "_" + datatype)
if scope is not None: if scope is not None:
if isinstance(path, str): if isinstance(path, str):
@@ -99,11 +115,13 @@ class App:
# del txt # del txt
self.__file.close() self.__file.close()
def __call__(self, path = str, mode = "r+", scope: Scopes = Scopes.PROJ, index: int = 0) -> None: def __call__(self, path = str, mode = "r+", scope: Scopes = Scopes.PROJ, role: Roles = Roles.CONF, index: int = 0) -> Callable:
if not hasattr(self, "_" + Roles.CONF.name.lower()): if not hasattr(self, "_" + role.name.lower()):
raise Exception raise Exception
else:
config = getattr(self, "_" + role.name.lower())
conf_coll = self._conf[scope.name.lower()] conf_coll = config[scope.name.lower()]
if isinstance(conf_coll, Sequence): if isinstance(conf_coll, Sequence):
conf_coll = conf_coll[index] conf_coll = conf_coll[index]
@@ -116,58 +134,24 @@ class App:
self.__current_filepath = filepath self.__current_filepath = filepath
self.__file = open(str(filepath), mode) self.__file = open(str(filepath), mode)
return __enter__
# @TODO write below method to duplicate file or template in local to project role file/template # @TODO write below method to duplicate file or template in local to project role file/template
def clone(self, source_scope: Scopes = Scopes.SYS, target_scope: Scopes = Scopes.PROJ, index: int = 0) -> Never: def clone(self, source_scope: Scopes = Scopes.SYS, target_scope: Scopes = Scopes.PROJ, index: int = 0) -> Never:
raise NotImplementedError raise NotImplementedError
def declare(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
sshd_proj_files = map(lambda r: r / AnsibleRoles.bootstrap.name.lower() / "files" / "sshd_config.d", PROJ_ROLES)
sshd_proj_files = list(filter(lambda p: p.exists(), sshd_proj_files))
sshd_proj_templates = map(lambda r: r / AnsibleRoles.bootstrap.name.lower() / "templates" / "sshd_config.d", PROJ_ROLES)
sshd_proj_templates = list(filter(lambda p: p.exists(), sshd_proj_templates))
# @TODO rewrite below using DIR_ROOTS var from whereami module # @TODO rewrite below using DIR_ROOTS var from whereami module
sshd_paths: PathRoles = { sshd_paths: PathRoles = {
Roles.CONF.name.lower(): { Roles.CONF.name.lower(): {
Scopes.PROJ.name.lower(): [ Scopes.PROJ.name.lower(): sshd_proj_files + sshd_proj_templates
PROJ_ROLES / "bootstrap" / "files" / "sshd_config.d",
PROJ_ROLES / "bootstrap" / "templates" / "sshd_config.d"
],
} }
} }
sshd = App(Software.sshd, SoftwareRoles.server, sshd_paths) sshd = App(Software.openssh_server, SoftwareRoles.server, sshd_paths)
class Group:
def __init__(self, group_name: GroupName = GroupName.sudo, gid: int = 27):
self.group_name = group_name
self.id = gid
self.category: Literal["system", "regular"] = "system"
class User:
def __init__(self, username: UserName = UserName.root.name, password: str = "test", services: list = [Software.sshd.name.lower()], uid: int = 0):
self.exists = True
self.username = username
self.id = uid
self.password = password
self.services: tuple = tuple(services)
self.shell = "/bin/bash"
self.home = "/"
self.category: Literal["system", "regular"] = "regular"
group = Group(username, self.id)
self.primary_group = group
self.supp_groups = None
if self.supp_groups is None:
self.is_admin = True
elif isinstance(self.supp_groups, Sequence) and GroupName.sudo in self.supp_groups:
self.is_admin = True
else:
self.is_admin = False
ssh_keys = SSHKeyCollection()
ssh_keys.pull()
self.ssh_keys = ssh_keys
pubkeys = ssh_keys.publish(SSHKeyType.pubkey.name.lower(), datatype=list)
self.__auth_keys: list[str] = list(map(lambda k: k.read_text(), pubkeys))
privkeys = ssh_keys.publish(SSHKeyType.privkey.name.lower(), datatype=list)
self.__priv_keys: list[str] = list(map(lambda k: str(k), privkeys[0]))
self.__priv_key_pref: int = privkeys[1]
self.__apps = (sshd,)
def add_keypair(self, private_key, public_key):
raise NotImplementedError

163
sshkey.py
View File

@@ -37,7 +37,34 @@ class SSHKey:
def __int__(self) -> int: def __int__(self) -> int:
return self.__idx return self.__idx
def update_status(self) -> None:
if isinstance(self.__value, tuple):
privkey_present = False
pubkey_present = False
for p in self.__value:
if "-----BEGIN OPENSSH PRIVATE KEY-----" in p.read_text():
privkey_present = True
else:
pubkey_present = True
if pubkey_present and privkey_present:
self.category = SSHKeyType.dual.name.lower()
elif pubkey_present or privkey_present:
if pubkey_present:
self.category = SSHKeyType.pubkey.name.lower()
if privkey_present:
self.category = SSHKeyType.privkey.name.lower()
elif isinstance(self.__value, ExecutedPath):
if "-----BEGIN OPENSSH PRIVATE KEY-----" in self.__value.read_text():
self.category = SSHKeyType.privkey.name.lower()
else:
self.category = SSHKeyType.pubkey.name.lower()
def __str__(self) -> str: def __str__(self) -> str:
if isinstance(self.__value, tuple):
key_basename = Path(str(self.__value[0])).stem + "." + self.category
else:
key_basename = Path(str(self.__value)).name key_basename = Path(str(self.__value)).name
return "🔑" + key_basename return "🔑" + key_basename
@@ -107,41 +134,75 @@ class SSHKey:
return self return self
def __add__(self, other: Self | ExecutedPath | str): def __add__(self, other: Self | ExecutedPath | str) -> Self:
if isinstance(self.__value, tuple):
raise ValueError
if isinstance(other, (str, ExecutedPath)): if isinstance(other, (str, ExecutedPath)):
result = self.update(self.__value, other)
else:
if isinstance(other.__SSHKey__value, tuple):
raise ValueError
result = self.update(self.__value, other._SSHKey__value)
return result
def __radd__(self, other: Self | ExecutedPath | str):
if isinstance(self.__value, tuple): if isinstance(self.__value, tuple):
raise ValueError self.update(*self.__value, other)
if isinstance(other, (str, ExecutedPath)):
result = self.update(other, self.__value)
else: else:
if isinstance(other.__SSHKey__value, tuple): self.update(*self.__value, other)
raise ValueError else:
if isinstance(other._SSHKey__value, tuple):
if isinstance(self.__value, tuple):
self.update(*self.__value, *other._SSHKey__value)
else:
self.update(self.__value, *other._SSHKey__value)
else:
if isinstance(self.__value, tuple):
self.update(*self.__value, other._SSHKey__value)
else:
self.update(self.__value, other._SSHKey__value)
result = self.update(other._SSHKey__value, self.__value) self.update_status()
return result return self
def __radd__(self, other: Self | ExecutedPath | str) -> Self:
if isinstance(self.__value, tuple):
if isinstance(other, (ExecutedPath, str)):
other.update(other, *self.__value)
else:
if isinstance(other._SSHKey__value, tuple):
other.update(*other._SSHKey__value, *self.__value)
else:
other.update(other._SSHKey__value, *self.__value)
else:
if isinstance(other, (ExecutedPath, str)):
other.update(other, self.__value)
else:
if isinstance(other._SSHKey__value, tuple):
other.update(*other._SSHKey__value, self.__value)
else:
other.update(other._SSHKey__value, self.__value)
other.update_status()
return self
# @TODO write following 2 subtraction algorithms using 'set' data type conversion and methods # @TODO write following 2 subtraction algorithms using 'set' data type conversion and methods
def __sub__(self, other: Self | ExecutedPath | str): def __sub__(self, other: Self | ExecutedPath | str) -> Never:
raise NotImplementedError raise NotImplementedError
def __rsub__(self, other: Self | ExecutedPath | str): def __rsub__(self, other: Self | ExecutedPath | str) -> Never:
raise NotImplementedError raise NotImplementedError
def __getitem__(self, key: int) -> ExecutedPath | str | Never:
if isinstance(self.__value, tuple):
return self.__value[key]
else:
raise KeyError
def __setitem__(self, key: int, value: ExecutedPath | str) -> None:
if isinstance(self.__value, tuple):
new_entry = list(self.__value)
if isinstance(value, str):
value = Path(value)
new_entry[key] = value
self.__value = tuple(new_entry)
else:
raise KeyError
def replace(self, old: ExecutedPath | str | tuple[ExecutedPath | str] | list[ExecutedPath | str], new: ExecutedPath | str | tuple[ExecutedPath | str] | list[ExecutedPath | str]) -> Self | Never: def replace(self, old: ExecutedPath | str | tuple[ExecutedPath | str] | list[ExecutedPath | str], new: ExecutedPath | str | tuple[ExecutedPath | str] | list[ExecutedPath | str]) -> Self | Never:
if isinstance(old, str): if isinstance(old, str):
old = Path(old) old = Path(old)
@@ -206,29 +267,6 @@ class SSHKey:
return result return result
def update_status(self) -> None:
if isinstance(self.__value, tuple):
privkey_present = False
pubkey_present = False
for p in self.__value:
if "-----BEGIN OPENSSH PRIVATE KEY-----" in p.read_text():
privkey_present = True
else:
pubkey_present = True
if pubkey_present and privkey_present:
self.category = SSHKeyType.dual.name.lower()
elif pubkey_present or privkey_present:
if pubkey_present:
self.category = SSHKeyType.pubkey.name.lower()
if privkey_present:
self.category = SSHKeyType.privkey.name.lower()
elif isinstance(self.__value, ExecutedPath):
if "-----BEGIN OPENSSH PRIVATE KEY-----" in self.__value.read_text():
self.category = SSHKeyType.privkey.name.lower()
else:
self.category = SSHKeyType.pubkey.name.lower()
@property @property
def status(self) -> str: def status(self) -> str:
self.update_status() self.update_status()
@@ -265,7 +303,7 @@ def stream_unequal(s1, s2) -> bool | Never:
# @TODO create unit tests for below class # @TODO create unit tests for below class
class SSHKeyCollection(Sequence): class SSHKeyCollection(Sequence):
__user_path: ExecutedPath = USER_PATH __user_path: ExecutedPath = USER_PATH()
__ssh_path: ExecutedPath = __user_path / ".ssh" __ssh_path: ExecutedPath = __user_path / ".ssh"
def __init__(self): def __init__(self):
@@ -502,6 +540,8 @@ class SSHKeyCollection(Sequence):
return self.__last return self.__last
def __contains__(self, value: ExecutedPath | str) -> bool: def __contains__(self, value: ExecutedPath | str) -> bool:
self.__current = self.__first
if isinstance(value, ExecutedPath): if isinstance(value, ExecutedPath):
value = str(value) value = str(value)
@@ -522,6 +562,7 @@ class SSHKeyCollection(Sequence):
self.__current = next(self.__current) self.__current = next(self.__current)
if self.__current is not None: if self.__current is not None:
return self.__current return self.__current
else:
raise StopIteration raise StopIteration
def __iter__(self) -> Self | Never: def __iter__(self) -> Self | Never:
@@ -562,7 +603,7 @@ class SSHKeyCollection(Sequence):
self.__current = self.__first self.__current = self.__first
concat = lambda s: str(s)[1:] + ", " concat = lambda s: str(s) + ", "
content = str() content = str()
count = 0 count = 0
while self.__current is not None: while self.__current is not None:
@@ -573,13 +614,17 @@ class SSHKeyCollection(Sequence):
return prefix + content + postfix return prefix + content + postfix
def publish(self, category: SSHKeyType = SSHKeyType.pubkey.name.lower(), pref: int | None = None, datatype = dict): def index(self, item: str | ExecutedPath) -> Never:
raise NotImplementedError
def publish(self, category: SSHKeyType | str = SSHKeyType.pubkey, pref: int | None = None, datatype = dict):
privkey = list() privkey = list()
pubkey = list() pubkey = list()
self.__current = self.__first self.__current = self.__first
if datatype == list: if datatype == list:
while self.__current is not None: while self.__current is not None:
# print(self.__current)
if self.__current.category == SSHKeyType.privkey.name.lower(): if self.__current.category == SSHKeyType.privkey.name.lower():
privkey.append(self.__current._SSHKey__value) privkey.append(self.__current._SSHKey__value)
elif self.__current.category == SSHKeyType.pubkey.name.lower(): elif self.__current.category == SSHKeyType.pubkey.name.lower():
@@ -587,16 +632,18 @@ class SSHKeyCollection(Sequence):
elif self.__current.category == SSHKeyType.dual.name.lower(): elif self.__current.category == SSHKeyType.dual.name.lower():
privkey.append(self.__current._SSHKey__value[0]) privkey.append(self.__current._SSHKey__value[0])
pubkey.append(self.__current._SSHKey__value[1]) pubkey.append(self.__current._SSHKey__value[1])
self.__current = next(self.__first) self.__current = next(self.__current)
# print("publish running...")
if pref is None: if pref is None:
preference = gamble(range(len(privkey))) preference = gamble(range(len(privkey)))
else: else:
preference = pref preference = pref
if category == SSHKeyType.pubkey.name.lower(): # print(category)
if category.name.lower() == SSHKeyType.pubkey.name.lower():
return pubkey return pubkey
elif category == SSHKeyType.privkey.name.lower(): elif category.name.lower() == SSHKeyType.privkey.name.lower():
return (privkey, preference) return (privkey, preference)
else: else:
return (privkey, pubkey, preference) return (privkey, pubkey, preference)
@@ -604,6 +651,7 @@ class SSHKeyCollection(Sequence):
result = dict() result = dict()
while self.__current is not None: while self.__current is not None:
# print(self.__current)
if self.__current.category == SSHKeyType.privkey.name.lower(): if self.__current.category == SSHKeyType.privkey.name.lower():
privkey.append(str(self.__current._SSHKey__value)) privkey.append(str(self.__current._SSHKey__value))
elif self.__current.category == SSHKeyType.pubkey.name.lower(): elif self.__current.category == SSHKeyType.pubkey.name.lower():
@@ -611,16 +659,17 @@ class SSHKeyCollection(Sequence):
elif self.__current.category == SSHKeyType.dual.name.lower(): elif self.__current.category == SSHKeyType.dual.name.lower():
privkey.append(str(self.__current._SSHKey__value[0])) privkey.append(str(self.__current._SSHKey__value[0]))
pubkey.append(self.__current._SSHKey__value[1].read_text()) pubkey.append(self.__current._SSHKey__value[1].read_text())
self.__current = next(self.__first) self.__current = next(self.__current)
# print("publish running...")
if category == SSHKeyType.pubkey.name.lower(): if category.name.lower() == SSHKeyType.pubkey.name.lower():
result["ssh_authorized_keys"]: list[str] = pubkey result["ssh_authorized_keys"]: list[str] = pubkey
if category == SSHKeyType.privkey.name.lower(): if category.name.lower() == SSHKeyType.privkey.name.lower():
result["ssh_private_key_paths"]: list[str] = privkey result["ssh_private_key_paths"]: list[str] = privkey
result["ssh_private_key_path_pref"]: int = pref if pref is not None else gamble(range(len(privkey))) result["ssh_private_key_path_pref"]: int = pref if pref is not None else gamble(range(len(privkey)))
if category == SSHKeyType.dual.name.lower(): if category.name.lower() == SSHKeyType.dual.name.lower():
result["ssh_authorized_keys"]: list[str] = pubkey result["ssh_authorized_keys"]: list[str] = pubkey
result["ssh_private_key_paths"]: list[str] = privkey result["ssh_private_key_paths"]: list[str] = privkey
result["ssh_private_key_path_pref"]: int = pref if pref is not None else gamble(range(len(privkey))) result["ssh_private_key_path_pref"]: int = pref if pref is not None else gamble(range(len(privkey)))

View File

@@ -2,25 +2,61 @@
Library of path constants to be used or referenced elsewhere. Library of path constants to be used or referenced elsewhere.
""" """
from custtypes import ExecutedPath, Roles, Scopes, AnsibleScopes from custtypes import ExecutedPath, Roles, Scopes, AnsibleScopes, NodeType, UserName
from pathlib import Path from pathlib import Path
from configparser import ConfigParser as cfg
from itertools import chain
from typing import Callable
USER_PATH: ExecutedPath = Path.home() def get_home(node: NodeType = NodeType.control, home: UserName | str | None = None) -> ExecutedPath:
if node == NodeType.control:
return Path.home()
else:
if home is None:
return Path("~")
else:
if isinstance(home, UserName):
return Path("/home") / home.name
else:
return Path(home)
USER_PATH: Callable = get_home
PROJ_ROOT: ExecutedPath = Path(__file__).parent.resolve() PROJ_ROOT: ExecutedPath = Path(__file__).parent.resolve()
PROJ_ROLES: ExecutedPath = PROJ_ROOT / "roles"
config = cfg()
ANSIBLE_CONFIG = PROJ_ROOT / "ansible.cfg"
if ANSIBLE_CONFIG.exists():
config.read(str(ANSIBLE_CONFIG))
role_candidates = config["defaults"]["roles_path"].split(":")
role_candidates = map(lambda s: PROJ_ROOT / s, role_candidates)
role_paths = list(filter(lambda p: p.exists(), role_candidates))
inv_candidates = config["defaults"]["inventory"].split(",")
inv_candidates = map(lambda s: PROJ_ROOT / s, inv_candidates)
inv_paths = list(filter(lambda p: p.exists(), inv_candidates))
else:
role_paths = [PROJ_ROOT / "roles"]
inv_paths = PROJ_ROOT.glob("hosts*.*")
PROJ_ROLES: list[ExecutedPath] = role_paths
PROJ_INVENTORIES: list[ExecutedPath] = inv_paths
proj_paths = map(lambda r: r.glob("**/files"), PROJ_ROLES)
proj_paths = list(chain.from_iterable(proj_paths))
template_paths = map(lambda r: r.glob("**/templates"), PROJ_ROLES)
template_paths = list(chain.from_iterable(template_paths))
APP_ROOTS = { APP_ROOTS = {
Roles.CONF.name.lower(): { Roles.CONF.name.lower(): {
Scopes.SYS.name.lower(): Path("/etc"), Scopes.SYS.name.lower(): Path("/etc"),
Scopes.USER.name.lower(): USER_PATH / ".config", Scopes.USER.name.lower(): USER_PATH(NodeType.remote) / ".config",
Scopes.SHARED.name.lower(): Path("/usr/share"), Scopes.SHARED.name.lower(): Path("/usr/share"),
Scopes.PROJ.name.lower(): tuple(list(PROJ_ROLES.glob("**/files")) + list(PROJ_ROLES.glob("**/templates"))) Scopes.PROJ.name.lower(): tuple(proj_paths + template_paths)
} }
} }
ANSIBLE_ROOTS = { ANSIBLE_ROOTS = {
AnsibleScopes.GROUPVARS.name.lower(): PROJ_ROOT / "group_vars", AnsibleScopes.GROUPVARS.name.lower(): PROJ_ROOT / "group_vars",
AnsibleScopes.HOSTVARS.name.lower(): PROJ_ROOT / "host_vars", AnsibleScopes.HOSTVARS.name.lower(): PROJ_ROOT / "host_vars",
AnsibleScopes.INVENTORY.name.lower(): None, AnsibleScopes.INVENTORY.name.lower(): tuple(PROJ_INVENTORIES),
AnsibleScopes.ROLE.name.lower(): None AnsibleScopes.ROLE.name.lower(): tuple(PROJ_ROLES)
} }