#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # License: GNU General Public License v2 # Author: thl-cmk[at]outlook[dot]com # URL : https://thl-cmk.hopto.org # Date : 2023-10-12 # File : create_topology_classes.py from os import environ from pathlib import Path from time import strftime, time_ns from typing import Dict, List, Any, NamedTuple from enum import Enum, unique from abc import abstractmethod from ast import literal_eval from requests import session import livestatus from create_topology_utils import ( CREATE_TOPOLOGY_VERSION, PATH_INTERFACES, USER_DATA_FILE, CACHE_INTERFACES_ITEM, SCRIPT, get_data_form_live_status, get_table_from_inventory, ExitCodes, get_data_from_toml, OMD_ROOT, ) @unique class CacheItems(Enum): inventory = 'inventory' interfaces = 'interfaces' class InventoryColumns(NamedTuple): neighbour: str local_port: str neighbour_port: str class StaticConnection(NamedTuple): host: str local_port: str neighbour_port: str neighbour: str label: str class Settings: def __init__( self, cli_args: Dict[str, Any], ): self.__omd_root = environ['OMD_ROOT'] self.__topology_save_path = 'var/topology_data' self.__topology_file_name = 'network_data.json' self.__path_to_if_table = 'networking,interfaces' self.__settings = { 'layers': [], 'seed_devices': None, 'time_format': '%Y-%m-%dT%H:%M:%S.%m', 'user_data_file': f'{self.__omd_root}/local/bin/topology_data/{USER_DATA_FILE}', 'output_directory': None, 'default': False, 'keep_domain': False, 'lowercase': False, 'uppercase': False, 'debug': False, 'version': False, 'dont_compare': False, 'check_user_data_only': False, 'keep': None, 'min_age': None, 'backend': 'LIVESTATUS', } # args in the form {'s, __seed_devices': 'CORE01', 'p, __path_in_inventory': None, ... }} # we will remove 's, __' self.__args = ({k.split(',')[-1].strip(' ').strip('_'): v for k, v in cli_args.items() if v}) self.__settings.update(self.__args) if self.version: print(f'{Path(SCRIPT).name} version: {CREATE_TOPOLOGY_VERSION}') exit(code=ExitCodes.OK.value) if self.check_user_data_only: _user_data = get_data_from_toml(file=self.user_data_file) print(f'Could read/parse the user data from {self.user_data_file}') exit(code=ExitCodes.OK.value) if self.layers: layers = list(set(self.layers)) if len(layers) != len(self.layers): print(f'-l/--layers options must be unique. Don~\'t use any layer more than once.') exit(code=ExitCodes.BAD_OPTION_LIST.value) @property def backend(self) -> str: return self.__settings['backend'] @property def version(self) -> bool: return self.__settings['version'] @property def layers(self) -> List[str]: return self.__settings['layers'] @property def keep(self) -> int | None: if self.__settings['keep']: return max(self.__settings['keep'], 0) @property def min_age(self) -> int: if self.__settings['min_age']: return max(self.__settings['min_age'], 0) else: return 0 @property def check_user_data_only(self) -> bool: return self.__settings['check_user_data_only'] @property def debug(self) -> bool: return self.__settings['debug'] @property def dont_compare(self) -> bool: return self.__settings['dont_compare'] @property def default(self) -> bool: return self.__settings['default'] @property def keep_domain(self) -> bool: return self.__settings['keep_domain'] @property def uppercase(self) -> bool: return self.__settings['uppercase'] @property def lowercase(self) -> bool: return self.__settings['lowercase'] @property def time_format(self) -> str: return self.__settings['time_format'] @property def omd_root(self) -> str: return self.__omd_root @property def user_data_file(self) -> str: return self.__settings['user_data_file'] @property def topology_save_path(self) -> str: return self.__topology_save_path @property def topology_file_name(self) -> str: return self.__topology_file_name @property def seed_devices(self) -> List[str]: if self.__settings['seed_devices']: return self.__settings['seed_devices'] else: return [] @property def output_directory(self) -> str: if not self.__settings['output_directory']: return f'{strftime(self.__settings["time_format"])}' else: return self.__settings['output_directory'] class HostCache: def __init__(self, debug: bool = False): self.__cache = {} self.__inventory_pre_fetch_list: List[str] = [ PATH_INTERFACES, ] self._count = 0 self._debug = debug if debug: print('init HOST_CACHE') @abstractmethod def get_inventory_data(self, host: str, debug: bool = False) -> Dict[str, str] | None: """ Args: host: the host name to return the inventory data for debug: enable debug output Returns: the inventory data as dictionary """ raise NotImplementedError() @abstractmethod def get_interface_items(self, host: str, debug: bool = False) -> List: """ Args: host: the host name to return the interface items debug: enable debug output Returns: list of the interface items """ raise NotImplementedError() def __fill_cache(self, host: str): # pre fill inventory data if self._debug: self._count += 1 _pre_query = time_ns() inventory = self.get_inventory_data(host=host) if self._debug: print(f'{(time_ns() - _pre_query) / 1e9}|{self._count:0>4}|inventory|{host}') if inventory: self.__cache[host][CacheItems.inventory.value] = {} self.__cache[host][CacheItems.inventory.value].update({ entry: get_table_from_inventory( inventory=inventory, path=entry ) for entry in self.__inventory_pre_fetch_list }) else: self.__cache[host][CacheItems.inventory.value] = None self.__cache[host][CacheItems.interfaces.value] = {} if self._debug: self._count += 1 _pre_query = time_ns() self.__cache[host][CacheItems.interfaces.value][CACHE_INTERFACES_ITEM] = self.get_interface_items(host) if self._debug: print(f'{(time_ns() - _pre_query) / 1e9}|{self._count:0>4}|items|{host}') def get_data(self, host: str, item: CacheItems, path: str): if host not in self.__cache.keys(): self.__cache[host]: Dict[str, Any] = {} self.__fill_cache(host=host) try: return self.__cache[host][item.value][path] except (KeyError, TypeError) as _e: return None def add_inventory_prefetch_path(self, path: str): self.__inventory_pre_fetch_list = list(set(self.__inventory_pre_fetch_list + [path])) class HostCacheLiveStatus(HostCache): def get_inventory_data(self, host: str, debug: bool = False) -> Dict[str, str] | None: query = f'GET hosts\nColumns: mk_inventory\nOutputFormat: python3\nFilter: host_name = {host}\n' data = get_data_form_live_status(query=query) if data: try: data = literal_eval(data[0][0].decode('utf-8')) except SyntaxError as e: if debug: print(f'data: |{data}|') print(f'type: {type(data)}') print(f'exception: {e}') return return data if debug: print(f'Device: {host}: no inventory data found!') def get_interface_items(self, host: str, debug: bool = False) -> List: """ Sample data from lq query, we keep only the item (description without "Interface" ). [ ['C9540-7-1', 'Interface Vlan-999', 'check_mk-if64'], ['C9540-7-1', 'Interface Vlan-998', 'check_mk-if64'], ['C9540-7-1', 'Interface Vlan-997', 'check_mk-if64'], ['C9540-7-1', 'Interface Vlan-996', 'check_mk-if64'], ['C9540-7-1', 'Interface Vlan-8', 'check_mk-if64'], ['C9540-7-1', 'Interface Te2/0/2', 'check_mk-if64'], ['C9540-7-1', 'Interface Te2/0/30', 'check_mk-if64'] ] Args: host: debug: Returns: """ query = ( 'GET services\n' 'Columns: host_name description check_command\n' 'Filter: description ~ Interface\n' f'Filter: host_name = {host}\n' 'OutputFormat: python3\n' ) data = get_data_form_live_status(query=query) items = [] for host, description, check_command in data: items.append(description[10:]) # remove 'Interface ' from description if debug: print(f'Interfaces items found: {len(items)} an host {host}') return items class HostCacheFileSystem(HostCache): def get_inventory_data(self, host: str, debug: bool = False) -> Dict[str, str] | None: __inventory_path = 'var/check_mk/inventory' inventory_file = Path(f'{OMD_ROOT}/{__inventory_path}/{host}') if inventory_file.exists(): data = literal_eval(inventory_file.read_text()) return data else: if debug: print(f'Device: {host}: not found in inventory data path!') return None def get_interface_items(self, host: str, debug: bool = False) -> List: """ Sample autochecks data, we keep only the item [ {'check_plugin_name': 'if64', 'item': 'Fa0', 'parameters': {'discovered_oper_status': ['2'], ...}},\n {'check_plugin_name': 'if64', 'item': 'Fa1/0/1', 'parameters': {'discovered_oper_status': ['1'], ...}},\n {'check_plugin_name': 'if64', 'item': 'Fa1/0/10', 'parameters': {'discovered_oper_status': ['2'], ...}},\n {'check_plugin_name': 'if64', 'item': 'Fa1/0/11', 'parameters': {'discovered_oper_status': ['2'], ...}},\n {'check_plugin_name': 'if64', 'item': 'Fa1/0/12', 'parameters': {'discovered_oper_status': ['1'], ...}}\n ] Args: host: name of the host object in cmk to fetch the data for debug: output debug information Returns: List of interface service items """ __autochecks_path = 'var/check_mk/autochecks' autochecks_file = Path(f'{OMD_ROOT}/{__autochecks_path}/{host}.mk') __data = [] if autochecks_file.exists(): data: List[Dict[str, str]] = literal_eval(autochecks_file.read_text()) for service in data: if service['check_plugin_name'] in ['if64']: __data.append(service['item']) else: if debug: print(f'Device: {host}: not found in auto checks path!') return [] return __data class HostCacheMultiSite(HostCache): def __init__(self, debug: bool = False): super().__init__(debug=debug) self.__sites = {} self.get_sites(debug=debug) if debug: print('Create livestatus connection(s)') self.__c = livestatus.MultiSiteConnection(self.__sites) # self.__c.set_prepend_site(False) # is default # self.__c.parallelize = True # is default self.__dead_sites = [site['site']['alias'] for site in self.__c.dead_sites().values()] if self.__dead_sites: self.__dead_sites = ', '.join(self.__dead_sites) print(f'WARNING: use of dead site(s) {self.__dead_sites} is disabled') self.__c.set_only_sites(self.__c.alive_sites()) def get_sites(self, debug: bool = False): sites_mk = Path(f'{OMD_ROOT}/etc/check_mk/multisite.d/sites.mk') socket_path = f'unix:{OMD_ROOT}/tmp/run/live' if sites_mk.exists(): # make eval() "secure" # https://realpython.com/python-eval-function/#minimizing-the-security-issues-of-eval _code = compile(sites_mk.read_text(), "<string>", "eval") allowed_names = ['sites', 'update'] for name in _code.co_names: if name not in allowed_names: raise NameError(f'Use of {name} in {sites_mk.name} not allowed.') sites_raw = {} eval(sites_mk.read_text(), {"__builtins__": {}}, {"sites": sites_raw}) for site, data in sites_raw.items(): self.__sites[site] = { 'alias': data['alias'], 'timeout': data['timeout'], 'nagios_url': '/nagios/', } if data['socket'] == ('local', None): self.__sites[site]['socket'] = socket_path else: protocol, socket = data['socket'] address, port = socket['address'] self.__sites[site]['socket'] = f'{protocol}:{address}:{port}' self.__sites[site]['tls'] = socket['tls'] if debug: print(f'Multisite: Site {site} found, ' f'Socket: {self.__sites[site]["socket"]}, ' f'TLS: {self.__sites[site].get("tls", "N/A")}.') def get_inventory_data(self, host: str, debug: bool = False) -> Dict[str, str] | None: query = f'GET hosts\nColumns: mk_inventory\nOutputFormat: python3\nFilter: host_name = {host}\n' data = self.__c.query(query=query) if data: try: data = literal_eval(data[0][0].decode('utf-8')) except SyntaxError as e: if debug: print(f'data: |{data}|') print(f'type: {type(data)}') print(f'exception: {e}') return return data def get_interface_items(self, host: str, debug: bool = False) -> List: query = ( 'GET services\n' 'Columns: host_name description check_command\n' 'Filter: description ~ Interface\n' f'Filter: host_name = {host}\n' 'OutputFormat: python3\n' ) data = self.__c.query(query=query) items = [] for host, description, check_command in data: items.append(description[10:]) # remove 'Interface ' from description if debug: if items: print(f'Interfaces items found: {len(items)} an host {host}') else: print(f'No Interfaces items found for host {host}') return items class HostCacheRestApi(HostCache): def __init__(self, debug: bool = False): super().__init__(debug=debug) try: self.__secret = Path(f'{OMD_ROOT}/var/check_mk/web/automation/automation.secret').read_text().strip('\n)') except FileNotFoundError as e: print(f'automation.secret not found, {e}') exit() self.__hostname = 'localhost' self.__site = OMD_ROOT.split('/')[-1] self.__api_url = f"http://{self.__hostname}/{self.__site}/check_mk/api/1.0" self.__user = 'automation' if debug: print('Create REST API session') self.__session = session() self.__session.headers['Authorization'] = f"Bearer {self.__user} {self.__secret}" self.__session.headers['Accept'] = 'application/json' def get_inventory_data(self, host: str, debug: bool = False) -> Dict[str, str] | None: # self._count += 1 __query = '{"op": "=", "left": "name", "right": "' + host + '"}' resp = self.__session.get( f"{self.__api_url}/domain-types/host/collections/all", params={ "query": __query, "columns": ['mk_inventory'], }, ) if resp.status_code == 200: # print(f'{resp.elapsed}|{self._count:0>4}|inventory|{host}') try: return resp.json()['value'][0]['extensions']['mk_inventory'] except IndexError: return None else: if debug: print(f'Device: {host}: no inventory data found!') return None def get_interface_items(self, host: str, debug: bool = False) -> List: # self._count += 1 __query = '{"op": "~", "left": "description", "right": "Interface "}' resp = self.__session.get( f"{self.__api_url}/objects/host/NX01/collections/services", params={ "query": __query, "columns": ['host_name', 'description', 'check_command'], }, ) if resp.status_code == 200: # print(f'{resp.elapsed}|{self._count:0>4}|items|{host}') items = [service['extensions']['description'][10:] for service in resp.json()['value']] if debug: print(f'Interfaces items found: {len(items)} an host {host}') return items else: if debug: print(f'No Interfaces items found for host {host}') return []