Compare commits

...

7 Commits

6 changed files with 261 additions and 49 deletions

1
.gitignore vendored
View File

@@ -18,3 +18,4 @@ banner
.galaxy_cache/ .galaxy_cache/
galaxy_token galaxy_token
uv.lock uv.lock
__pycache__/

197
anodes.py
View File

@@ -11,9 +11,8 @@ from custtypes import ExecutedPath, IdlePath
from enum import Enum from enum import Enum
from softman import Software, SoftPathGroup, SoftScope from softman import Software, SoftPathGroup, SoftScope
from whereami import USER_PATH, PROJ_ROOT from whereami import USER_PATH, PROJ_ROOT
# @TODO use below 2 imports in 'get_head_user' function to grab username of highest-privileged login user from ansible_vault import Vault
from os import name as shell_type from cerberus import Validator as constrain_by
# from os import environ as env_vars
class ControlNode: class ControlNode:
__user_path: ExecutedPath = USER_PATH __user_path: ExecutedPath = USER_PATH
@@ -29,8 +28,20 @@ class ControlNode:
) )
def __init__(self, ansible_proj_root: ExecutedPath | None): def __init__(self, ansible_proj_root: ExecutedPath | None):
if ansible_root is not None: if ansible_proj_root is not None:
self.__proj_root = ansible_proj_root self.__proj_root = ansible_proj_root
self.proj_roles: tuple[IdlePath] | list[IdlePath] = (
PurePath(str(self.__proj_root), "roles"),
PurePath(str(self.__proj_root), ".ansible/roles")
)
self.role_data: tuple[IdlePath] | list[IdlePath] = (
PurePath(str(self.proj_roles[0]), "files"),
PurePath(str(self.proj_roles[0]), "templates")
)
self.invvar_data: tuple[IdlePath] | list[IdlePath] = (
PurePath(str(self.__proj_root), "group_vars"),
PurePath(str(self.__proj_root), "host_vars")
)
@property @property
def user_path(self) -> ExecutedPath: def user_path(self) -> ExecutedPath:
@@ -53,13 +64,14 @@ class Softs(Enum):
userSSHParams = Dict("userSSHParams", { userSSHParams = Dict("userSSHParams", {
"username": Required[str], "username": Required[str],
"paths": Required[Software._Apps], "paths": Software._Apps,
"keys": { "keys": {
"available": Required[tuple[ExecutedPath]], "available": Required[tuple[ExecutedPath]],
"selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]], "selected": Required[ExecutedPath | list[ExecutedPath] | int | list[int]],
"authorized": ExecutedPath | list[ExecutedPath] | int | list[int], "authorized": ExecutedPath | list[ExecutedPath] | int | list[int],
"used": ExecutedPath | list[ExecutedPath] | int | list[int] "used": ExecutedPath | list[ExecutedPath] | int | list[int]
}, },
"password": Required[str],
"fate": Literal["disposal", "retention"] "fate": Literal["disposal", "retention"]
}, total=False) }, total=False)
__user_ssh_input = { __user_ssh_input = {
@@ -71,24 +83,51 @@ __user_ssh_input = {
"authorized": 1, "authorized": 1,
"used": 0 "used": 0
}, },
"password": "password123",
"fate": "disposal" "fate": "disposal"
} }
# @TODO remove unpacking of '__user_ssh_input'
userSSH = type("userSSH", (), **__user_ssh_input) userSSH = type("userSSH", (), **__user_ssh_input)
# @TODO continue to write below function vps_schema = {
def get_head_user(): "fqdn": {"type": "string"},
if shell_type == "nt": "vps_service": {
raise NotImplementedError "type": "dict",
else: "schema": {
raise NotImplementedError "exists": {"type": "boolean"},
"password": {"type": "string"},
"api_key": {"type": "string"},
"type": {
"type": "string",
"anyof": ["linode"]
},
"region": {
"type": "string",
"anyof": ["us-east"]
},
"ssh_authorized_keys": {"type": "list"},
"root_fate": {
"type": "string",
"anyof": ["disposal", "retention"]
}
}
},
"keywords": {"type": "list"}
}
vps_schema = constrain_by(vps_schema)
class VirtualPrivateServers(Enum):
Linode = 0
class RemoteNode: class RemoteNode:
# __user_path = # __user_path =
_fqdn: str | None = None _fqdn: str | None = None
def __init__(self, cnode: ControlNode, name: str | None, keywords: list[str] | None = None): def __init__(self, cnode: ControlNode, api_key: str, password: str, name: str, service: VirtualPrivateServers = VirtualPrivateServers.Linode.name, keywords: list[str] | None = None):
self.root: dict = dict() self.root: dict = dict()
self.root["username"]: str = "root" self.root["username"]: str = "root"
vault = Vault(password)
self.root["password"]: str = vault.dump(password)
self.root["software"]: Software = Software() self.root["software"]: Software = Software()
app_input = { app_input = {
@@ -101,12 +140,44 @@ class RemoteNode:
} }
self.root["software"].declare(Softs.OpenSSH, **app_input) self.root["software"].declare(Softs.OpenSSH, **app_input)
if name is not None: self._fqdn = name
self._fqdn = name self.root["software"]._fqdn = name
self.root["software"]._fqdn = name
root_ssh_input: userSSHParams = {
"username": self.root["username"],
"password": self.root["password"]
}
self.ssh: userSSH = userSSH(**root_ssh_input)
self.apps: list = self.root["software"].list(contents = True) self.apps: list = self.root["software"].list(contents = True)
self.keywords = keywords self.keywords = keywords
self._api_key: str | None = api_key
self.service = service
def set_region(self, name: Literal["us-east"] = "us-east") -> None:
self.region = name
def get_region(self) -> str:
return self.region
def set_password(self, password: str) -> None:
vault = Vault(value)
self.root["password"] = vault.dump(value)
self.ssh.password: str = self.root["password"]
def get_password(self) -> Vault | str:
return self.root["password"]
def set_api(self, key: str) -> None:
self._api_key = key
def get_api(self) -> Vault | str:
vault = Vault(self._api_key)
return vault.dump(self._api_key)
def add_tags(self, *name):
self.keywords: list = []
self.keywords += list(name)
def import_keys(self, key_basenames: str | tuple[str], match_sort: tuple[Callable, bool] = (lambda e: e, False)): def import_keys(self, key_basenames: str | tuple[str], match_sort: tuple[Callable, bool] = (lambda e: e, False)):
keyfiles: list[ExecutedPath] | list[None] | None = list() keyfiles: list[ExecutedPath] | list[None] | None = list()
@@ -124,14 +195,8 @@ class RemoteNode:
updated_keyfiles.append(new_keyfile.resolve()) updated_keyfiles.append(new_keyfile.resolve())
keyfiles = updated_keyfiles keyfiles = updated_keyfiles
root_ssh_input: userSSHParams = { self.ssh.paths = self.root["software"].ssh
"username": "root", self.ssh.keys["available"] = tuple(keyfiles.sort(reverse = match_sort[1], key = match_sort[0]))
"paths": self.root["software"].ssh,
"keys": {
"available": tuple(keyfiles.sort(reverse = match_sort[1], key = match_sort[0]))
}
}
self.ssh: userSSH = userSSH(**root_ssh_input)
def show_keys(self, which: Literal["authorized", "used", "available", "selected"] = "available") -> tuple[tuple, str]: def show_keys(self, which: Literal["authorized", "used", "available", "selected"] = "available") -> tuple[tuple, str]:
delimiters: str | tuple = "{}" delimiters: str | tuple = "{}"
@@ -174,17 +239,16 @@ class RemoteNode:
return (tuple(keyfiles), stringified_keyfiles) return (tuple(keyfiles), stringified_keyfiles)
# @TODO continue to work on two below methods
def pick_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never: def pick_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never:
keyfiles = self.ssh.keys[source] keyfiles = self.ssh.keys[source]
authlist = [] authlist = []
if source == "available": if source == "available" or source == "selected":
for s in selections: for s in selections:
if isinstance(s, int): if isinstance(s, int):
authlist.append(keyfiles[s]) authlist.append(keyfiles[s])
elif isinstance(s, ExecutedPath): elif isinstance(s, ExecutedPath):
path_set = set([ExecutedPath]) path_set = set([s])
kf_set = set(keyfiles) kf_set = set(keyfiles)
overlap = kf_set & path_set overlap = kf_set & path_set
@@ -197,11 +261,84 @@ class RemoteNode:
if s in kf_strs: if s in kf_strs:
authlist.append(Path(s)) authlist.append(Path(s))
self.ssh.keys["selected"] = authlist
elif source == "selected":
raise NotImplementedError
if source == "available":
self.ssh.keys["selected"] = authlist
return self.ssh.keys["selected"]
elif source == "selected":
self.ssh.keys["authorized"] = authlist
return self.ssh.keys["authorized"]
elif source == "authorized":
for s in selections:
if isinstance(s, int):
authlist.append(keyfiles[s])
elif isinstance(s, ExecutedPath):
for p in keyfiles:
if str(s) in str(p):
authlist.append(p)
else:
continue
elif isinstance(s, str):
for p in keyfiles:
if s in str(p):
authlist.append(p)
else:
continue
self.ssh.keys["used"] = authlist
return self.ssh.keys["used"]
def remove_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never: def remove_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never:
raise NotImplementedError keyfiles = self.ssh.keys[source]
for s in selections:
if isinstance(s, int):
removed_elem = keyfiles.pop(s)
elif isinstance(s, ExecutedPath):
path_set = set([s])
kf_set = set(keyfiles)
overlap = kf_set & path_set
if overlap is not None and len(overlap) > 0:
item = list(overlap)[0]
keyfiles = [p for p in keyfiles if p != item]
self.ssh.keys[source] = keyfiles
else:
continue
elif isinstance(s, str):
kf_strs = list(map(lambda p: str(p), keyfiles))
if s in kf_strs:
keyfiles = [p for p in keyfiles if str(p) != S]
self.ssh.keys[source] = keyfiles
self.ssh.keys[source] = keyfiles
if source == "available":
self.remove_keys("selected", *selections)
elif source == "selected":
self.remove_keys("authorized", *selections)
elif source == "authorized":
self.remove_keys("used", *selections)
return self.ssh.keys
def itemize(self):
model = dict()
vault_api = Vault(self._api_key)
vault_pass = Vault(self.ssh.password)
model["fqdn"] = self._fqdn
model["vps_service"] = {
"exists": True,
"password": vault_pass.dump(self.ssh.password),
"api_key": vault_api.dump(self._api_key),
"type": self.service.lower(),
"region": self.region,
"ssh_authorized_keys": self.ssh.keys["authorized"],
"root_fate": self.ssh.fate
}
model["keywords"] = self.keywords
if vps_schema.validate(model):
return model

