Compare commits

..

4 Commits

4 changed files with 122 additions and 80 deletions

View File

@@ -1,58 +1,99 @@
from typing import Self, Literal, Never, Callable, Sequence
from custtypes import Roles, Scopes, PathCollection, PathRoles, Software, SoftwareRoles, ExecutedPath, PacMans, UserName, GroupName, IdlePath, ExecutedPath
from custtypes import Roles, Scopes, PathCollection, PathRoles
from custtypes import Software, SoftwareRoles, PacMans
from custtypes import ExecutedPath, IdlePath
from custtypes import UserName, GroupName
from pathlib import Path, PurePath
from sshkey import SSHKeyCollection, SSHKeyType, SSHKey
from random import choice as gamble
from re import Pattern as RegEx
from softman import sshd
from yaml import YAMLObject
class Group:
def __init__(self, group_name: GroupName = GroupName.sudo, gid: int = 27):
class Group(YAMLObject):
yaml_tag = u"!Group"
# @TODO create Enum class child for category parameter type hinting in below method
def __init__(self, group_name: GroupName = GroupName.sudo, category: Literal["system", "regular"] = "system", gid: int | str = 27):
if isinstance(group_name, GroupName):
self.group_name = group_name.name.lower()
else:
self.group_name = group_name
self.id = gid
self.category: Literal["system", "regular"] = "system"
class User:
def __init__(self, username: UserName | str = UserName.root.name.lower(), password: str = "test", services: list = [Software.openssh_server.name.lower()], uid: int = 0):
self.id = str(gid)
self.type = category
def __repr__(self):
return "%s(group_name=%r,category=%r,gid=%r)" % (
self.__class__.__name__,
self.group_name,
self.category,
self.id
)
class User(YAMLObject):
yaml_tag = u"!User"
def __init__(self, username: UserName | str = UserName.root, password: str = "test", services: list[str | Software] = [Software.openssh_server], uid: int | str = 0):
self.exists = True
if isinstance(username, UserName):
self.username = username.name.lower()
else:
self.username = username
self.id = uid
self.id = str(uid)
self.password = password
self.services: tuple = tuple(services)
new_services = []
for s in services:
if isinstance(s, Software):
new_services.append(s.name.lower())
else:
new_services.append(s)
self.services: tuple = tuple(new_services)
self.shell = "/bin/bash"
self.home = "/"
self.category: Literal["system", "regular"] = "regular"
group = Group(username, self.id)
self.primary_group = group
self.supp_groups = None
self.group = group
self.groups: list[str | GroupName] | None = None
if self.supp_groups is None:
self.is_admin = True
elif isinstance(self.supp_groups, Sequence) and GroupName.sudo in self.supp_groups:
self.is_admin = True
if self.groups is None:
self.admin = True
elif isinstance(self.groups, Sequence) and GroupName.sudo in self.groups:
self.admin = True
else:
self.is_admin = False
self.admin = False
ssh_keys = SSHKeyCollection()
ssh_keys.pull()
self.__ssh_keys = ssh_keys
self.__public_keys = self.__ssh_keys.publish(SSHKeyType.pubkey, datatype=list)
self.__private_keys = self.__ssh_keys.publish(SSHKeyType.privkey, datatype=list)[0]
# print("here")
self.__public_keys = self.__ssh_keys.publish(SSHKeyType.pubkey, datatype=list)
pubkeys = ssh_keys.publish(SSHKeyType.pubkey, datatype=list)
self.__auth_keys: SSHKeyCollection = SSHKeyCollection()
for p in pubkeys:
self.__auth_keys.append(p)
self.ssh_authorized_keys: list[str | None] = []
self.__private_keys = self.__ssh_keys.publish(SSHKeyType.privkey, datatype=list)[0]
privkeys = ssh_keys.publish(SSHKeyType.privkey, datatype=list)
self.__priv_keys: SSHKeyCollection = SSHKeyCollection()
for p in privkeys[0]:
self.__priv_keys.append(p)
self.ssh_private_key_paths: list[str | None] = []
self.__priv_key_pref: int = privkeys[1]
self.ssh_private_key_path_pref: int = privkeys[1]
self.__apps = (sshd,)
self.__ssh_keypairs: tuple[SSHKey | tuple[SSHKey]] | SSHKey = tuple()
self.__ssh_keypair_chosen = False
def get_app(self, name: Software) -> Never:
raise NotImplementedError
@@ -68,8 +109,8 @@ class User:
if self.username not in users:
users[self.username] = dict()
users[self.username]["auth_keys"] = self.__auth_keys
users[self.username]["priv_keys"] = self.__priv_keys
users[self.username]["authorized_keys"] = self.__auth_keys
users[self.username]["credential_keys"] = self.__priv_keys
users[self.username]["keys"] = self.__ssh_keys
users[self.username]["keypairs"] = self.__ssh_keypairs
users[self.username]["preferred_priv_key"] = self.__priv_key_pref
@@ -170,14 +211,16 @@ class User:
raise KeyError
self.__auth_keys.append(public_key)
self.ssh_authorized_keys.append(public_key.read_text())
self.__priv_keys.append(private_key)
self.ssh_private_key_paths.append(str(private_key))
self.__ssh_keypairs = (*self.__ssh_keypairs, (self.__priv_keys.tail, self.__auth_keys.tail),)
self.__priv_key_pref = len(self.__ssh_keypairs) - 1
self.ssh_private_key_path_pref = len(self.__ssh_keypairs) - 1
self.__update_sshd()
self.__ssh_keypair_chosen = True
def get_keypair(self, preference: int):
if not self.__ssh_keypair_chosen:
raise Exception
@@ -194,7 +237,7 @@ class User:
return self.__ssh_keypairs[preference]
@property
def available_keys(self) -> SSHKeyCollection:
def keys(self) -> SSHKeyCollection:
return self.__ssh_keys
@property
@@ -229,7 +272,7 @@ class User:
count += 1
@property
def available_keypairs(self) -> tuple[SSHKey] | SSHKey | None:
def keypairs(self) -> tuple[SSHKey] | SSHKey | None:
return self.__ssh_keypairs
@property
@@ -245,3 +288,12 @@ class User:
def credential_keys(self) -> SSHKeyCollection:
return self.__priv_keys
def __repr__(self) -> str:
return "%s(username=%r,password=%r,services=%r,uid=%r)" % (
self.__class__.__name__,
self.username,
self.password,
self.services,
self.id
)

