#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # License: GNU General Public License v2 # Author: thl-cmk[at]outlook[dot]com # URL : https://thl-cmk.hopto.org # Date : 2023-10-12 # File : create_topology_classes.py from os import environ from pathlib import Path from time import strftime from typing import Dict, List, Any, NamedTuple from enum import Enum, unique from abc import abstractmethod from ast import literal_eval from create_topology_utils import ( CREATE_TOPOLOGY_VERSION, PATH_INTERFACES, USER_DATA_FILE, CACHE_INTERFACES_ITEM, SCRIPT, get_data_form_live_status, get_table_from_inventory, ExitCodes, get_data_from_toml, OMD_ROOT, ) @unique class CacheItems(Enum): inventory = 'inventory' interfaces = 'interfaces' class InventoryColumns(NamedTuple): neighbour: str local_port: str neighbour_port: str class StaticConnection(NamedTuple): host: str local_port: str neighbour_port: str neighbour: str label: str class Settings: def __init__( self, cli_args: Dict[str, Any], ): self.__omd_root = environ['OMD_ROOT'] self.__topology_save_path = 'var/topology_data' self.__topology_file_name = 'network_data.json' self.__path_to_if_table = 'networking,interfaces' self.__settings = { 'layers': [], 'seed_devices': None, 'time_format': '%Y-%m-%dT%H:%M:%S.%m', 'user_data_file': f'{self.__omd_root}/local/bin/topology_data/{USER_DATA_FILE}', 'output_directory': None, 'default': False, 'keep_domain': False, 'lowercase': False, 'uppercase': False, 'debug': False, 'version': False, 'dont_compare': False, 'check_user_data_only': False, 'keep': None, 'min_age': None, 'backend': 'LIVESTATUS', } # args in the form {'s, __seed_devices': 'CORE01', 'p, __path_in_inventory': None, ... }} # we will remove 's, __' self.__args = ({k.split(',')[-1].strip(' ').strip('_'): v for k, v in cli_args.items() if v}) self.__settings.update(self.__args) if self.version: print(f'{Path(SCRIPT).name} version: {CREATE_TOPOLOGY_VERSION}') exit(code=ExitCodes.OK.value) if self.check_user_data_only: _user_data = get_data_from_toml(file=self.user_data_file) print(f'Could read/parse the user data from {self.user_data_file}') exit(code=ExitCodes.OK.value) if self.layers: layers = list(set(self.layers)) if len(layers) != len(self.layers): print(f'-l/--layers options must be unique. Don~\'t use any layer more than once.') exit(code=ExitCodes.BAD_OPTION_LIST.value) @property def backend(self) -> str: return self.__settings['backend'] @property def version(self) -> bool: return self.__settings['version'] @property def layers(self) -> List[str]: return self.__settings['layers'] @property def keep(self) -> int | None: if self.__settings['keep']: return max(self.__settings['keep'], 0) @property def min_age(self) -> int: if self.__settings['min_age']: return max(self.__settings['min_age'], 0) else: return 0 @property def check_user_data_only(self) -> bool: return self.__settings['check_user_data_only'] @property def debug(self) -> bool: return self.__settings['debug'] @property def dont_compare(self) -> bool: return self.__settings['dont_compare'] @property def default(self) -> bool: return self.__settings['default'] @property def keep_domain(self) -> bool: return self.__settings['keep_domain'] @property def uppercase(self) -> bool: return self.__settings['uppercase'] @property def lowercase(self) -> bool: return self.__settings['lowercase'] @property def time_format(self) -> str: return self.__settings['time_format'] @property def omd_root(self) -> str: return self.__omd_root @property def user_data_file(self) -> str: return self.__settings['user_data_file'] @property def topology_save_path(self) -> str: return self.__topology_save_path @property def topology_file_name(self) -> str: return self.__topology_file_name @property def seed_devices(self) -> List[str]: if self.__settings['seed_devices']: return self.__settings['seed_devices'] else: return [] @property def output_directory(self) -> str: if not self.__settings['output_directory']: return f'{strftime(self.__settings["time_format"])}' else: return self.__settings['output_directory'] class HostCache: def __init__(self): self.__cache = {} self.__inventory_pre_fetch_list: List[str] = [ PATH_INTERFACES, ] @abstractmethod def get_inventory_data(self, host: str, debug: bool = False) -> Dict[str, str] | None: """ Args: host: the host name to return the inventory data for debug: enable debug output Returns: the inventory data as dictionary """ @abstractmethod def get_interface_items(self, host: str, debug: bool = False) -> List: """ Args: host: the host name to return the interface items debug: enable debug output Returns: list of the interface items """ def __fill_cache(self, host: str): # pre fill inventory data inventory = self.get_inventory_data(host=host) if inventory: self.__cache[host][CacheItems.inventory.value] = {} self.__cache[host][CacheItems.inventory.value].update({ entry: get_table_from_inventory( inventory=inventory, path=entry ) for entry in self.__inventory_pre_fetch_list }) else: self.__cache[host][CacheItems.inventory.value] = None self.__cache[host][CacheItems.interfaces.value] = {} self.__cache[host][CacheItems.interfaces.value][CACHE_INTERFACES_ITEM] = self.get_interface_items(host) def get_data(self, host: str, item: CacheItems, path: str): if host not in self.__cache.keys(): self.__cache[host]: Dict[str, Any] = {} self.__fill_cache(host=host) try: return self.__cache[host][item.value][path] except (KeyError, TypeError) as _e: return None def add_inventory_prefetch_path(self, path: str): self.__inventory_pre_fetch_list = list(set(self.__inventory_pre_fetch_list + [path])) class HostCacheLiveStatus(HostCache): def get_inventory_data(self, host: str, debug: bool = False) -> Dict[str, str] | None: query = f'GET hosts\nColumns: mk_inventory\nOutputFormat: python3\nFilter: host_name = {host}\n' data = get_data_form_live_status(query=query) if data: try: data = literal_eval(data[0][0].decode('utf-8')) except SyntaxError as e: if debug: print(f'data: |{data}|') print(f'type: {type(data)}') print(f'exception: {e}') return return data def get_interface_items(self, host: str, debug: bool = False) -> List: """ Sample data from lq query, we keep only the item (description without "Interface" ). [ ['C9540-7-1', 'Interface Vlan-999', 'check_mk-if64'], ['C9540-7-1', 'Interface Vlan-998', 'check_mk-if64'], ['C9540-7-1', 'Interface Vlan-997', 'check_mk-if64'], ['C9540-7-1', 'Interface Vlan-996', 'check_mk-if64'], ['C9540-7-1', 'Interface Vlan-8', 'check_mk-if64'], ['C9540-7-1', 'Interface Te2/0/2', 'check_mk-if64'], ['C9540-7-1', 'Interface Te2/0/30', 'check_mk-if64'] ] Args: host: debug: Returns: """ query = ( 'GET services\n' 'Columns: host_name description check_command\n' 'Filter: description ~ Interface\n' f'Filter: host_name = {host}\n' 'OutputFormat: python3\n' ) data = get_data_form_live_status(query=query) items = [] for host, description, check_command in data: items.append(description[10:]) # remove 'Interface ' from description if debug: print(f'Interfaces items found: {len(items)} an host {host}') return items class HostCacheFileSystem(HostCache): def get_inventory_data(self, host: str, debug: bool = False) -> Dict[str, str] | None: __inventory_path = 'var/check_mk/inventory' inventory_file = Path(f'{OMD_ROOT}/{__inventory_path}/{host}') if inventory_file.exists(): data = literal_eval(inventory_file.read_text()) return data else: return None def get_interface_items(self, host: str, debug: bool = False) -> List: """ Sample autochecks data we keep only the item [ {'check_plugin_name': 'if64', 'item': 'Fa0', 'parameters': {'discovered_oper_status': ['2'], ...}}, {'check_plugin_name': 'if64', 'item': 'Fa1/0/1', 'parameters': {'discovered_oper_status': ['1'], ...}}, {'check_plugin_name': 'if64', 'item': 'Fa1/0/10', 'parameters': {'discovered_oper_status': ['2'], ...}}, {'check_plugin_name': 'if64', 'item': 'Fa1/0/11', 'parameters': {'discovered_oper_status': ['2'], ...}}, {'check_plugin_name': 'if64', 'item': 'Fa1/0/12', 'parameters': {'discovered_oper_status': ['1'], ...}} ] Args: host: debug: Returns: """ __autochecks_path = 'var/check_mk/autochecks' autochecks_file = Path(f'{OMD_ROOT}/{__autochecks_path}/{host}.mk') __data = [] if autochecks_file.exists(): data: List[Dict[str, str]] = literal_eval(autochecks_file.read_text()) for service in data: if service['check_plugin_name'] in ['if64']: __data.append(service['item']) else: if debug: print(f'Device: {host}: not found in auto checks path!') return [] return __data