Files
skato-ansible/entities.py

248 lines
9.6 KiB
Python

from typing import Self, Literal, Never, Callable, Sequence
from custtypes import Roles, Scopes, PathCollection, PathRoles, Software, SoftwareRoles, ExecutedPath, PacMans, UserName, GroupName, IdlePath, ExecutedPath
from pathlib import Path, PurePath
from sshkey import SSHKeyCollection, SSHKeyType, SSHKey
from random import choice as gamble
from re import Pattern as RegEx
from softman import sshd
class Group:
def __init__(self, group_name: GroupName = GroupName.sudo, gid: int = 27):
self.group_name = group_name
self.id = gid
self.category: Literal["system", "regular"] = "system"
class User:
def __init__(self, username: UserName | str = UserName.root.name.lower(), password: str = "test", services: list = [Software.openssh_server.name.lower()], uid: int = 0):
self.exists = True
self.username = username
self.id = uid
self.password = password
self.services: tuple = tuple(services)
self.shell = "/bin/bash"
self.home = "/"
self.category: Literal["system", "regular"] = "regular"
group = Group(username, self.id)
self.primary_group = group
self.supp_groups = None
if self.supp_groups is None:
self.is_admin = True
elif isinstance(self.supp_groups, Sequence) and GroupName.sudo in self.supp_groups:
self.is_admin = True
else:
self.is_admin = False
ssh_keys = SSHKeyCollection()
ssh_keys.pull()
self.__ssh_keys = ssh_keys
self.__public_keys = self.__ssh_keys.publish(SSHKeyType.pubkey, datatype=list)
self.__private_keys = self.__ssh_keys.publish(SSHKeyType.privkey, datatype=list)[0]
# print("here")
pubkeys = ssh_keys.publish(SSHKeyType.pubkey, datatype=list)
self.__auth_keys: SSHKeyCollection = SSHKeyCollection()
for p in pubkeys:
self.__auth_keys.append(p)
privkeys = ssh_keys.publish(SSHKeyType.privkey, datatype=list)
self.__priv_keys: SSHKeyCollection = SSHKeyCollection()
for p in privkeys[0]:
self.__priv_keys.append(p)
self.__priv_key_pref: int = privkeys[1]
self.__apps = (sshd,)
self.__ssh_keypairs: tuple[SSHKey | tuple[SSHKey]] | SSHKey = tuple()
self.__ssh_keypair_chosen = False
def get_app(self, name: Software) -> Never:
raise NotImplementedError
def update_app(self, name: Software, attr: str, method = None) -> Never:
raise NotImplementedError
def __update_sshd(self, app: Software = Software.openssh_server):
for a in self.__apps:
if a.alt_names[PacMans.APT.name.lower()] == app.name.lower():
if hasattr(a, "users"):
users = getattr(a, "users")
if self.username not in users:
users[self.username] = dict()
users[self.username]["auth_keys"] = self.__auth_keys
users[self.username]["priv_keys"] = self.__priv_keys
users[self.username]["keys"] = self.__ssh_keys
users[self.username]["keypairs"] = self.__ssh_keypairs
users[self.username]["preferred_priv_key"] = self.__priv_key_pref
setattr(self, "users", users)
else:
users = {
self.username: {
"auth_keys": self.__auth_keys,
"priv_keys": self.__priv_keys,
"keypairs": self.__ssh_keypairs,
"keys": self.__ssh_keys
}
}
a.declare(users = users)
else:
continue
def add_keypair(self, private_key: SSHKey | ExecutedPath | str | int | RegEx, public_key: SSHKey | ExecutedPath | str | int | RegEx, from_host = True):
if not self.__ssh_keypair_chosen:
self.__priv_keys = SSHKeyCollection()
self.__auth_keys = SSHKeyCollection()
if from_host:
pubkeys = self.__ssh_keys.publish(SSHKeyType.pubkey, datatype=list)
# print(pubkeys)
if isinstance(public_key, int):
public_key = pubkeys[public_key]
elif isinstance(public_key, SSHKey):
public_key = tuple(filter(lambda k: str(k) == str(public_key()), pubkeys))
if len(public_key) > 0:
public_key = public_key[0]
else:
public_key = None
elif isinstance(public_key, str):
public_key = tuple(filter(lambda k: str(k) == public_key or public_key in str(k), pubkeys))
if len(public_key) > 0:
public_key = public_key[0]
else:
public_key = None
elif isinstance(public_key, RegEx):
public_key = tuple(filter(lambda k: public_key.search(str(k)), pubkeys))
if len(public_key) > 0:
public_key = public_key[0]
else:
public_key = None
else:
public_key = tuple(filter(lambda k: str(k) == str(public_key), pubkeys))
if len(public_key) > 0:
public_key = public_key[0]
else:
public_key = None
privkeys = self.__ssh_keys.publish(SSHKeyType.privkey, datatype=list)[0]
if isinstance(private_key, int):
private_key = privkeys[private_key]
elif isinstance(private_key, SSHKey):
private_key = tuple(filter(lambda k: str(k) == str(private_key()), privkeys))
if len(private_key) > 0:
private_key = private_key[0]
else:
private_key = None
elif isinstance(private_key, str):
private_key = tuple(filter(lambda k: str(k) == private_key or private_key in str(k), privkeys))
if len(private_key) > 0:
private_key = private_key[0]
else:
private_key = None
elif isinstance(private_key, RegEx):
private_key = tuple(filter(lambda k: private_key.search(str(k)), privkeys))
if len(private_key) > 0:
private_key = private_key[0]
else:
private_key = None
else:
private_key = tuple(filter(lambda k: str(k) == str(private_key), privkeys))
if len(private_key) > 0:
private_key = private_key[0]
else:
private_key = None
else:
if isinstance(public_key, SSHKey):
public_key = public_key()
elif isinstance(public_key, str):
public_key = Path(public_key)
self.__ssh_keys.append(public_key)
if isinstance(private_key, SSHKey):
private_key = private_key()
elif isinstance(private_key, str):
private_key = Path(private_key)
self.__ssh_keys.append(private_key)
if private_key is None or public_key is None:
raise KeyError
self.__auth_keys.append(public_key)
self.__priv_keys.append(private_key)
self.__ssh_keypairs = (*self.__ssh_keypairs, (self.__priv_keys.tail, self.__auth_keys.tail),)
self.__priv_key_pref = len(self.__ssh_keypairs) - 1
self.__update_sshd()
self.__ssh_keypair_chosen = True
def get_keypair(self, preference: int):
if not self.__ssh_keypair_chosen:
raise Exception
if isinstance(self.__ssh_keypairs, SSHKey) and not isinstance(self.__ssh_keypairs(), tuple):
raise ValueError
if isinstance(self.__ssh_keypairs, SSHKey):
if isinstance(self.__ssh_keypairs(), tuple):
return self.__ssh_keypairs[preference]
else:
return self.__ssh_keypairs
else:
return self.__ssh_keypairs[preference]
@property
def available_keys(self) -> SSHKeyCollection:
return self.__ssh_keys
@property
def public_keys(self) -> SSHKeyCollection:
return self.__public_keys
@property
def private_keys(self) -> SSHKeyCollection:
return self.__private_keys
@property
def keypair_preference(self) -> int:
return self.__priv_key_pref
def prefer_keypair(self, preference: int | RegEx | str):
if isinstance(preference, int):
if preference < len(self.__ssh_keypairs):
self.__priv_key_pref = preference
else:
raise KeyError
elif isinstance(preference, RegEx):
count = 0
for keypair in self.__ssh_keypairs:
if preference.search(keypair[0]) or preference.search(keypair[1]):
self.__priv_key_pref = count
count += 1
else:
count = 0
for keypair in self.__ssh_keypairs:
if preference in str(keypair[0]()) or preference in str(keypair[1]()):
self.__priv_key_pref = count
count += 1
@property
def available_keypairs(self) -> tuple[SSHKey] | SSHKey | None:
return self.__ssh_keypairs
@property
def keypair(self):
kp = self.__ssh_keypairs[self.__priv_key_pref]
return kp[0] + kp[1]
@property
def authorized_keys(self) -> SSHKeyCollection:
return self.__auth_keys
@property
def credential_keys(self) -> SSHKeyCollection:
return self.__priv_keys