started creation of classes representing/modeling Ansible remote and control nodes

This commit is contained in:
2025-12-25 09:39:19 -05:00
parent db426eff81
commit c8fc487996

207
anodes.py Normal file
View File

@@ -0,0 +1,207 @@
"""
Library of classes modeling Ansible nodes or their types.
"""
from pathlib import Path, PurePath
from typing import TypeAlias as Neotype
from typing import TypedDict as Dict
from typing import Never, Union, Literal
from collections.abc import Callable
from custtypes import ExecutedPath, IdlePath
from enum import Enum
from softman import Software, SoftPathGroup, SoftScope
from whereami import USER_PATH, PROJ_ROOT
# @TODO use below 2 imports in 'get_head_user' function to grab username of highest-privileged login user
from os import name as shell_type
# from os import environ as env_vars
class ControlNode:
__user_path: ExecutedPath = USER_PATH
__proj_root: ExecutedPath = PROJ_ROOT
__conf_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath("/etc"),
PurePath("/usr", "local", "etc"),
PurePath(str(__user_path), ".config")
)
__data_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath("/usr", "local", "share"),
PurePath(str(__user_path), ".local", "share")
)
def __init__(self, ansible__proj_root: ExecutedPath | None):
if ansible_root is not None:
self.__proj_root = ansible_proj_root
@property
def user_path(self) -> ExecutedPath:
return self.__user_path
@property
def proj_root(self) -> ExecutedPath:
return self.__proj_root
@property
def conf_paths(self) -> ExecutedPath:
return self.__conf_paths
@property
def conf_paths(self) -> ExecutedPath:
return self.__data_paths
class Softs(Enum):
OpenSSH = "ssh"
userSSHParams = Dict("userSSHParams", {
"username": Required[str],
"paths": Required[Software._Apps],
"keys": {
"available": Required[tuple[ExecutedPath]],
"selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]],
"authorized": ExecutedPath | list[ExecutedPath] | int | list[int],
"used": ExecutedPath | list[ExecutedPath] | int | list[int]
},
"fate": Literal["disposal", "retention"]
}, total=False)
__user_ssh_input = {
"username": "",
"paths": None,
"keys": {
"available": tuple(),
"selected": [0, 1],
"authorized": 1,
"used": 0
},
"fate": "disposal"
}
userSSH = type("userSSH", (), **__user_ssh_input)
# @TODO continue to write below function
def get_head_user():
if shell_type == "nt":
raise NotImplementedError
else:
raise NotImplementedError
class RemoteNode:
# __user_path =
_fqdn: str | None = None
def __init__(self, cnode: ControlNode, name: str | None, keywords: list[str] | None = None):
self.root: dict = dict()
self.root["username"]: str = "root"
self.root["software"]: Software = Software()
app_input = {
SoftPathGroup.CONFIG.name: {
SoftScope.PERSONAL.name: PurePath(str(cnode.user_path), ("." + Softs.OpenSSH))
},
SoftPathGroup.DATA.name: {
SoftScope.GLOBAL.name: PurePath(str(cnode.conf_paths[0]), "update-motd.d")
}
}
self.root["software"].declare(Softs.OpenSSH, **app_input)
if name is not None:
self._fqdn = name
self.root["software"]._fqdn = name
self.apps: list = self.root["software"].list(contents = True)
self.keywords = keywords
def import_keys(self, key_basenames: str | tuple[str], match_sort: tuple[Callable, bool] = (lambda e: e, False)):
keyfiles: list[ExecutedPath] | list[None] | None = list()
if isinstance(key_basenames, tuple):
for basename in key_basenames:
keyfiles += Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(basename)
elif isinstance(key_basenames, str):
keyfiles = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(key_basenames)
else:
raise ValueError
updated_keyfiles: list[ExecutedPath] = []
for filename in keyfiles:
new_keyfile = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])) / str(filename)
updated_keyfiles.append(new_keyfile.resolve())
keyfiles = updated_keyfiles
root_ssh_input: userSSHParams = {
"username": "root",
"paths": self.root["software"].ssh,
"keys": {
"available": tuple(keyfiles.sort(reverse = match_sort[1], key = match_sort[0]))
}
}
self.ssh: userSSH = userSSH(**root_ssh_input)
def show_keys(self, which: Literal["authorized", "used", "available", "selected"] = "available") -> tuple[tuple, str]:
delimiters: str | tuple = "{}"
gap: Callable[[bool], str | None] = lambda b: " " if b else ""
sep_format: str = "{0}"
sep: Callable[[str, bool], str] = lambda s, b: sep_format.format(s) + gap(b)
label_format: str = "{0}:"
label: Callable[[str, bool], str] = lambda s, b: label_format.format(s) + gap(b)
def render(kfs, source: str):
if isinstance(kfs, list):
member_ints: list = list()
for f in kfs:
if isinstance(f, int):
members_ints.append(f)
if len(member_ints) > 0:
kfs = [self.ssh.keys[source][i] for i in kfs]
elif isinstance(kfs, int):
kfs = [self.ssh.keys[source][kfs]]
elif isinstance(kfs, ExecutedPath):
kfs = [self.ssh.keys[source]]
return kfs
keyfiles = self.ssh.keys[which]
if which == "selected":
keyfiles = render(keyfiles, "available")
if which == "authorized":
keyfiles = render(keyfiles, "selected")
if which == "used":
keyfiles = render(keyfiles, "selected")
stringified_keyfiles = list(map(lambda t: label(t[0], True) + str(t[1]), enumerate(keyfiles)))
stringified_keyfiles = sep(",", True).join(stringified_keyfiles)
stringified_keyfiles = delimiters[0:len(delimiters)] + stringified_keyfiles[:(len(stringified_keyfiles) - 2)] + delimiters[1:]
return (tuple(keyfiles), stringified_keyfiles)
# @TODO continue to work on two below methods
def pick_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never:
keyfiles = self.ssh.keys[source]
authlist = []
if source == "available":
for s in selections:
if isinstance(s, int):
authlist.append(keyfiles[s])
elif isinstance(s, ExecutedPath):
path_set = set([ExecutedPath])
kf_set = set(keyfiles)
overlap = kf_set & path_set
if overlap is not None and len(overlap) > 0:
authlist.append(list(overlap)[0])
else:
continue
elif isinstance(s, str):
kf_strs = list(map(lambda p: str(p), keyfiles))
if s in kf_strs:
authlist.append(Path(s))
self.ssh.keys["selected"] = authlist
elif source == "selected":
raise NotImplementedError
def remove_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never:
raise NotImplementedError