#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # License: GNU General Public License v2 # Author: thl-cmk[at]outlook[dot]com # URL : https://thl-cmk.hopto.org # Date : 2023-10-12 # File : create_topology_classes.py from os import environ from pathlib import Path from time import strftime from typing import Dict, List, Any, NamedTuple from enum import Enum, unique from create_topology_utils import ( CREATE_TOPOLOGY_VERSION, PATH_CDP, COLUMNS_CDP, LABEL_CDP, PATH_LLDP, COLUMNS_LLDP, LABEL_LLDP, PATH_INTERFACES, USER_DATA_FILE, LQ_INTERFACES, SCRIPT, get_inventory_data, get_table_from_inventory, get_interface_items_from_lq, ExitCodes, get_data_from_toml, ) class TopologyParams(NamedTuple): path: str columns: str label: str @unique class Topologies(Enum): CDP = TopologyParams(path=PATH_CDP, columns=COLUMNS_CDP, label=LABEL_CDP) LLDP = TopologyParams(path=PATH_LLDP, columns=COLUMNS_LLDP, label=LABEL_LLDP) @unique class CacheSources(Enum): inventory = 'inventory' lq = 'lq' class InventoryColumns(NamedTuple): neighbour: str local_port: str neighbour_port: str class Interface(NamedTuple): host: str index: str name: str description: str alias: str class StaticConnection(NamedTuple): host: str local_port: str neighbour_port: str neighbour: str label: str class Settings: def __init__( self, cli_args: Dict[str, Any], ): self.__omd_root = environ['OMD_ROOT'] self.__topology_save_path = 'var/topology_data' self.__topology_file_name = 'network_data.json' self.__path_to_if_table = 'networking,interfaces' # self.__inventory_path = 'var/check_mk/inventory' # self.__autochecks_path = 'var/check_mk/autochecks' self.__settings = { 'seed_devices': None, 'path_in_inventory': PATH_CDP, 'inventory_columns': COLUMNS_CDP, 'data_source': LABEL_CDP, 'time_format': '%Y-%m-%dT%H:%M:%S.%m', 'user_data_file': f'{self.__omd_root}/local/bin/topology_data/{USER_DATA_FILE}', 'merge': None, 'output_directory': None, 'default': False, 'keep_domain': False, 'lowercase': False, 'uppercase': False, 'debug': False, 'lldp': False, 'version': False, 'dont_compare': False, 'check_user_data_only': False, 'keep': None, 'min_age': None, } # args in the form {'s, __seed_devices': 'CORE01', 'p, __path_in_inventory': None, ... }} # we will remove 's, __' self.__args = ({k.split(',')[-1].strip(' ').strip('_'): v for k, v in cli_args.items() if v}) # first set values if self.__args.get('lldp'): self.__settings['data_source'] = LABEL_LLDP self.__settings['path_in_inventory'] = PATH_LLDP self.__settings['inventory_columns'] = COLUMNS_LLDP # Then update values with cli values self.__settings.update(self.__args) if self.version: print(f'{Path(SCRIPT).name} version: {CREATE_TOPOLOGY_VERSION}') exit(code=ExitCodes.OK.value) if self.check_user_data_only: user_data = get_data_from_toml(file=self.user_data_file) print(f'Could read/parse the user data from {self.user_data_file}') exit(code=ExitCodes.OK.value) if self.merge: _merge = list(set(self.merge.copy())) if len(_merge) != len(self.merge): print(f'-m/--merge options must be unique. Use "-m CDP LLDP" oder "-m LLDP CDP"') exit(code=ExitCodes.BAD_OPTION_LIST.value) else: if self.lldp: self.merge = ['LLDP'] else: self.merge = ['CDP'] def set_topology_param(self, topology: TopologyParams): self.__settings['path_in_inventory'] = topology.path self.__settings['inventory_columns'] = topology.columns self.__settings['data_source'] = topology.label @property def version(self) -> bool: return self.__settings['version'] @property def merge(self) -> List[str] | None: return self.__settings['merge'] @merge.setter def merge(self, topologies: List[str]): self.__settings['merge'] = topologies @property def keep(self) -> int | None: if self.__settings['keep']: return max(self.__settings['keep'], 1) @property def min_age(self) -> int: if self.__settings['min_age']: return max(self.__settings['min_age'], 1) else: return 1 @property def check_user_data_only(self) -> bool: return self.__settings['check_user_data_only'] @property def lldp(self) -> bool: return self.__settings['lldp'] @property def debug(self) -> bool: return self.__settings['debug'] @property def dont_compare(self) -> bool: return self.__settings['dont_compare'] @property def default(self) -> bool: return self.__settings['default'] @property def keep_domain(self) -> bool: return self.__settings['keep_domain'] @property def uppercase(self) -> bool: return self.__settings['uppercase'] @property def lowercase(self) -> bool: return self.__settings['lowercase'] @property def time_format(self) -> str: return self.__settings['time_format'] @property def omd_root(self) -> str: return self.__omd_root # replaced by live status query # @property # def inventory_path(self) -> str: # return self.__inventory_path # @property # def autochecks_path(self) -> str: # return self.__autochecks_path @property def user_data_file(self) -> str: return self.__settings['user_data_file'] @property def topology_save_path(self) -> str: return self.__topology_save_path @property def topology_file_name(self) -> str: return self.__topology_file_name @property def path_in_inventory(self) -> List[str]: # path = ('Nodes,' + ',Nodes,'.join(self.__settings['path_in_inventory'].split(',')) + ',Table,Rows').split(',') # return path return self.__settings['path_in_inventory'] @property def path_to_if_table(self) -> List[str]: path = ('Nodes,' + ',Nodes,'.join(self.__path_to_if_table.split(',')) + ',Table,Rows').split(',') return path @property def inventory_columns(self) -> InventoryColumns: columns = self.__settings['inventory_columns'].split(',') return InventoryColumns( neighbour=columns[0], local_port=columns[1], neighbour_port=columns[2], ) @property def seed_devices(self) -> List[str]: if self.__settings['seed_devices']: return self.__settings['seed_devices'] else: return [] @property def data_source(self) -> str: return self.__settings['data_source'] @property def output_directory(self) -> str: if not self.__settings['output_directory']: return f'{strftime(self.__settings["time_format"])}' else: return self.__settings['output_directory'] class HostCache: def __init__(self): self.__cache = {} self.__inventory_pre_fetch_list: List[str] = [ PATH_CDP, PATH_LLDP, PATH_INTERFACES, ] def __fill_cache(self, host: str): # pre fill inventory data inventory = get_inventory_data(host=host) if inventory: self.__cache[host][CacheSources.inventory.value] = {} self.__cache[host][CacheSources.inventory.value].update({ entry: get_table_from_inventory( inventory=inventory, path=entry ) for entry in self.__inventory_pre_fetch_list }) else: self.__cache[host][CacheSources.inventory.value] = None # prefill live status data self.__cache[host][CacheSources.lq.value] = {} self.__cache[host][CacheSources.lq.value][LQ_INTERFACES] = get_interface_items_from_lq(host) def get_data(self, host: str, source: CacheSources, path: str): if host not in self.__cache.keys(): self.__cache[host]: Dict[str, Any] = {} self.__fill_cache(host=host) try: return self.__cache[host][source.value][path] except (KeyError, TypeError) as e: return None def add_inventory_prefetch_path(self, path: str): self.__inventory_pre_fetch_list = list(set(self.__inventory_pre_fetch_list + [path]))