Compare commits

...

4 Commits

4 changed files with 122 additions and 80 deletions

View File

@@ -1,58 +1,99 @@
from typing import Self, Literal, Never, Callable, Sequence from typing import Self, Literal, Never, Callable, Sequence
from custtypes import Roles, Scopes, PathCollection, PathRoles, Software, SoftwareRoles, ExecutedPath, PacMans, UserName, GroupName, IdlePath, ExecutedPath from custtypes import Roles, Scopes, PathCollection, PathRoles
from custtypes import Software, SoftwareRoles, PacMans
from custtypes import ExecutedPath, IdlePath
from custtypes import UserName, GroupName
from pathlib import Path, PurePath from pathlib import Path, PurePath
from sshkey import SSHKeyCollection, SSHKeyType, SSHKey from sshkey import SSHKeyCollection, SSHKeyType, SSHKey
from random import choice as gamble from random import choice as gamble
from re import Pattern as RegEx from re import Pattern as RegEx
from softman import sshd from softman import sshd
from yaml import YAMLObject
class Group:
def __init__(self, group_name: GroupName = GroupName.sudo, gid: int = 27):
self.group_name = group_name
self.id = gid
self.category: Literal["system", "regular"] = "system"
class User: class Group(YAMLObject):
def __init__(self, username: UserName | str = UserName.root.name.lower(), password: str = "test", services: list = [Software.openssh_server.name.lower()], uid: int = 0): yaml_tag = u"!Group"
# @TODO create Enum class child for category parameter type hinting in below method
def __init__(self, group_name: GroupName = GroupName.sudo, category: Literal["system", "regular"] = "system", gid: int | str = 27):
if isinstance(group_name, GroupName):
self.group_name = group_name.name.lower()
else:
self.group_name = group_name
self.id = str(gid)
self.type = category
def __repr__(self):
return "%s(group_name=%r,category=%r,gid=%r)" % (
self.__class__.__name__,
self.group_name,
self.category,
self.id
)
class User(YAMLObject):
yaml_tag = u"!User"
def __init__(self, username: UserName | str = UserName.root, password: str = "test", services: list[str | Software] = [Software.openssh_server], uid: int | str = 0):
self.exists = True self.exists = True
self.username = username
self.id = uid if isinstance(username, UserName):
self.username = username.name.lower()
else:
self.username = username
self.id = str(uid)
self.password = password self.password = password
self.services: tuple = tuple(services)
new_services = []
for s in services:
if isinstance(s, Software):
new_services.append(s.name.lower())
else:
new_services.append(s)
self.services: tuple = tuple(new_services)
self.shell = "/bin/bash" self.shell = "/bin/bash"
self.home = "/" self.home = "/"
self.category: Literal["system", "regular"] = "regular" self.category: Literal["system", "regular"] = "regular"
group = Group(username, self.id) group = Group(username, self.id)
self.primary_group = group self.group = group
self.supp_groups = None self.groups: list[str | GroupName] | None = None
if self.supp_groups is None: if self.groups is None:
self.is_admin = True self.admin = True
elif isinstance(self.supp_groups, Sequence) and GroupName.sudo in self.supp_groups: elif isinstance(self.groups, Sequence) and GroupName.sudo in self.groups:
self.is_admin = True self.admin = True
else: else:
self.is_admin = False self.admin = False
ssh_keys = SSHKeyCollection() ssh_keys = SSHKeyCollection()
ssh_keys.pull() ssh_keys.pull()
self.__ssh_keys = ssh_keys self.__ssh_keys = ssh_keys
self.__public_keys = self.__ssh_keys.publish(SSHKeyType.pubkey, datatype=list)
self.__private_keys = self.__ssh_keys.publish(SSHKeyType.privkey, datatype=list)[0]
# print("here") # print("here")
self.__public_keys = self.__ssh_keys.publish(SSHKeyType.pubkey, datatype=list)
pubkeys = ssh_keys.publish(SSHKeyType.pubkey, datatype=list) pubkeys = ssh_keys.publish(SSHKeyType.pubkey, datatype=list)
self.__auth_keys: SSHKeyCollection = SSHKeyCollection() self.__auth_keys: SSHKeyCollection = SSHKeyCollection()
for p in pubkeys: for p in pubkeys:
self.__auth_keys.append(p) self.__auth_keys.append(p)
self.ssh_authorized_keys: list[str | None] = []
self.__private_keys = self.__ssh_keys.publish(SSHKeyType.privkey, datatype=list)[0]
privkeys = ssh_keys.publish(SSHKeyType.privkey, datatype=list) privkeys = ssh_keys.publish(SSHKeyType.privkey, datatype=list)
self.__priv_keys: SSHKeyCollection = SSHKeyCollection() self.__priv_keys: SSHKeyCollection = SSHKeyCollection()
for p in privkeys[0]: for p in privkeys[0]:
self.__priv_keys.append(p) self.__priv_keys.append(p)
self.ssh_private_key_paths: list[str | None] = []
self.__priv_key_pref: int = privkeys[1] self.__priv_key_pref: int = privkeys[1]
self.ssh_private_key_path_pref: int = privkeys[1]
self.__apps = (sshd,) self.__apps = (sshd,)
self.__ssh_keypairs: tuple[SSHKey | tuple[SSHKey]] | SSHKey = tuple() self.__ssh_keypairs: tuple[SSHKey | tuple[SSHKey]] | SSHKey = tuple()
self.__ssh_keypair_chosen = False self.__ssh_keypair_chosen = False
def get_app(self, name: Software) -> Never: def get_app(self, name: Software) -> Never:
raise NotImplementedError raise NotImplementedError
@@ -68,8 +109,8 @@ class User:
if self.username not in users: if self.username not in users:
users[self.username] = dict() users[self.username] = dict()
users[self.username]["auth_keys"] = self.__auth_keys users[self.username]["authorized_keys"] = self.__auth_keys
users[self.username]["priv_keys"] = self.__priv_keys users[self.username]["credential_keys"] = self.__priv_keys
users[self.username]["keys"] = self.__ssh_keys users[self.username]["keys"] = self.__ssh_keys
users[self.username]["keypairs"] = self.__ssh_keypairs users[self.username]["keypairs"] = self.__ssh_keypairs
users[self.username]["preferred_priv_key"] = self.__priv_key_pref users[self.username]["preferred_priv_key"] = self.__priv_key_pref
@@ -170,14 +211,16 @@ class User:
raise KeyError raise KeyError
self.__auth_keys.append(public_key) self.__auth_keys.append(public_key)
self.ssh_authorized_keys.append(public_key.read_text())
self.__priv_keys.append(private_key) self.__priv_keys.append(private_key)
self.ssh_private_key_paths.append(str(private_key))
self.__ssh_keypairs = (*self.__ssh_keypairs, (self.__priv_keys.tail, self.__auth_keys.tail),) self.__ssh_keypairs = (*self.__ssh_keypairs, (self.__priv_keys.tail, self.__auth_keys.tail),)
self.__priv_key_pref = len(self.__ssh_keypairs) - 1 self.__priv_key_pref = len(self.__ssh_keypairs) - 1
self.ssh_private_key_path_pref = len(self.__ssh_keypairs) - 1
self.__update_sshd() self.__update_sshd()
self.__ssh_keypair_chosen = True self.__ssh_keypair_chosen = True
def get_keypair(self, preference: int): def get_keypair(self, preference: int):
if not self.__ssh_keypair_chosen: if not self.__ssh_keypair_chosen:
raise Exception raise Exception
@@ -194,7 +237,7 @@ class User:
return self.__ssh_keypairs[preference] return self.__ssh_keypairs[preference]
@property @property
def available_keys(self) -> SSHKeyCollection: def keys(self) -> SSHKeyCollection:
return self.__ssh_keys return self.__ssh_keys
@property @property
@@ -229,7 +272,7 @@ class User:
count += 1 count += 1
@property @property
def available_keypairs(self) -> tuple[SSHKey] | SSHKey | None: def keypairs(self) -> tuple[SSHKey] | SSHKey | None:
return self.__ssh_keypairs return self.__ssh_keypairs
@property @property
@@ -245,3 +288,12 @@ class User:
def credential_keys(self) -> SSHKeyCollection: def credential_keys(self) -> SSHKeyCollection:
return self.__priv_keys return self.__priv_keys
def __repr__(self) -> str:
return "%s(username=%r,password=%r,services=%r,uid=%r)" % (
self.__class__.__name__,
self.username,
self.password,
self.services,
self.id
)

