Compare commits

..

6 Commits

6 changed files with 407 additions and 0 deletions

207
anodes.py Normal file
View File

@@ -0,0 +1,207 @@
"""
Library of classes modeling Ansible nodes or their types.
"""
from pathlib import Path, PurePath
from typing import TypeAlias as Neotype
from typing import TypedDict as Dict
from typing import Never, Union, Literal
from collections.abc import Callable
from custtypes import ExecutedPath, IdlePath
from enum import Enum
from softman import Software, SoftPathGroup, SoftScope
from whereami import USER_PATH, PROJ_ROOT
# @TODO use below 2 imports in 'get_head_user' function to grab username of highest-privileged login user
from os import name as shell_type
# from os import environ as env_vars
class ControlNode:
__user_path: ExecutedPath = USER_PATH
__proj_root: ExecutedPath = PROJ_ROOT
__conf_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath("/etc"),
PurePath("/usr", "local", "etc"),
PurePath(str(__user_path), ".config")
)
__data_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath("/usr", "local", "share"),
PurePath(str(__user_path), ".local", "share")
)
def __init__(self, ansible__proj_root: ExecutedPath | None):
if ansible_root is not None:
self.__proj_root = ansible_proj_root
@property
def user_path(self) -> ExecutedPath:
return self.__user_path
@property
def proj_root(self) -> ExecutedPath:
return self.__proj_root
@property
def conf_paths(self) -> ExecutedPath:
return self.__conf_paths
@property
def conf_paths(self) -> ExecutedPath:
return self.__data_paths
class Softs(Enum):
OpenSSH = "ssh"
userSSHParams = Dict("userSSHParams", {
"username": Required[str],
"paths": Required[Software._Apps],
"keys": {
"available": Required[tuple[ExecutedPath]],
"selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]],
"authorized": ExecutedPath | list[ExecutedPath] | int | list[int],
"used": ExecutedPath | list[ExecutedPath] | int | list[int]
},
"fate": Literal["disposal", "retention"]
}, total=False)
__user_ssh_input = {
"username": "",
"paths": None,
"keys": {
"available": tuple(),
"selected": [0, 1],
"authorized": 1,
"used": 0
},
"fate": "disposal"
}
userSSH = type("userSSH", (), **__user_ssh_input)
# @TODO continue to write below function
def get_head_user():
if shell_type == "nt":
raise NotImplementedError
else:
raise NotImplementedError
class RemoteNode:
# __user_path =
_fqdn: str | None = None
def __init__(self, cnode: ControlNode, name: str | None, keywords: list[str] | None = None):
self.root: dict = dict()
self.root["username"]: str = "root"
self.root["software"]: Software = Software()
app_input = {
SoftPathGroup.CONFIG.name: {
SoftScope.PERSONAL.name: PurePath(str(cnode.user_path), ("." + Softs.OpenSSH))
},
SoftPathGroup.DATA.name: {
SoftScope.GLOBAL.name: PurePath(str(cnode.conf_paths[0]), "update-motd.d")
}
}
self.root["software"].declare(Softs.OpenSSH, **app_input)
if name is not None:
self._fqdn = name
self.root["software"]._fqdn = name
self.apps: list = self.root["software"].list(contents = True)
self.keywords = keywords
def import_keys(self, key_basenames: str | tuple[str], match_sort: tuple[Callable, bool] = (lambda e: e, False)):
keyfiles: list[ExecutedPath] | list[None] | None = list()
if isinstance(key_basenames, tuple):
for basename in key_basenames:
keyfiles += Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(basename)
elif isinstance(key_basenames, str):
keyfiles = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(key_basenames)
else:
raise ValueError
updated_keyfiles: list[ExecutedPath] = []
for filename in keyfiles:
new_keyfile = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])) / str(filename)
updated_keyfiles.append(new_keyfile.resolve())
keyfiles = updated_keyfiles
root_ssh_input: userSSHParams = {
"username": "root",
"paths": self.root["software"].ssh,
"keys": {
"available": tuple(keyfiles.sort(reverse = match_sort[1], key = match_sort[0]))
}
}
self.ssh: userSSH = userSSH(**root_ssh_input)
def show_keys(self, which: Literal["authorized", "used", "available", "selected"] = "available") -> tuple[tuple, str]:
delimiters: str | tuple = "{}"
gap: Callable[[bool], str | None] = lambda b: " " if b else ""
sep_format: str = "{0}"
sep: Callable[[str, bool], str] = lambda s, b: sep_format.format(s) + gap(b)
label_format: str = "{0}:"
label: Callable[[str, bool], str] = lambda s, b: label_format.format(s) + gap(b)
def render(kfs, source: str):
if isinstance(kfs, list):
member_ints: list = list()
for f in kfs:
if isinstance(f, int):
members_ints.append(f)
if len(member_ints) > 0:
kfs = [self.ssh.keys[source][i] for i in kfs]
elif isinstance(kfs, int):
kfs = [self.ssh.keys[source][kfs]]
elif isinstance(kfs, ExecutedPath):
kfs = [self.ssh.keys[source]]
return kfs
keyfiles = self.ssh.keys[which]
if which == "selected":
keyfiles = render(keyfiles, "available")
if which == "authorized":
keyfiles = render(keyfiles, "selected")
if which == "used":
keyfiles = render(keyfiles, "selected")
stringified_keyfiles = list(map(lambda t: label(t[0], True) + str(t[1]), enumerate(keyfiles)))
stringified_keyfiles = sep(",", True).join(stringified_keyfiles)
stringified_keyfiles = delimiters[0:len(delimiters)] + stringified_keyfiles[:(len(stringified_keyfiles) - 2)] + delimiters[1:]
return (tuple(keyfiles), stringified_keyfiles)
# @TODO continue to work on two below methods
def pick_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never:
keyfiles = self.ssh.keys[source]
authlist = []
if source == "available":
for s in selections:
if isinstance(s, int):
authlist.append(keyfiles[s])
elif isinstance(s, ExecutedPath):
path_set = set([ExecutedPath])
kf_set = set(keyfiles)
overlap = kf_set & path_set
if overlap is not None and len(overlap) > 0:
authlist.append(list(overlap)[0])
else:
continue
elif isinstance(s, str):
kf_strs = list(map(lambda p: str(p), keyfiles))
if s in kf_strs:
authlist.append(Path(s))
self.ssh.keys["selected"] = authlist
elif source == "selected":
raise NotImplementedError
def remove_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never:
raise NotImplementedError

