Compare commits

..

7 Commits

8 changed files with 361 additions and 503 deletions

152
anodes.py
View File

@@ -1,152 +0,0 @@
"""
Library of classes modeling Ansible nodes or their types.
"""
from enum import Enum
from pathlib import Path, PurePath
from typing import TypedDict as Dict
from typing import Union, Literal, Required, Self
from custtypes import ExecutedPath, IdlePath, VirtualPrivateServers, AnsibleScopes
from softman import Software, SoftPathGroup, SoftScope, Apps, Softs
from whereami import USER_PATH, PROJ_ROOT
from ansible_vault import Vault
import secrets
class ControlNode:
__user_path: ExecutedPath = USER_PATH
__proj_root: ExecutedPath = PROJ_ROOT
__conf_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath("/etc"),
PurePath("/usr", "local", "etc"),
PurePath(str(__user_path), ".config")
)
__data_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath("/usr", "local", "share"),
PurePath(str(__user_path), ".local", "share")
)
def __init__(self, ansible_proj_root: ExecutedPath | None = None):
if ansible_proj_root is not None:
self.__proj_root = ansible_proj_root
role_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath(str(self.__proj_root), "roles"),
PurePath(str(self.__proj_root), ".ansible/roles")
)
roledata_paths: tuple[IdlePath] | list[IdlePath] = (
PurePath(str(self.proj_roles[0]), "files"),
PurePath(str(self.proj_roles[0]), "templates"),
PurePath(str(self.proj_roles[1]), "files"),
PurePath(str(self.proj_roles[1]), "templates")
)
setattr(self, AnsibleScopes.ROLE.name, {
"data": roledata_paths,
"vars": role_paths
})
setattr(self, AnsibleScopes.GROUPVARS.name, {
"vars": (PurePath(str(self.__proj_root), "group_vars"),)
})
setattr(self, AnsibleScopes.HOSTVARS.name, {
"vars": (PurePath(str(self.__proj_root), "host_vars"),)
})
setattr(self, AnsibleScopes.INVENTORY.name, {
"vars": (PurePath(str(self.__proj_root), "hosts.yml"),)
})
def get_scope(self, scope: AnsibleScopes = AnsibleScopes.INVENTORY.name, index = 0):
return getattr(self, scope)[index]
@property
def home(self) -> ExecutedPath:
return self.__user_path
@property
def root(self) -> ExecutedPath:
return self.__proj_root
@property
def sys_confs(self) -> ExecutedPath:
return self.__conf_paths
@property
def sys_data(self) -> ExecutedPath:
return self.__data_paths
# userSSHParams = Dict("userSSHParams", {
# "username": Required[str],
# "paths": Apps,
# "keys": dict,
# "password": Required[str],
# "fate": Literal["disposal", "retention"]
# }, total=False)
vpsSchema = Dict("vpsSchema", {
"fqdn": Required[str],
"vps_service": {
"exists": Required[bool],
"password": Required[str],
"api_key": Required[str],
"type": Required[Literal["linode"]],
"region": Literal["us-east"],
"ssh_authorized_keys": list[IdlePath | ExecutedPath | str],
"ssh_private_key_paths": list[IdlePath | ExecutedPath | str],
"ssh_private_key_path_pref": int,
"root_fate": Required[Literal["disposal","retention"]],
"ssh_motd_script_basenames": list[str]
},
"keywords": list[str]
}, total=False)
class RemoteNode:
# __user_path =
_fqdn: str | None = None
def __init__(self, cnode: ControlNode, api_key: str = secrets.token_urlsafe(32), password: str = "password123", name: str = ".test", service: VirtualPrivateServers = VirtualPrivateServers.Linode.name, region: Literal["us-east"] | None = "us-east", keywords: list[str] | None = ["server"]):
self.root: dict = dict()
self.root["username"]: str = "root"
vault = Vault(password)
self.root["password"]: str = vault.dump(password)
self.root["software"]: Software = Software()
self.owner = cnode
app_input = {
SoftPathGroup.CONFIG.name: {
SoftScope.PERSONAL.name: PurePath(str(cnode.home), ("." + Softs.ssh.name))
},
SoftPathGroup.DATA.name: {
SoftScope.GLOBAL.name: PurePath(str(cnode.sys_confs[0]), "update-motd.d")
}
}
self.root["software"].append(Softs.ssh.name, **app_input)
self._fqdn = name
self.root["software"]._fqdn = name
# root_ssh_input: userSSHParams = {
root_ssh_input = {
"username": self.root["username"],
"password": self.root["password"]
}
self.ssh: UserSSH = UserSSH(**root_ssh_input)
self.apps: list = self.root["software"].show(contents = True)
self.keywords = keywords
self._api_key: str | None = api_key
self.service = service
self.region = region
self.model: dict | None = None
def set_region(self, name: Literal["us-east"] = "us-east") -> None:
self.region = name
def set_password(self, password: str) -> None:
vault = Vault(self.root["password"])
self.root["password"] = vault.dump(self.root["password"])
def set_api(self, key: str) -> None:
vault = Vault(self._api_key)
self._api_key = vault.dump(self._api_key)
def add_tags(self, *name):
self.keywords: list = []
self.keywords += list(name)

