feature: added a class for handling Ansible vaults that can be converted to string, and added a YAMLObject representing VPS service as in YAML doc

This commit is contained in:
2026-01-24 14:38:31 -05:00
parent e3683fe955
commit a2d921a158

View File

@@ -1,21 +1,22 @@
from typing import Self, Literal, Never, Callable, Sequence from typing import Self, Literal, Never, Callable, Sequence
from custtypes import Roles, Scopes, PathCollection, PathRoles from custtypes import Roles, Scopes, PathCollection, PathRoles
from custtypes import Software, SoftwareRoles, PacMans from custtypes import Software, SoftwareRoles, PacMans
from custtypes import ExecutedPath, IdlePath from custtypes import ExecutedPath, IdlePath, File
from custtypes import UserName, GroupName from custtypes import UserName, GroupName, VPS, VPSRegion, RootFate
from pathlib import Path, PurePath from pathlib import Path, PurePath
from sshkey import SSHKeyCollection, SSHKeyType, SSHKey from sshkey import SSHKeyCollection, SSHKeyType, SSHKey
from random import choice as gamble from random import choice as gamble
from re import Pattern as RegEx from re import Pattern as RegEx
from softman import sshd from softman import sshd
from yaml import YAMLObject from yaml import YAMLObject
from ansible_vault import Vault
class Group(YAMLObject): class Group(YAMLObject):
yaml_tag = u"!Group" yaml_tag = u"!Group"
# @TODO create Enum class child for category parameter type hinting in below method # @TODO create Enum class child for category parameter type hinting in below method
def __init__(self, group_name: GroupName = GroupName.sudo, category: Literal["system", "regular"] = "system", gid: int | str = 27): def __init__(self, group_name: GroupName | str = GroupName.sudo, category: Literal["system", "regular"] = "system", gid: int | str | None = 27):
if isinstance(group_name, GroupName): if isinstance(group_name, GroupName):
self.group_name = group_name.name.lower() self.group_name = group_name.name.lower()
else: else:
@@ -36,7 +37,7 @@ class Group(YAMLObject):
class User(YAMLObject): class User(YAMLObject):
yaml_tag = u"!User" yaml_tag = u"!User"
def __init__(self, username: UserName | str = UserName.root, password: str = "test", services: list[str | Software] = [Software.openssh_server], uid: int | str = 0): def __init__(self, username: UserName | str = UserName.root, password: str = "test", services: list[str | Software] = [Software.openssh_server], uid: int | str | None = 0):
self.exists = True self.exists = True
if isinstance(username, UserName): if isinstance(username, UserName):
@@ -128,7 +129,7 @@ class User(YAMLObject):
else: else:
continue continue
def add_keypair(self, private_key: SSHKey | ExecutedPath | str | int | RegEx, public_key: SSHKey | ExecutedPath | str | int | RegEx, from_host = True): def choose_keypair(self, private_key: SSHKey | ExecutedPath | str | int | RegEx, public_key: SSHKey | ExecutedPath | str | int | RegEx, from_host = True):
if not self.__ssh_keypair_chosen: if not self.__ssh_keypair_chosen:
self.__priv_keys = SSHKeyCollection() self.__priv_keys = SSHKeyCollection()
self.__auth_keys = SSHKeyCollection() self.__auth_keys = SSHKeyCollection()
@@ -297,3 +298,59 @@ class User(YAMLObject):
self.id self.id
) )
class AnsibleCrypt:
def __init__(self, string: str, source: File | None = None):
self.__args = (string, source)
self.__lock = Vault(string).dump
self.__stream = None
if source is not None:
self.__stream = source.read()
self.__data = self.__lock(string, self.__stream)
else:
self.__data = self.__lock(string)
def unlock(self, string: str):
unlock = Vault(string).load
if self.__stream is not None:
result = unlock(self.__stream)
else:
result = unlock(string)
return result
def __str__(self):
return self.__data
def __repr__(self):
return "%s(%r, source=%r)" % (
self.__class__.__name__,
*self.__args
)
class VirtualPrivateServer(YAMLObject):
yaml_tag = u"!VirtualPrivateServer"
def __init__(self, root: User, api: str, vps: VPS | str = VPS.Linode):
self.region: VPSRegion | None = None
if vps == VPS.Linode:
self.region = VPSRegion.us_east
api_key = AnsibleCrypt(api)
self.__api_key: AnsibleCrypt = api_key
self.api_key: str = str(api_key)
self.password: str = root.password
self.exists: bool = True
self.type: str = vps.name.lower()
self.__default_fate: RootFate = RootFate.disposal
self.root_fate: str = self.__default_fate.name.lower()
self.ssh_authorized_keys: list[str] = root.ssh_authorized_keys
self.ssh_private_key_paths: list[str] = root.ssh_private_key_paths
self.ssh_private_key_path_pref: int = root.ssh_private_key_path_pref
# @TODO add SSH MOTD attribute