did some refactoring and renaming

This commit is contained in:
2026-01-04 20:35:13 -05:00
parent 0230cb4074
commit 078cdd4ab3
3 changed files with 70 additions and 104 deletions

108
anodes.py
View File

@@ -7,13 +7,14 @@ from typing import TypeAlias as Neotype
from typing import TypedDict as Dict from typing import TypedDict as Dict
from typing import Never, Union, Literal, Required from typing import Never, Union, Literal, Required
from collections.abc import Callable from collections.abc import Callable
from custtypes import ExecutedPath, IdlePath from custtypes import ExecutedPath, IdlePath, VirtualPrivateServers, AnsibleScopes
from enum import Enum from enum import Enum
from softman import Software, SoftPathGroup, SoftScope, _Apps from softman import Software, SoftPathGroup, SoftScope, Apps
from whereami import USER_PATH, PROJ_ROOT from whereami import USER_PATH, PROJ_ROOT
from ansible_vault import Vault from ansible_vault import Vault
from cerberus import Validator as constrain_by from cerberus import Validator as constrain_by
from random import choice from random import choice
import secrets
class ControlNode: class ControlNode:
__user_path: ExecutedPath = USER_PATH __user_path: ExecutedPath = USER_PATH
@@ -31,41 +32,53 @@ class ControlNode:
def __init__(self, ansible_proj_root: ExecutedPath | None = None): def __init__(self, ansible_proj_root: ExecutedPath | None = None):
if ansible_proj_root is not None: if ansible_proj_root is not None:
self.__proj_root = ansible_proj_root self.__proj_root = ansible_proj_root
self.proj_roles: tuple[IdlePath] | list[IdlePath] = ( role_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath(str(self.__proj_root), "roles"), PurePath(str(self.__proj_root), "roles"),
PurePath(str(self.__proj_root), ".ansible/roles") PurePath(str(self.__proj_root), ".ansible/roles")
) )
self.role_data: tuple[IdlePath] | list[IdlePath] = ( roledata_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath(str(self.proj_roles[0]), "files"), PurePath(str(self.proj_roles[0]), "files"),
PurePath(str(self.proj_roles[0]), "templates"), PurePath(str(self.proj_roles[0]), "templates"),
PurePath(str(self.proj_roles[1]), "files"), PurePath(str(self.proj_roles[1]), "files"),
PurePath(str(self.proj_roles[1]), "templates") PurePath(str(self.proj_roles[1]), "templates")
) )
self.invvar_data: tuple[IdlePath] | list[IdlePath] = ( setattr(self, AnsibleScopes.ROLE.name, {
PurePath(str(self.__proj_root), "group_vars"), "data": roledata_paths,
PurePath(str(self.__proj_root), "host_vars") "vars": role_paths
) })
setattr(self, AnsibleScopes.GROUPVARS.name, {
"vars": (PurePath(str(self.__proj_root), "group_vars"),)
})
setattr(self, AnsibleScopes.HOSTVARS.name, {
"vars": (PurePath(str(self.__proj_root), "host_vars"),)
})
setattr(self, AnsibleScopes.INVENTORY.name, {
"vars": (PurePath(str(self.__proj_root), "hosts.yml"),)
})
def get_scope(self, scope: AnsibleScopes = AnsibleScopes.INVENTORY.name, index = 0):
return getattr(self, scope)[index]
@property @property
def user_path(self) -> ExecutedPath: def home(self) -> ExecutedPath:
return self.__user_path return self.__user_path
@property @property
def proj_root(self) -> ExecutedPath: def root(self) -> ExecutedPath:
return self.__proj_root return self.__proj_root
@property @property
def conf_paths(self) -> ExecutedPath: def sys_confs(self) -> ExecutedPath:
return self.__conf_paths return self.__conf_paths
@property @property
def data_paths(self) -> ExecutedPath: def sys_data(self) -> ExecutedPath:
return self.__data_paths return self.__data_paths
class Softs(Enum): class Softs(Enum):
ssh = 0 ssh = 0
userSSHSubParams = { _userSSHSubParams = {
"available": Required[tuple[ExecutedPath]], "available": Required[tuple[ExecutedPath]],
"selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]], "selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]],
"authorized": ExecutedPath | list[ExecutedPath] | int | list[int], "authorized": ExecutedPath | list[ExecutedPath] | int | list[int],
@@ -77,7 +90,7 @@ __user_ssh_keys = {
"authorized": 1, "authorized": 1,
"used": 0 "used": 0
} }
def userSSHInit(self, username: str = "root", paths: _Apps | None = None, keys: userSSHSubParams = __user_ssh_keys, password: str = "password123", fate: Literal["disposal", "retention"] = "disposal"): def __userSSHInit(self, username: str = "root", paths: _Apps | None = None, keys: _userSSHSubParams = __user_ssh_keys, password: str = "password123", fate: Literal["disposal", "retention"] = "disposal"):
self.username = username self.username = username
self.paths = paths self.paths = paths
self.keys = keys self.keys = keys
@@ -101,7 +114,7 @@ __user_ssh_input = {
}, },
"password": "password123", "password": "password123",
"fate": "disposal",\ "fate": "disposal",\
"__init__": userSSHInit "__init__": __userSSHInit
} }
userSSH = type("userSSH", (), __user_ssh_input) userSSH = type("userSSH", (), __user_ssh_input)
@@ -122,26 +135,24 @@ vpsSchema = Dict("vpsSchema", {
"keywords": list[str] "keywords": list[str]
}, total=False) }, total=False)
class VirtualPrivateServers(Enum):
Linode = 0
class RemoteNode: class RemoteNode:
# __user_path = # __user_path =
_fqdn: str | None = None _fqdn: str | None = None
def __init__(self, cnode: ControlNode, api_key: str, password: str, name: str, service: VirtualPrivateServers = VirtualPrivateServers.Linode.name, region: Literal["us-east"] | None = None, keywords: list[str] | None = None): def __init__(self, cnode: ControlNode, api_key: str = secrets.token_urlsafe(32), password: str = "password123", name: str = ".test", service: VirtualPrivateServers = VirtualPrivateServers.Linode.name, region: Literal["us-east"] | None = "us-east", keywords: list[str] | None = ["server"]):
self.root: dict = dict() self.root: dict = dict()
self.root["username"]: str = "root" self.root["username"]: str = "root"
vault = Vault(password) vault = Vault(password)
self.root["password"]: str = vault.dump(password) self.root["password"]: str = vault.dump(password)
self.root["software"]: Software = Software() self.root["software"]: Software = Software()
self.owner = cnode
app_input = { app_input = {
SoftPathGroup.CONFIG.name: { SoftPathGroup.CONFIG.name: {
SoftScope.PERSONAL.name: PurePath(str(cnode.user_path), ("." + Softs.ssh.name)) SoftScope.PERSONAL.name: PurePath(str(cnode.home), ("." + Softs.ssh.name))
}, },
SoftPathGroup.DATA.name: { SoftPathGroup.DATA.name: {
SoftScope.GLOBAL.name: PurePath(str(cnode.conf_paths[0]), "update-motd.d") SoftScope.GLOBAL.name: PurePath(str(cnode.sys_confs[0]), "update-motd.d")
} }
} }
self.root["software"].declare(Softs.ssh.name, **app_input) self.root["software"].declare(Softs.ssh.name, **app_input)
@@ -164,27 +175,19 @@ class RemoteNode:
self.__usedkeys_selected = False self.__usedkeys_selected = False
self.__keys_selected = False self.__keys_selected = False
self.__finalized_keys = False self.__finalized_keys = False
self.model: dict | None = None
self.__accumulator = None
def set_region(self, name: Literal["us-east"] = "us-east") -> None: def set_region(self, name: Literal["us-east"] = "us-east") -> None:
self.region = name self.region = name
def get_region(self) -> str:
return self.region
def set_password(self, password: str) -> None: def set_password(self, password: str) -> None:
vault = Vault(value) vault = Vault(self.root["password"])
self.root["password"] = vault.dump(value) self.root["password"] = vault.dump(self.root["password"])
self.ssh.password: str = self.root["password"]
def get_password(self) -> Vault | str:
return self.root["password"]
def set_api(self, key: str) -> None: def set_api(self, key: str) -> None:
self._api_key = key
def get_api(self) -> Vault | str:
vault = Vault(self._api_key) vault = Vault(self._api_key)
return vault.dump(self._api_key) self._api_key = vault.dump(self._api_key)
def add_tags(self, *name): def add_tags(self, *name):
self.keywords: list = [] self.keywords: list = []
@@ -356,41 +359,9 @@ class RemoteNode:
self.__finalized_keys = self.__keys_selected and self.__authkeys_selected and self.__usedkeys_selected self.__finalized_keys = self.__keys_selected and self.__authkeys_selected and self.__usedkeys_selected
return result return result
# @TODO test and debug below 'RemoteNode' method # @TODO rewrite below method
def remove_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never: def remove_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never:
keyfiles = self.ssh.keys[source] raise NotImplementedError
for s in selections:
if isinstance(s, int):
removed_elem = keyfiles.pop(s)
elif isinstance(s, ExecutedPath):
path_set = set([s])
kf_set = set(keyfiles)
overlap = kf_set & path_set
if overlap is not None and len(overlap) > 0:
item = list(overlap)[0]
keyfiles = [p for p in keyfiles if p != item]
self.ssh.keys[source] = keyfiles
else:
continue
elif isinstance(s, str):
kf_strs = list(map(lambda p: str(p), keyfiles))
if s in kf_strs:
keyfiles = [p for p in keyfiles if str(p) != S]
self.ssh.keys[source] = keyfiles
self.ssh.keys[source] = keyfiles
if source == "available":
self.remove_keys("selected", *selections)
elif source == "selected":
self.remove_keys("authorized", *selections)
elif source == "authorized":
self.remove_keys("used", *selections)
return self.ssh.keys
def itemize(self): def itemize(self):
model: dict | vpsSchema = dict() model: dict | vpsSchema = dict()
@@ -421,3 +392,4 @@ class RemoteNode:
} }
model["keywords"] = self.keywords model["keywords"] = self.keywords
return model return model