View File

@@ -1,103 +0,0 @@
from pathlib import Path, PurePath
from whereami import PROJ_ROOT
from custtypes import ExecutedPath, IdlePath, VirtualPrivateServers, AnsibleScopes
from anodes import RemoteNode, ControlNode, _userSSHSubParams
# from ansible_vault import Vault
import yaml as yams
import re
from typing import Literal
# @TODO for 'Config' class, write methods to pull and push from Ansible YAML files
# @NOTE https://docs.python.org/3/library/configparser.html#quick-start
class Config:
path: ExecutedPath = PROJ_ROOT / "config.ini"
__controller = ControlNode()
setattr(__controller, AnsibleScopes.INTERNAL.name, {
"vars": (PurePath(str(path)),)
})
def __init__(self, filepath: str = "config.ini", scope: AnsibleScopes = AnsibleScopes.INTERNAL.name, index: int = 0, mode: str = "r+"):
self.scope = scope
parent_dir = self.__controller.get_scope(scope, index)
filepath = Path(parent_dir) / filepath
filename: str = filepath.name
self.parse_method = "YML"
if "." in filename:
filename_arr = filename.split(".")
basename: str = filename_arr[0]
ext: str = filename_arr[len(filename_arr) - 1]
if re.match(r"toml|ini", ext):
self.parse_method: str = ext.upper()
self.filepath: ExecutedPath = filepath
self.mode = mode
self.model = None
self.file = None
self.__remote: RemoteNode | None = None
def __enter__(self):
if not self.filepath.exists():
self.__remote = RemoteNode(self.__controller)
return self.__remote
self.file = open(str(self.filepath), self.mode)
model = self.file.read()
if self.parse_method == "INI":
raise NotImplementedError
elif self.parse_method == "TOML":
raise NotImplementedError
else:
self.model = yams.load(model)
if self.scope == AnsibleScopes.GROUPVARS.name or self.scope == AnsibleScopes.HOSTVARS.name:
api_key: str = model["vps_service"]["api_key"]
vault = Vault(model["vps_service"]["password"])
password: str = vault.load(model["vps_service"]["password"])
fqdn: str = model["fqdn"]
service: VirtualPrivateServers = model["vps_service"]["type"]
region: Literal["us-east"] = model["vps_service"]["region"]
keywords: list[str] = model["keywords"]
self.__remote = RemoteNode(self.__controller, api_key, password, fqdn, service, region, keywords)
# @TODO add portion wherein SSH keys from 'model' are made to be present in 'self.__remote'
self.__remote.import_keys("*")
keyfiles = self.__remote.show_keys("available", list)
keyfiles_contents = set(map(lambda p: p.read_text(), keyfiles))
keyfiles_pub = list(set(model["vps_service"]["ssh_authorized_keys"]) - keyfiles_contents)
self.__remote.ssh.keys["available"] = tuple(keyfiles_pub)
self.__remote.ssh.keys["selected"] = keyfiles_pub
self.__remote.ssh.keys["authorized"] += keyfiles_pub
keyfiles_paths = set(map(lambda p: str(p), keyfiles))
keyfiles_priv = list(set(model["vps_service"]["ssh_private_key_paths"]) - keyfiles_paths)
self.__remote.ssh.keys["available"] = (*self.__remote.ssh.keys["available"],*keyfiles_priv)
self.__remote.ssh.keys["selected"] += keyfiles_priv
self.__remote.ssh.keys["used"] += keyfiles_priv
self.__remote.ssh.fate: Literal["disposal", "retention"] = model["vps_service"]["root_fate"]
return self.__remote
else:
raise ValueError
def __exit__(self):
if self.scope == AnsibleScopes.GROUPVARS.name or self.scope == AnsibleScopes.HOSTVARS.name:
ansible_chunk = self.__remote.itemize()
ansible_chunk["vps_service"]["ssh_motd_script_basenames"] = self.model["ssh_motd_script_basenames"]
ansible_chunk["vps_service"]["ssh_private_key_path_pref"] = self.model["ssh_private_key_path_pref"]
del self.model["fqdn"]
del self.model["vps_service"]
del self.model["keywords"]
else:
raise ValueError
if self.parse_method == "INI":
raise NotImplementedError
elif self.parse_method == "TOML":
raise NotImplementedError
else:
file_model = yams.dump(self.model | ansible_chunk)
self.file.write(file_model)
self.file.close()

