Compare commits

..

5 Commits

5 changed files with 82 additions and 22 deletions

View File

@@ -7,9 +7,11 @@ from typing import Required
from collections.abc import Sequence
from pathlib import Path, PurePath, PurePosixPath, PureWindowsPath, PosixPath, WindowsPath
from enum import StrEnum, auto
from io import TextIOBase, BufferedIOBase, RawIOBase
ExecutedPath: Neotype = PosixPath | WindowsPath
IdlePath: Neotype = PurePosixPath | PureWindowsPath
File: Neotype = TextIOBase | BufferedIOBase | RawIOBase
class RootFate(StrEnum):
disposal = auto()

View File

@@ -1,21 +1,22 @@
from typing import Self, Literal, Never, Callable, Sequence
from custtypes import Roles, Scopes, PathCollection, PathRoles
from custtypes import Software, SoftwareRoles, PacMans
from custtypes import ExecutedPath, IdlePath
from custtypes import UserName, GroupName
from custtypes import ExecutedPath, IdlePath, File
from custtypes import UserName, GroupName, VPS, VPSRegion, RootFate
from pathlib import Path, PurePath
from sshkey import SSHKeyCollection, SSHKeyType, SSHKey
from random import choice as gamble
from re import Pattern as RegEx
from softman import sshd
from yaml import YAMLObject
from ansible_vault import Vault
class Group(YAMLObject):
yaml_tag = u"!Group"
# @TODO create Enum class child for category parameter type hinting in below method
def __init__(self, group_name: GroupName = GroupName.sudo, category: Literal["system", "regular"] = "system", gid: int | str = 27):
def __init__(self, group_name: GroupName | str = GroupName.sudo, category: Literal["system", "regular"] = "system", gid: int | str | None = 27):
if isinstance(group_name, GroupName):
self.group_name = group_name.name.lower()
else:
@@ -36,7 +37,7 @@ class Group(YAMLObject):
class User(YAMLObject):
yaml_tag = u"!User"
def __init__(self, username: UserName | str = UserName.root, password: str = "test", services: list[str | Software] = [Software.openssh_server], uid: int | str = 0):
def __init__(self, username: UserName | str = UserName.root, password: str = "test", services: list[str | Software] = [Software.openssh_server], uid: int | str | None = 0):
self.exists = True
if isinstance(username, UserName):
@@ -128,7 +129,7 @@ class User(YAMLObject):
else:
continue
def add_keypair(self, private_key: SSHKey | ExecutedPath | str | int | RegEx, public_key: SSHKey | ExecutedPath | str | int | RegEx, from_host = True):
def choose_keypair(self, private_key: SSHKey | ExecutedPath | str | int | RegEx, public_key: SSHKey | ExecutedPath | str | int | RegEx, from_host = True):
if not self.__ssh_keypair_chosen:
self.__priv_keys = SSHKeyCollection()
self.__auth_keys = SSHKeyCollection()
@@ -297,3 +298,59 @@ class User(YAMLObject):
self.id
)
class AnsibleCrypt:
def __init__(self, string: str, source: File | None = None):
self.__args = (string, source)
self.__lock = Vault(string).dump
self.__stream = None
if source is not None:
self.__stream = source.read()
self.__data = self.__lock(string, self.__stream)
else:
self.__data = self.__lock(string)
def unlock(self, string: str):
unlock = Vault(string).load
if self.__stream is not None:
result = unlock(self.__stream)
else:
result = unlock(string)
return result
def __str__(self):
return self.__data
def __repr__(self):
return "%s(%r, source=%r)" % (
self.__class__.__name__,
*self.__args
)
class VirtualPrivateServer(YAMLObject):
yaml_tag = u"!VirtualPrivateServer"
def __init__(self, root: User, api: str, vps: VPS | str = VPS.Linode):
self.region: VPSRegion | None = None
if vps == VPS.Linode:
self.region = VPSRegion.us_east
api_key = AnsibleCrypt(api)
self.__api_key: AnsibleCrypt = api_key
self.api_key: str = str(api_key)
self.password: str = root.password
self.exists: bool = True
self.type: str = vps.name.lower()
self.__default_fate: RootFate = RootFate.disposal
self.root_fate: str = self.__default_fate.name.lower()
self.ssh_authorized_keys: list[str] = root.ssh_authorized_keys
self.ssh_private_key_paths: list[str] = root.ssh_private_key_paths
self.ssh_private_key_path_pref: int = root.ssh_private_key_path_pref
# @TODO add SSH MOTD attribute

View File

@@ -25,22 +25,26 @@ class Parser:
self.__file = filepath
filepath = str(filepath)
else:
self.__is_path = False
if Path(filepath).exists():
if isinstance(filepath, str) and Path(filepath).exists():
self.__file = Path(filepath)
self.__is_path = True
else:
self.__content = filepath
self.__is_path = False
if isinstance(filepath, str):
self.__content = filepath
if self.__is_yaml(filepath) or method == "yaml":
self.__method = "yaml"
if self.__is_path:
filepath = open(str(filepath), "r")
filepath = open(str(filepath), "r+")
if len(kwargs) > 0:
self.__data = yams.load(filepath, Loader=yams.Loader, **kwargs)
self.__data = yams.load_all(filepath, Loader=yams.Loader, **kwargs)
else:
self.__data = yams.load(filepath, Loader=yams.Loader)
self.__data = yams.load_all(filepath, Loader=yams.Loader)
filepath.close()
elif self.__is_config(filepath) or method == "config":
self.__method = "config"
self.__data = cfg()
@@ -60,15 +64,15 @@ class Parser:
return self.__data
def dump(self, obj = None, method: Literal["yaml", "config", "generic"] | None = "generic", **kwargs):
if self.__method == "yaml" or method == "yaml":
if isinstance(obj, yams.YAMLObject) or self.__method == "yaml" or method == "yaml":
if obj is None:
obj = self.__data
if len(kwargs) > 0:
self.__content = yams.dump(obj, Dumper=yams.Dumper, **kwargs)
self.__content = "---\n" + yams.dump(obj, Dumper=yams.Dumper, **kwargs)
else:
self.__content = yams.dump(obj, Dumper=yams.Dumper)
elif self.__method == "config" or method == "config":
self.__content = "---\n" + yams.dump(obj, Dumper=yams.Dumper)
elif isinstance(obj, ConfigParser) or self.__method == "config" or method == "config":
if obj is None:
if self.__is_path:
return self.__file.read_text()
@@ -77,11 +81,8 @@ class Parser:
return self.__content
else:
return self.__file.read_text()
else:
if isinstance(obj, ConfigParser):
return obj
else:
raise TypeError
raise NotImplementedError
else:
raise TypeError

View File

@@ -9,6 +9,6 @@ dependencies = [
"ansible-lint>=25.12.1",
"ansible-navigator>=25.12.0",
"ansible-vault>=4.1.0",
"cerberus>=1.3.8",
"click>=8.3.1",
"validators>=0.35.0",
]

View File

@@ -134,7 +134,7 @@ class App:
self.__current_filepath = filepath
self.__file = open(str(filepath), mode)
return __enter__
return self
# @TODO write below method to duplicate file or template in local to project role file/template
def clone(self, source_scope: Scopes = Scopes.SYS, target_scope: Scopes = Scopes.PROJ, index: int = 0) -> Never: