Compare commits

...

41 Commits

Author SHA1 Message Date
ffcf6b2596 fix: made the return be self, given that the instance of the class itself is what is treated as the context manager by default 2026-01-24 14:47:44 -05:00
0d44ec029f replaced cerberus dependency with library of prebuilt valdiators 2026-01-24 14:46:51 -05:00
6af0d4b48c fix, refactor: changed file buffer mode and changed conditional sequences for shorter algorithm 2026-01-24 14:46:08 -05:00
f37c2d5998 feature: added a new type amalgamating the return types of the 'open' BIF 2026-01-24 14:39:35 -05:00
a2d921a158 feature: added a class for handling Ansible vaults that can be converted to string, and added a YAMLObject representing VPS service as in YAML doc 2026-01-24 14:38:31 -05:00
e3683fe955 feature: created file for future GPG/OpenPGP related classes 2026-01-22 17:20:00 -05:00
d9ff0443dc feature: removed dict arg option for publish method while added repr magic method in/for SSHKeyCollection 2026-01-22 17:13:14 -05:00
66da080109 feature: made User and Group classes inherit from YAMLObject class for ease in load/dump operations and conversion 2026-01-22 17:10:15 -05:00
8e6346ee21 fix: made code DRYer, changed RegEx match test to succeed when appropriate 2026-01-22 17:08:06 -05:00
c698bc831a fix, feature: accounted for additional cases in concatenating magic method for SSHKey, added Sequence methods to SSHKey 2026-01-22 14:19:01 -05:00
d46f3a1f83 feature: created a class representing the Ansible control node as context manager 2026-01-22 14:16:55 -05:00
ad8ea5d8b8 feature: added a class 'Parser' that abstracts from data loaders and dumpers 2026-01-22 14:12:37 -05:00
0137fcdad9 feature: made new static path variables and dictionaries containing paths 2026-01-22 14:11:03 -05:00
36e553092f changed Enum options for Software 2026-01-22 14:09:12 -05:00
8ccdf4547e refactor: distributed classes into different files 2026-01-22 14:08:23 -05:00
d9d81a43e0 added another command to the CLI program 2026-01-21 09:21:13 -05:00
2df09c8087 feature: added more root paths and a dictionary of root paths for specific purposes 2026-01-21 09:20:25 -05:00
73472cb073 feature: added more Enum classes useful for type checking and renamed others 2026-01-21 09:19:46 -05:00
57cf9d197d removed configuration manager module as may not be necessary 2026-01-21 09:18:43 -05:00
808434275b removed node classes as currently may not be necessary 2026-01-21 09:18:14 -05:00
aea6638243 refactor: renamed module with utilities for management of software for given users and groups 2026-01-21 09:17:37 -05:00
4eab3bd787 refactor: renamed module with utilities for management of SSH keys 2026-01-21 09:16:51 -05:00
05a680eb7e feature: added a method for 'SSHKeyCollection' that outputs its data in an Ansible-friendly data structure 2026-01-20 12:46:33 -05:00
04127d9d5f fix: changed referenced in commented-out code 2026-01-20 12:15:31 -05:00
28bd322c87 feature: added magic method for containment test of 'SSHKeyCollection' and magic method for setting the status of 'SSHKey' 2026-01-20 12:14:53 -05:00
67e645a3bd refactor,feature: simplified item method for 'SSHKeyCollection', added iteration and str methods to it, and refined str methods for 'SSHKey' 2026-01-14 19:36:27 -05:00
83af21eec8 fix: made sure to get basename rather than file stem 2026-01-14 18:25:47 -05:00
88aab2b418 feature: allowed slicing for item retrieval from 'SSHKeyCollection' instances 2026-01-14 18:25:01 -05:00
43885ec135 feature: implemented some numeric and sequence methods for 'SSHKey' class and added a function 2026-01-08 09:45:23 -05:00
877e133eec feature: added equality comparison magic method counterparts 2026-01-07 23:26:17 -05:00
a4a8260d1c fix: added missing import of Callable type hint 2026-01-07 23:16:43 -05:00
c45b282c5c fix: debugged various magic methods in 'SSHKeyCollection' and added unimplemented but planned magic methods for it and 'SSHKey' class 2026-01-07 23:16:06 -05:00
b7d35f4147 fix: added missing import of Enum 2026-01-07 23:13:09 -05:00
86cd3584b3 changed method name and added inheritance for 'Software' class, moved 'SSHKey' related classes to separate file and fixed 'Software' instance method call 2026-01-07 12:07:36 -05:00
e412e3d5ab changed method name and added inheritance for 'Software' class, moved 'SSHKey' related classes to separate file and fixed 'Software' instance method call 2026-01-07 12:07:07 -05:00
f46d397e3c refactor: moved SSH key management classes to separate file, added planned methods unimplemented for some classes 2026-01-06 21:59:05 -05:00
d75ad74d6b refactor: joined related conditionals together 2026-01-06 20:50:09 -05:00
dc97a1f0b9 refactor: implementing or changing classes to reduce decision tree branching complexity in some methods of 'RemoteNode' class 2026-01-06 19:27:59 -05:00
2f37eed0db fix: changed the algorithm/approach of the SSH key path removal method for 'RemoteNode' 2026-01-05 10:26:17 -05:00
f68a438de3 feature: created another set of cases for non-YAML parsing, tho not yet implemented 2026-01-05 09:10:35 -05:00
8193cb6de6 feature: created cases for non-YAML parsing, tho not yet implemented 2026-01-05 09:09:02 -05:00
11 changed files with 1483 additions and 623 deletions

438
anodes.py
View File