9
custtypes.py Normal file
View File

@@ -0,0 +1,9 @@
"""
Library of custom type hints.
"""
from typing import TypeAlias as Neotype
from pathlib import PurePosixPath, PureWindowsPath, PosixPath, WindowsPath
ExecutedPath: Neotype = Union[PosixPath, WindowsPath]
IdlePath: Neotype = Union[PurePosixPath, PureWindowsPath]

28
main.py Normal file
View File

@@ -0,0 +1,28 @@
import click
from pathlib import Path, PurePath, PurePosixPath, PureWindowsPath, PosixPath, WindowsPath
from typing import TypeAlias as Neotype
from typing import Union
# import configparser as ini
# from cerberus import Validator as constrain_by
# import skansible_types as skato
ExecutedPath: Neotype = Union[PosixPath, WindowsPath]
IdlePath: Neotype = Union[PurePosixPath, PureWindowsPath]
# @NOTE https://docs.python.org/3/library/configparser.html#quick-start
class Config:
path: IdlePath = PurePath(str(Path(__file__).parent.resolve())) / "config.ini"
@click.group()
def skansible():
raise NotImplementedError
@click.command()
def init():
if Path(str(Config.path)).exists():
click.echo("")
else:
pass
if __name__ == "__main__":
skansible()

View File

@@ -8,5 +8,6 @@ dependencies = [
"ansible>=13.1.0", "ansible>=13.1.0",
"ansible-lint>=25.12.1", "ansible-lint>=25.12.1",
"ansible-navigator>=25.12.0", "ansible-navigator>=25.12.0",
"cerberus>=1.3.8",
"click>=8.3.1", "click>=8.3.1",
] ]