View File

@@ -6,4 +6,14 @@ from typing import TypeAlias as Neotype
from pathlib import PurePosixPath, PureWindowsPath, PosixPath, WindowsPath from pathlib import PurePosixPath, PureWindowsPath, PosixPath, WindowsPath
ExecutedPath: Neotype = PosixPath | WindowsPath ExecutedPath: Neotype = PosixPath | WindowsPath
IdlePath: Neotype = PurePosixPath | PureWindowsPath IdlePath: Neotype = PurePosixPath | PureWindowsPath
class VirtualPrivateServers(Enum):
Linode = 0
class AnsibleScopes(Enum):
INTERNAL = 0
INVENTORY = 1
GROUPVARS = 2
HOSTVARS = 3
ROLE = 4

View File

@@ -24,35 +24,19 @@ class SoftPathGroup(Enum):
MEM = 2 MEM = 2
EXE = 3 EXE = 3
_AppParams = Dict("_AppParams", {
SoftPathGroup.CONFIG.name: {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
},
SoftPathGroup.DATA.name: {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
},
SoftPathGroup.MEM.name: {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
},
SoftPathGroup.EXE.name: {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
}
}, total=False)
_SubAppParams = Dict("_SubAppParams", { _SubAppParams = Dict("_SubAppParams", {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath], SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath], SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath] SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
}, total=False) }, total=False)
AppParams = Dict("AppParams", {
SoftPathGroup.CONFIG.name: _SubAppParams,
SoftPathGroup.DATA.name: _SubAppParams,
SoftPathGroup.MEM.name: _SubAppParams,
SoftPathGroup.EXE.name: _SubAppParams
}, total=False)
def _AppsInit(self, CONFIG = None, DATA = None, MEM = None, EXE = None): def __AppsInit(self, CONFIG = None, DATA = None, MEM = None, EXE = None):
self.CONFIG = CONFIG self.CONFIG = CONFIG
self.DATA = DATA self.DATA = DATA
self.MEM = MEM self.MEM = MEM
@@ -78,9 +62,9 @@ __app_input = {
SoftScope.LOCAL.name: [], SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: [] SoftScope.GLOBAL.name: []
}, },
"__init__": _AppsInit "__init__": __AppsInit
} }
_Apps = type("_Apps", (), __app_input) Apps = type("Apps", (), __app_input)
# @TODO continue adding magic methods to below class # @TODO continue adding magic methods to below class
# @NOTE https://rszalski.github.io/magicmethods/#sequence # @NOTE https://rszalski.github.io/magicmethods/#sequence
@@ -91,16 +75,16 @@ class Software:
self._fqdn: str | None = None self._fqdn: str | None = None
# @TODO fix NameError for 'Software' in parameter type check # @TODO fix NameError for 'Software' in parameter type check
def declare(self, name: str, **kwpaths: _SubAppParams) -> _AppParams: def declare(self, name: str, **kwpaths: _SubAppParams) -> AppParams:
keyword_args: _AppParams = kwpaths keyword_args: AppParams = kwpaths
app = _Apps(**keyword_args) app = Apps(**keyword_args)
setattr(self, name, app) setattr(self, name, app)
return app return app
def __getitem__(self, key: str) -> _AppParams | Never: def __getitem__(self, key: str) -> AppParams | Never:
if hasattr(self, key): if hasattr(self, key):
app: _Apps = getattr(self, key) app: Apps = getattr(self, key)
else: else:
raise KeyError raise KeyError
return app return app
@@ -112,7 +96,7 @@ class Software:
app_params: _SubAppParams = value app_params: _SubAppParams = value
if hasattr(self, key[0]): if hasattr(self, key[0]):
app: _Apps = getattr(self, key[0]) app: Apps = getattr(self, key[0])
if hasattr(app, key[1]): if hasattr(app, key[1]):
app_child: _SubAppParams = getattr(app, key[1]) app_child: _SubAppParams = getattr(app, key[1])
@@ -138,14 +122,14 @@ class Software:
if len(key) == 1: if len(key) == 1:
delattr(self, key[0]) delattr(self, key[0])
elif len(key) > 1: elif len(key) > 1:
app: _Apps = getattr(self, key[0]) app: Apps = getattr(self, key[0])
delattr(app, key[1]) delattr(app, key[1])
setattr(self, key[0], app) setattr(self, key[0], app)
def list(self, contents: bool = False) -> tuple[str]: def list(self, contents: bool = False) -> tuple[str]:
apps: tuple[str] | tuple[_Apps] = tuple( apps: tuple[str] | tuple[Apps] = tuple(
filter( filter(
lambda a: isinstance(getattr(self, a), _Apps), lambda a: isinstance(getattr(self, a), Apps),
dir(self) dir(self)
) )
) )
@@ -163,7 +147,7 @@ class Software:
def __len__(self) -> int: def __len__(self) -> int:
apps: tuple[str] = tuple( apps: tuple[str] = tuple(
filter( filter(
lambda a: isinstance(getattr(self, a), _Apps), lambda a: isinstance(getattr(self, a), Apps),
dir(self) dir(self)
) )
) )