@@ -1,395 +1,75 @@
""" from custtypes import AnsibleRoles, AnsibleScopes, ExecutedPath
Library of classes modeling Ansible nodes or their types. from whereami import USER_PATH, ANSIBLE_CONFIG, ANSIBLE_ROOTS
""" from configparser import ConfigParser as cfg
from typing import Sequence
from pathlib import Path, PurePath from re import Pattern as RegEx
from typing import TypeAlias as Neotype from pathlib import PurePath
from typing import TypedDict as Dict from parse import Parser
from typing import Never, Union, Literal, Required
from collections.abc import Callable
from custtypes import ExecutedPath, IdlePath, VirtualPrivateServers, AnsibleScopes
from enum import Enum
from softman import Software, SoftPathGroup, SoftScope, Apps
from whereami import USER_PATH, PROJ_ROOT
from ansible_vault import Vault
from cerberus import Validator as constrain_by
from random import choice
import secrets
# @TODO below class should mostly work as a context manager
class ControlNode: class ControlNode:
__user_path: ExecutedPath = USER_PATH __parser = Parser()
__proj_root: ExecutedPath = PROJ_ROOT
__conf_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath("/etc"),
PurePath("/usr", "local", "etc"),
PurePath(str(__user_path), ".config")
)
__data_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath("/usr", "local", "share"),
PurePath(str(__user_path), ".local", "share")
)
def __init__(self, ansible_proj_root: ExecutedPath | None = None):
if ansible_proj_root is not None:
self.__proj_root = ansible_proj_root
role_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath(str(self.__proj_root), "roles"),
PurePath(str(self.__proj_root), ".ansible/roles")
)
roledata_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath(str(self.proj_roles[0]), "files"),
PurePath(str(self.proj_roles[0]), "templates"),
PurePath(str(self.proj_roles[1]), "files"),
PurePath(str(self.proj_roles[1]), "templates")
)
setattr(self, AnsibleScopes.ROLE.name, {
"data": roledata_paths,
"vars": role_paths
})
setattr(self, AnsibleScopes.GROUPVARS.name, {
"vars": (PurePath(str(self.__proj_root), "group_vars"),)
})
setattr(self, AnsibleScopes.HOSTVARS.name, {
"vars": (PurePath(str(self.__proj_root), "host_vars"),)
})
setattr(self, AnsibleScopes.INVENTORY.name, {
"vars": (PurePath(str(self.__proj_root), "hosts.yml"),)
})
def get_scope(self, scope: AnsibleScopes = AnsibleScopes.INVENTORY.name, index = 0): def __init__(self):
return getattr(self, scope)[index] self.__user_path = USER_PATH()
self.__config = cfg()
@property if ANSIBLE_CONFIG.exists():
def home(self) -> ExecutedPath: self.__config.read(str(ANSIBLE_CONFIG))
return self.__user_path
@property
def root(self) -> ExecutedPath:
return self.__proj_root
@property
def sys_confs(self) -> ExecutedPath:
return self.__conf_paths
@property
def sys_data(self) -> ExecutedPath:
return self.__data_paths
class Softs(Enum):
ssh = 0
_userSSHSubParams = {
"available": Required[tuple[ExecutedPath]],
"selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]],
"authorized": ExecutedPath | list[ExecutedPath] | int | list[int],
"used": ExecutedPath | list[ExecutedPath] | int | list[int]
}
__user_ssh_keys = {
"available": tuple(),
"selected": [0, 1],
"authorized": 1,
"used": 0
}
def __userSSHInit(self, username: str = "root", paths: _Apps | None = None, keys: _userSSHSubParams = __user_ssh_keys, password: str = "password123", fate: Literal["disposal", "retention"] = "disposal"):
self.username = username
self.paths = paths
self.keys = keys
self.password = password
self.fate = fate
userSSHParams = Dict("userSSHParams", {
"username": Required[str],
"paths": _Apps,
"keys": __user_ssh_keys,
"password": Required[str],
"fate": Literal["disposal", "retention"]
}, total=False)
__user_ssh_input = {
"username": "",
"paths": None,
"keys": {
"available": tuple(),
"selected": [0, 1],
"authorized": 1,
"used": 0
},
"password": "password123",
"fate": "disposal",\
"__init__": __userSSHInit
}
userSSH = type("userSSH", (), __user_ssh_input)
vpsSchema = Dict("vpsSchema", {
"fqdn": Required[str],
"vps_service": {
"exists": Required[bool],
"password": Required[str],
"api_key": Required[str],
"type": Required[Literal["linode"]],
"region": Literal["us-east"],
"ssh_authorized_keys": list[IdlePath | ExecutedPath | str],
"ssh_private_key_paths": list[IdlePath | ExecutedPath | str],
"ssh_private_key_path_pref": int,
"root_fate": Required[Literal["disposal","retention"]],
"ssh_motd_script_basenames": list[str]
},
"keywords": list[str]
}, total=False)
class RemoteNode:
# __user_path =
_fqdn: str | None = None
def __init__(self, cnode: ControlNode, api_key: str = secrets.token_urlsafe(32), password: str = "password123", name: str = ".test", service: VirtualPrivateServers = VirtualPrivateServers.Linode.name, region: Literal["us-east"] | None = "us-east", keywords: list[str] | None = ["server"]):
self.root: dict = dict()
self.root["username"]: str = "root"
vault = Vault(password)
self.root["password"]: str = vault.dump(password)
self.root["software"]: Software = Software()
self.owner = cnode
app_input = {
SoftPathGroup.CONFIG.name: {
SoftScope.PERSONAL.name: PurePath(str(cnode.home), ("." + Softs.ssh.name))
},
SoftPathGroup.DATA.name: {
SoftScope.GLOBAL.name: PurePath(str(cnode.sys_confs[0]), "update-motd.d")
}
}
self.root["software"].declare(Softs.ssh.name, **app_input)
self._fqdn = name
self.root["software"]._fqdn = name
root_ssh_input: userSSHParams = {
"username": self.root["username"],
"password": self.root["password"]
}
self.ssh: userSSH = userSSH(**root_ssh_input)
self.apps: list = self.root["software"].list(contents = True)
self.keywords = keywords
self._api_key: str | None = api_key
self.service = service
self.region = region
self.__authkeys_selected = False
self.__usedkeys_selected = False
self.__keys_selected = False
self.__finalized_keys = False
self.model: dict | None = None
self.__accumulator = None
def set_region(self, name: Literal["us-east"] = "us-east") -> None:
self.region = name
def set_password(self, password: str) -> None:
vault = Vault(self.root["password"])
self.root["password"] = vault.dump(self.root["password"])
def set_api(self, key: str) -> None:
vault = Vault(self._api_key)
self._api_key = vault.dump(self._api_key)
def add_tags(self, *name):
self.keywords: list = []
self.keywords += list(name)
def import_keys(self, key_basenames: str | tuple[str], match_sort: tuple[Callable, bool] = (lambda e: e.stem, False)):
keyfiles: list[ExecutedPath] | list[None] | None = list()
if isinstance(key_basenames, tuple):
for basename in key_basenames:
keyfiles += Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(basename)
elif isinstance(key_basenames, str):
keyfiles = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])).glob(key_basenames)
else: else:
raise ValueError raise Exception
updated_keyfiles: list[ExecutedPath] = [] self.__data = None
for filename in keyfiles: self.__filepath: ExecutedPath | None = None
new_keyfile = Path(str(self.root["software"].ssh.CONFIG[SoftScope.PERSONAL.name])) / str(filename) self.__file = None
updated_keyfiles.append(new_keyfile.resolve())
keyfiles = updated_keyfiles
self.ssh.paths = self.root["software"].ssh def __enter__(self):
self.ssh.keys["available"] = tuple(sorted(keyfiles, key=match_sort[0], reverse=match_sort[1])) self.__file = open(str(self.__filepath), "r+")
self.__data = self.__parser.load(self.__filepath)
return self.__data
def show_keys(self, which: Literal["authorized", "used", "available", "selected"] = "available", kformat = object) -> tuple[tuple, str]: def __exit__(self, exc_type, exc_value, exc_traceback):
delimiters: str | tuple = "[]" result = self.__parser.dump(self.__data)
gap: Callable[[bool], str | None] = lambda b: " " if b else ""
sep_format: str = "{0}"
sep: Callable[[str, bool], str] = lambda s, b: sep_format.format(s) + gap(b)
label_format: str = "{0}:"
label: Callable[[str, bool], str] = lambda s, b: label_format.format(s) + gap(b)
def render(kfs, source: str): if isinstance(result, str):
if isinstance(kfs, list): self.__file.write(result)
member_ints: list = list() else:
for f in kfs: result.write(self.__file)
if isinstance(f, int):
members_ints.append(f)
if len(member_ints) > 0:
kfs = [self.ssh.keys[source][i] for i in kfs]
elif isinstance(kfs, int):
kfs = [self.ssh.keys[source][kfs]]
elif isinstance(kfs, ExecutedPath):
kfs = [self.ssh.keys[source]]
return kfs
keyfiles = self.ssh.keys[which] self.__file.close()
if which == "selected":
keyfiles = render(keyfiles, "available")
if which == "authorized": def __call__(self, scope: AnsibleScopes = AnsibleScopes.INVENTORY, pick: int | str | RegEx | AnsibleRoles = 0, filepath: ExecutedPath | str | None = None):
keyfiles = render(keyfiles, "selected") if scope is None:
raise NotImplementedError
else:
paths = ANSIBLE_ROOTS[scope.name.lower()]
if which == "used": if isinstance(paths, Sequence):
keyfiles = render(keyfiles, "selected") path = None
if isinstance(pick, int):
path = paths[pick]
elif isinstance(pick, str):
path = tuple(filter(lambda p: str(p) == pick or pick in str(p), paths))
if len(path) > 0:
path = path[0]
elif isinstance(pick, RegEx):
path = tuple(filter(lambda p: pick.search(str(p)), paths))
if len(path) > 0:
path = path[0]
else:
path = tuple(filter(lambda p: str(p) == pick.name.lower() or pick.name.lower() in str(p), paths))
if len(path) > 0:
path = path[0]
stringified_keyfiles = list(map(lambda t: label(t[0], True) + str(t[1]), enumerate(keyfiles))) if path is None:
stringified_keyfiles = sep(",", True).join(stringified_keyfiles) raise KeyError
stringified_keyfiles = delimiters[0] + stringified_keyfiles[:(len(stringified_keyfiles) - 2)] + delimiters[1:]
if kformat == str: if isinstance(filepath, ExecutedPath):
result = stringified_keyfiles filepath = str(filepath)
elif kformat == list:
result = keyfiles
elif kformat == tuple:
result = tuple(keyfiles)
elif kformat == object:
result = (tuple(keyfiles), stringified_keyfiles)
print(result) if path.is_dir():
return result self.__filepath = path / filepath
elif path.is_file():
def pick_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never: self.__filepath = path
keyfiles = self.ssh.keys[source]
# print(keyfiles)
if keyfiles is None:
raise TypeError
elif isinstance(keyfiles, (tuple, list)) and len(keyfiles) < 1:
raise ValueError
authlist = []
if source == "available":
for s in selections:
if isinstance(s, int):
# print(s)
authlist.append(keyfiles[s])
elif isinstance(s, ExecutedPath):
path_set = set([s])
kf_set = set(keyfiles)
overlap = kf_set & path_set
if overlap is not None and len(overlap) > 0:
authlist.append(list(overlap)[0])
else:
continue
elif isinstance(s, str):
kf_strs = list(map(lambda p: str(p), keyfiles))
if s in kf_strs:
authlist.append(Path(s))
self.ssh.keys["selected"] = authlist
self.__keys_selected = True
result = self.ssh.keys["selected"]
if source == "selected":
privkeys = list()
pubkeys = list()
count = 1
for s in selections:
if isinstance(s, int):
# print(s)
if count % 2 == 0:
pubkeys.append(keyfiles[s])
else:
privkeys.append(keyfiles[s])
elif isinstance(s, ExecutedPath):
path_set = set([s])
kf_set = set(keyfiles)
overlap = kf_set & path_set
if overlap is not None and len(overlap) > 0:
if count % 2 == 0:
pubkeys.append(list(overlap)[0])
else:
privkeys.append(list(overlap)[0])
else:
continue
elif isinstance(s, str):
kf_strs = list(map(lambda p: str(p), keyfiles))
if s in kf_strs:
if count % 2 == 0:
pubkeys.append(Path(s))
else:
privkeys.append(Path(s))
count += 1
self.ssh.keys["authorized"] = pubkeys
self.ssh.keys["used"] = privkeys
self.__authkeys_selected = True
self.__usedkeys_selected = True
result = (self.ssh.keys["authorized"], self.ssh.keys["used"])
elif source == "authorized":
for s in selections:
if isinstance(s, int):
if self.ssh.keys["selected"][s] in keyfiles:
authlist.append(self.ssh.keys["selected"][s])
elif isinstance(s, ExecutedPath):
for p in self.ssh.keys["selected"]:
if str(s) in str(p) and str(s) in list(map(lambda p: str(p), keyfiles)):
authlist.append(p)
else:
continue
elif isinstance(s, str):
for p in self.ssh.keys["selected"]:
if s in str(p) and s in list(map(lambda p: str(p), keyfiles)):
authlist.append(p)
else:
continue
self.ssh.keys["used"] = authlist
self.__usedkeys_selected = True
result = self.ssh.keys["used"]
self.__finalized_keys = self.__keys_selected and self.__authkeys_selected and self.__usedkeys_selected
return result
# @TODO rewrite below method
def remove_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never:
raise NotImplementedError
def itemize(self):
model: dict | vpsSchema = dict()
vault_api = Vault(self._api_key)
vault_pass = Vault(self.ssh.password)
# print(self.ssh.keys["selected"])
if not self.__keys_selected:
self.pick_keys("available", *self.ssh.keys["selected"])
self.pick_keys("selected", self.ssh.keys["authorized"], self.ssh.keys["used"])
# self.pick_keys("selected", self.ssh.keys["used"])
authorized_keys = list(map(lambda p: p.read_text(), self.ssh.keys["authorized"]))
used_keys = list(map(lambda p: str(p), self.ssh.keys["used"]))
model["fqdn"] = self._fqdn
model["vps_service"] = {
"exists": True,
"password": vault_pass.dump(self.ssh.password),
"api_key": vault_api.dump(self._api_key),
"type": self.service.lower(),
"region": self.region,
"ssh_authorized_keys": authorized_keys,
"ssh_private_key_paths": used_keys,
"ssh_private_key_path_pref": choice(list(range(len(used_keys)))),
"root_fate": self.ssh.fate,
"ssh_motd_script_basenames": []
}
model["keywords"] = self.keywords
return model
return __enter__

