Compare commits

..

3 Commits

5 changed files with 172 additions and 208 deletions

108
anodes.py
View File

@@ -7,13 +7,14 @@ from typing import TypeAlias as Neotype
from typing import TypedDict as Dict
from typing import Never, Union, Literal, Required
from collections.abc import Callable
from custtypes import ExecutedPath, IdlePath
from custtypes import ExecutedPath, IdlePath, VirtualPrivateServers, AnsibleScopes
from enum import Enum
from softman import Software, SoftPathGroup, SoftScope, _Apps
from softman import Software, SoftPathGroup, SoftScope, Apps
from whereami import USER_PATH, PROJ_ROOT
from ansible_vault import Vault
from cerberus import Validator as constrain_by
from random import choice
import secrets
class ControlNode:
__user_path: ExecutedPath = USER_PATH
@@ -31,41 +32,53 @@ class ControlNode:
def __init__(self, ansible_proj_root: ExecutedPath | None = None):
if ansible_proj_root is not None:
self.__proj_root = ansible_proj_root
self.proj_roles: tuple[IdlePath] | list[IdlePath] = (
role_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath(str(self.__proj_root), "roles"),
PurePath(str(self.__proj_root), ".ansible/roles")
)
self.role_data: tuple[IdlePath] | list[IdlePath] = (
roledata_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath(str(self.proj_roles[0]), "files"),
PurePath(str(self.proj_roles[0]), "templates"),
PurePath(str(self.proj_roles[1]), "files"),
PurePath(str(self.proj_roles[1]), "templates")
)
self.invvar_data: tuple[IdlePath] | list[IdlePath] = (
PurePath(str(self.__proj_root), "group_vars"),
PurePath(str(self.__proj_root), "host_vars")
)
setattr(self, AnsibleScopes.ROLE.name, {
"data": roledata_paths,
"vars": role_paths
})
setattr(self, AnsibleScopes.GROUPVARS.name, {
"vars": (PurePath(str(self.__proj_root), "group_vars"),)
})
setattr(self, AnsibleScopes.HOSTVARS.name, {
"vars": (PurePath(str(self.__proj_root), "host_vars"),)
})
setattr(self, AnsibleScopes.INVENTORY.name, {
"vars": (PurePath(str(self.__proj_root), "hosts.yml"),)
})
def get_scope(self, scope: AnsibleScopes = AnsibleScopes.INVENTORY.name, index = 0):
return getattr(self, scope)[index]
@property
def user_path(self) -> ExecutedPath:
def home(self) -> ExecutedPath:
return self.__user_path
@property
def proj_root(self) -> ExecutedPath:
def root(self) -> ExecutedPath:
return self.__proj_root
@property
def conf_paths(self) -> ExecutedPath:
def sys_confs(self) -> ExecutedPath:
return self.__conf_paths
@property
def data_paths(self) -> ExecutedPath:
def sys_data(self) -> ExecutedPath:
return self.__data_paths
class Softs(Enum):
ssh = 0
userSSHSubParams = {
_userSSHSubParams = {
"available": Required[tuple[ExecutedPath]],
"selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]],
"authorized": ExecutedPath | list[ExecutedPath] | int | list[int],
@@ -77,7 +90,7 @@ __user_ssh_keys = {
"authorized": 1,
"used": 0
}
def userSSHInit(self, username: str = "root", paths: _Apps | None = None, keys: userSSHSubParams = __user_ssh_keys, password: str = "password123", fate: Literal["disposal", "retention"] = "disposal"):
def __userSSHInit(self, username: str = "root", paths: _Apps | None = None, keys: _userSSHSubParams = __user_ssh_keys, password: str = "password123", fate: Literal["disposal", "retention"] = "disposal"):
self.username = username
self.paths = paths
self.keys = keys
@@ -101,7 +114,7 @@ __user_ssh_input = {
},
"password": "password123",
"fate": "disposal",\
"__init__": userSSHInit
"__init__": __userSSHInit
}
userSSH = type("userSSH", (), __user_ssh_input)
@@ -122,26 +135,24 @@ vpsSchema = Dict("vpsSchema", {
"keywords": list[str]
}, total=False)
class VirtualPrivateServers(Enum):
Linode = 0
class RemoteNode:
# __user_path =
_fqdn: str | None = None
def __init__(self, cnode: ControlNode, api_key: str, password: str, name: str, service: VirtualPrivateServers = VirtualPrivateServers.Linode.name, region: Literal["us-east"] | None = None, keywords: list[str] | None = None):
def __init__(self, cnode: ControlNode, api_key: str = secrets.token_urlsafe(32), password: str = "password123", name: str = ".test", service: VirtualPrivateServers = VirtualPrivateServers.Linode.name, region: Literal["us-east"] | None = "us-east", keywords: list[str] | None = ["server"]):
self.root: dict = dict()
self.root["username"]: str = "root"
vault = Vault(password)
self.root["password"]: str = vault.dump(password)
self.root["software"]: Software = Software()
self.owner = cnode
app_input = {
SoftPathGroup.CONFIG.name: {
SoftScope.PERSONAL.name: PurePath(str(cnode.user_path), ("." + Softs.ssh.name))
SoftScope.PERSONAL.name: PurePath(str(cnode.home), ("." + Softs.ssh.name))
},
SoftPathGroup.DATA.name: {
SoftScope.GLOBAL.name: PurePath(str(cnode.conf_paths[0]), "update-motd.d")
SoftScope.GLOBAL.name: PurePath(str(cnode.sys_confs[0]), "update-motd.d")
}
}
self.root["software"].declare(Softs.ssh.name, **app_input)
@@ -164,27 +175,19 @@ class RemoteNode:
self.__usedkeys_selected = False
self.__keys_selected = False
self.__finalized_keys = False
self.model: dict | None = None
self.__accumulator = None
def set_region(self, name: Literal["us-east"] = "us-east") -> None:
self.region = name
def get_region(self) -> str:
return self.region
def set_password(self, password: str) -> None:
vault = Vault(value)
self.root["password"] = vault.dump(value)
self.ssh.password: str = self.root["password"]
def get_password(self) -> Vault | str:
return self.root["password"]
vault = Vault(self.root["password"])
self.root["password"] = vault.dump(self.root["password"])
def set_api(self, key: str) -> None:
self._api_key = key
def get_api(self) -> Vault | str:
vault = Vault(self._api_key)
return vault.dump(self._api_key)
self._api_key = vault.dump(self._api_key)
def add_tags(self, *name):
self.keywords: list = []
@@ -356,41 +359,9 @@ class RemoteNode:
self.__finalized_keys = self.__keys_selected and self.__authkeys_selected and self.__usedkeys_selected
return result
# @TODO test and debug below 'RemoteNode' method
# @TODO rewrite below method
def remove_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never:
keyfiles = self.ssh.keys[source]
for s in selections:
if isinstance(s, int):
removed_elem = keyfiles.pop(s)
elif isinstance(s, ExecutedPath):
path_set = set([s])
kf_set = set(keyfiles)
overlap = kf_set & path_set
if overlap is not None and len(overlap) > 0:
item = list(overlap)[0]
keyfiles = [p for p in keyfiles if p != item]
self.ssh.keys[source] = keyfiles
else:
continue
elif isinstance(s, str):
kf_strs = list(map(lambda p: str(p), keyfiles))
if s in kf_strs:
keyfiles = [p for p in keyfiles if str(p) != S]
self.ssh.keys[source] = keyfiles
self.ssh.keys[source] = keyfiles
if source == "available":
self.remove_keys("selected", *selections)
elif source == "selected":
self.remove_keys("authorized", *selections)
elif source == "authorized":
self.remove_keys("used", *selections)
return self.ssh.keys
raise NotImplementedError
def itemize(self):
model: dict | vpsSchema = dict()
@@ -421,3 +392,4 @@ class RemoteNode:
}
model["keywords"] = self.keywords
return model

