Compare commits

..

4 Commits

3 changed files with 303 additions and 196 deletions

215
anodes.py
View File

@@ -5,14 +5,15 @@ Library of classes modeling Ansible nodes or their types.
from pathlib import Path, PurePath from pathlib import Path, PurePath
from typing import TypeAlias as Neotype from typing import TypeAlias as Neotype
from typing import TypedDict as Dict from typing import TypedDict as Dict
from typing import Never, Union, Literal from typing import Never, Union, Literal, Required
from collections.abc import Callable from collections.abc import Callable
from custtypes import ExecutedPath, IdlePath from custtypes import ExecutedPath, IdlePath
from enum import Enum from enum import Enum
from softman import Software, SoftPathGroup, SoftScope from softman import Software, SoftPathGroup, SoftScope, _Apps
from whereami import USER_PATH, PROJ_ROOT from whereami import USER_PATH, PROJ_ROOT
from ansible_vault import Vault from ansible_vault import Vault
from cerberus import Validator as constrain_by from cerberus import Validator as constrain_by
from random import choice
class ControlNode: class ControlNode:
__user_path: ExecutedPath = USER_PATH __user_path: ExecutedPath = USER_PATH
@@ -27,7 +28,7 @@ class ControlNode:
PurePath(str(__user_path), ".local", "share") PurePath(str(__user_path), ".local", "share")
) )
def __init__(self, ansible_proj_root: ExecutedPath | None): def __init__(self, ansible_proj_root: ExecutedPath | None = None):
if ansible_proj_root is not None: if ansible_proj_root is not None:
self.__proj_root = ansible_proj_root self.__proj_root = ansible_proj_root
self.proj_roles: tuple[IdlePath] | list[IdlePath] = ( self.proj_roles: tuple[IdlePath] | list[IdlePath] = (
@@ -36,7 +37,9 @@ class ControlNode:
) )
self.role_data: tuple[IdlePath] | list[IdlePath] = ( self.role_data: tuple[IdlePath] | list[IdlePath] = (
PurePath(str(self.proj_roles[0]), "files"), PurePath(str(self.proj_roles[0]), "files"),
PurePath(str(self.proj_roles[0]), "templates") PurePath(str(self.proj_roles[0]), "templates"),
PurePath(str(self.proj_roles[1]), "files"),
PurePath(str(self.proj_roles[1]), "templates")
) )
self.invvar_data: tuple[IdlePath] | list[IdlePath] = ( self.invvar_data: tuple[IdlePath] | list[IdlePath] = (
PurePath(str(self.__proj_root), "group_vars"), PurePath(str(self.__proj_root), "group_vars"),
@@ -56,21 +59,34 @@ class ControlNode:
return self.__conf_paths return self.__conf_paths
@property @property
def conf_paths(self) -> ExecutedPath: def data_paths(self) -> ExecutedPath:
return self.__data_paths return self.__data_paths
class Softs(Enum): class Softs(Enum):
OpenSSH = "ssh" ssh = 0
userSSHSubParams = {
"available": Required[tuple[ExecutedPath]],
"selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]],
"authorized": ExecutedPath | list[ExecutedPath] | int | list[int],
"used": ExecutedPath | list[ExecutedPath] | int | list[int]
}
__user_ssh_keys = {
"available": tuple(),
"selected": [0, 1],
"authorized": 1,
"used": 0
}
def userSSHInit(self, username: str = "root", paths: _Apps | None = None, keys: userSSHSubParams = __user_ssh_keys, password: str = "password123", fate: Literal["disposal", "retention"] = "disposal"):
self.username = username
self.paths = paths
self.keys = keys
self.password = password
self.fate = fate
userSSHParams = Dict("userSSHParams", { userSSHParams = Dict("userSSHParams", {
"username": Required[str], "username": Required[str],
"paths": Software._Apps, "paths": _Apps,
"keys": { "keys": __user_ssh_keys,
"available": Required[tuple[ExecutedPath]],
"selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]],
"authorized": ExecutedPath | list[ExecutedPath] | int | list[int],
"used": ExecutedPath | list[ExecutedPath] | int | list[int]
},
"password": Required[str], "password": Required[str],
"fate": Literal["disposal", "retention"] "fate": Literal["disposal", "retention"]
}, total=False) }, total=False)
@@ -84,37 +100,27 @@ __user_ssh_input = {
"used": 0 "used": 0
}, },
"password": "password123", "password": "password123",
"fate": "disposal" "fate": "disposal",\
"__init__": userSSHInit
} }
# @TODO remove unpacking of '__user_ssh_input' userSSH = type("userSSH", (), __user_ssh_input)
userSSH = type("userSSH", (), **__user_ssh_input)
vps_schema = { vpsSchema = Dict("vpsSchema", {
"fqdn": {"type": "string"}, "fqdn": Required[str],
"vps_service": { "vps_service": {
"type": "dict", "exists": Required[bool],
"schema": { "password": Required[str],
"exists": {"type": "boolean"}, "api_key": Required[str],
"password": {"type": "string"}, "type": Required[Literal["linode"]],
"api_key": {"type": "string"}, "region": Literal["us-east"],
"type": { "ssh_authorized_keys": list[IdlePath | ExecutedPath | str],
"type": "string", "ssh_private_key_paths": list[IdlePath | ExecutedPath | str],
"anyof": ["linode"] "ssh_private_key_path_pref": int,
}, "root_fate": Required[Literal["disposal","retention"]],
"region": { "ssh_motd_script_basenames": list[str]
"type": "string",
"anyof": ["us-east"]
},
"ssh_authorized_keys": {"type": "list"},
"root_fate": {
"type": "string",
"anyof": ["disposal", "retention"]
}
}
}, },
"keywords": {"type": "list"} "keywords": list[str]
} }, total=False)
vps_schema = constrain_by(vps_schema)
class VirtualPrivateServers(Enum): class VirtualPrivateServers(Enum):
Linode = 0 Linode = 0
@@ -123,7 +129,7 @@ class RemoteNode:
# __user_path = # __user_path =
_fqdn: str | None = None _fqdn: str | None = None
def __init__(self, cnode: ControlNode, api_key: str, password: str, name: str, service: VirtualPrivateServers = VirtualPrivateServers.Linode.name, keywords: list[str] | None = None): def __init__(self, cnode: ControlNode, api_key: str, password: str, name: str, service: VirtualPrivateServers = VirtualPrivateServers.Linode.name, region: Literal["us-east"] | None = None, keywords: list[str] | None = None):
self.root: dict = dict() self.root: dict = dict()
self.root["username"]: str = "root" self.root["username"]: str = "root"
vault = Vault(password) vault = Vault(password)
@@ -132,13 +138,13 @@ class RemoteNode:
app_input = { app_input = {
SoftPathGroup.CONFIG.name: { SoftPathGroup.CONFIG.name: {
SoftScope.PERSONAL.name: PurePath(str(cnode.user_path), ("." + Softs.OpenSSH)) SoftScope.PERSONAL.name: PurePath(str(cnode.user_path), ("." + Softs.ssh.name))
}, },
SoftPathGroup.DATA.name: { SoftPathGroup.DATA.name: {
SoftScope.GLOBAL.name: PurePath(str(cnode.conf_paths[0]), "update-motd.d") SoftScope.GLOBAL.name: PurePath(str(cnode.conf_paths[0]), "update-motd.d")
} }
} }
self.root["software"].declare(Softs.OpenSSH, **app_input) self.root["software"].declare(Softs.ssh.name, **app_input)
self._fqdn = name self._fqdn = name
self.root["software"]._fqdn = name self.root["software"]._fqdn = name
@@ -153,6 +159,11 @@ class RemoteNode:
self.keywords = keywords self.keywords = keywords
self._api_key: str | None = api_key self._api_key: str | None = api_key
self.service = service self.service = service
self.region = region
self.__authkeys_selected = False
self.__usedkeys_selected = False
self.__keys_selected = False
self.__finalized_keys = False
def set_region(self, name: Literal["us-east"] = "us-east") -> None: def set_region(self, name: Literal["us-east"] = "us-east") -> None:
self.region = name self.region = name
@@ -179,7 +190,7 @@ class RemoteNode:
self.keywords: list = [] self.keywords: list = []
self.keywords += list(name) self.keywords += list(name)
def import_keys(self, key_basenames: str | tuple[str], match_sort: tuple[Callable, bool] = (lambda e: e, False)): def import_keys(self, key_basenames: str | tuple[str], match_sort: tuple[Callable, bool] = (lambda e: e.stem, False)):
keyfiles: list[ExecutedPath] | list[None] | None = list() keyfiles: list[ExecutedPath] | list[None] | None = list()
if isinstance(key_basenames, tuple): if isinstance(key_basenames, tuple):
for basename in key_basenames: for basename in key_basenames:
@@ -196,10 +207,10 @@ class RemoteNode:
keyfiles = updated_keyfiles keyfiles = updated_keyfiles
self.ssh.paths = self.root["software"].ssh self.ssh.paths = self.root["software"].ssh
self.ssh.keys["available"] = tuple(keyfiles.sort(reverse = match_sort[1], key = match_sort[0])) self.ssh.keys["available"] = tuple(sorted(keyfiles, key=match_sort[0], reverse=match_sort[1]))
def show_keys(self, which: Literal["authorized", "used", "available", "selected"] = "available") -> tuple[tuple, str]: def show_keys(self, which: Literal["authorized", "used", "available", "selected"] = "available", kformat = object) -> tuple[tuple, str]:
delimiters: str | tuple = "{}" delimiters: str | tuple = "[]"
gap: Callable[[bool], str | None] = lambda b: " " if b else "" gap: Callable[[bool], str | None] = lambda b: " " if b else ""
sep_format: str = "{0}" sep_format: str = "{0}"
sep: Callable[[str, bool], str] = lambda s, b: sep_format.format(s) + gap(b) sep: Callable[[str, bool], str] = lambda s, b: sep_format.format(s) + gap(b)
@@ -235,17 +246,34 @@ class RemoteNode:
stringified_keyfiles = list(map(lambda t: label(t[0], True) + str(t[1]), enumerate(keyfiles))) stringified_keyfiles = list(map(lambda t: label(t[0], True) + str(t[1]), enumerate(keyfiles)))
stringified_keyfiles = sep(",", True).join(stringified_keyfiles) stringified_keyfiles = sep(",", True).join(stringified_keyfiles)
stringified_keyfiles = delimiters[0:len(delimiters)] + stringified_keyfiles[:(len(stringified_keyfiles) - 2)] + delimiters[1:] stringified_keyfiles = delimiters[0] + stringified_keyfiles[:(len(stringified_keyfiles) - 2)] + delimiters[1:]
return (tuple(keyfiles), stringified_keyfiles) if kformat == str:
result = stringified_keyfiles
elif kformat == list:
result = keyfiles
elif kformat == tuple:
result = tuple(keyfiles)
elif kformat == object:
result = (tuple(keyfiles), stringified_keyfiles)
print(result)
return result
def pick_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never: def pick_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never:
keyfiles = self.ssh.keys[source] keyfiles = self.ssh.keys[source]
# print(keyfiles)
if keyfiles is None:
raise TypeError
elif isinstance(keyfiles, (tuple, list)) and len(keyfiles) < 1:
raise ValueError
authlist = [] authlist = []
if source == "available" or source == "selected": if source == "available":
for s in selections: for s in selections:
if isinstance(s, int): if isinstance(s, int):
# print(s)
authlist.append(keyfiles[s]) authlist.append(keyfiles[s])
elif isinstance(s, ExecutedPath): elif isinstance(s, ExecutedPath):
path_set = set([s]) path_set = set([s])
@@ -262,32 +290,73 @@ class RemoteNode:
if s in kf_strs: if s in kf_strs:
authlist.append(Path(s)) authlist.append(Path(s))
if source == "available": self.ssh.keys["selected"] = authlist
self.ssh.keys["selected"] = authlist self.__keys_selected = True
return self.ssh.keys["selected"] result = self.ssh.keys["selected"]
elif source == "selected": if source == "selected":
self.ssh.keys["authorized"] = authlist privkeys = list()
return self.ssh.keys["authorized"] pubkeys = list()
count = 1
for s in selections:
if isinstance(s, int):
# print(s)
if count % 2 == 0:
pubkeys.append(keyfiles[s])
else:
privkeys.append(keyfiles[s])
elif isinstance(s, ExecutedPath):
path_set = set([s])
kf_set = set(keyfiles)
overlap = kf_set & path_set
if overlap is not None and len(overlap) > 0:
if count % 2 == 0:
pubkeys.append(list(overlap)[0])
else:
privkeys.append(list(overlap)[0])
else:
continue
elif isinstance(s, str):
kf_strs = list(map(lambda p: str(p), keyfiles))
if s in kf_strs:
if count % 2 == 0:
pubkeys.append(Path(s))
else:
privkeys.append(Path(s))
count += 1
self.ssh.keys["authorized"] = pubkeys
self.ssh.keys["used"] = privkeys
self.__authkeys_selected = True
self.__usedkeys_selected = True
result = (self.ssh.keys["authorized"], self.ssh.keys["used"])
elif source == "authorized": elif source == "authorized":
for s in selections: for s in selections:
if isinstance(s, int): if isinstance(s, int):
authlist.append(keyfiles[s]) if self.ssh.keys["selected"][s] in keyfiles:
authlist.append(self.ssh.keys["selected"][s])
elif isinstance(s, ExecutedPath): elif isinstance(s, ExecutedPath):
for p in keyfiles: for p in self.ssh.keys["selected"]:
if str(s) in str(p): if str(s) in str(p) and str(s) in list(map(lambda p: str(p), keyfiles)):
authlist.append(p) authlist.append(p)
else: else:
continue continue
elif isinstance(s, str): elif isinstance(s, str):
for p in keyfiles: for p in self.ssh.keys["selected"]:
if s in str(p): if s in str(p) and s in list(map(lambda p: str(p), keyfiles)):
authlist.append(p) authlist.append(p)
else: else:
continue continue
self.ssh.keys["used"] = authlist self.ssh.keys["used"] = authlist
return self.ssh.keys["used"] self.__usedkeys_selected = True
result = self.ssh.keys["used"]
self.__finalized_keys = self.__keys_selected and self.__authkeys_selected and self.__usedkeys_selected
return result
# @TODO test and debug below 'RemoteNode' method
def remove_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never: def remove_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never:
keyfiles = self.ssh.keys[source] keyfiles = self.ssh.keys[source]
@@ -324,9 +393,18 @@ class RemoteNode:
return self.ssh.keys return self.ssh.keys
def itemize(self): def itemize(self):
model = dict() model: dict | vpsSchema = dict()
vault_api = Vault(self._api_key) vault_api = Vault(self._api_key)
vault_pass = Vault(self.ssh.password) vault_pass = Vault(self.ssh.password)
# print(self.ssh.keys["selected"])
if not self.__keys_selected:
self.pick_keys("available", *self.ssh.keys["selected"])
self.pick_keys("selected", self.ssh.keys["authorized"], self.ssh.keys["used"])
# self.pick_keys("selected", self.ssh.keys["used"])
authorized_keys = list(map(lambda p: p.read_text(), self.ssh.keys["authorized"]))
used_keys = list(map(lambda p: str(p), self.ssh.keys["used"]))
model["fqdn"] = self._fqdn model["fqdn"] = self._fqdn
model["vps_service"] = { model["vps_service"] = {
@@ -335,10 +413,11 @@ class RemoteNode:
"api_key": vault_api.dump(self._api_key), "api_key": vault_api.dump(self._api_key),
"type": self.service.lower(), "type": self.service.lower(),
"region": self.region, "region": self.region,
"ssh_authorized_keys": self.ssh.keys["authorized"], "ssh_authorized_keys": authorized_keys,
"root_fate": self.ssh.fate "ssh_private_key_paths": used_keys,
"ssh_private_key_path_pref": choice(list(range(len(used_keys)))),
"root_fate": self.ssh.fate,
"ssh_motd_script_basenames": []
} }
model["keywords"] = self.keywords model["keywords"] = self.keywords
return model
if vps_schema.validate(model):
return model