View File

@@ -1,91 +0,0 @@
from pathlib import Path, PurePath
from whereami import PROJ_ROOT
from custtypes import ExecutedPath, IdlePath, VirtualPrivateServers, AnsibleScopes
from anodes import RemoteNode, ControlNode, _userSSHSubParams
# from ansible_vault import Vault
import yaml as yams
import re
from typing import Literal
# @TODO for 'Config' class, write methods to pull and push from Ansible YAML files
# @NOTE https://docs.python.org/3/library/configparser.html#quick-start
class Config:
path: ExecutedPath = PROJ_ROOT / "config.ini"
__controller = ControlNode()
setattr(__controller, AnsibleScopes.INTERNAL.name, {
"vars": (PurePath(str(path)),)
})
def __init__(self, filepath: str = "config.ini", scope: AnsibleScopes = AnsibleScopes.INTERNAL.name, index: int = 0, mode: str = "r+"):
self.scope = scope
parent_dir = self.__controller.get_scope(scope, index)
filepath = Path(parent_dir) / filepath
filename: str = filepath.stem
self.parse_method = "yml"
if "." in filename:
filename_arr = filename.split(".")
basename: str = filename_arr[0]
ext: str = filename_arr[len(filename_arr) - 1]
if re.match(r"toml|ini", ext):
self.parse_method: str = ext.upper()
self.filepath: ExecutedPath = filepath
self.mode = mode
self.model = None
self.file = None
self.__remote: RemoteNode | None = None
def __enter__(self):
if not self.filepath.exists():
self.__remote = RemoteNode(self.__controller)
return self.__remote
self.file = open(str(self.filepath), self.mode)
model = self.file.read()
self.model = yams.load(model)
if self.scope == AnsibleScopes.GROUPVARS.name or self.scope == AnsibleScopes.HOSTVARS.name:
api_key: str = model["vps_service"]["api_key"]
vault = Vault(model["vps_service"]["password"])
password: str = vault.load(model["vps_service"]["password"])
fqdn: str = model["fqdn"]
service: VirtualPrivateServers = model["vps_service"]["type"]
region: Literal["us-east"] = model["vps_service"]["region"]
keywords: list[str] = model["keywords"]
self.__remote = RemoteNode(self.__controller, api_key, password, fqdn, service, region, keywords)
# @TODO add portion wherein SSH keys from 'model' are made to be present in 'self.__remote'
self.__remote.import_keys("*")
keyfiles = self.__remote.show_keys("available", list)
keyfiles_contents = set(map(lambda p: p.read_text(), keyfiles))
keyfiles_pub = list(set(model["vps_service"]["ssh_authorized_keys"]) - keyfiles_contents)
self.__remote.ssh.keys["available"] = tuple(keyfiles_pub)
self.__remote.ssh.keys["selected"] = keyfiles_pub
self.__remote.ssh.keys["authorized"] += keyfiles_pub
keyfiles_paths = set(map(lambda p: str(p), keyfiles))
keyfiles_priv = list(set(model["vps_service"]["ssh_private_key_paths"]) - keyfiles_paths)
self.__remote.ssh.keys["available"] = (*self.__remote.ssh.keys["available"],*keyfiles_priv)
self.__remote.ssh.keys["selected"] += keyfiles_priv
self.__remote.ssh.keys["used"] += keyfiles_priv
self.__remote.ssh.fate: Literal["disposal", "retention"] = model["vps_service"]["root_fate"]
return self.__remote
else:
raise ValueError
def __exit__(self):
if self.scope == AnsibleScopes.GROUPVARS.name or self.scope == AnsibleScopes.HOSTVARS.name:
ansible_chunk = self.__remote.itemize()
ansible_chunk["vps_service"]["ssh_motd_script_basenames"] = self.model["ssh_motd_script_basenames"]
ansible_chunk["vps_service"]["ssh_private_key_path_pref"] = self.model["ssh_private_key_path_pref"]
del self.model["fqdn"]
del self.model["vps_service"]
del self.model["keywords"]
else:
raise ValueError
file_model = yams.dump(self.model | ansible_chunk)
self.file.write(file_model)
self.file.close()

View File

@@ -2,18 +2,81 @@
Library of custom type hints. Library of custom type hints.
""" """
from typing import TypeAlias as Neotype from typing import TypeAlias as Neotype, TypedDict as Dict
from pathlib import PurePosixPath, PureWindowsPath, PosixPath, WindowsPath from typing import Required
from collections.abc import Sequence
from pathlib import Path, PurePath, PurePosixPath, PureWindowsPath, PosixPath, WindowsPath
from enum import StrEnum, auto
from io import TextIOBase, BufferedIOBase, RawIOBase
ExecutedPath: Neotype = PosixPath | WindowsPath ExecutedPath: Neotype = PosixPath | WindowsPath
IdlePath: Neotype = PurePosixPath | PureWindowsPath IdlePath: Neotype = PurePosixPath | PureWindowsPath
File: Neotype = TextIOBase | BufferedIOBase | RawIOBase
class VirtualPrivateServers(Enum): class RootFate(StrEnum):
Linode = 0 disposal = auto()
retention = auto()
class AnsibleScopes(Enum): class NodeType(StrEnum):
INTERNAL = 0 remote = auto()
INVENTORY = 1 control = auto()
GROUPVARS = 2
HOSTVARS = 3 class VPS(StrEnum):
ROLE = 4 Linode = auto()
class VPSRegion(StrEnum):
us_east = auto()
class AnsibleScopes(StrEnum):
INTERNAL = auto()
INVENTORY = auto()
GROUPVARS = auto()
HOSTVARS = auto()
ROLE = auto()
class AnsibleRoles(StrEnum):
bootstrap = auto()
class Scopes(StrEnum):
SYS = auto()
USER = auto()
LOCAL = auto()
PROJ = auto()
SHARED = auto()
class Roles(StrEnum):
CONF = auto()
DATA = auto()
MEM = auto()
EXE = auto()
class UserName(StrEnum):
root = auto()
class GroupName(StrEnum):
remote = auto()
sudo = auto()
class Software(StrEnum):
openssh_client = auto()
openssh_server = auto()
class SoftwareRoles(StrEnum):
client = auto()
server = auto()
class PacMans(StrEnum):
APT = auto()
class PathCollection(Dict, total=False):
sys: ExecutedPath | IdlePath | str | Sequence[ExecutedPath | IdlePath | str]
user: ExecutedPath | IdlePath | str | Sequence[ExecutedPath | IdlePath | str]
local: ExecutedPath | IdlePath | str | Sequence[ExecutedPath | IdlePath | str]
proj: ExecutedPath | IdlePath | str | Sequence[ExecutedPath | IdlePath | str]
shared: ExecutedPath | IdlePath | str | Sequence[ExecutedPath | IdlePath | str]
class PathRoles(Dict, total=False):
conf: PathCollection
data: PathCollection
mem: PathCollection
exe: PathCollection

356
entities.py Normal file
View File