View File

@@ -2,19 +2,79 @@
Library of custom type hints.
"""
from typing import TypeAlias as Neotype
from pathlib import PurePosixPath, PureWindowsPath, PosixPath, WindowsPath
from enum import Enum
from typing import TypeAlias as Neotype, TypedDict as Dict
from typing import Required
from collections.abc import Sequence
from pathlib import Path, PurePath, PurePosixPath, PureWindowsPath, PosixPath, WindowsPath
from enum import StrEnum, auto
ExecutedPath: Neotype = PosixPath | WindowsPath
IdlePath: Neotype = PurePosixPath | PureWindowsPath
class VirtualPrivateServers(Enum):
Linode = 0
class RootFate(StrEnum):
disposal = auto()
retention = auto()
class AnsibleScopes(Enum):
INTERNAL = 0
INVENTORY = 1
GROUPVARS = 2
HOSTVARS = 3
ROLE = 4
class NodeType(StrEnum):
remote = auto()
control = auto()
class VPS(StrEnum):
Linode = auto()
class VPSRegion(StrEnum):
us_east = auto()
class AnsibleScopes(StrEnum):
INTERNAL = auto()
INVENTORY = auto()
GROUPVARS = auto()
HOSTVARS = auto()
ROLE = auto()
class AnsibleRoles(StrEnum):
bootstrap = auto()
class Scopes(StrEnum):
SYS = auto()
USER = auto()
LOCAL = auto()
PROJ = auto()
SHARED = auto()
class Roles(StrEnum):
CONF = auto()
DATA = auto()
MEM = auto()
EXE = auto()
class UserName(StrEnum):
root = auto()
class GroupName(StrEnum):
remote = auto()
sudo = auto()
class Software(StrEnum):
ssh = auto()
sshd = auto()
class SoftwareRoles(StrEnum):
client = auto()
server = auto()
class PacMans(StrEnum):
APT = auto()
class PathCollection(Dict, total=False):
sys: ExecutedPath | IdlePath | str | Sequence[ExecutedPath | IdlePath | str]
user: ExecutedPath | IdlePath | str | Sequence[ExecutedPath | IdlePath | str]
local: ExecutedPath | IdlePath | str | Sequence[ExecutedPath | IdlePath | str]
proj: ExecutedPath | IdlePath | str | Sequence[ExecutedPath | IdlePath | str]
shared: ExecutedPath | IdlePath | str | Sequence[ExecutedPath | IdlePath | str]
class PathRoles(Dict, total=False):
conf: PathCollection
data: PathCollection
mem: PathCollection
exe: PathCollection

58
main.py
View File

@@ -3,17 +3,63 @@ Library for the CLI commands and the related classes and functions
"""
import click as cli
domain_pattern = r'^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$'
# @TODO create regex pattern for matching IP addresses
# ip_pattern = r''
from custtypes import AnsibleScopes, VPS, VPSRegion, RootFate, UserName
from whereami import PROJ_ROOT, ANSIBLE_ROOTS
from servs import User
from pathlib import PurePath, Path
from sshkey import SSHKeyType
from ansible_vault import Vault
import yaml as yams
@cli.group()
@cli.option("-d", "--debug", type=bool, is_flag=True, default=True, help="Use debugging mode")
@cli.option("-d", "--debug", type=bool, is_flag=True, default=False, help="Use debugging mode")
@cli.pass_context
def skansible(ctx, debug):
ctx.ensure_object(dict)
ctx.obj["DEBUG"] = True
ctx.obj["DEBUG"] = debug
@skansible.command()
@cli.argument("api_key")
@cli.option("-s", "--vps", type=cli.Choice(VPS, case_sensitive=False), default="Linode", help="Set the type of VPS")
@cli.option("-r", "--region", type=cli.Choice(VPSRegion, case_sensitive=False), default="us_east", help="Set the VPS region")
@cli.option("-0", "--root", type=bool, is_flag=True, default=True, help="Declare root SSH login credentials")
@cli.option("-f", "--fate", type=cli.Choice(RootFate, case_sensitive=False), default="disposal", help="Choose the eventual fate of the root account")
@cli.option("-h", "--host", multiple=True, type=str, default="all", help="Specify what inventory host or group this is being set")
@cli.pass_context
def init(ctx, vps, region, root, fate, host, api_key):
if root:
password = cli.prompt("Please enter a password: ", type=str, hide_input=True, confirmation_prompt=True)
root = User(UserName.root.name.lower(), password)
pubkeys = root.ssh_keys.publish(SSHKeyType.pubkey.name.lower(), datatype=list)
pubkey_opts = map(lambda k: str(k), pubkeys)
chosen_pubkey = cli.prompt("Authorize one of the following SSH public keys: ", type=cli.Choice(pubkey_opts, case_sensitive=True), show_choices=True)
chosen_pubkey = Path(chosen_pubkey)
privkeys = root.ssh_keys.publish(SSHKeyType.privkey.name.lower(), datatype=list)[0]
chosen_privkey = tuple(filter(lambda k: k.stem == chosen_pubkey.stem, privkeys))[0]
inv_vars = []
for h in host:
inv_vars += list(ANSIBLE_ROOTS[AnsibleScopes.HOSTVARS.name.lower()].glob(h)) + list(ANSIBLE_ROOTS[AnsibleScopes.GROUPVARS.name.lower()].glob(h))
if len(inv_vars) > 0:
for p in inv_vars:
with open(str(p), "r+") as file:
content = yams.load(file.read(), Loader=yams.Loader)
if "vps_service" in content:
content["vps_service"]["exists"] = True
crypt_key = Vault(api_key)
content["vps_service"]["api_key"] = crypt_key.dump(api_key)
content["vps_service"]["type"] = vps.lower()
content["vps_service"]["region"] = region.replace("_", "-")
content["vps_service"]["root_fate"] = fate
crypt_key = Vault(root.password)
content["vps_service"]["password"] = crypt_key.dump(root.password)
else:
for h in host:
path = ANSIBLE_ROOTS[AnsibleScopes.GROUPVARS.name.lower()] / h
with open(str(path), "w") as file:
pass
if __name__ == "__main__":
skansible(obj={})