View File

@@ -5,5 +5,5 @@ Library of custom type hints.
from typing import TypeAlias as Neotype from typing import TypeAlias as Neotype
from pathlib import PurePosixPath, PureWindowsPath, PosixPath, WindowsPath from pathlib import PurePosixPath, PureWindowsPath, PosixPath, WindowsPath
ExecutedPath: Neotype = Union[PosixPath, WindowsPath] ExecutedPath: Neotype = PosixPath | WindowsPath
IdlePath: Neotype = Union[PurePosixPath, PureWindowsPath] IdlePath: Neotype = PurePosixPath | PureWindowsPath

85
main.py
View File

@@ -1,28 +1,89 @@
import click import click
from pathlib import Path, PurePath, PurePosixPath, PureWindowsPath, PosixPath, WindowsPath from pathlib import Path, PurePath, PurePosixPath, PureWindowsPath, PosixPath, WindowsPath
from typing import TypeAlias as Neotype from whereami import PROJ_ROOT
from typing import Union from custtypes import ExecutedPath, IdlePath
import yaml as yams
from anodes import RemoteNode, ControlNode
# from typing import TypeAlias as Neotype
# from typing import Literal
# import configparser as ini # import configparser as ini
# from cerberus import Validator as constrain_by # from cerberus import Validator as constrain_by
# import skansible_types as skato # import skansible_types as skato
ExecutedPath: Neotype = Union[PosixPath, WindowsPath]
IdlePath: Neotype = Union[PurePosixPath, PureWindowsPath]
# @NOTE https://docs.python.org/3/library/configparser.html#quick-start # @NOTE https://docs.python.org/3/library/configparser.html#quick-start
class Config: class Config:
path: IdlePath = PurePath(str(Path(__file__).parent.resolve())) / "config.ini" path: ExecutedPath = PROJ_ROOT / "config.ini"
config = Config()
@click.group() @click.group()
def skansible(): def skansible():
raise NotImplementedError pass
@click.command() @click.group()
def init(): @click.argument("hostname", nargs=-1, type=str)
if Path(str(Config.path)).exists(): @click.option("-g", "--group", type=str, show_default=True)
click.echo("") @click.option("-p", "--password", type=str, prompt=True, prompt_required=False)
@click.option("-a", "--api", type=str)
@click.option("-k", "--key", multiple=True)
@click.option("-t", "--tag", multiple=True, type=str)
@click.option("-n", "--fqdn", type=(str, None))
@click.option("-s", "--service", type=(str, None))
def addhost(hostname, password, api, key, group = "ungrouped", service = None, fqdn = None, tags = None):
hosts_model = dict()
hosts_model[group]["hosts"] = dict()
for name in hostname:
hosts_model[group]["hosts"][name] = None
with open(str(PROJ_ROOT / "hosts.yml")) as hosts_file:
yams.dump(hosts_model, hosts_file)
cnode = ControlNode()
if fqdn is not None:
rnode = RemoteNode(cnode, api, password, fqdn)
else: else:
pass if isinstance(hostname, (tuple, list)):
if len(hostname) > 0 and len(hostname) < 2:
rnode = RemoteNode(cnode, api, password, hostname[0])
else:
raise ValueError
elif isinstance(hostname, str):
rnode = RemoteNode(cnode, api, password, hostname)
else:
raise TypeError
if tag is not None:
if isinstance(tag, (tuple, list)) and len(tag) > 0:
rnode.add_tags(*tag)
if key is not None:
if isinstance(key, tuple) and len(key) > 0:
rnode.import_keys(key)
if service is not None:
rnode.service = service
if group == "ungrouped":
if isinstance(hostname, (tuple, list)):
if len(hostname) > 0 and len(hostname) < 2:
filepath = PurePath(str(cnode.invvar_data[1]), hostname[0])
else:
raise ValueError
elif isinstance(hostname, str):
filepath = PurePath(str(cnode.invvar_data[1]), hostname)
else:
raise TypeError
else:
filepath = PurePath(str(cnode.invvar_data[0]), group)
inv_model = rnode.itemize()
with open(str(filepath), "w") as inv_file:
yams.dump(inv_model, inv_file)
print((hosts_model, inv_model))
skansible.add_command(addhost)
if __name__ == "__main__": if __name__ == "__main__":
skansible() skansible()