@@ -0,0 +1,356 @@
from typing import Self, Literal, Never, Callable, Sequence
from custtypes import Roles, Scopes, PathCollection, PathRoles
from custtypes import Software, SoftwareRoles, PacMans
from custtypes import ExecutedPath, IdlePath, File
from custtypes import UserName, GroupName, VPS, VPSRegion, RootFate
from pathlib import Path, PurePath
from sshkey import SSHKeyCollection, SSHKeyType, SSHKey
from random import choice as gamble
from re import Pattern as RegEx
from softman import sshd
from yaml import YAMLObject
from ansible_vault import Vault
class Group(YAMLObject):
yaml_tag = u"!Group"
# @TODO create Enum class child for category parameter type hinting in below method
def __init__(self, group_name: GroupName | str = GroupName.sudo, category: Literal["system", "regular"] = "system", gid: int | str | None = 27):
if isinstance(group_name, GroupName):
self.group_name = group_name.name.lower()
else:
self.group_name = group_name
self.id = str(gid)
self.type = category
def __repr__(self):
return "%s(group_name=%r,category=%r,gid=%r)" % (
self.__class__.__name__,
self.group_name,
self.category,
self.id
)
class User(YAMLObject):
yaml_tag = u"!User"
def __init__(self, username: UserName | str = UserName.root, password: str = "test", services: list[str | Software] = [Software.openssh_server], uid: int | str | None = 0):
self.exists = True
if isinstance(username, UserName):
self.username = username.name.lower()
else:
self.username = username
self.id = str(uid)
self.password = password
new_services = []
for s in services:
if isinstance(s, Software):
new_services.append(s.name.lower())
else:
new_services.append(s)
self.services: tuple = tuple(new_services)
self.shell = "/bin/bash"
self.home = "/"
self.category: Literal["system", "regular"] = "regular"
group = Group(username, self.id)
self.group = group
self.groups: list[str | GroupName] | None = None
if self.groups is None:
self.admin = True
elif isinstance(self.groups, Sequence) and GroupName.sudo in self.groups:
self.admin = True
else:
self.admin = False
ssh_keys = SSHKeyCollection()
ssh_keys.pull()
self.__ssh_keys = ssh_keys
# print("here")
self.__public_keys = self.__ssh_keys.publish(SSHKeyType.pubkey, datatype=list)
pubkeys = ssh_keys.publish(SSHKeyType.pubkey, datatype=list)
self.__auth_keys: SSHKeyCollection = SSHKeyCollection()
for p in pubkeys:
self.__auth_keys.append(p)
self.ssh_authorized_keys: list[str | None] = []
self.__private_keys = self.__ssh_keys.publish(SSHKeyType.privkey, datatype=list)[0]
privkeys = ssh_keys.publish(SSHKeyType.privkey, datatype=list)
self.__priv_keys: SSHKeyCollection = SSHKeyCollection()
for p in privkeys[0]:
self.__priv_keys.append(p)
self.ssh_private_key_paths: list[str | None] = []
self.__priv_key_pref: int = privkeys[1]
self.ssh_private_key_path_pref: int = privkeys[1]
self.__apps = (sshd,)
self.__ssh_keypairs: tuple[SSHKey | tuple[SSHKey]] | SSHKey = tuple()
self.__ssh_keypair_chosen = False
def get_app(self, name: Software) -> Never:
raise NotImplementedError
def update_app(self, name: Software, attr: str, method = None) -> Never:
raise NotImplementedError
def __update_sshd(self, app: Software = Software.openssh_server):
for a in self.__apps:
if a.alt_names[PacMans.APT.name.lower()] == app.name.lower():
if hasattr(a, "users"):
users = getattr(a, "users")
if self.username not in users:
users[self.username] = dict()
users[self.username]["authorized_keys"] = self.__auth_keys
users[self.username]["credential_keys"] = self.__priv_keys
users[self.username]["keys"] = self.__ssh_keys
users[self.username]["keypairs"] = self.__ssh_keypairs
users[self.username]["preferred_priv_key"] = self.__priv_key_pref
setattr(self, "users", users)
else:
users = {
self.username: {
"auth_keys": self.__auth_keys,
"priv_keys": self.__priv_keys,
"keypairs": self.__ssh_keypairs,
"keys": self.__ssh_keys
}
}
a.declare(users = users)
else:
continue
def choose_keypair(self, private_key: SSHKey | ExecutedPath | str | int | RegEx, public_key: SSHKey | ExecutedPath | str | int | RegEx, from_host = True):
if not self.__ssh_keypair_chosen:
self.__priv_keys = SSHKeyCollection()
self.__auth_keys = SSHKeyCollection()
if from_host:
pubkeys = self.__ssh_keys.publish(SSHKeyType.pubkey, datatype=list)
# print(pubkeys)
if isinstance(public_key, int):
public_key = pubkeys[public_key]
elif isinstance(public_key, SSHKey):
public_key = tuple(filter(lambda k: str(k) == str(public_key()), pubkeys))
if len(public_key) > 0:
public_key = public_key[0]
else:
public_key = None
elif isinstance(public_key, str):
public_key = tuple(filter(lambda k: str(k) == public_key or public_key in str(k), pubkeys))
if len(public_key) > 0:
public_key = public_key[0]
else:
public_key = None
elif isinstance(public_key, RegEx):
public_key = tuple(filter(lambda k: public_key.search(str(k)), pubkeys))
if len(public_key) > 0:
public_key = public_key[0]
else:
public_key = None
else:
public_key = tuple(filter(lambda k: str(k) == str(public_key), pubkeys))
if len(public_key) > 0:
public_key = public_key[0]
else:
public_key = None
privkeys = self.__ssh_keys.publish(SSHKeyType.privkey, datatype=list)[0]
if isinstance(private_key, int):
private_key = privkeys[private_key]
elif isinstance(private_key, SSHKey):
private_key = tuple(filter(lambda k: str(k) == str(private_key()), privkeys))
if len(private_key) > 0:
private_key = private_key[0]
else:
private_key = None
elif isinstance(private_key, str):
private_key = tuple(filter(lambda k: str(k) == private_key or private_key in str(k), privkeys))
if len(private_key) > 0:
private_key = private_key[0]
else:
private_key = None
elif isinstance(private_key, RegEx):
private_key = tuple(filter(lambda k: private_key.search(str(k)), privkeys))
if len(private_key) > 0:
private_key = private_key[0]
else:
private_key = None
else:
private_key = tuple(filter(lambda k: str(k) == str(private_key), privkeys))
if len(private_key) > 0:
private_key = private_key[0]
else:
private_key = None
else:
if isinstance(public_key, SSHKey):
public_key = public_key()
elif isinstance(public_key, str):
public_key = Path(public_key)
self.__ssh_keys.append(public_key)
if isinstance(private_key, SSHKey):
private_key = private_key()
elif isinstance(private_key, str):
private_key = Path(private_key)
self.__ssh_keys.append(private_key)
if private_key is None or public_key is None:
raise KeyError
self.__auth_keys.append(public_key)
self.ssh_authorized_keys.append(public_key.read_text())
self.__priv_keys.append(private_key)
self.ssh_private_key_paths.append(str(private_key))
self.__ssh_keypairs = (*self.__ssh_keypairs, (self.__priv_keys.tail, self.__auth_keys.tail),)
self.__priv_key_pref = len(self.__ssh_keypairs) - 1
self.ssh_private_key_path_pref = len(self.__ssh_keypairs) - 1
self.__update_sshd()
self.__ssh_keypair_chosen = True
def get_keypair(self, preference: int):
if not self.__ssh_keypair_chosen:
raise Exception
if isinstance(self.__ssh_keypairs, SSHKey) and not isinstance(self.__ssh_keypairs(), tuple):
raise ValueError
if isinstance(self.__ssh_keypairs, SSHKey):
if isinstance(self.__ssh_keypairs(), tuple):
return self.__ssh_keypairs[preference]
else:
return self.__ssh_keypairs
else:
return self.__ssh_keypairs[preference]
@property
def keys(self) -> SSHKeyCollection:
return self.__ssh_keys
@property
def public_keys(self) -> SSHKeyCollection:
return self.__public_keys
@property
def private_keys(self) -> SSHKeyCollection:
return self.__private_keys
@property
def keypair_preference(self) -> int:
return self.__priv_key_pref
def prefer_keypair(self, preference: int | RegEx | str):
if isinstance(preference, int):
if preference < len(self.__ssh_keypairs):
self.__priv_key_pref = preference
else:
raise KeyError
elif isinstance(preference, RegEx):
count = 0
for keypair in self.__ssh_keypairs:
if preference.search(keypair[0]) or preference.search(keypair[1]):
self.__priv_key_pref = count
count += 1
else:
count = 0
for keypair in self.__ssh_keypairs:
if preference in str(keypair[0]()) or preference in str(keypair[1]()):
self.__priv_key_pref = count
count += 1
@property
def keypairs(self) -> tuple[SSHKey] | SSHKey | None:
return self.__ssh_keypairs
@property
def keypair(self):
kp = self.__ssh_keypairs[self.__priv_key_pref]
return kp[0] + kp[1]
@property
def authorized_keys(self) -> SSHKeyCollection:
return self.__auth_keys
@property
def credential_keys(self) -> SSHKeyCollection:
return self.__priv_keys
def __repr__(self) -> str:
return "%s(username=%r,password=%r,services=%r,uid=%r)" % (
self.__class__.__name__,
self.username,
self.password,
self.services,
self.id
)
class AnsibleCrypt:
def __init__(self, string: str, source: File | None = None):
self.__args = (string, source)
self.__lock = Vault(string).dump
self.__stream = None
if source is not None:
self.__stream = source.read()
self.__data = self.__lock(string, self.__stream)
else:
self.__data = self.__lock(string)
def unlock(self, string: str):
unlock = Vault(string).load
if self.__stream is not None:
result = unlock(self.__stream)
else:
result = unlock(string)
return result
def __str__(self):
return self.__data
def __repr__(self):
return "%s(%r, source=%r)" % (
self.__class__.__name__,
*self.__args
)
class VirtualPrivateServer(YAMLObject):
yaml_tag = u"!VirtualPrivateServer"
def __init__(self, root: User, api: str, vps: VPS | str = VPS.Linode):
self.region: VPSRegion | None = None
if vps == VPS.Linode:
self.region = VPSRegion.us_east
api_key = AnsibleCrypt(api)
self.__api_key: AnsibleCrypt = api_key
self.api_key: str = str(api_key)
self.password: str = root.password
self.exists: bool = True
self.type: str = vps.name.lower()
self.__default_fate: RootFate = RootFate.disposal
self.root_fate: str = self.__default_fate.name.lower()
self.ssh_authorized_keys: list[str] = root.ssh_authorized_keys
self.ssh_private_key_paths: list[str] = root.ssh_private_key_paths
self.ssh_private_key_path_pref: int = root.ssh_private_key_path_pref
# @TODO add SSH MOTD attribute

1
gpgkey.py Normal file
View File

@@ -0,0 +1 @@
# @TODO create classes similar to those in sshkey module, for GPG keys

58
main.py
View File

@@ -3,17 +3,63 @@ Library for the CLI commands and the related classes and functions
""" """
import click as cli import click as cli
from custtypes import AnsibleScopes, VPS, VPSRegion, RootFate, UserName
domain_pattern = r'^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$' from whereami import PROJ_ROOT, ANSIBLE_ROOTS
# @TODO create regex pattern for matching IP addresses from servs import User
# ip_pattern = r'' from pathlib import PurePath, Path
from sshkey import SSHKeyType
from ansible_vault import Vault
import yaml as yams
@cli.group() @cli.group()
@cli.option("-d", "--debug", type=bool, is_flag=True, default=True, help="Use debugging mode") @cli.option("-d", "--debug", type=bool, is_flag=True, default=False, help="Use debugging mode")
@cli.pass_context @cli.pass_context
def skansible(ctx, debug): def skansible(ctx, debug):
ctx.ensure_object(dict) ctx.ensure_object(dict)
ctx.obj["DEBUG"] = True ctx.obj["DEBUG"] = debug
@skansible.command()
@cli.argument("api_key")
@cli.option("-s", "--vps", type=cli.Choice(VPS, case_sensitive=False), default="Linode", help="Set the type of VPS")
@cli.option("-r", "--region", type=cli.Choice(VPSRegion, case_sensitive=False), default="us_east", help="Set the VPS region")
@cli.option("-0", "--root", type=bool, is_flag=True, default=True, help="Declare root SSH login credentials")
@cli.option("-f", "--fate", type=cli.Choice(RootFate, case_sensitive=False), default="disposal", help="Choose the eventual fate of the root account")
@cli.option("-h", "--host", multiple=True, type=str, default="all", help="Specify what inventory host or group this is being set")
@cli.pass_context
def init(ctx, vps, region, root, fate, host, api_key):
if root:
password = cli.prompt("Please enter a password: ", type=str, hide_input=True, confirmation_prompt=True)
root = User(UserName.root.name.lower(), password)
pubkeys = root.ssh_keys.publish(SSHKeyType.pubkey.name.lower(), datatype=list)
pubkey_opts = map(lambda k: str(k), pubkeys)
chosen_pubkey = cli.prompt("Authorize one of the following SSH public keys: ", type=cli.Choice(pubkey_opts, case_sensitive=True), show_choices=True)
chosen_pubkey = Path(chosen_pubkey)
privkeys = root.ssh_keys.publish(SSHKeyType.privkey.name.lower(), datatype=list)[0]
chosen_privkey = tuple(filter(lambda k: k.stem == chosen_pubkey.stem, privkeys))[0]
inv_vars = []
for h in host:
inv_vars += list(ANSIBLE_ROOTS[AnsibleScopes.HOSTVARS.name.lower()].glob(h)) + list(ANSIBLE_ROOTS[AnsibleScopes.GROUPVARS.name.lower()].glob(h))
if len(inv_vars) > 0:
for p in inv_vars:
with open(str(p), "r+") as file:
content = yams.load(file.read(), Loader=yams.Loader)
if "vps_service" in content:
content["vps_service"]["exists"] = True
crypt_key = Vault(api_key)
content["vps_service"]["api_key"] = crypt_key.dump(api_key)
content["vps_service"]["type"] = vps.lower()
content["vps_service"]["region"] = region.replace("_", "-")
content["vps_service"]["root_fate"] = fate
crypt_key = Vault(root.password)
content["vps_service"]["password"] = crypt_key.dump(root.password)
else:
for h in host:
path = ANSIBLE_ROOTS[AnsibleScopes.GROUPVARS.name.lower()] / h
with open(str(path), "w") as file:
pass
if __name__ == "__main__": if __name__ == "__main__":
skansible(obj={}) skansible(obj={})

89
parse.py Normal file
View File

@@ -0,0 +1,89 @@
from pathlib import Path
import yaml as yams
from configparser import ConfigParser as cfg
from custtypes import ExecutedPath
from re import compile as rgx, IGNORECASE
from whereami import PROJ_ROOT
from typing import Literal
class Parser:
def __init__(self):
self.__is_yaml = rgx(r".*\.ya?ml$", flags=IGNORECASE).match
self.__is_ini = rgx(r".*\.ini$", flags=IGNORECASE).match
self.__is_config = rgx(r".*\.cfg$", flags=IGNORECASE).match
self.__is_json = rgx(r".*\.[jb]son$", flags=IGNORECASE).match
self.__data = None
self.__content: str | None = None
self.__is_path: bool = False
# @TODO use Enum child class for below type hint instead
self.__method: Literal["yaml", "config", "generic"] = "generic"
self.__file: ExecutedPath | None = None
def load(self, filepath: ExecutedPath | str, method: Literal["yaml", "config", "generic"] = "generic", **kwargs):
if isinstance(filepath, ExecutedPath):
self.__is_path = True
self.__file = filepath
filepath = str(filepath)
else:
if isinstance(filepath, str) and Path(filepath).exists():
self.__file = Path(filepath)
self.__is_path = True
else:
self.__is_path = False
if isinstance(filepath, str):
self.__content = filepath
if self.__is_yaml(filepath) or method == "yaml":
self.__method = "yaml"
if self.__is_path:
filepath = open(str(filepath), "r+")
if len(kwargs) > 0:
self.__data = yams.load_all(filepath, Loader=yams.Loader, **kwargs)
else:
self.__data = yams.load_all(filepath, Loader=yams.Loader)
filepath.close()
elif self.__is_config(filepath) or method == "config":
self.__method = "config"
self.__data = cfg()
if self.__is_path:
read = self.__data.read
else:
read = self.__data.read_string
if len(kwargs) > 0:
self.__data.read(filepath, **kwargs)
else:
self.__data.read(filepath)
else:
raise TypeError
return self.__data
def dump(self, obj = None, method: Literal["yaml", "config", "generic"] | None = "generic", **kwargs):
if isinstance(obj, yams.YAMLObject) or self.__method == "yaml" or method == "yaml":
if obj is None:
obj = self.__data
if len(kwargs) > 0:
self.__content = "---\n" + yams.dump(obj, Dumper=yams.Dumper, **kwargs)
else:
self.__content = "---\n" + yams.dump(obj, Dumper=yams.Dumper)
elif isinstance(obj, ConfigParser) or self.__method == "config" or method == "config":
if obj is None:
if self.__is_path:
return self.__file.read_text()
else:
if self.__file is None:
return self.__content
else:
return self.__file.read_text()
raise NotImplementedError
else:
raise TypeError
return self.__content

View File

@@ -9,6 +9,6 @@ dependencies = [
"ansible-lint>=25.12.1", "ansible-lint>=25.12.1",
"ansible-navigator>=25.12.0", "ansible-navigator>=25.12.0",
"ansible-vault>=4.1.0", "ansible-vault>=4.1.0",
"cerberus>=1.3.8",
"click>=8.3.1", "click>=8.3.1",
"validators>=0.35.0",
] ]

View File