173
servs.py Normal file
View File

@@ -0,0 +1,173 @@
from typing import TypedDict as Dict
from typing import Self, Literal, Never
from collections.abc import Sequence
from custtypes import Roles, Scopes, PathCollection, PathRoles, Software, SoftwareRoles, ExecutedPath, PacMans, UserName, GroupName, IdlePath, ExecutedPath
from pathlib import Path, PurePath
from sshkey import SSHKeyCollection, SSHKeyType, SSHKey
from whereami import PROJ_ROLES
class App:
def __init__(self, name: Software, role: SoftwareRoles = SoftwareRoles.client, paths: PathRoles | None = None):
self.__name = name
# @TODO create dict type hint for below data struct
self.alt_names = dict()
self.alt_names[PacMans.APT] = self.__name
self.role = role
if paths is not None:
if Roles.EXE in paths:
setattr(self, "_" + Roles.EXE.name.lower(), paths[Roles.EXE.name.lower()])
if Roles.CONF in paths:
setattr(self, "_" + Roles.CONF.name.lower(), paths[Roles.CONF.name.lower()])
if Roles.DATA in paths:
setattr(self, "_" + Roles.DATA.name.lower(), paths[Roles.DATA.name.lower()])
self.__parents: tuple[Self] | None = None
self.__children: tuple[Self| None] = []
self.__api: str | None = None
self.__current_filepath: IdlePath | ExecutedPath | None = None
self.__content: str | None = None
@property
def conf_paths(self):
if hasattr(self, "_" + Roles.CONF.name.lower()):
return self._conf
else:
raise Exception
@property
def data_paths(self):
if hasattr(self, "_" + Roles.DATA.name.lower()):
return self._data
else:
raise Exception
@property
def exec_paths(self):
if hasattr(self, "_" + Roles.EXE.name.lower()):
return self._exe
else:
raise Exception
def append(self, datatype: Roles = Roles.CONF, scope: Scopes | None = None, path: IdlePath | ExecutedPath | str | PathCollection | None = None):
if path is None:
raise TypeError
datatype = datatype.name.lower()
if hasattr(self, "_" + datatype):
paths = getattr(self, "_" + datatype)
else:
setattr(self, "_" + datatype, dict())
if scope is not None:
if isinstance(path, str):
path: ExecutedPath = Path(path)
if scope.name.lower() not in paths:
paths[scope.name.lower()] = []
paths[scope.name.lower()].append(path)
else:
paths: PathCollection = path
setattr(self, "_" + datatype, paths)
def inherit(self, other):
if other._App__children is None:
other._App__children = tuple()
if self not in other._App__children:
other.adopt(self)
self.__parents = (*self.__parents, other)
def adopt(self, other: Self):
if other._App__parents is None:
other._App__parents = tuple()
if self not in other._App__parent:
other.inherit(self)
self.__children = (*self.__children, other)
def __enter__(self) -> dict | Sequence:
self.__content = self.__current_filepath.read_text()
return self.__content
def __exit__(self, exc_type, exc_value, exc_traceback) -> None:
# txt = yams.dump(self.__content)
# self.__file.write(txt)
# del txt
self.__file.close()
def __call__(self, path = str, mode = "r+", scope: Scopes = Scopes.PROJ, index: int = 0) -> None:
if not hasattr(self, "_" + Roles.CONF.name.lower()):
raise Exception
conf_coll = self._conf[scope.name.lower()]
if isinstance(conf_coll, Sequence):
conf_coll = conf_coll[index]
if isinstance(conf_coll, str):
conf_coll = Path(conf_coll)
filepath = conf_coll / path
self.__current_filepath = filepath
self.__file = open(str(filepath), mode)
# @TODO write below method to duplicate file or template in local to project role file/template
def clone(self, source_scope: Scopes = Scopes.SYS, target_scope: Scopes = Scopes.PROJ, index: int = 0) -> Never:
raise NotImplementedError
# @TODO rewrite below using DIR_ROOTS var from whereami module
sshd_paths: PathRoles = {
Roles.CONF.name.lower(): {
Scopes.PROJ.name.lower(): [
PROJ_ROLES / "bootstrap" / "files" / "sshd_config.d",
PROJ_ROLES / "bootstrap" / "templates" / "sshd_config.d"
],
}
}
sshd = App(Software.sshd, SoftwareRoles.server, sshd_paths)
class Group:
def __init__(self, group_name: GroupName = GroupName.sudo, gid: int = 27):
self.group_name = group_name
self.id = gid
self.category: Literal["system", "regular"] = "system"
class User:
def __init__(self, username: UserName = UserName.root.name, password: str = "test", services: list = [Software.sshd.name.lower()], uid: int = 0):
self.exists = True
self.username = username
self.id = uid
self.password = password
self.services: tuple = tuple(services)
self.shell = "/bin/bash"
self.home = "/"
self.category: Literal["system", "regular"] = "regular"
group = Group(username, self.id)
self.primary_group = group
self.supp_groups = None
if self.supp_groups is None:
self.is_admin = True
elif isinstance(self.supp_groups, Sequence) and GroupName.sudo in self.supp_groups:
self.is_admin = True
else:
self.is_admin = False
ssh_keys = SSHKeyCollection()
ssh_keys.pull()
self.ssh_keys = ssh_keys
pubkeys = ssh_keys.publish(SSHKeyType.pubkey.name.lower(), datatype=list)
self.__auth_keys: list[str] = list(map(lambda k: k.read_text(), pubkeys))
privkeys = ssh_keys.publish(SSHKeyType.privkey.name.lower(), datatype=list)
self.__priv_keys: list[str] = list(map(lambda k: str(k), privkeys[0]))
self.__priv_key_pref: int = privkeys[1]
self.__apps = (sshd,)
def add_keypair(self, private_key, public_key):
raise NotImplementedError