151
main.py
View File

@@ -1,89 +1,112 @@
import click import click as cli
from pathlib import Path, PurePath, PurePosixPath, PureWindowsPath, PosixPath, WindowsPath from pathlib import Path, PurePath, PurePosixPath, PureWindowsPath, PosixPath, WindowsPath
from whereami import PROJ_ROOT from whereami import PROJ_ROOT
from custtypes import ExecutedPath, IdlePath from custtypes import ExecutedPath, IdlePath
import yaml as yams import yaml as yams
from anodes import RemoteNode, ControlNode from anodes import RemoteNode, ControlNode
# from typing import TypeAlias as Neotype # from typing import TypeAlias as Neotype
# from typing import Literal from typing import Literal
from enum import Enum
from ansible_vault import Vault
from random import choice
# import configparser as ini # import configparser as ini
# from cerberus import Validator as constrain_by # from cerberus import Validator as constrain_by
# import skansible_types as skato # import skansible_types as skato
class AnsibleScope(Enum):
INTERNAL = 0
INVENTORY = 1
GROUPVARS = 2
HOSTVARS = 3
ROLE = 4
# @TODO for 'Config' class, write methods to pull and push from Ansible YAML files
# @NOTE https://docs.python.org/3/library/configparser.html#quick-start # @NOTE https://docs.python.org/3/library/configparser.html#quick-start
class Config: class Config:
path: ExecutedPath = PROJ_ROOT / "config.ini" path: ExecutedPath = PROJ_ROOT / "config.ini"
_cnode = ControlNode(PROJ_ROOT)
__scope_paths = {
AnsibleScope.INTERNAL.name: path,
AnsibleScope.GROUPVARS.name: Path(str(_cnode.invvar_data[0])),
AnsibleScope.HOSTVARS.name: Path(str(_cnode.invvar_data[1]))
}
config = Config() def __init__(self, filepath: str, anscope: AnsibleScope = AnsibleScope.INTERNAL.name, mode: str = "r+"):
filepath: ExecutedPath = self.__scope_paths[anscope] / filepath
@click.group() if filepath.exists():
def skansible(): self.filepath = filepath
else:
raise FileNotFoundError
self.mode: str = mode
self.file = None
self.model = None
def __enter__(self):
self.file = open(str(self.filepath), self.mode)
self.model = yams.load(self.file.read())
return self.model
def __exit__(self, exc_type, exc_value, exc_traceback):
file_content = yams.dump(self.model)
self.file.write(file_content)
self.file.close()
@cli.group()
@cli.pass_context
def skansible(ctx):
ctx.ensure_object(dict)
@skansible.group(help="Replace conventionally-typed Ansible variables")
@cli.pass_context
def mod(ctx):
with Config(entity, AnsibleScope.HOSTVARS.name) as config:
ctx.obj["fqdn"] = config["fqdn"]
@skansible.group(help="Remove conventionally-typed Ansible variables")
def rm():
pass pass
@click.group() @skansible.group(help="Append conventionally-typed entries to Ansible variables")
@click.argument("hostname", nargs=-1, type=str) def append():
@click.option("-g", "--group", type=str, show_default=True) pass
@click.option("-p", "--password", type=str, prompt=True, prompt_required=False)
@click.option("-a", "--api", type=str)
@click.option("-k", "--key", multiple=True)
@click.option("-t", "--tag", multiple=True, type=str)
@click.option("-n", "--fqdn", type=(str, None))
@click.option("-s", "--service", type=(str, None))
def addhost(hostname, password, api, key, group = "ungrouped", service = None, fqdn = None, tags = None):
hosts_model = dict()
hosts_model[group]["hosts"] = dict()
for name in hostname: @skansible.group(help="Initialize Ansible variable or object with defaults")
hosts_model[group]["hosts"][name] = None def init():
pass
with open(str(PROJ_ROOT / "hosts.yml")) as hosts_file: # @skansible.group(help="Show Ansible assets")
yams.dump(hosts_model, hosts_file) # @cli.argument("-d", "--datatype", type=str, help="Specify type of Ansible object or variable to show")
# def show(datatype):
# pass
cnode = ControlNode() @mod.command("vps", help="Add 'vps_service' Ansible variable")
if fqdn is not None: @cli.argument("entity", type=str, help="Specify FQDN/alias or host group to associate with the VPS")
rnode = RemoteNode(cnode, api, password, fqdn) @cli.option("-p", "--password", type=str, prompt=True, prompt_required=False, help="Prompt for password of root or administrative user of VPS")
else: @cli.option("-a", "--api", type=str, help="API key for host service providing VPS")
if isinstance(hostname, (tuple, list)): @cli.option("-k", "--pubkey", multiple=True, help="A file glob pattern for acquiring user's public SSH keys")
if len(hostname) > 0 and len(hostname) < 2: @cli.option("-k", "--privkey", multiple=True, help="A file glob pattern for acquiring user's private SSH keys")
rnode = RemoteNode(cnode, api, password, hostname[0]) @cli.option("-m", "--motd", multiple=True, help="Provide basenames for MOTD SSH scripts")
else: @cli.option("-t", "--tag", multiple=True, type=str)
raise ValueError @cli.option("-s", "--service", type=str)
elif isinstance(hostname, str): @cli.option("-r", "--region", type=str)
rnode = RemoteNode(cnode, api, password, hostname) @cli.option("-f", "--fate", type=str)
else: @cli.pass_context
raise TypeError # @TODO rewrite below command function, using 'ctx' parameter for values shared with sibling commands
def mod_vps(ctx, entity: str, password: str | None = None, api: str | None = None, pubkey: tuple[str] = None, privkey: tuple[str] = None, service: str | int | None = None, region: Literal["us-east"] | None = None, tags: tuple[str] = None, fate: Literal["disposal", "retention"] | None = None):
with Config(entity, AnsibleScope.HOSTVARS.name) as config:
raise NotImplementedError
if tag is not None: @mod.command("host", help="Add host to Ansible inventory")
if isinstance(tag, (tuple, list)) and len(tag) > 0: @cli.argument("hostname", multiple=True, type=str, help="Provide aliases / FQDN / IP addresses for host(s)")
rnode.add_tags(*tag) @cli.option("-g", "--group", type=str, default="ungrouped", help="Provide group name given host(s) fall under")
@cli.pass_context
# @TODO rewrite below command function, using 'ctx' parameter for values shared with sibling commands
def mod_host(group, hostname):
with Config(entity, AnsibleScope.INVENTORY.name) as config:
raise NotImplementedError
if key is not None:
if isinstance(key, tuple) and len(key) > 0:
rnode.import_keys(key)
if service is not None:
rnode.service = service
if group == "ungrouped":
if isinstance(hostname, (tuple, list)):
if len(hostname) > 0 and len(hostname) < 2:
filepath = PurePath(str(cnode.invvar_data[1]), hostname[0])
else:
raise ValueError
elif isinstance(hostname, str):
filepath = PurePath(str(cnode.invvar_data[1]), hostname)
else:
raise TypeError
else:
filepath = PurePath(str(cnode.invvar_data[0]), group)
inv_model = rnode.itemize()
with open(str(filepath), "w") as inv_file:
yams.dump(inv_model, inv_file)
print((hosts_model, inv_model))
skansible.add_command(addhost)
if __name__ == "__main__": if __name__ == "__main__":
skansible() skansible()