1
gpgkey.py Normal file
View File

@@ -0,0 +1 @@
# @TODO create classes similar to those in sshkey module, for GPG keys

View File

@@ -3,21 +3,23 @@ import yaml as yams
from configparser import ConfigParser as cfg from configparser import ConfigParser as cfg
from custtypes import ExecutedPath from custtypes import ExecutedPath
from re import compile as rgx, IGNORECASE from re import compile as rgx, IGNORECASE
from whereami import PROJ_ROOT
from typing import Literal
class Parser: class Parser:
def __init__(self): def __init__(self):
self.__is_yaml = rgx(r"\.ya?ml$", flags=IGNORECASE).match self.__is_yaml = rgx(r".*\.ya?ml$", flags=IGNORECASE).match
self.__is_ini = rgx(r"\.ini$", flags=IGNORECASE).match self.__is_ini = rgx(r".*\.ini$", flags=IGNORECASE).match
self.__is_config = rgx(r"\.cfg$", flags=IGNORECASE).match self.__is_config = rgx(r".*\.cfg$", flags=IGNORECASE).match
self.__is_json = rgx(r"\.[jb]son$", flags=IGNORECASE).match self.__is_json = rgx(r".*\.[jb]son$", flags=IGNORECASE).match
self.__data = None self.__data = None
self.__content: str | None = None self.__content: str | None = None
self.__is_path: bool = False self.__is_path: bool = False
# @TODO use Enum child class for below type hint instead # @TODO use Enum child class for below type hint instead
self.__method: Literal["yaml", "config"] | None = None self.__method: Literal["yaml", "config", "generic"] = "generic"
self.__file: ExecutedPath | None = None self.__file: ExecutedPath | None = None
def load(self, filepath: ExecutedPath | str, **kwargs): def load(self, filepath: ExecutedPath | str, method: Literal["yaml", "config", "generic"] = "generic", **kwargs):
if isinstance(filepath, ExecutedPath): if isinstance(filepath, ExecutedPath):
self.__is_path = True self.__is_path = True
self.__file = filepath self.__file = filepath
@@ -29,32 +31,36 @@ class Parser:
else: else:
self.__content = filepath self.__content = filepath
if self.__is_yaml(filepath): if self.__is_yaml(filepath) or method == "yaml":
self.__method = "yaml" self.__method = "yaml"
if self.__is_path:
filepath = open(str(filepath), "r")
if len(kwargs) > 0: if len(kwargs) > 0:
self.__data = yams.load(filepath, Loader=yams.Loader, **kwargs) self.__data = yams.load(filepath, Loader=yams.Loader, **kwargs)
else: else:
self.__data = yams.load(filepath, Loader=yams.Loader) self.__data = yams.load(filepath, Loader=yams.Loader)
elif self.__is_config(filepath): elif self.__is_config(filepath) or method == "config":
self.__method = "config" self.__method = "config"
self.__data = cfg() self.__data = cfg()
if self.__is_path: if self.__is_path:
if len(kwargs) > 0: read = self.__data.read
self.__data.read(filepath, **kwargs)
else:
self.__data.read(filepath)
else: else:
if len(kwargs) > 0: read = self.__data.read_string
self.__data.read_string(filepath, **kwargs)
else: if len(kwargs) > 0:
self.__data.read_string(filepath, **kwargs) self.__data.read(filepath, **kwargs)
else:
self.__data.read(filepath)
else: else:
raise TypeError raise TypeError
return self.__data return self.__data
def dump(self, obj = None, **kwargs): def dump(self, obj = None, method: Literal["yaml", "config", "generic"] | None = "generic", **kwargs):
if self.__method == "yaml": if self.__method == "yaml" or method == "yaml":
if obj is None: if obj is None:
obj = self.__data obj = self.__data
@@ -62,15 +68,15 @@ class Parser:
self.__content = yams.dump(obj, Dumper=yams.Dumper, **kwargs) self.__content = yams.dump(obj, Dumper=yams.Dumper, **kwargs)
else: else:
self.__content = yams.dump(obj, Dumper=yams.Dumper) self.__content = yams.dump(obj, Dumper=yams.Dumper)
elif self.__method == "config": elif self.__method == "config" or method == "config":
if obj is None: if obj is None:
if self.__is_path: if self.__is_path:
return self.__file return self.__file.read_text()
else: else:
if self.__file is None: if self.__file is None:
return self.__content return self.__content
else: else:
return self.__file return self.__file.read_text()
else: else:
if isinstance(obj, ConfigParser): if isinstance(obj, ConfigParser):
return obj return obj