@@ -1,154 +1,157 @@
""" from typing import Self, Never, Callable, Sequence
Library of classes modeling software and software-related from custtypes import Roles, Scopes, PathCollection, PathRoles
data as represented in or used by Ansible. from custtypes import Software, SoftwareRoles, ExecutedPath
""" from custtypes import PacMans, UserName, GroupName, IdlePath
from custtypes import ExecutedPath, AnsibleRoles
from sshkey import SSHKeyCollection, SSHKeyType, SSHKey
from whereami import PROJ_ROLES
from typing import TypeAlias as Neotype class App:
from typing import TypedDict as Dict def __init__(self, name: Software, role: SoftwareRoles = SoftwareRoles.client, paths: PathRoles | None = None):
from typing import Never, Union self.__name = name.name.lower().replace("_", "-")
from custtypes import ExecutedPath, IdlePath # @TODO create dict type hint for below data struct
from enum import Enum self.alt_names = dict()
from pathlib import Path, PurePath self.alt_names[PacMans.APT.name.lower()] = self.__name
from whereami import USER_PATH, PROJ_ROOT self.role = role.name.lower()
if paths is not None:
if Roles.EXE in paths:
setattr(self, "_" + Roles.EXE.name.lower(), paths[Roles.EXE.name.lower()])
if Roles.CONF in paths:
setattr(self, "_" + Roles.CONF.name.lower(), paths[Roles.CONF.name.lower()])
if Roles.DATA in paths:
setattr(self, "_" + Roles.DATA.name.lower(), paths[Roles.DATA.name.lower()])
if Roles.MEM in paths:
setattr(self, "_" + Roles.MEM.name.lower(), paths[Roles.MEM.name.lower()])
self.__parents: tuple[Self] | None = None
self.__children: tuple[Self| None] = []
self.__api: str | None = None
self.__current_filepath: IdlePath | ExecutedPath | None = None
self.__content: str | None = None
AppPath: Neotype = Union[ExecutedPath, IdlePath] @property
def conf_paths(self):
if hasattr(self, "_" + Roles.CONF.name.lower()):
return self._conf
else:
raise Exception
class SoftScope(Enum): @property
PERSONAL = 0 def data_paths(self):
LOCAL = 1 if hasattr(self, "_" + Roles.DATA.name.lower()):
GLOBAL = 2 return self._data
else:
raise Exception
class SoftPathGroup(Enum): @property
CONFIG = 0 def exec_paths(self):
DATA = 1 if hasattr(self, "_" + Roles.EXE.name.lower()):
MEM = 2 return self._exe
EXE = 3 else:
raise Exception
_SubAppParams = Dict("_SubAppParams", { @property
SoftScope.PERSONAL.name: IdlePath | list[IdlePath], def mem_paths(self):
SoftScope.LOCAL.name: IdlePath | list[IdlePath], if hasattr(self, "_" + Roles.MEM.name.lower()):
SoftScope.GLOBAL.name: IdlePath | list[IdlePath] return self._mem
}, total=False) else:
AppParams = Dict("AppParams", { raise Exception
SoftPathGroup.CONFIG.name: _SubAppParams,
SoftPathGroup.DATA.name: _SubAppParams,
SoftPathGroup.MEM.name: _SubAppParams,
SoftPathGroup.EXE.name: _SubAppParams
}, total=False)
def __AppsInit(self, CONFIG = None, DATA = None, MEM = None, EXE = None): def get_paths(self, role: Roles = Roles.CONF):
self.CONFIG = CONFIG if hasattr(self, "_" + role.name.lower()):
self.DATA = DATA return getattr(self, "_" + role.name.lower())
self.MEM = MEM else:
self.EXE = EXE raise Exception
__app_input = {
SoftPathGroup.CONFIG.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
},
SoftPathGroup.DATA.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
},
SoftPathGroup.MEM.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
},
SoftPathGroup.EXE.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
},
"__init__": __AppsInit
}
Apps = type("Apps", (), __app_input)
# @TODO continue adding magic methods to below class def append(self, datatype: Roles = Roles.CONF, scope: Scopes | None = None, path: IdlePath | ExecutedPath | str | PathCollection | None = None):
# @NOTE https://rszalski.github.io/magicmethods/#sequence if path is None:
class Software: raise TypeError
__user_path: ExecutedPath = USER_PATH
def __init__(self): datatype = datatype.name.lower()
self._fqdn: str | None = None
# @TODO fix NameError for 'Software' in parameter type check
def declare(self, name: str, **kwpaths: _SubAppParams) -> AppParams:
keyword_args: AppParams = kwpaths
app = Apps(**keyword_args) if hasattr(self, "_" + datatype):
setattr(self, name, app) paths = getattr(self, "_" + datatype)
return app
def __getitem__(self, key: str) -> AppParams | Never:
if hasattr(self, key):
app: Apps = getattr(self, key)
else: else:
raise KeyError setattr(self, "_" + datatype, dict())
return app paths = getattr(self, "_" + datatype)
def __setitem__(self, key: tuple[str, SoftPathGroup], **value: IdlePath | list[IdlePath]) -> None | Never: if scope is not None:
if len(value) < 1 or len(value) > 3: if isinstance(path, str):
raise ValueError path: ExecutedPath = Path(path)
app_params: _SubAppParams = value if scope.name.lower() not in paths:
paths[scope.name.lower()] = []
if hasattr(self, key[0]): paths[scope.name.lower()].append(path)
app: Apps = getattr(self, key[0])
if hasattr(app, key[1]):
app_child: _SubAppParams = getattr(app, key[1])
for k, v in app_params.items():
v = [v] if not isinstance(v, list) else v
app_child[k]: IdlePath | list[IdlePath] = v
setattr(app, key[1], app_child)
else:
raise KeyError
setattr(self, key[0], app)
else: else:
raise KeyError paths: PathCollection = path
def __delitem__(self, key: tuple[str | SoftPathGroup]) -> None | Never: setattr(self, "_" + datatype, paths)
if len(key) < 1 or len(key) > 3:
raise KeyError
if not hasattr(self, key[0]): def inherit(self, other):
raise KeyError if other._App__children is None:
other._App__children = tuple()
if self not in other._App__children:
other.adopt(self)
if len(key) == 1: self.__parents = (*self.__parents, other)
delattr(self, key[0])
elif len(key) > 1:
app: Apps = getattr(self, key[0])
delattr(app, key[1])
setattr(self, key[0], app)
def list(self, contents: bool = False) -> tuple[str]: def adopt(self, other: Self):
apps: tuple[str] | tuple[Apps] = tuple( if other._App__parents is None:
filter( other._App__parents = tuple()
lambda a: isinstance(getattr(self, a), Apps),
dir(self)
)
)
if contents: if self not in other._App__parent:
apps = tuple( other.inherit(self)
map(
lambda a: getattr(self, a),
apps
)
)
return apps self.__children = (*self.__children, other)
def __len__(self) -> int: def __enter__(self) -> dict | Sequence:
apps: tuple[str] = tuple( self.__content = self.__current_filepath.read_text()
filter( return self.__content
lambda a: isinstance(getattr(self, a), Apps),
dir(self) def __exit__(self, exc_type, exc_value, exc_traceback) -> None:
) # txt = yams.dump(self.__content)
) # self.__file.write(txt)
return len(apps) # del txt
self.__file.close()
def __call__(self, path = str, mode = "r+", scope: Scopes = Scopes.PROJ, role: Roles = Roles.CONF, index: int = 0) -> Callable:
if not hasattr(self, "_" + role.name.lower()):
raise Exception
else:
config = getattr(self, "_" + role.name.lower())
conf_coll = config[scope.name.lower()]
if isinstance(conf_coll, Sequence):
conf_coll = conf_coll[index]
if isinstance(conf_coll, str):
conf_coll = Path(conf_coll)
filepath = conf_coll / path
self.__current_filepath = filepath
self.__file = open(str(filepath), mode)
return self
# @TODO write below method to duplicate file or template in local to project role file/template
def clone(self, source_scope: Scopes = Scopes.SYS, target_scope: Scopes = Scopes.PROJ, index: int = 0) -> Never:
raise NotImplementedError
def declare(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
sshd_proj_files = map(lambda r: r / AnsibleRoles.bootstrap.name.lower() / "files" / "sshd_config.d", PROJ_ROLES)
sshd_proj_files = list(filter(lambda p: p.exists(), sshd_proj_files))
sshd_proj_templates = map(lambda r: r / AnsibleRoles.bootstrap.name.lower() / "templates" / "sshd_config.d", PROJ_ROLES)
sshd_proj_templates = list(filter(lambda p: p.exists(), sshd_proj_templates))
# @TODO rewrite below using DIR_ROOTS var from whereami module
sshd_paths: PathRoles = {
Roles.CONF.name.lower(): {
Scopes.PROJ.name.lower(): sshd_proj_files + sshd_proj_templates
}
}
sshd = App(Software.openssh_server, SoftwareRoles.server, sshd_paths)

660
sshkey.py Normal file
View File

@@ -0,0 +1,660 @@
from re import Pattern as RegEx
from re import fullmatch as Match
from pathlib import Path, PurePath
from custtypes import ExecutedPath, IdlePath
from enum import StrEnum, auto
from random import choice as gamble
# from collections.abc import Sequence, Iterable
from typing import Never, Self, Callable, Iterable, Sequence
from whereami import USER_PATH
from itertools import chain
# import os
class SSHKeyType(StrEnum):
pubkey = auto()
privkey = auto()
dual = auto()
# @TODO create unit tests for below class
class SSHKey:
def __init__(self, *path: ExecutedPath | str):
if len(path) > 2 or len(path) < 1:
raise ValueError
path = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, path))
self.__idx: int = 0
self.__prev: Self | None = None
self.__next: Self | None = None
self.category: SSHKeyType | None = None
if len(path) < 2:
self.__value: ExecutedPath | tuple[ExecutedPath] = path[0]
else:
self.category = SSHKeyType.dual.name.lower()
self.__value: ExecutedPath | tuple[ExecutedPath] = path
def __int__(self) -> int:
return self.__idx
def update_status(self) -> None:
if isinstance(self.__value, tuple):
privkey_present = False
pubkey_present = False
for p in self.__value:
if "-----BEGIN OPENSSH PRIVATE KEY-----" in p.read_text():
privkey_present = True
else:
pubkey_present = True
if pubkey_present and privkey_present:
self.category = SSHKeyType.dual.name.lower()
elif pubkey_present or privkey_present:
if pubkey_present:
self.category = SSHKeyType.pubkey.name.lower()
if privkey_present:
self.category = SSHKeyType.privkey.name.lower()
elif isinstance(self.__value, ExecutedPath):
if "-----BEGIN OPENSSH PRIVATE KEY-----" in self.__value.read_text():
self.category = SSHKeyType.privkey.name.lower()
else:
self.category = SSHKeyType.pubkey.name.lower()
def __str__(self) -> str:
if isinstance(self.__value, tuple):
key_basename = Path(str(self.__value[0])).stem + "." + self.category
else:
key_basename = Path(str(self.__value)).name
return "🔑" + key_basename
def __repr__(self) -> str:
return "%s(%r)" % (self.__class__.__name__, self.__value)
def __nonzero__(self) -> bool:
return True
def __format__(self, formatstr) -> str:
match formatstr:
case "item":
return str(self.__idx) + ": " + str(self.__value)
case "int":
return str(self.__idx)
case _:
return str(self)
def __next__(self) -> ExecutedPath | tuple[ExecutedPath]:
return self.__next
def __prev__(self) -> ExecutedPath | tuple[ExecutedPath]:
return self.__prev
def __call__(self, *path: ExecutedPath | str | None) -> ExecutedPath | tuple[ExecutedPath]:
if path is not None or len(path) > 0:
if len(path) > 2:
raise ValueError
path = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, path))
if len(path) < 2:
self.__value = path[0]
else:
self.__value = path
return self.__value
def __eq__(self, other: Self) -> bool:
return self.__value == other._SSHKey__value
def __ne__(self, other: Self) -> bool | Never:
return self.__value != other._SSHKey__value
def __eqcontent__(self, other: Self) -> bool:
return self.__value.read_text() == other._SSHKey__value.read_text()
def __neqcontent__(self, other: Self) -> bool:
return self.__value.read_text() != other._SSHKey__value.read_text()
def __len__(self):
if isinstance(self.__value, tuple):
return len(self.__value)
else:
return 1
def update(self, *path: ExecutedPath | str) -> Self | Never:
if len(path) > 2 or len(path) < 1:
raise ValueError
path = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, path))
if len(path) < 2:
self.__value = path[0]
else:
self.__value = path
return self
def __add__(self, other: Self | ExecutedPath | str) -> Self:
if isinstance(other, (str, ExecutedPath)):
if isinstance(self.__value, tuple):
self.update(*self.__value, other)
else:
self.update(*self.__value, other)
else:
if isinstance(other._SSHKey__value, tuple):
if isinstance(self.__value, tuple):
self.update(*self.__value, *other._SSHKey__value)
else:
self.update(self.__value, *other._SSHKey__value)
else:
if isinstance(self.__value, tuple):
self.update(*self.__value, other._SSHKey__value)
else:
self.update(self.__value, other._SSHKey__value)
self.update_status()
return self
def __radd__(self, other: Self | ExecutedPath | str) -> Self:
if isinstance(self.__value, tuple):
if isinstance(other, (ExecutedPath, str)):
other.update(other, *self.__value)
else:
if isinstance(other._SSHKey__value, tuple):
other.update(*other._SSHKey__value, *self.__value)
else:
other.update(other._SSHKey__value, *self.__value)
else:
if isinstance(other, (ExecutedPath, str)):
other.update(other, self.__value)
else:
if isinstance(other._SSHKey__value, tuple):
other.update(*other._SSHKey__value, self.__value)
else:
other.update(other._SSHKey__value, self.__value)
other.update_status()
return self
# @TODO write following 2 subtraction algorithms using 'set' data type conversion and methods
def __sub__(self, other: Self | ExecutedPath | str) -> Never:
raise NotImplementedError
def __rsub__(self, other: Self | ExecutedPath | str) -> Never:
raise NotImplementedError
def __getitem__(self, key: int) -> ExecutedPath | str | Never:
if isinstance(self.__value, tuple):
return self.__value[key]
else:
raise KeyError
def __setitem__(self, key: int, value: ExecutedPath | str) -> None:
if isinstance(self.__value, tuple):
new_entry = list(self.__value)
if isinstance(value, str):
value = Path(value)
new_entry[key] = value
self.__value = tuple(new_entry)
else:
raise KeyError
def replace(self, old: ExecutedPath | str | tuple[ExecutedPath | str] | list[ExecutedPath | str], new: ExecutedPath | str | tuple[ExecutedPath | str] | list[ExecutedPath | str]) -> Self | Never:
if isinstance(old, str):
old = Path(old)
if isinstance(new, str):
new = Path(new)
if isinstance(old, (list, tuple)):
if len(old) > 2 or len(old) < 1:
raise ValueError
old = tuple(map(lambda p: Path(p) if isinstance(p, str) else p, old))
if isinstance(new, (list, tuple)):
if len(new) > 2 or len(new) < 1:
raise ValueError
new = tuple(map(lambda p: Path(p) if isinstance(p, str) else p, new))
if isinstance(self.__value, (tuple, list)):
if isinstance(old, tuple):
remaining_value = list(filter(lambda p: p not in old, self.__value))
if isinstance(new, tuple):
self.__value = (*remaining_value, *new)
else:
self.__value = (*remaining_value, new)
else:
remaining_value = list(filter(lambda p: p != old, self.__value))
if isinstance(new, tuple):
self.__value = (*remaining_value, *new)
else:
self.__value = (*remaining_value, new)
if len(self.__value) > 2:
self.__value = self.__value[0]
elif isinstance(self.__value, ExecutedPath):
if isinstance(old, tuple):
remaining_value = None if self.__value in old else self.__value
else:
remaining_value = None if self.__value == old else self.__value
if remaining_value is None:
self.__value = new
else:
raise ValueError
return self
def read(self, idx: int | None = None) -> str | tuple[str]:
if idx is not None and isinstance(self.__value, tuple):
result = self.__value[idx]
else:
if idx is not None:
raise KeyError
result = self.__value
if isinstance(result, tuple):
result = tuple(map(lambda p: p.read_text(), result))
else:
result = result.read_text()
return result
@property
def status(self) -> str:
self.update_status()
return self.category
def reverse(self) -> Never:
if isinstance(self.__value, tuple):
v1 = self.__value[0]
v2 = self.__value[1]
result = self.update(v2, v1)
else:
result = self
return result
def prev(arg):
if isinstance(arg, SSHKey):
return arg._SSHKey__prev__()
else:
raise TypeError
def stream_equal(s1, s2) -> bool | Never:
if isinstance(s1, SSHKey) and isinstance(s2, SSHKey):
return s1._SSHKey__eqcontent__(s2)
else:
raise TypeError
def stream_unequal(s1, s2) -> bool | Never:
if isinstance(s1, SSHKey) and isinstance(s2, SSHKey):
return s1._SSHKey__neqcontent__(s2)
else:
raise TypeError
# @TODO create unit tests for below class
class SSHKeyCollection(Sequence):
__user_path: ExecutedPath = USER_PATH()
__ssh_path: ExecutedPath = __user_path / ".ssh"
def __init__(self):
self.__current: SSHKey | None = None
self.__first: SSHKey | None = None
self.__last: SSHKey | None = None
self.__indices: range | None = None
# @TODO allow initialization with unpacked parameter or sequence/iterable argument
def __getitem__(self, key: int | slice) -> SSHKey | Never:
self.__current = self.__first
if self.__current is None:
raise KeyError
if isinstance(key, int):
if int(self.__current) == key:
return self.__current
else:
while int(self.__current) != key:
if self.__current is None:
raise KeyError
self.__current = next(self.__current)
result = self.__current
elif isinstance(key, slice):
step = key.step
sshkcoll = SSHKeyCollection()
if hasattr(key, "start"):
if getattr(key, "start") is None:
start = 0
else:
start = key.start
if hasattr(key, "stop"):
if getattr(key, "stop") is None:
stop = len(self.__indices)
else:
stop = key.stop
if hasattr(key, "step"):
if getattr(key, "step") is None:
step = 1
else:
step = key.step
indices = range(start, stop, step)
# test_coll = []
while int(self.__current) < stop:
if int(self.__current) < start:
continue
elif int(self.__current) >= start:
if int(self.__current) in indices:
sshkcoll.append(self.__current)
# test_coll.append(self.__current)
else:
continue
self.__current = next(self.__current)
if self.__current is None:
break
# print(test_coll)
result = sshkcoll
return result
def __len__(self) -> int:
if self.__indices is None:
return 0
return len(self.__indices)
def pop(self, key: int = -1) -> Never:
self.__current = self.__first
if self.__current is None:
raise KeyError
if key == -1:
if self.__last is not None:
past = self.__last
self.__last._SSHKey__prev._SSHKey__next = None
self.__last = self.__last._SSHKey__prev
self.__current = self.__last
else:
past = self.__first
self.__first = None
return past
elif key <= -2:
raise NotImplementedError
else:
while int(self.__current) != key:
self.__current = next(self.__current)
if self.__current is None:
raise KeyError
past = self.__current
count = self.__current._SSHKey__idx
prior = self.__current._SSHKey__prev
posterior = self.__current._SSHKey__next
posterior._SSHKey__prev = prior
posterior._SSHKey__prev._SSHKey__next = posterior
self.__current = posterior
while self.__current is not None:
self.__current._SSHKey__idx = count
self.__current = next(self.__current)
count += 1
return past
def remove(self) -> Never:
raise NotImplementedError
def append(self, *value: ExecutedPath | str) -> SSHKey:
if len(value) < 1 or len(value) > 2:
raise ValueError
value = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, value))
ssh_key = SSHKey(*value)
if self.__first is None:
# print("branch1")
ssh_key._SSHKey__idx = 0
# print(ssh_key._SSHKey__idx)
ssh_key.update_status()
self.__indices = range(ssh_key._SSHKey__idx + 1)
self.__first = ssh_key
self.__current = self.__first
else:
# print("branch2")
if self.__last is not None:
# print("branch2.1")
ssh_key._SSHKey__idx = self.__last._SSHKey__idx + 1
# print(ssh_key._SSHKey__idx)
ssh_key.update_status()
self.__last._SSHKey__next = ssh_key
self.__last._SSHKey__next._SSHKey__prev = self.__last
self.__last = next(self.__last)
else:
# print("branch2.2")
ssh_key._SSHKey__idx = self.__first._SSHKey__idx + 1
# print(ssh_key._SSHKey__idx)
ssh_key.update_status()
self.__first._SSHKey__next = ssh_key
self.__first._SSHKey__next._SSHKey__prev = self.__first
self.__last = self.__first._SSHKey__next
self.__indices = range(ssh_key._SSHKey__idx + 1)
self.__current = self.__last
#print(self.__current)
return self.__current
def __setitem__(self, key: int | slice, *value: ExecutedPath | str) -> None | Never:
if len(value) < 1 or len(value) > 2:
raise ValueError
value = tuple(map(lambda s: Path(s) if isinstance(s, str) else s, value))
self.__current = self.__first
if self.__current is None:
raise KeyError
if isinstance(key, int):
if int(self.__current) == key:
if self.__current() is None or len(self.__current()) < 1:
self.__current(*value)
else:
if int(self.__current) == key:
return self.__current(*value)
while int(self.__current) != key:
if self.__current is None:
raise KeyError
self.__current = next(self.__current)
self.__current(*value)
elif isinstance(key, slice):
raise NotImplementedError
def __delitem__(self, key: int | slice) -> None | Never:
self.__current = self.__first
if self.__current is None:
raise KeyError
if isinstance(key, int):
if key == -1:
if self.__last is not None:
self.__last._SSHKey__prev._SSHKey__next = None
self.__last = self.__last._SSHKey__prev
self.__current = self.__last
else:
self.__first = None
return past
elif key <= -2:
raise NotImplementedError
else:
while int(self.__current) != key:
self.__current = next(self.__current)
if self.__current is None:
raise KeyError
count = self.__current._SSHKey__idx
prior = self.__current._SSHKey__prev
posterior = self.__current._SSHKey__next
posterior._SSHKey__prev = prior
posterior._SSHKey__prev._SSHKey__next = posterior
self.__current = posterior
while self.__current is not None:
self.__current._SSHKey__idx = count
self.__current = next(self.__current)
count += 1
elif isinstance(key, slice):
raise NotImplementedError
@property
def head(self) -> SSHKey | None:
return self.__first
@property
def tail(self) -> SSHKey | None:
if self.__last is None:
return self.__first
return self.__last
def __contains__(self, value: ExecutedPath | str) -> bool:
self.__current = self.__first
if isinstance(value, ExecutedPath):
value = str(value)
is_contained = False
while self.__current is not None:
if str(self.__current._SSHKey__value) == value:
is_contained = True
break
self.__current = next(self.__current)
return is_contained
def __missing__(self, value: ExecutedPath | str) -> Never:
raise NotImplementedError
def __next__(self):
self.__current = next(self.__current)
if self.__current is not None:
return self.__current
else:
raise StopIteration
def __iter__(self) -> Self | Never:
self.__current = self.__first
# return self.__current
return self
def count(self, query: RegEx | str) -> int | Never:
raise NotImplementedError
def pull(self, query: RegEx | str = "*") -> None:
if isinstance(query, RegEx):
keypaths = self.__ssh_path.glob("*")
for p in keypaths:
if query.fullmatch(p.name):
if not Match("(known_hosts|authorized_keys|config).*", p.name):
self.append(p)
else:
continue
else:
keypaths = self.__ssh_path.glob(query)
for p in keypaths:
if not Match("(known_hosts|authorized_keys|config).*", p.name):
self.append(p)
def reverse(self) -> None | Never:
raise NotImplementedError
def sort(self, key: Callable = (lambda e: e), reverse: bool = False) -> None | Never:
raise NotImplementedError
def __str__(self) -> str:
prefix = "[("
postfix = "|]"
self.__current = self.__first
concat = lambda s: str(s) + ", "
content = str()
count = 0
while self.__current is not None:
content += str(count) + ">" + concat(self.__current)
self.__current = next(self.__current)
count += 1
content = content[0:len(content)-2]
return prefix + content + postfix
def index(self, item: str | ExecutedPath) -> Never:
raise NotImplementedError
def publish(self, category: SSHKeyType | str | None = SSHKeyType.pubkey, pref: int | None = None, datatype = list):
privkey = list()
pubkey = list()
self.__current = self.__first
# @TODO create conditional case that publishes all keys
if datatype == list:
while self.__current is not None:
# print(self.__current)
if self.__current.category == SSHKeyType.privkey.name.lower():
privkey.append(self.__current._SSHKey__value)
elif self.__current.category == SSHKeyType.pubkey.name.lower():
pubkey.append(self.__current._SSHKey__value)
elif self.__current.category == SSHKeyType.dual.name.lower():
privkey.append(self.__current._SSHKey__value[0])
pubkey.append(self.__current._SSHKey__value[1])
self.__current = next(self.__current)
# print("publish running...")
if pref is None:
preference = gamble(range(len(privkey)))
else:
preference = pref
# print(category)
if category.name.lower() == SSHKeyType.pubkey.name.lower():
return pubkey
elif category.name.lower() == SSHKeyType.privkey.name.lower():
return (privkey, preference)
else:
return (privkey, pubkey, preference)
elif datatype == dict:
# @TODO have result var equal to instance of a yaml.YAMLObject class from parse module
raise NotImplementedError
return result
def __repr__(self) -> str:
return "%s()" % (self.__class__.__name__)