View File

@@ -1,182 +0,0 @@
"""
Library of classes modeling software and software-related
data as represented in or used by Ansible.
"""
from typing import TypeAlias as Neotype
from typing import TypedDict as Dict
from typing import Never, Union, Callable
from custtypes import ExecutedPath, IdlePath
from enum import Enum
from pathlib import Path, PurePath
from whereami import USER_PATH, PROJ_ROOT
from collections.abc import Sequence
AppPath: Neotype = Union[ExecutedPath, IdlePath]
class SoftScope(Enum):
PERSONAL = 0
LOCAL = 1
GLOBAL = 2
class SoftPathGroup(Enum):
CONFIG = 0
DATA = 1
MEM = 2
EXE = 3
_SubAppParams = Dict("_SubAppParams", {
SoftScope.PERSONAL.name: IdlePath | list[IdlePath],
SoftScope.LOCAL.name: IdlePath | list[IdlePath],
SoftScope.GLOBAL.name: IdlePath | list[IdlePath]
}, total=False)
AppParams = Dict("AppParams", {
SoftPathGroup.CONFIG.name: _SubAppParams,
SoftPathGroup.DATA.name: _SubAppParams,
SoftPathGroup.MEM.name: _SubAppParams,
SoftPathGroup.EXE.name: _SubAppParams
}, total=False)
def __AppsInit(self, CONFIG = None, DATA = None, MEM = None, EXE = None):
self.CONFIG = CONFIG
self.DATA = DATA
self.MEM = MEM
self.EXE = EXE
__app_input = {
SoftPathGroup.CONFIG.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
},
SoftPathGroup.DATA.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
},
SoftPathGroup.MEM.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
},
SoftPathGroup.EXE.name: {
SoftScope.PERSONAL.name: [],
SoftScope.LOCAL.name: [],
SoftScope.GLOBAL.name: []
},
"__init__": __AppsInit
}
Apps = type("Apps", (), __app_input)
# @TODO continue adding magic methods to below class
# @NOTE https://rszalski.github.io/magicmethods/#sequence
class Software (Sequence):
__user_path: ExecutedPath = USER_PATH
def __init__(self):
self._fqdn: str | None = None
# @TODO fix NameError for 'Software' in parameter type check
def append(self, name: str, **kwpaths: _SubAppParams) -> AppParams:
keyword_args: AppParams = kwpaths
app = Apps(**keyword_args)
setattr(self, name, app)
return app
def __getitem__(self, key: str) -> AppParams | Never:
if hasattr(self, key):
app: Apps = getattr(self, key)
else:
raise KeyError
return app
def __setitem__(self, key: tuple[str, SoftPathGroup], **value: IdlePath | list[IdlePath]) -> None | Never:
if len(value) < 1 or len(value) > 3:
raise ValueError
app_params: _SubAppParams = value
if hasattr(self, key[0]):
app: Apps = getattr(self, key[0])
if hasattr(app, key[1]):
app_child: _SubAppParams = getattr(app, key[1])
for k, v in app_params.items():
v = [v] if not isinstance(v, list) else v
app_child[k]: IdlePath | list[IdlePath] = v
setattr(app, key[1], app_child)
else:
raise KeyError
setattr(self, key[0], app)
else:
raise KeyError
def __delitem__(self, key: tuple[str | SoftPathGroup]) -> None | Never:
if len(key) < 1 or len(key) > 3:
raise KeyError
if not hasattr(self, key[0]):
raise KeyError
if len(key) == 1:
delattr(self, key[0])
elif len(key) > 1:
app: Apps = getattr(self, key[0])
delattr(app, key[1])
setattr(self, key[0], app)
def show(self, contents: bool = False) -> tuple[str]:
apps: tuple[str] | tuple[Apps] = tuple(
filter(
lambda a: isinstance(getattr(self, a), Apps),
dir(self)
)
)
if contents:
apps = tuple(
map(
lambda a: getattr(self, a),
apps
)
)
return apps
def __len__(self) -> int:
apps: tuple[str] = tuple(
filter(
lambda a: isinstance(getattr(self, a), Apps),
dir(self)
)
)
return len(apps)
def pop(self) -> Never:
raise NotImplementedError
def remove(self) -> Never:
raise NotImplementedError
def __contains__(self) -> Never:
raise NotImplementedError
def count(self) -> Never:
return NotImplementedError
def __missing__(self) -> Never:
raise NotImplementedError
def __iter__(self) -> Never:
raise NotImplementedError
def reverse(self) -> Never:
raise NotImplementedError
def sort(self, key: Callable = (lambda e: e), reverse: bool = False) -> Never:
raise NotImplementedError
class Softs(Enum):
ssh = 0

