Files
skato-ansible/anodes.py

396 lines
15 KiB
Python

"""
Library of classes modeling Ansible nodes or their types.
"""
from pathlib import Path, PurePath
from typing import TypeAlias as Neotype
from typing import TypedDict as Dict
from typing import Never, Union, Literal, Required
from collections.abc import Callable
from custtypes import ExecutedPath, IdlePath, VirtualPrivateServers, AnsibleScopes
from enum import Enum
from softman import Software, SoftPathGroup, SoftScope, Apps
from whereami import USER_PATH, PROJ_ROOT
from ansible_vault import Vault
from cerberus import Validator as constrain_by
from random import choice
import secrets
class ControlNode:
__user_path: ExecutedPath = USER_PATH
__proj_root: ExecutedPath = PROJ_ROOT
__conf_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath("/etc"),
PurePath("/usr", "local", "etc"),
PurePath(str(__user_path), ".config")
)
__data_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath("/usr", "local", "share"),
PurePath(str(__user_path), ".local", "share")
)
def __init__(self, ansible_proj_root: ExecutedPath | None = None):
if ansible_proj_root is not None:
self.__proj_root = ansible_proj_root
role_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath(str(self.__proj_root), "roles"),
PurePath(str(self.__proj_root), ".ansible/roles")
)
roledata_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath(str(self.proj_roles[0]), "files"),
PurePath(str(self.proj_roles[0]), "templates"),
PurePath(str(self.proj_roles[1]), "files"),
PurePath(str(self.proj_roles[1]), "templates")
)
setattr(self, AnsibleScopes.ROLE.name, {
"data": roledata_paths,
"vars": role_paths
})
setattr(self, AnsibleScopes.GROUPVARS.name, {
"vars": (PurePath(str(self.__proj_root), "group_vars"),)
})
setattr(self, AnsibleScopes.HOSTVARS.name, {
"vars": (PurePath(str(self.__proj_root), "host_vars"),)
})
setattr(self, AnsibleScopes.INVENTORY.name, {
"vars": (PurePath(str(self.__proj_root), "hosts.yml"),)
})
def get_scope(self, scope: AnsibleScopes = AnsibleScopes.INVENTORY.name, index = 0):
return getattr(self, scope)[index]
@property
def home(self) -> ExecutedPath:
return self.__user_path
@property
def root(self) -> ExecutedPath:
return self.__proj_root
@property
def sys_confs(self) -> ExecutedPath:
return self.__conf_paths
@property
def sys_data(self) -> ExecutedPath:
return self.__data_paths
class Softs(Enum):
ssh = 0
_userSSHSubParams = {
"available": Required[tuple[ExecutedPath]],
"selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]],
"authorized": ExecutedPath | list[ExecutedPath] | int | list[int],
"used": ExecutedPath | list[ExecutedPath] | int | list[int]
}
__user_ssh_keys = {
"available": tuple(),
"selected": [0, 1],
"authorized": 1,
"used": 0
}
def __userSSHInit(self, username: str = "root", paths: _Apps | None = None, keys: _userSSHSubParams = __user_ssh_keys, password: str = "password123", fate: Literal["disposal", "retention"] = "disposal"):
self.username = username
self.paths = paths
self.keys = keys
self.password = password
self.fate = fate
userSSHParams = Dict("userSSHParams", {
"username": Required[str],
"paths": _Apps,
"keys": __user_ssh_keys,
"password": Required[str],
"fate": Literal["disposal", "retention"]
}, total=False)
__user_ssh_input = {
"username": "",
"paths": None,
"keys": {
"available": tuple(),
"selected": [0, 1],
"authorized": 1,
"used": 0
},
"password": "password123",
"fate": "disposal",\
"__init__": __userSSHInit
}
userSSH = type("userSSH", (), __user_ssh_input)
vpsSchema = Dict("vpsSchema", {
"fqdn": Required[str],
"vps_service": {
"exists": Required[bool],
"password": Required[str],
"api_key": Required[str],
"type": Required[Literal["linode"]],
"region": Literal["us-east"],
"ssh_authorized_keys": list[IdlePath | ExecutedPath | str],
"ssh_private_key_paths": list[IdlePath | ExecutedPath | str],
"ssh_private_key_path_pref": int,
"root_fate": Required[Literal["disposal","retention"]],
"ssh_motd_script_basenames": list[str]
},
"keywords": list[str]
}, total=False)
class RemoteNode:
# __user_path =
_fqdn: str | None = None
def __init__(self, cnode: ControlNode, api_key: str = secrets.token_urlsafe(32), password: str = "password123", name: str = ".test", service: VirtualPrivateServers = VirtualPrivateServers.Linode.name, region: Literal["us-east"] | None = "us-east", keywords: list[str] | None = ["server"]):
self.root: dict = dict()
self.root["username"]: str = "root"
vault = Vault(password)
self.root["password"]: str = vault.dump(password)
self.root["software"]: Software = Software()
self.owner = cnode
app_input = {
SoftPathGroup.CONFIG.name: {
SoftScope.PERSONAL.name: PurePath(str(cnode.home), ("." + Softs.ssh.name))
},
SoftPathGroup.DATA.name: {
SoftScope.GLOBAL.name: PurePath(str(cnode.sys_confs[0]), "update-motd.d")
}
}
self.root["software"].declare(Softs.ssh.name, **app_input)
self._fqdn = name
self.root["software"]._fqdn = name
root_ssh_input: userSSHParams = {
"username": self.root["username"],
"password": self.root["password"]
}
self.ssh: userSSH = userSSH(**root_ssh_input)
self.apps: list = self.root["software"].list(contents = True)
self.keywords = keywords
self._api_key: str | None = api_key
self.service = service
self.region = region
self.__authkeys_selected = False
self.__usedkeys_selected = False
self.__keys_selected = False
self.__finalized_keys = False
self.model: dict | None = None
self.__accumulator = None
def set_region(self, name: Literal["us-east"] = "us-east") -> None:
self.region = name
def set_password(self, password: str) -> None:
vault = Vault(self.root["password"])
self.root["password"] = vault.dump(self.root["password"])
def set_api(self, key: str) -> None:
vault = Vault(self._api_key)
self._api_key = vault.dump(self._api_key)
def add_tags(self, *name):
self.keywords: list = []
self.keywords += list(name)
def import_keys(self, key_basenames: str | tuple[str], match_sort: tuple[Callable, bool] = (lambda e: e.stem, False)):
keyfiles: list[ExecutedPath] | list[None] | None = list()
if isinstance(key_basenames, tuple):
for basename in key_basenames:
keyfiles += Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(basename)
elif isinstance(key_basenames, str):
keyfiles = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(key_basenames)
else:
raise ValueError
updated_keyfiles: list[ExecutedPath] = []
for filename in keyfiles:
new_keyfile = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])) / str(filename)
updated_keyfiles.append(new_keyfile.resolve())
keyfiles = updated_keyfiles
self.ssh.paths = self.root["software"].ssh
self.ssh.keys["available"] = tuple(sorted(keyfiles, key=match_sort[0], reverse=match_sort[1]))
def show_keys(self, which: Literal["authorized", "used", "available", "selected"] = "available", kformat = object) -> tuple[tuple, str]:
delimiters: str | tuple = "[]"
gap: Callable[[bool], str | None] = lambda b: " " if b else ""
sep_format: str = "{0}"
sep: Callable[[str, bool], str] = lambda s, b: sep_format.format(s) + gap(b)
label_format: str = "{0}:"
label: Callable[[str, bool], str] = lambda s, b: label_format.format(s) + gap(b)
def render(kfs, source: str):
if isinstance(kfs, list):
member_ints: list = list()
for f in kfs:
if isinstance(f, int):
members_ints.append(f)
if len(member_ints) > 0:
kfs = [self.ssh.keys[source][i] for i in kfs]
elif isinstance(kfs, int):
kfs = [self.ssh.keys[source][kfs]]
elif isinstance(kfs, ExecutedPath):
kfs = [self.ssh.keys[source]]
return kfs
keyfiles = self.ssh.keys[which]
if which == "selected":
keyfiles = render(keyfiles, "available")
if which == "authorized":
keyfiles = render(keyfiles, "selected")
if which == "used":
keyfiles = render(keyfiles, "selected")
stringified_keyfiles = list(map(lambda t: label(t[0], True) + str(t[1]), enumerate(keyfiles)))
stringified_keyfiles = sep(",", True).join(stringified_keyfiles)
stringified_keyfiles = delimiters[0] + stringified_keyfiles[:(len(stringified_keyfiles) - 2)] + delimiters[1:]
if kformat == str:
result = stringified_keyfiles
elif kformat == list:
result = keyfiles
elif kformat == tuple:
result = tuple(keyfiles)
elif kformat == object:
result = (tuple(keyfiles), stringified_keyfiles)
print(result)
return result
def pick_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never:
keyfiles = self.ssh.keys[source]
# print(keyfiles)
if keyfiles is None:
raise TypeError
elif isinstance(keyfiles, (tuple, list)) and len(keyfiles) < 1:
raise ValueError
authlist = []
if source == "available":
for s in selections:
if isinstance(s, int):
# print(s)
authlist.append(keyfiles[s])
elif isinstance(s, ExecutedPath):
path_set = set([s])
kf_set = set(keyfiles)
overlap = kf_set & path_set
if overlap is not None and len(overlap) > 0:
authlist.append(list(overlap)[0])
else:
continue
elif isinstance(s, str):
kf_strs = list(map(lambda p: str(p), keyfiles))
if s in kf_strs:
authlist.append(Path(s))
self.ssh.keys["selected"] = authlist
self.__keys_selected = True
result = self.ssh.keys["selected"]
if source == "selected":
privkeys = list()
pubkeys = list()
count = 1
for s in selections:
if isinstance(s, int):
# print(s)
if count % 2 == 0:
pubkeys.append(keyfiles[s])
else:
privkeys.append(keyfiles[s])
elif isinstance(s, ExecutedPath):
path_set = set([s])
kf_set = set(keyfiles)
overlap = kf_set & path_set
if overlap is not None and len(overlap) > 0:
if count % 2 == 0:
pubkeys.append(list(overlap)[0])
else:
privkeys.append(list(overlap)[0])
else:
continue
elif isinstance(s, str):
kf_strs = list(map(lambda p: str(p), keyfiles))
if s in kf_strs:
if count % 2 == 0:
pubkeys.append(Path(s))
else:
privkeys.append(Path(s))
count += 1
self.ssh.keys["authorized"] = pubkeys
self.ssh.keys["used"] = privkeys
self.__authkeys_selected = True
self.__usedkeys_selected = True
result = (self.ssh.keys["authorized"], self.ssh.keys["used"])
elif source == "authorized":
for s in selections:
if isinstance(s, int):
if self.ssh.keys["selected"][s] in keyfiles:
authlist.append(self.ssh.keys["selected"][s])
elif isinstance(s, ExecutedPath):
for p in self.ssh.keys["selected"]:
if str(s) in str(p) and str(s) in list(map(lambda p: str(p), keyfiles)):
authlist.append(p)
else:
continue
elif isinstance(s, str):
for p in self.ssh.keys["selected"]:
if s in str(p) and s in list(map(lambda p: str(p), keyfiles)):
authlist.append(p)
else:
continue
self.ssh.keys["used"] = authlist
self.__usedkeys_selected = True
result = self.ssh.keys["used"]
self.__finalized_keys = self.__keys_selected and self.__authkeys_selected and self.__usedkeys_selected
return result
# @TODO rewrite below method
def remove_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never:
raise NotImplementedError
def itemize(self):
model: dict | vpsSchema = dict()
vault_api = Vault(self._api_key)
vault_pass = Vault(self.ssh.password)
# print(self.ssh.keys["selected"])
if not self.__keys_selected:
self.pick_keys("available", *self.ssh.keys["selected"])
self.pick_keys("selected", self.ssh.keys["authorized"], self.ssh.keys["used"])
# self.pick_keys("selected", self.ssh.keys["used"])
authorized_keys = list(map(lambda p: p.read_text(), self.ssh.keys["authorized"]))
used_keys = list(map(lambda p: str(p), self.ssh.keys["used"]))
model["fqdn"] = self._fqdn
model["vps_service"] = {
"exists": True,
"password": vault_pass.dump(self.ssh.password),
"api_key": vault_api.dump(self._api_key),
"type": self.service.lower(),
"region": self.region,
"ssh_authorized_keys": authorized_keys,
"ssh_private_key_paths": used_keys,
"ssh_private_key_path_pref": choice(list(range(len(used_keys)))),
"root_fate": self.ssh.fate,
"ssh_motd_script_basenames": []
}
model["keywords"] = self.keywords
return model