View File

@@ -2,8 +2,61 @@
Library of path constants to be used or referenced elsewhere. Library of path constants to be used or referenced elsewhere.
""" """
from custtypes import ExecutedPath from custtypes import ExecutedPath, Roles, Scopes, AnsibleScopes, NodeType, UserName
from pathlib import Path from pathlib import Path
from configparser import ConfigParser as cfg
from itertools import chain
from typing import Callable
USER_PATH: ExecutedPath = Path.home() def get_home(node: NodeType = NodeType.control, home: UserName | str | None = None) -> ExecutedPath:
PROJ_ROOT: ExecutedPath = Path(__file__).parent.resolve() if node == NodeType.control:
return Path.home()
else:
if home is None:
return Path("~")
else:
if isinstance(home, UserName):
return Path("/home") / home.name
else:
return Path(home)
USER_PATH: Callable = get_home
PROJ_ROOT: ExecutedPath = Path(__file__).parent.resolve()
config = cfg()
ANSIBLE_CONFIG = PROJ_ROOT / "ansible.cfg"
if ANSIBLE_CONFIG.exists():
config.read(str(ANSIBLE_CONFIG))
role_candidates = config["defaults"]["roles_path"].split(":")
role_candidates = map(lambda s: PROJ_ROOT / s, role_candidates)
role_paths = list(filter(lambda p: p.exists(), role_candidates))
inv_candidates = config["defaults"]["inventory"].split(",")
inv_candidates = map(lambda s: PROJ_ROOT / s, inv_candidates)
inv_paths = list(filter(lambda p: p.exists(), inv_candidates))
else:
role_paths = [PROJ_ROOT / "roles"]
inv_paths = PROJ_ROOT.glob("hosts*.*")
PROJ_ROLES: list[ExecutedPath] = role_paths
PROJ_INVENTORIES: list[ExecutedPath] = inv_paths
proj_paths = map(lambda r: r.glob("**/files"), PROJ_ROLES)
proj_paths = list(chain.from_iterable(proj_paths))
template_paths = map(lambda r: r.glob("**/templates"), PROJ_ROLES)
template_paths = list(chain.from_iterable(template_paths))
APP_ROOTS = {
Roles.CONF.name.lower(): {
Scopes.SYS.name.lower(): Path("/etc"),
Scopes.USER.name.lower(): USER_PATH(NodeType.remote) / ".config",
Scopes.SHARED.name.lower(): Path("/usr/share"),
Scopes.PROJ.name.lower(): tuple(proj_paths + template_paths)
}
}
ANSIBLE_ROOTS = {
AnsibleScopes.GROUPVARS.name.lower(): PROJ_ROOT / "group_vars",
AnsibleScopes.HOSTVARS.name.lower(): PROJ_ROOT / "host_vars",
AnsibleScopes.INVENTORY.name.lower(): tuple(PROJ_INVENTORIES),
AnsibleScopes.ROLE.name.lower(): tuple(PROJ_ROLES)
}