1
gpgkey.py Normal file
View File

@@ -0,0 +1 @@
# @TODO create classes similar to those in sshkey module, for GPG keys

View File

@@ -3,21 +3,23 @@ import yaml as yams
from configparser import ConfigParser as cfg
from custtypes import ExecutedPath
from re import compile as rgx, IGNORECASE
from whereami import PROJ_ROOT
from typing import Literal
class Parser:
def __init__(self):
self.__is_yaml = rgx(r"\.ya?ml$", flags=IGNORECASE).match
self.__is_ini = rgx(r"\.ini$", flags=IGNORECASE).match
self.__is_config = rgx(r"\.cfg$", flags=IGNORECASE).match
self.__is_json = rgx(r"\.[jb]son$", flags=IGNORECASE).match
self.__is_yaml = rgx(r".*\.ya?ml$", flags=IGNORECASE).match
self.__is_ini = rgx(r".*\.ini$", flags=IGNORECASE).match
self.__is_config = rgx(r".*\.cfg$", flags=IGNORECASE).match
self.__is_json = rgx(r".*\.[jb]son$", flags=IGNORECASE).match
self.__data = None
self.__content: str | None = None
self.__is_path: bool = False
# @TODO use Enum child class for below type hint instead
self.__method: Literal["yaml", "config"] | None = None
self.__method: Literal["yaml", "config", "generic"] = "generic"
self.__file: ExecutedPath | None = None
def load(self, filepath: ExecutedPath | str, **kwargs):
def load(self, filepath: ExecutedPath | str, method: Literal["yaml", "config", "generic"] = "generic", **kwargs):
if isinstance(filepath, ExecutedPath):
self.__is_path = True
self.__file = filepath
@@ -29,32 +31,36 @@ class Parser:
else:
self.__content = filepath
if self.__is_yaml(filepath):
if self.__is_yaml(filepath) or method == "yaml":
self.__method = "yaml"
if self.__is_path:
filepath = open(str(filepath), "r")
if len(kwargs) > 0:
self.__data = yams.load(filepath, Loader=yams.Loader, **kwargs)
else:
self.__data = yams.load(filepath, Loader=yams.Loader)
elif self.__is_config(filepath):
elif self.__is_config(filepath) or method == "config":
self.__method = "config"
self.__data = cfg()
if self.__is_path:
read = self.__data.read
else:
read = self.__data.read_string
if len(kwargs) > 0:
self.__data.read(filepath, **kwargs)
else:
self.__data.read(filepath)
else:
if len(kwargs) > 0:
self.__data.read_string(filepath, **kwargs)
else:
self.__data.read_string(filepath, **kwargs)
else:
raise TypeError
return self.__data
def dump(self, obj = None, **kwargs):
if self.__method == "yaml":
def dump(self, obj = None, method: Literal["yaml", "config", "generic"] | None = "generic", **kwargs):
if self.__method == "yaml" or method == "yaml":
if obj is None:
obj = self.__data
@@ -62,15 +68,15 @@ class Parser:
self.__content = yams.dump(obj, Dumper=yams.Dumper, **kwargs)
else:
self.__content = yams.dump(obj, Dumper=yams.Dumper)
elif self.__method == "config":
elif self.__method == "config" or method == "config":
if obj is None:
if self.__is_path:
return self.__file
return self.__file.read_text()
else:
if self.__file is None:
return self.__content
else:
return self.__file
return self.__file.read_text()
else:
if isinstance(obj, ConfigParser):
return obj