View File

@@ -8,6 +8,7 @@ dependencies = [
"ansible>=13.1.0", "ansible>=13.1.0",
"ansible-lint>=25.12.1", "ansible-lint>=25.12.1",
"ansible-navigator>=25.12.0", "ansible-navigator>=25.12.0",
"ansible-vault>=4.1.0",
"cerberus>=1.3.8", "cerberus>=1.3.8",
"click>=8.3.1", "click>=8.3.1",
] ]

View File

@@ -22,12 +22,13 @@ class SoftPathGroup(Enum):
CONFIG = 0 CONFIG = 0
DATA = 1 DATA = 1
MEM = 2 MEM = 2
EXE = 3
# @TODO continue adding magic methods to below class # @TODO continue adding magic methods to below class
# @NOTE https://rszalski.github.io/magicmethods/#sequence # @NOTE https://rszalski.github.io/magicmethods/#sequence
class Software: class Software:
# @TODO 2 options: add additional requirements to type definition below, # @TODO move below 2 class variables declarations outside of class, but make class variables equal to them
# or ensure attribute which stores data of this type also stores other data # @TODO when done with above, rename all intra-class references to such variables appropriately
_AppParams = Dict("_AppParams", { _AppParams = Dict("_AppParams", {
SoftPathGroup.CONFIG.name: { SoftPathGroup.CONFIG.name: {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath], SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
@@ -43,6 +44,11 @@ class Software:
SoftScope.PERSONAL.name: IdlePath | list[IdlePath], SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath], SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath] SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
},
SoftPathGroup.EXE.name: {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
} }
}, total=False) }, total=False)
_SubAppParams = Dict("_SubAppParams", { _SubAppParams = Dict("_SubAppParams", {
@@ -66,14 +72,20 @@ class Software:
SoftScope.PERSONAL.name: [], SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [], SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: [] SoftScope.GLOBAL.name: []
},
SoftPathGroup.EXE.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
} }
} }
_Apps = type("_Apps", (), **__app_input) _Apps = type("_Apps", (), __app_input)
__user_path: ExecutedPath = USER_PATH __user_path: ExecutedPath = USER_PATH
def __init__(self): def __init__(self):
self._fqdn: str | None = None self._fqdn: str | None = None
# @TODO fix NameError for 'Software' in parameter type check
def declare(self, name: str, **kwpaths: Software._SubAppParams) -> Software._AppParams: def declare(self, name: str, **kwpaths: Software._SubAppParams) -> Software._AppParams:
keyword_args: Software._AppParams = kwpaths keyword_args: Software._AppParams = kwpaths