feature: created a class representing the Ansible control node as context manager
This commit is contained in:
75
anodes.py
Normal file
75
anodes.py
Normal file
@@ -0,0 +1,75 @@
|
||||
from custtypes import AnsibleRoles, AnsibleScopes, ExecutedPath
|
||||
from whereami import USER_PATH, ANSIBLE_CONFIG, ANSIBLE_ROOTS
|
||||
from configparser import ConfigParser as cfg
|
||||
from typing import Sequence
|
||||
from re import Pattern as RegEx
|
||||
from pathlib import PurePath
|
||||
from parse import Parser
|
||||
|
||||
# @TODO below class should mostly work as a context manager
|
||||
class ControlNode:
|
||||
__parser = Parser()
|
||||
|
||||
def __init__(self):
|
||||
self.__user_path = USER_PATH()
|
||||
self.__config = cfg()
|
||||
|
||||
if ANSIBLE_CONFIG.exists():
|
||||
self.__config.read(str(ANSIBLE_CONFIG))
|
||||
else:
|
||||
raise Exception
|
||||
|
||||
self.__data = None
|
||||
self.__filepath: ExecutedPath | None = None
|
||||
self.__file = None
|
||||
|
||||
def __enter__(self):
|
||||
self.__file = open(str(self.__filepath), "r+")
|
||||
self.__data = self.__parser.load(self.__filepath)
|
||||
return self.__data
|
||||
|
||||
def __exit__(self, exc_type, exc_value, exc_traceback):
|
||||
result = self.__parser.dump(self.__data)
|
||||
|
||||
if isinstance(result, str):
|
||||
self.__file.write(result)
|
||||
else:
|
||||
result.write(self.__file)
|
||||
|
||||
self.__file.close()
|
||||
|
||||
def __call__(self, scope: AnsibleScopes = AnsibleScopes.INVENTORY, pick: int | str | RegEx | AnsibleRoles = 0, filepath: ExecutedPath | str | None = None):
|
||||
if scope is None:
|
||||
raise NotImplementedError
|
||||
else:
|
||||
paths = ANSIBLE_ROOTS[scope.name.lower()]
|
||||
|
||||
if isinstance(paths, Sequence):
|
||||
path = None
|
||||
if isinstance(pick, int):
|
||||
path = paths[pick]
|
||||
elif isinstance(pick, str):
|
||||
path = tuple(filter(lambda p: str(p) == pick or pick in str(p), paths))
|
||||
if len(path) > 0:
|
||||
path = path[0]
|
||||
elif isinstance(pick, RegEx):
|
||||
path = tuple(filter(lambda p: pick.search(str(p)), paths))
|
||||
if len(path) > 0:
|
||||
path = path[0]
|
||||
else:
|
||||
path = tuple(filter(lambda p: str(p) == pick.name.lower() or pick.name.lower() in str(p), paths))
|
||||
if len(path) > 0:
|
||||
path = path[0]
|
||||
|
||||
if path is None:
|
||||
raise KeyError
|
||||
|
||||
if isinstance(filepath, ExecutedPath):
|
||||
filepath = str(filepath)
|
||||
|
||||
if path.is_dir():
|
||||
self.__filepath = path / filepath
|
||||
elif path.is_file():
|
||||
self.__filepath = path
|
||||
|
||||
return __enter__
|
||||
Reference in New Issue
Block a user