View File

@@ -1,27 +1,18 @@
from re import Pattern as RegEx
from re import fullmatch as Match
from pathlib import Path, PurePath
from custtypes import ExecutedPath, IdlePath, VirtualPrivateServers, AnsibleScopes
from enum import Enum
from softman import Apps
from custtypes import ExecutedPath, IdlePath
from enum import StrEnum, auto
from random import choice as gamble
from collections.abc import Sequence
from typing import Never, Union, Self, Callable, Required, Literal
from typing import TypedDict as Dict
from glob import glob as globbify
from typing import Never, Self, Callable
from whereami import USER_PATH
from softman import Softs
# import os
class RootFate(Enum):
disposal = 0
retention = 1
class SSHKeyType(Enum):
pubkey = 0
privkey = 1
dual = 2
class SSHKeyType(StrEnum):
pubkey = auto()
privkey = auto()
dual = auto()
# @TODO create unit tests for below class
@@ -40,7 +31,7 @@ class SSHKey:
if len(path) < 2:
self.__value: ExecutedPath | tuple[ExecutedPath] = path[0]
else:
self.category = SSHKeyType.dual.name
self.category = SSHKeyType.dual.name.lower()
self.__value: ExecutedPath | tuple[ExecutedPath] = path
def __int__(self) -> int:
@@ -226,17 +217,17 @@ class SSHKey:
pubkey_present = True
if pubkey_present and privkey_present:
self.category = SSHKeyType.dual.name
self.category = SSHKeyType.dual.name.lower()
elif pubkey_present or privkey_present:
if pubkey_present:
self.category = SSHKeyType.pubkey.name
self.category = SSHKeyType.pubkey.name.lower()
if privkey_present:
self.category = SSHKeyType.privkey.name
self.category = SSHKeyType.privkey.name.lower()
elif isinstance(self.__value, ExecutedPath):
if "-----BEGIN OPENSSH PRIVATE KEY-----" in self.__value.read_text():
self.category = SSHKeyType.privkey.name
self.category = SSHKeyType.privkey.name.lower()
else:
self.category = SSHKeyType.pubkey.name
self.category = SSHKeyType.pubkey.name.lower()
@property
def status(self) -> str:
@@ -582,48 +573,56 @@ class SSHKeyCollection(Sequence):
return prefix + content + postfix
def publish(self, pref: int | None = None, datatype = dict):
def publish(self, category: SSHKeyType = SSHKeyType.pubkey.name.lower(), pref: int | None = None, datatype = dict):
privkey = list()
pubkey = list()
self.__current = self.__first
if datatype == list:
while self.__current is not None:
if self.__current.category == SSHKeyType.privkey.name:
privkey.append(str(self.__current._SSHKey__value))
elif self.__current.category == SSHKeyType.pubkey.name:
pubkey.append(self.__current._SSHKey__value.read_text())
elif self.__current.category == SSHKeyType.dual.name:
privkey.append(str(self.__current._SSHKey__value[0]))
pubkey.append(self.__current._SSHKey__value[1].read_text())
if self.__current.category == SSHKeyType.privkey.name.lower():
privkey.append(self.__current._SSHKey__value)
elif self.__current.category == SSHKeyType.pubkey.name.lower():
pubkey.append(self.__current._SSHKey__value)
elif self.__current.category == SSHKeyType.dual.name.lower():
privkey.append(self.__current._SSHKey__value[0])
pubkey.append(self.__current._SSHKey__value[1])
self.__current = next(self.__first)
return (privkey, pubkey, gamble(range(len(privkey))))
if pref is None:
preference = gamble(range(len(privkey)))
else:
preference = pref
if category == SSHKeyType.pubkey.name.lower():
return pubkey
elif category == SSHKeyType.privkey.name.lower():
return (privkey, preference)
else:
return (privkey, pubkey, preference)
elif datatype == dict:
result = dict()
while self.__current is not None:
if self.__current.category == SSHKeyType.privkey.name:
if self.__current.category == SSHKeyType.privkey.name.lower():
privkey.append(str(self.__current._SSHKey__value))
elif self.__current.category == SSHKeyType.pubkey.name:
elif self.__current.category == SSHKeyType.pubkey.name.lower():
pubkey.append(self.__current._SSHKey__value.read_text())
elif self.__current.category == SSHKeyType.dual.name:
elif self.__current.category == SSHKeyType.dual.name.lower():
privkey.append(str(self.__current._SSHKey__value[0]))
pubkey.append(self.__current._SSHKey__value[1].read_text())
self.__current = next(self.__first)
result["ssh_authorized_keys"]: list[str] = pubkey
result["ssh_private_key_paths"]: list[str] = privkey
result["ssh_private_key_path_pref"]: int = pref if pref is not None else gamble(range(len(privkey)))
if category == SSHKeyType.pubkey.name.lower():
result["ssh_authorized_keys"]: list[str] = pubkey
if category == SSHKeyType.privkey.name.lower():
result["ssh_private_key_paths"]: list[str] = privkey
result["ssh_private_key_path_pref"]: int = pref if pref is not None else gamble(range(len(privkey)))
if category == SSHKeyType.dual.name.lower():
result["ssh_authorized_keys"]: list[str] = pubkey
result["ssh_private_key_paths"]: list[str] = privkey
result["ssh_private_key_path_pref"]: int = pref if pref is not None else gamble(range(len(privkey)))
return result
# @TODO maybe move to separate module for classes for handling users and groups
class UserSSH:
def __init__(self, username: str = "root", paths: Apps | None = None, keys: dict = dict(), password: str = "password123", fate: RootFate = RootFate.disposal.name):
self.username = username
self.paths = paths
self.keys = keys
self.password = password
self.fate = fate

View File

@@ -2,8 +2,25 @@
Library of path constants to be used or referenced elsewhere.
"""
from custtypes import ExecutedPath
from custtypes import ExecutedPath, Roles, Scopes, AnsibleScopes
from pathlib import Path
USER_PATH: ExecutedPath = Path.home()
PROJ_ROOT: ExecutedPath = Path(__file__).parent.resolve()
PROJ_ROLES: ExecutedPath = PROJ_ROOT / "roles"
APP_ROOTS = {
Roles.CONF.name.lower(): {
Scopes.SYS.name.lower(): Path("/etc"),
Scopes.USER.name.lower(): USER_PATH / ".config",
Scopes.SHARED.name.lower(): Path("/usr/share"),
Scopes.PROJ.name.lower(): tuple(list(PROJ_ROLES.glob("**/files")) + list(PROJ_ROLES.glob("**/templates")))
}
}
ANSIBLE_ROOTS = {
AnsibleScopes.GROUPVARS.name.lower(): PROJ_ROOT / "group_vars",
AnsibleScopes.HOSTVARS.name.lower(): PROJ_ROOT / "host_vars",
AnsibleScopes.INVENTORY.name.lower(): None,
AnsibleScopes.ROLE.name.lower(): None
}