View File

@@ -4,9 +4,10 @@ from pathlib import Path, PurePath
from custtypes import ExecutedPath, IdlePath from custtypes import ExecutedPath, IdlePath
from enum import StrEnum, auto from enum import StrEnum, auto
from random import choice as gamble from random import choice as gamble
from collections.abc import Sequence # from collections.abc import Sequence, Iterable
from typing import Never, Self, Callable from typing import Never, Self, Callable, Iterable, Sequence
from whereami import USER_PATH from whereami import USER_PATH
from itertools import chain
# import os # import os
class SSHKeyType(StrEnum): class SSHKeyType(StrEnum):
@@ -37,7 +38,6 @@ class SSHKey:
def __int__(self) -> int: def __int__(self) -> int:
return self.__idx return self.__idx
def update_status(self) -> None: def update_status(self) -> None:
if isinstance(self.__value, tuple): if isinstance(self.__value, tuple):
privkey_present = False privkey_present = False
@@ -69,7 +69,7 @@ class SSHKey:
return "🔑" + key_basename return "🔑" + key_basename
def __repr__(self) -> str: def __repr__(self) -> str:
return "SSHKey(" + str(self.__value) + ")" return "%s(%r)" % (self.__class__.__name__, self.__value)
def __nonzero__(self) -> bool: def __nonzero__(self) -> bool:
return True return True
@@ -282,6 +282,8 @@ class SSHKey:
result = self result = self
return result return result
def prev(arg): def prev(arg):
if isinstance(arg, SSHKey): if isinstance(arg, SSHKey):
return arg._SSHKey__prev__() return arg._SSHKey__prev__()
@@ -312,7 +314,8 @@ class SSHKeyCollection(Sequence):
self.__last: SSHKey | None = None self.__last: SSHKey | None = None
self.__indices: range | None = None self.__indices: range | None = None
# @TODO have other item magic methods mimic this one for slicing purposes # @TODO allow initialization with unpacked parameter or sequence/iterable argument
def __getitem__(self, key: int | slice) -> SSHKey | Never: def __getitem__(self, key: int | slice) -> SSHKey | Never:
self.__current = self.__first self.__current = self.__first
@@ -573,7 +576,6 @@ class SSHKeyCollection(Sequence):
def count(self, query: RegEx | str) -> int | Never: def count(self, query: RegEx | str) -> int | Never:
raise NotImplementedError raise NotImplementedError
# @TODO make sure to implement below method
def pull(self, query: RegEx | str = "*") -> None: def pull(self, query: RegEx | str = "*") -> None:
if isinstance(query, RegEx): if isinstance(query, RegEx):
keypaths = self.__ssh_path.glob("*") keypaths = self.__ssh_path.glob("*")
@@ -617,11 +619,12 @@ class SSHKeyCollection(Sequence):
def index(self, item: str | ExecutedPath) -> Never: def index(self, item: str | ExecutedPath) -> Never:
raise NotImplementedError raise NotImplementedError
def publish(self, category: SSHKeyType | str = SSHKeyType.pubkey, pref: int | None = None, datatype = dict): def publish(self, category: SSHKeyType | str | None = SSHKeyType.pubkey, pref: int | None = None, datatype = list):
privkey = list() privkey = list()
pubkey = list() pubkey = list()
self.__current = self.__first self.__current = self.__first
# @TODO create conditional case that publishes all keys
if datatype == list: if datatype == list:
while self.__current is not None: while self.__current is not None:
# print(self.__current) # print(self.__current)
@@ -648,30 +651,10 @@ class SSHKeyCollection(Sequence):
else: else:
return (privkey, pubkey, preference) return (privkey, pubkey, preference)
elif datatype == dict: elif datatype == dict:
result = dict() # @TODO have result var equal to instance of a yaml.YAMLObject class from parse module
raise NotImplementedError
while self.__current is not None:
# print(self.__current)
if self.__current.category == SSHKeyType.privkey.name.lower():
privkey.append(str(self.__current._SSHKey__value))
elif self.__current.category == SSHKeyType.pubkey.name.lower():
pubkey.append(self.__current._SSHKey__value.read_text())
elif self.__current.category == SSHKeyType.dual.name.lower():
privkey.append(str(self.__current._SSHKey__value[0]))
pubkey.append(self.__current._SSHKey__value[1].read_text())
self.__current = next(self.__current)
# print("publish running...")
if category.name.lower() == SSHKeyType.pubkey.name.lower():
result["ssh_authorized_keys"]: list[str] = pubkey
if category.name.lower() == SSHKeyType.privkey.name.lower():
result["ssh_private_key_paths"]: list[str] = privkey
result["ssh_private_key_path_pref"]: int = pref if pref is not None else gamble(range(len(privkey)))
if category.name.lower() == SSHKeyType.dual.name.lower():
result["ssh_authorized_keys"]: list[str] = pubkey
result["ssh_private_key_paths"]: list[str] = privkey
result["ssh_private_key_path_pref"]: int = pref if pref is not None else gamble(range(len(privkey)))
return result return result
def __repr__(self) -> str:
return "%s()" % (self.__class__.__name__)