#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # License: GNU General Public License v2 # Author: thl-cmk[at]outlook[dot]com # URL : https://thl-cmk.hopto.org # Date : 2023-10-12 # File : create_topology_classes.py from os import environ from pathlib import Path from time import strftime from typing import Dict, List, Any, NamedTuple from enum import Enum, unique from create_topology_utils import ( CREATE_TOPOLOGY_VERSION, PATH_CDP, PATH_LLDP, PATH_INTERFACES, USER_DATA_FILE, LQ_INTERFACES, SCRIPT, get_inventory_data, get_table_from_inventory, get_interface_items_from_lq, ExitCodes, get_data_from_toml, ) @unique class CacheSources(Enum): inventory = 'inventory' lq = 'lq' class InventoryColumns(NamedTuple): neighbour: str local_port: str neighbour_port: str class StaticConnection(NamedTuple): host: str local_port: str neighbour_port: str neighbour: str label: str class Settings: def __init__( self, cli_args: Dict[str, Any], ): self.__omd_root = environ['OMD_ROOT'] self.__topology_save_path = 'var/topology_data' self.__topology_file_name = 'network_data.json' self.__path_to_if_table = 'networking,interfaces' # self.__inventory_path = 'var/check_mk/inventory' # self.__autochecks_path = 'var/check_mk/autochecks' self.__settings = { 'layers': [], 'seed_devices': None, 'time_format': '%Y-%m-%dT%H:%M:%S.%m', 'user_data_file': f'{self.__omd_root}/local/bin/topology_data/{USER_DATA_FILE}', 'output_directory': None, 'default': False, 'keep_domain': False, 'lowercase': False, 'uppercase': False, 'debug': False, 'version': False, 'dont_compare': False, 'check_user_data_only': False, 'keep': None, 'min_age': None, } # args in the form {'s, __seed_devices': 'CORE01', 'p, __path_in_inventory': None, ... }} # we will remove 's, __' self.__args = ({k.split(',')[-1].strip(' ').strip('_'): v for k, v in cli_args.items() if v}) self.__settings.update(self.__args) if self.version: print(f'{Path(SCRIPT).name} version: {CREATE_TOPOLOGY_VERSION}') exit(code=ExitCodes.OK.value) if self.check_user_data_only: _user_data = get_data_from_toml(file=self.user_data_file) print(f'Could read/parse the user data from {self.user_data_file}') exit(code=ExitCodes.OK.value) if self.layers: layers = list(set(self.layers)) if len(layers) != len(self.layers): print(f'-l/--layers options must be unique. Don~\'t use any layer more than once.') exit(code=ExitCodes.BAD_OPTION_LIST.value) @property def version(self) -> bool: return self.__settings['version'] @property def layers(self) -> List[str]: return self.__settings['layers'] @property def keep(self) -> int | None: if self.__settings['keep']: return max(self.__settings['keep'], 0) @property def min_age(self) -> int: if self.__settings['min_age']: return max(self.__settings['min_age'], 0) else: return 0 @property def check_user_data_only(self) -> bool: return self.__settings['check_user_data_only'] @property def debug(self) -> bool: return self.__settings['debug'] @property def dont_compare(self) -> bool: return self.__settings['dont_compare'] @property def default(self) -> bool: return self.__settings['default'] @property def keep_domain(self) -> bool: return self.__settings['keep_domain'] @property def uppercase(self) -> bool: return self.__settings['uppercase'] @property def lowercase(self) -> bool: return self.__settings['lowercase'] @property def time_format(self) -> str: return self.__settings['time_format'] @property def omd_root(self) -> str: return self.__omd_root @property def user_data_file(self) -> str: return self.__settings['user_data_file'] @property def topology_save_path(self) -> str: return self.__topology_save_path @property def topology_file_name(self) -> str: return self.__topology_file_name @property def seed_devices(self) -> List[str]: if self.__settings['seed_devices']: return self.__settings['seed_devices'] else: return [] @property def output_directory(self) -> str: if not self.__settings['output_directory']: return f'{strftime(self.__settings["time_format"])}' else: return self.__settings['output_directory'] class HostCache: def __init__(self): self.__cache = {} self.__inventory_pre_fetch_list: List[str] = [ # PATH_CDP, # PATH_LLDP, PATH_INTERFACES, ] def __fill_cache(self, host: str): # pre fill inventory data inventory = get_inventory_data(host=host) if inventory: self.__cache[host][CacheSources.inventory.value] = {} self.__cache[host][CacheSources.inventory.value].update({ entry: get_table_from_inventory( inventory=inventory, path=entry ) for entry in self.__inventory_pre_fetch_list }) else: self.__cache[host][CacheSources.inventory.value] = None # prefill live status data self.__cache[host][CacheSources.lq.value] = {} self.__cache[host][CacheSources.lq.value][LQ_INTERFACES] = get_interface_items_from_lq(host) def get_data(self, host: str, source: CacheSources, path: str): if host not in self.__cache.keys(): self.__cache[host]: Dict[str, Any] = {} self.__fill_cache(host=host) try: return self.__cache[host][source.value][path] except (KeyError, TypeError) as _e: return None def add_inventory_prefetch_path(self, path: str): self.__inventory_pre_fetch_list = list(set(self.__inventory_pre_fetch_list + [path]))