Compare commits
3 Commits
310fd28495
...
2f37eed0db
| Author | SHA1 | Date | |
|---|---|---|---|
|
2f37eed0db
|
|||
|
f68a438de3
|
|||
|
8193cb6de6
|
54
anodes.py
54
anodes.py
@@ -176,7 +176,7 @@ class RemoteNode:
|
||||
self.__keys_selected = False
|
||||
self.__finalized_keys = False
|
||||
self.model: dict | None = None
|
||||
self.__accumulator = None
|
||||
self.__key_accumulator: tuple | list | None = []
|
||||
|
||||
def set_region(self, name: Literal["us-east"] = "us-east") -> None:
|
||||
self.region = name
|
||||
@@ -359,9 +359,55 @@ class RemoteNode:
|
||||
self.__finalized_keys = self.__keys_selected and self.__authkeys_selected and self.__usedkeys_selected
|
||||
return result
|
||||
|
||||
# @TODO rewrite below method
|
||||
def remove_keys(self, source: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | ExecutedPath | str) -> list[ExecutedPath] | Never:
|
||||
raise NotImplementedError
|
||||
# @TODO continue writing below method
|
||||
def remove_keys(self, target: Literal["authorized", "used", "available", "selected"] = "available", *selections: int | str | ExecutedPath):
|
||||
keyfiles = self.ssh.keys[target]
|
||||
|
||||
key_accumulator_populated = (slf.__key_accumulator is not None and isinstance(slf.__key_accumulator, (tuple, list)) and len(slf.__key_accumulator) > 0)
|
||||
for s in selections:
|
||||
if isinstance(s, int):
|
||||
if target == "available":
|
||||
self.__key_accumulator.append(list(keyfiles).pop(s))
|
||||
else:
|
||||
self.__key_accumulator.append(keyfiles.pop(s))
|
||||
elif isinstance(s, (str, ExecutedPath)):
|
||||
if isinstance(s, str):
|
||||
removed_keyfiles = list(filter(lambda p: str(p) == s, keyfiles))
|
||||
else:
|
||||
removed_keyfiles = list(filter(lambda p: p == s, keyfiles))
|
||||
keyfiles = filter(lambda p: str(p) != s, keyfiles)
|
||||
|
||||
self.__key_accumulator += removed_keyfiles
|
||||
|
||||
self.ssh.keys[target] = keyfiles
|
||||
if target == "available":
|
||||
selected_diff = list(set(self.ssh.keys["selected"]) - set(self.__key_accumulator))
|
||||
self.ssh.keys["selected"] = selected_diff if len(selected_diff) >= 2 else [0, 1]
|
||||
auth_diff = list(set(self.ssh.keys["authorized"]) - set(self.__key_accumulator))
|
||||
self.ssh.keys["authorized"] = auth_diff
|
||||
used_diff = list(set(self.ssh.keys["used"]) - set(self.__key_accumulator))
|
||||
self.ssh.keys["used"] = used_diff
|
||||
elif target == "selected":
|
||||
available_diff = list(set(self.ssh.keys["available"]) - set(self.__key_accumulator))
|
||||
self.ssh.keys["available"] = available_diff if len(available_diff) >= 2 else [0, 1]
|
||||
auth_diff = list(set(self.ssh.keys["authorized"]) - set(self.__key_accumulator))
|
||||
self.ssh.keys["authorized"] = auth_diff
|
||||
used_diff = list(set(self.ssh.keys["used"]) - set(self.__key_accumulator))
|
||||
self.ssh.keys["used"] = used_diff
|
||||
elif target == "authorized":
|
||||
available_diff = list(set(self.ssh.keys["available"]) - set(self.__key_accumulator))
|
||||
self.ssh.keys["available"] = available_diff if len(available_diff) >= 2 else [0, 1]
|
||||
selected_diff = list(set(self.ssh.keys["selected"]) - set(self.__key_accumulator))
|
||||
self.ssh.keys["selected"] = selected_diff
|
||||
used_diff = list(set(self.ssh.keys["used"]) - set(self.__key_accumulator))
|
||||
self.ssh.keys["used"] = used_diff
|
||||
elif target == "used":
|
||||
available_diff = list(set(self.ssh.keys["available"]) - set(self.__key_accumulator))
|
||||
self.ssh.keys["available"] = available_diff if len(available_diff) >= 2 else [0, 1]
|
||||
selected_diff = list(set(self.ssh.keys["selected"]) - set(self.__key_accumulator))
|
||||
self.ssh.keys["selected"] = selected_diff
|
||||
auth_diff = list(set(self.ssh.keys["used"]) - set(self.__key_accumulator))
|
||||
self.ssh.keys["authorized"] = auth_diff
|
||||
|
||||
def itemize(self):
|
||||
model: dict | vpsSchema = dict()
|
||||
|
||||
18
confctx.py
18
confctx.py
@@ -23,7 +23,7 @@ class Config:
|
||||
filepath = Path(parent_dir) / filepath
|
||||
filename: str = filepath.stem
|
||||
|
||||
self.parse_method = "yml"
|
||||
self.parse_method = "YML"
|
||||
if "." in filename:
|
||||
filename_arr = filename.split(".")
|
||||
basename: str = filename_arr[0]
|
||||
@@ -45,7 +45,13 @@ class Config:
|
||||
|
||||
self.file = open(str(self.filepath), self.mode)
|
||||
model = self.file.read()
|
||||
self.model = yams.load(model)
|
||||
|
||||
if self.parse_method == "INI":
|
||||
raise NotImplementedError
|
||||
elif self.parse_method == "TOML":
|
||||
raise NotImplementedError
|
||||
else:
|
||||
self.model = yams.load(model)
|
||||
|
||||
if self.scope == AnsibleScopes.GROUPVARS.name or self.scope == AnsibleScopes.HOSTVARS.name:
|
||||
api_key: str = model["vps_service"]["api_key"]
|
||||
@@ -86,6 +92,12 @@ class Config:
|
||||
else:
|
||||
raise ValueError
|
||||
|
||||
file_model = yams.dump(self.model | ansible_chunk)
|
||||
if self.parse_method == "INI":
|
||||
raise NotImplementedError
|
||||
elif self.parse_method == "TOML":
|
||||
raise NotImplementedError
|
||||
else:
|
||||
file_model = yams.dump(self.model | ansible_chunk)
|
||||
|
||||
self.file.write(file_model)
|
||||
self.file.close()
|
||||
|
||||
Reference in New Issue
Block a user