153
softman.py Normal file
View File

@@ -0,0 +1,153 @@
"""
Library of classes modeling software and software-related
data as represented in or used by Ansible.
"""
from typing import TypeAlias as Neotype
from typing import TypedDict as Dict
from typing import Never, Union
from custtypes import ExecutedPath, IdlePath
from enum import Enum
from pathlib import Path, PurePath
from whereami import USER_PATH, PROJ_ROOT
AppPath: Neotype = Union[ExecutedPath, IdlePath]
class SoftScope(Enum):
PERSONAL = 0
LOCAL = 1
GLOBAL = 2
class SoftPathGroup(Enum):
CONFIG = 0
DATA = 1
MEM = 2
# @TODO continue adding magic methods to below class
# @NOTE https://rszalski.github.io/magicmethods/#sequence
class Software:
# @TODO 2 options: add additional requirements to type definition below,
# or ensure attribute which stores data of this type also stores other data
_AppParams = Dict("_AppParams", {
SoftPathGroup.CONFIG.name: {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
},
SoftPathGroup.DATA.name: {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
},
SoftPathGroup.MEM.name: {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
}
}, total=False)
_SubAppParams = Dict("_SubAppParams", {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
}, total=False)
__app_input = {
SoftPathGroup.CONFIG.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
},
SoftPathGroup.DATA.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
},
SoftPathGroup.MEM.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
}
}
_Apps = type("_Apps", (), **__app_input)
__user_path: ExecutedPath = USER_PATH
def __init__(self):
self._fqdn: str | None = None
def declare(self, name: str, **kwpaths: Software._SubAppParams) -> Software._AppParams:
keyword_args: Software._AppParams = kwpaths
app = Software._Apps(**keyword_args)
setattr(self, name, app)
return app
def __getitem__(self, key: str) -> Software._AppParams | Never:
if hasattr(self, key):
app: Software._Apps = getattr(self, key)
else:
raise KeyError
return app
def __setitem__(self, key: tuple[str, SoftPathGroup], **value: IdlePath | list[IdlePath]) -> None | Never:
if len(value) < 1 or len(value) > 3:
raise ValueError
app_params: Software._SubAppParams = value
if hasattr(self, key[0]):
app: Software._Apps = getattr(self, key[0])
if hasattr(app, key[1]):
app_child: Software._SubAppParams = getattr(app, key[1])
for k, v in app_params.items():
v = [v] if not isinstance(v, list) else v
app_child[k]: IdlePath | list[IdlePath] = v
setattr(app, key[1], app_child)
else:
raise KeyError
setattr(self, key[0], app)
else:
raise KeyError
def __delitem__(self, key: tuple[str | SoftPathGroup]) -> None | Never:
if len(key) < 1 or len(key) > 3:
raise KeyError
if not hasattr(self, key[0]):
raise KeyError
if len(key) == 1:
delattr(self, key[0])
elif len(key) > 1:
app: Software._Apps = getattr(self, key[0])
delattr(app, key[1])
setattr(self, key[0], app)
def list(self, contents: bool = False) -> tuple[str]:
apps: tuple[str] | tuple[Software._Apps] = tuple(
filter(
lambda a: isinstance(getattr(self, a), Software._Apps),
dir(self)
)
)
if contents:
apps = tuple(
map(
lambda a: getattr(self, a),
apps
)
)
return apps
def __len__(self) -> int:
apps: tuple[str] = tuple(
filter(
lambda a: isinstance(getattr(self, a), Software._Apps),
dir(self)
)
)
return len(apps)

9
whereami.py Normal file
View File

@@ -0,0 +1,9 @@
"""
Library of path constants to be used or referenced elsewhere.
"""
from custtypes import ExecutedPath
from pathlib import Path
USER_PATH: ExecutedPath = Path.home()
PROJ_ROOT: ExecutedPath = Path(__file__).parent.resolve()