View File

@@ -24,78 +24,83 @@ class SoftPathGroup(Enum):
MEM = 2 MEM = 2
EXE = 3 EXE = 3
# @TODO continue adding magic methods to below class _AppParams = Dict("_AppParams", {
# @NOTE https://rszalski.github.io/magicmethods/#sequence SoftPathGroup.CONFIG.name: {
class Software: SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
# @TODO move below 2 class variables declarations outside of class, but make class variables equal to them SoftScope.LOCAL.name: IdlePath | list[IdlePath],
# @TODO when done with above, rename all intra-class references to such variables appropriately SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
_AppParams = Dict("_AppParams", { },
SoftPathGroup.CONFIG.name: { SoftPathGroup.DATA.name: {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath], SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath], SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath] SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
}, },
SoftPathGroup.DATA.name: { SoftPathGroup.MEM.name: {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath], SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath], SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath] SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
}, },
SoftPathGroup.MEM.name: { SoftPathGroup.EXE.name: {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
},
SoftPathGroup.EXE.name: {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
}
}, total=False)
_SubAppParams = Dict("_SubAppParams", {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath], SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath], SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath] SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
}, total=False)
__app_input = {
SoftPathGroup.CONFIG.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
},
SoftPathGroup.DATA.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
},
SoftPathGroup.MEM.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
},
SoftPathGroup.EXE.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
}
} }
_Apps = type("_Apps", (), __app_input) }, total=False)
_SubAppParams = Dict("_SubAppParams", {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
}, total=False)
def _AppsInit(self, CONFIG = None, DATA = None, MEM = None, EXE = None):
self.CONFIG = CONFIG
self.DATA = DATA
self.MEM = MEM
self.EXE = EXE
__app_input = {
SoftPathGroup.CONFIG.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
},
SoftPathGroup.DATA.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
},
SoftPathGroup.MEM.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
},
SoftPathGroup.EXE.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
},
"__init__": _AppsInit
}
_Apps = type("_Apps", (), __app_input)
# @TODO continue adding magic methods to below class
# @NOTE https://rszalski.github.io/magicmethods/#sequence
class Software:
__user_path: ExecutedPath = USER_PATH __user_path: ExecutedPath = USER_PATH
def __init__(self): def __init__(self):
self._fqdn: str | None = None self._fqdn: str | None = None
# @TODO fix NameError for 'Software' in parameter type check # @TODO fix NameError for 'Software' in parameter type check
def declare(self, name: str, **kwpaths: Software._SubAppParams) -> Software._AppParams: def declare(self, name: str, **kwpaths: _SubAppParams) -> _AppParams:
keyword_args: Software._AppParams = kwpaths keyword_args: _AppParams = kwpaths
app = Software._Apps(**keyword_args) app = _Apps(**keyword_args)
setattr(self, name, app) setattr(self, name, app)
return app return app
def __getitem__(self, key: str) -> Software._AppParams | Never: def __getitem__(self, key: str) -> _AppParams | Never:
if hasattr(self, key): if hasattr(self, key):
app: Software._Apps = getattr(self, key) app: _Apps = getattr(self, key)
else: else:
raise KeyError raise KeyError
return app return app
@@ -104,13 +109,13 @@ class Software:
if len(value) < 1 or len(value) > 3: if len(value) < 1 or len(value) > 3:
raise ValueError raise ValueError
app_params: Software._SubAppParams = value app_params: _SubAppParams = value
if hasattr(self, key[0]): if hasattr(self, key[0]):
app: Software._Apps = getattr(self, key[0]) app: _Apps = getattr(self, key[0])
if hasattr(app, key[1]): if hasattr(app, key[1]):
app_child: Software._SubAppParams = getattr(app, key[1]) app_child: _SubAppParams = getattr(app, key[1])
for k, v in app_params.items(): for k, v in app_params.items():
v = [v] if not isinstance(v, list) else v v = [v] if not isinstance(v, list) else v
@@ -133,14 +138,14 @@ class Software:
if len(key) == 1: if len(key) == 1:
delattr(self, key[0]) delattr(self, key[0])
elif len(key) > 1: elif len(key) > 1:
app: Software._Apps = getattr(self, key[0]) app: _Apps = getattr(self, key[0])
delattr(app, key[1]) delattr(app, key[1])
setattr(self, key[0], app) setattr(self, key[0], app)
def list(self, contents: bool = False) -> tuple[str]: def list(self, contents: bool = False) -> tuple[str]:
apps: tuple[str] | tuple[Software._Apps] = tuple( apps: tuple[str] | tuple[_Apps] = tuple(
filter( filter(
lambda a: isinstance(getattr(self, a), Software._Apps), lambda a: isinstance(getattr(self, a), _Apps),
dir(self) dir(self)
) )
) )
@@ -158,7 +163,7 @@ class Software:
def __len__(self) -> int: def __len__(self) -> int:
apps: tuple[str] = tuple( apps: tuple[str] = tuple(
filter( filter(
lambda a: isinstance(getattr(self, a), Software._Apps), lambda a: isinstance(getattr(self, a), _Apps),
dir(self) dir(self)
) )
) )