View File

@@ -4,9 +4,10 @@ from pathlib import Path, PurePath
from custtypes import ExecutedPath, IdlePath
from enum import StrEnum, auto
from random import choice as gamble
from collections.abc import Sequence
from typing import Never, Self, Callable
# from collections.abc import Sequence, Iterable
from typing import Never, Self, Callable, Iterable, Sequence
from whereami import USER_PATH
from itertools import chain
# import os
class SSHKeyType(StrEnum):
@@ -37,7 +38,6 @@ class SSHKey:
def __int__(self) -> int:
return self.__idx
def update_status(self) -> None:
if isinstance(self.__value, tuple):
privkey_present = False
@@ -69,7 +69,7 @@ class SSHKey:
return "🔑" + key_basename
def __repr__(self) -> str:
return "SSHKey(" + str(self.__value) + ")"
return "%s(%r)" % (self.__class__.__name__, self.__value)
def __nonzero__(self) -> bool:
return True
@@ -282,6 +282,8 @@ class SSHKey:
result = self
return result
def prev(arg):
if isinstance(arg, SSHKey):
return arg._SSHKey__prev__()
@@ -312,7 +314,8 @@ class SSHKeyCollection(Sequence):
self.__last: SSHKey | None = None
self.__indices: range | None = None
# @TODO have other item magic methods mimic this one for slicing purposes
# @TODO allow initialization with unpacked parameter or sequence/iterable argument
def __getitem__(self, key: int | slice) -> SSHKey | Never:
self.__current = self.__first
@@ -573,7 +576,6 @@ class SSHKeyCollection(Sequence):
def count(self, query: RegEx | str) -> int | Never:
raise NotImplementedError
# @TODO make sure to implement below method
def pull(self, query: RegEx | str = "*") -> None:
if isinstance(query, RegEx):
keypaths = self.__ssh_path.glob("*")
@@ -617,11 +619,12 @@ class SSHKeyCollection(Sequence):
def index(self, item: str | ExecutedPath) -> Never:
raise NotImplementedError
def publish(self, category: SSHKeyType | str = SSHKeyType.pubkey, pref: int | None = None, datatype = dict):
def publish(self, category: SSHKeyType | str | None = SSHKeyType.pubkey, pref: int | None = None, datatype = list):
privkey = list()
pubkey = list()
self.__current = self.__first
# @TODO create conditional case that publishes all keys
if datatype == list:
while self.__current is not None:
# print(self.__current)
@@ -648,30 +651,10 @@ class SSHKeyCollection(Sequence):
else:
return (privkey, pubkey, preference)
elif datatype == dict:
result = dict()
while self.__current is not None:
# print(self.__current)
if self.__current.category == SSHKeyType.privkey.name.lower():
privkey.append(str(self.__current._SSHKey__value))
elif self.__current.category == SSHKeyType.pubkey.name.lower():
pubkey.append(self.__current._SSHKey__value.read_text())
elif self.__current.category == SSHKeyType.dual.name.lower():
privkey.append(str(self.__current._SSHKey__value[0]))
pubkey.append(self.__current._SSHKey__value[1].read_text())
self.__current = next(self.__current)
# print("publish running...")
if category.name.lower() == SSHKeyType.pubkey.name.lower():
result["ssh_authorized_keys"]: list[str] = pubkey
if category.name.lower() == SSHKeyType.privkey.name.lower():
result["ssh_private_key_paths"]: list[str] = privkey
result["ssh_private_key_path_pref"]: int = pref if pref is not None else gamble(range(len(privkey)))
if category.name.lower() == SSHKeyType.dual.name.lower():
result["ssh_authorized_keys"]: list[str] = pubkey
result["ssh_private_key_paths"]: list[str] = privkey
result["ssh_private_key_path_pref"]: int = pref if pref is not None else gamble(range(len(privkey)))
# @TODO have result var equal to instance of a yaml.YAMLObject class from parse module
raise NotImplementedError
return result
def __repr__(self) -> str:
return "%s()" % (self.__class__.__name__)