91
confctx.py Normal file
View File

@@ -0,0 +1,91 @@
from pathlib import Path, PurePath
from whereami import PROJ_ROOT
from custtypes import ExecutedPath, IdlePath, VirtualPrivateServers, AnsibleScopes
from anodes import RemoteNode, ControlNode, _userSSHSubParams
# from ansible_vault import Vault
import yaml as yams
import re
from typing import Literal
# @TODO for 'Config' class, write methods to pull and push from Ansible YAML files
# @NOTE https://docs.python.org/3/library/configparser.html#quick-start
class Config:
path: ExecutedPath = PROJ_ROOT / "config.ini"
__controller = ControlNode()
setattr(__controller, AnsibleScopes.INTERNAL.name, {
"vars": (PurePath(str(path)),)
})
def __init__(self, filepath: str = "config.ini", scope: AnsibleScopes = AnsibleScopes.INTERNAL.name, index: int = 0, mode: str = "r+"):
self.scope = scope
parent_dir = self.__controller.get_scope(scope, index)
filepath = Path(parent_dir) / filepath
filename: str = filepath.stem
self.parse_method = "yml"
if "." in filename:
filename_arr = filename.split(".")
basename: str = filename_arr[0]
ext: str = filename_arr[len(filename_arr) - 1]
if re.match(r"toml|ini", ext):
self.parse_method: str = ext.upper()
self.filepath: ExecutedPath = filepath
self.mode = mode
self.model = None
self.file = None
self.__remote: RemoteNode | None = None
def __enter__(self):
if not self.filepath.exists():
self.__remote = RemoteNode(self.__controller)
return self.__remote
self.file = open(str(self.filepath), self.mode)
model = self.file.read()
self.model = yams.load(model)
if self.scope == AnsibleScopes.GROUPVARS.name or self.scope == AnsibleScopes.HOSTVARS.name:
api_key: str = model["vps_service"]["api_key"]
vault = Vault(model["vps_service"]["password"])
password: str = vault.load(model["vps_service"]["password"])
fqdn: str = model["fqdn"]
service: VirtualPrivateServers = model["vps_service"]["type"]
region: Literal["us-east"] = model["vps_service"]["region"]
keywords: list[str] = model["keywords"]
self.__remote = RemoteNode(self.__controller, api_key, password, fqdn, service, region, keywords)
# @TODO add portion wherein SSH keys from 'model' are made to be present in 'self.__remote'
self.__remote.import_keys("*")
keyfiles = self.__remote.show_keys("available", list)
keyfiles_contents = set(map(lambda p: p.read_text(), keyfiles))
keyfiles_pub = list(set(model["vps_service"]["ssh_authorized_keys"]) - keyfiles_contents)
self.__remote.ssh.keys["available"] = tuple(keyfiles_pub)
self.__remote.ssh.keys["selected"] = keyfiles_pub
self.__remote.ssh.keys["authorized"] += keyfiles_pub
keyfiles_paths = set(map(lambda p: str(p), keyfiles))
keyfiles_priv = list(set(model["vps_service"]["ssh_private_key_paths"]) - keyfiles_paths)
self.__remote.ssh.keys["available"] = (*self.__remote.ssh.keys["available"],*keyfiles_priv)
self.__remote.ssh.keys["selected"] += keyfiles_priv
self.__remote.ssh.keys["used"] += keyfiles_priv
self.__remote.ssh.fate: Literal["disposal", "retention"] = model["vps_service"]["root_fate"]
return self.__remote
else:
raise ValueError
def __exit__(self):
if self.scope == AnsibleScopes.GROUPVARS.name or self.scope == AnsibleScopes.HOSTVARS.name:
ansible_chunk = self.__remote.itemize()
ansible_chunk["vps_service"]["ssh_motd_script_basenames"] = self.model["ssh_motd_script_basenames"]
ansible_chunk["vps_service"]["ssh_private_key_path_pref"] = self.model["ssh_private_key_path_pref"]
del self.model["fqdn"]
del self.model["vps_service"]
del self.model["keywords"]
else:
raise ValueError
file_model = yams.dump(self.model | ansible_chunk)
self.file.write(file_model)
self.file.close()

View File

@@ -6,4 +6,14 @@ from typing import TypeAlias as Neotype
from pathlib import PurePosixPath, PureWindowsPath, PosixPath, WindowsPath
ExecutedPath: Neotype = PosixPath | WindowsPath
IdlePath: Neotype = PurePosixPath | PureWindowsPath
IdlePath: Neotype = PurePosixPath | PureWindowsPath
class VirtualPrivateServers(Enum):
Linode = 0
class AnsibleScopes(Enum):
INTERNAL = 0
INVENTORY = 1
GROUPVARS = 2
HOSTVARS = 3
ROLE = 4

115
main.py
View File

@@ -1,112 +1,19 @@
"""
Library for the CLI commands and the related classes and functions
"""
import click as cli
from pathlib import Path, PurePath, PurePosixPath, PureWindowsPath, PosixPath, WindowsPath
from whereami import PROJ_ROOT
from custtypes import ExecutedPath, IdlePath
import yaml as yams
from anodes import RemoteNode, ControlNode
# from typing import TypeAlias as Neotype
from typing import Literal
from enum import Enum
from ansible_vault import Vault
from random import choice
# import configparser as ini
# from cerberus import Validator as constrain_by
# import skansible_types as skato
class AnsibleScope(Enum):
INTERNAL = 0
INVENTORY = 1
GROUPVARS = 2
HOSTVARS = 3
ROLE = 4
# @TODO for 'Config' class, write methods to pull and push from Ansible YAML files
# @NOTE https://docs.python.org/3/library/configparser.html#quick-start
class Config:
path: ExecutedPath = PROJ_ROOT / "config.ini"
_cnode = ControlNode(PROJ_ROOT)
__scope_paths = {
AnsibleScope.INTERNAL.name: path,
AnsibleScope.GROUPVARS.name: Path(str(_cnode.invvar_data[0])),
AnsibleScope.HOSTVARS.name: Path(str(_cnode.invvar_data[1]))
}
def __init__(self, filepath: str, anscope: AnsibleScope = AnsibleScope.INTERNAL.name, mode: str = "r+"):
filepath: ExecutedPath = self.__scope_paths[anscope] / filepath
if filepath.exists():
self.filepath = filepath
else:
raise FileNotFoundError
self.mode: str = mode
self.file = None
self.model = None
def __enter__(self):
self.file = open(str(self.filepath), self.mode)
self.model = yams.load(self.file.read())
return self.model
def __exit__(self, exc_type, exc_value, exc_traceback):
file_content = yams.dump(self.model)
self.file.write(file_content)
self.file.close()
domain_pattern = r'^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$'
# @TODO create regex pattern for matching IP addresses
# ip_pattern = r''
@cli.group()
@cli.option("-d", "--debug", type=bool, is_flag=True, default=True, help="Use debugging mode")
@cli.pass_context
def skansible(ctx):
def skansible(ctx, debug):
ctx.ensure_object(dict)
@skansible.group(help="Replace conventionally-typed Ansible variables")
@cli.pass_context
def mod(ctx):
with Config(entity, AnsibleScope.HOSTVARS.name) as config:
ctx.obj["fqdn"] = config["fqdn"]
@skansible.group(help="Remove conventionally-typed Ansible variables")
def rm():
pass
@skansible.group(help="Append conventionally-typed entries to Ansible variables")
def append():
pass
@skansible.group(help="Initialize Ansible variable or object with defaults")
def init():
pass
# @skansible.group(help="Show Ansible assets")
# @cli.argument("-d", "--datatype", type=str, help="Specify type of Ansible object or variable to show")
# def show(datatype):
# pass
@mod.command("vps", help="Add 'vps_service' Ansible variable")
@cli.argument("entity", type=str, help="Specify FQDN/alias or host group to associate with the VPS")
@cli.option("-p", "--password", type=str, prompt=True, prompt_required=False, help="Prompt for password of root or administrative user of VPS")
@cli.option("-a", "--api", type=str, help="API key for host service providing VPS")
@cli.option("-k", "--pubkey", multiple=True, help="A file glob pattern for acquiring user's public SSH keys")
@cli.option("-k", "--privkey", multiple=True, help="A file glob pattern for acquiring user's private SSH keys")
@cli.option("-m", "--motd", multiple=True, help="Provide basenames for MOTD SSH scripts")
@cli.option("-t", "--tag", multiple=True, type=str)
@cli.option("-s", "--service", type=str)
@cli.option("-r", "--region", type=str)
@cli.option("-f", "--fate", type=str)
@cli.pass_context
# @TODO rewrite below command function, using 'ctx' parameter for values shared with sibling commands
def mod_vps(ctx, entity: str, password: str | None = None, api: str | None = None, pubkey: tuple[str] = None, privkey: tuple[str] = None, service: str | int | None = None, region: Literal["us-east"] | None = None, tags: tuple[str] = None, fate: Literal["disposal", "retention"] | None = None):
with Config(entity, AnsibleScope.HOSTVARS.name) as config:
raise NotImplementedError
@mod.command("host", help="Add host to Ansible inventory")
@cli.argument("hostname", multiple=True, type=str, help="Provide aliases / FQDN / IP addresses for host(s)")
@cli.option("-g", "--group", type=str, default="ungrouped", help="Provide group name given host(s) fall under")
@cli.pass_context
# @TODO rewrite below command function, using 'ctx' parameter for values shared with sibling commands
def mod_host(ctx, group, hostname):
with Config(entity, AnsibleScope.INVENTORY.name) as config:
raise NotImplementedError
ctx.obj["DEBUG"] = True
if __name__ == "__main__":
skansible()
skansible(obj={})

View File

@@ -24,35 +24,19 @@ class SoftPathGroup(Enum):
MEM = 2
EXE = 3
_AppParams = Dict("_AppParams", {
SoftPathGroup.CONFIG.name: {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
},
SoftPathGroup.DATA.name: {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
},
SoftPathGroup.MEM.name: {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
},
SoftPathGroup.EXE.name: {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
}
}, total=False)
_SubAppParams = Dict("_SubAppParams", {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
}, total=False)
AppParams = Dict("AppParams", {
SoftPathGroup.CONFIG.name: _SubAppParams,
SoftPathGroup.DATA.name: _SubAppParams,
SoftPathGroup.MEM.name: _SubAppParams,
SoftPathGroup.EXE.name: _SubAppParams
}, total=False)
def _AppsInit(self, CONFIG = None, DATA = None, MEM = None, EXE = None):
def __AppsInit(self, CONFIG = None, DATA = None, MEM = None, EXE = None):
self.CONFIG = CONFIG
self.DATA = DATA
self.MEM = MEM
@@ -78,9 +62,9 @@ __app_input = {
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
},
"__init__": _AppsInit
"__init__": __AppsInit
}
_Apps = type("_Apps", (), __app_input)
Apps = type("Apps", (), __app_input)
# @TODO continue adding magic methods to below class
# @NOTE https://rszalski.github.io/magicmethods/#sequence
@@ -91,16 +75,16 @@ class Software:
self._fqdn: str | None = None
# @TODO fix NameError for 'Software' in parameter type check
def declare(self, name: str, **kwpaths: _SubAppParams) -> _AppParams:
keyword_args: _AppParams = kwpaths
def declare(self, name: str, **kwpaths: _SubAppParams) -> AppParams:
keyword_args: AppParams = kwpaths
app = _Apps(**keyword_args)
app = Apps(**keyword_args)
setattr(self, name, app)
return app
def __getitem__(self, key: str) -> _AppParams | Never:
def __getitem__(self, key: str) -> AppParams | Never:
if hasattr(self, key):
app: _Apps = getattr(self, key)
app: Apps = getattr(self, key)
else:
raise KeyError
return app
@@ -112,7 +96,7 @@ class Software:
app_params: _SubAppParams = value
if hasattr(self, key[0]):
app: _Apps = getattr(self, key[0])
app: Apps = getattr(self, key[0])
if hasattr(app, key[1]):
app_child: _SubAppParams = getattr(app, key[1])
@@ -138,14 +122,14 @@ class Software:
if len(key) == 1:
delattr(self, key[0])
elif len(key) > 1:
app: _Apps = getattr(self, key[0])
app: Apps = getattr(self, key[0])
delattr(app, key[1])
setattr(self, key[0], app)
def list(self, contents: bool = False) -> tuple[str]:
apps: tuple[str] | tuple[_Apps] = tuple(
apps: tuple[str] | tuple[Apps] = tuple(
filter(
lambda a: isinstance(getattr(self, a), _Apps),
lambda a: isinstance(getattr(self, a), Apps),
dir(self)
)
)
@@ -163,7 +147,7 @@ class Software:
def __len__(self) -> int:
apps: tuple[str] = tuple(
filter(
lambda a: isinstance(getattr(self, a), _Apps),
lambda a: isinstance(getattr(self, a), Apps),
dir(self)
)
)