#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # License: GNU General Public License v2 # Author: thl-cmk[at]outlook[dot]com # URL : https://thl-cmk.hopto.org # Date : 2023-10-08 # File : create_topology_data.py # # PoC for creating topology_data.json from inventory data # # This script creates the topology data file needed for the Checkmk "network_visualization" plugin by # Andreas Boesl and schnetz. See # https://forum.checkmk.com/t/network-visualization/41680 # and # https://exchange.checkmk.com/p/network-visualization # for more information. # # The inventory data could be created with my CDP/LLDP inventory plugins: # # CDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_cdp_cache\ # CDP path-in-inventory: 'networking,cdp_cache' # CDP inventory-columns: 'device_id,local_port,device_port' # # LLDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lldp_cache\ # LLDP path-in-inventory: 'networking,lldp_cache' # LLDP inventory-columns: 'system_name,local_port_num,port_id' # # USAGE CDP (is the default): # ~/local/lib/topology_data/create_topology_data.py -s CORE01 -m # # USAGE LLDP: # ~/local/lib/topology_data/create_topology_data.py -s CORE01 -p "networking,lldp_cache" \ # -c "system_name,local_port_num,port_id" -m # from ast import literal_eval from pathlib import Path from json import dumps from typing import Dict, List, Optional, Any from time import strftime, time_ns from create_topology_classes import InventoryColumns, Settings from create_topology_utils import parse_arguments CREATE_TOPOLOGY_VERSION = '0.0.2-20231012' # map inventory neighbour name to CMK host name HOST_MAP = { 'neighbour': 'cmk_host', 'nexus01': 'NX01', 'nexus02': 'NX02', 'nexus03': 'NX03', 'isr-ap01': 'ro01-ap', } # drop neighbours with invalid names DROP_HOSTS = [ 'not advertised', ] ITEMS = {} def get_inventory_data(host: str, path: List[str]) -> Optional[List[Dict[str, str]]]: inventory_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.inventory_path}/{host}') if inventory_file.exists(): data = literal_eval(inventory_file.read_text()) if data: for m in path: try: data = data[m] except KeyError: return None return data else: print(f'Device: {host}: no inventory data found!') else: print(f'Device: {host}: not found in inventory path!') def get_items_from_autochecks(host: str): if host and host not in ITEMS.keys(): ITEMS[host] = [] autochecks_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.autochecks_path}/{host}.mk') if autochecks_file.exists(): data: List[Dict[str, str]] = literal_eval(autochecks_file.read_text()) for service in data: if service['check_plugin_name'] in ['if64']: ITEMS[host].append(service['item']) else: print(f'Device: {host}: not found in auto checks path!') return None def check_interface_name(host: str, interface: str) -> str: if not host: return interface items = ITEMS.get(host, []) if interface in items: return interface else: if_padding = '/'.join(interface.split('/')[:-1]) + f'/{interface.split("/")[-1]:0>2}' for item in items: # interface = Gi0/1 # item = Gi0/1 - Access port if item.startswith(interface): return item elif item.startswith(if_padding): return item print(f'Device: {host}: interface ({interface}) not found in services') return interface def create_device_from_inv( host: str, inv_data: List[Dict[str, str]], inv_columns: InventoryColumns, data_source: str, ) -> Optional[Dict[str, Any]]: data = {'connections': {}, "interfaces": []} get_items_from_autochecks(host) for topo_neighbour in inv_data: neighbour = topo_neighbour.get(inv_columns.neighbour) if not SETTINGS.keep_domain: neighbour = neighbour.split('.')[0] if SETTINGS.uppercase: neighbour = neighbour.upper() if SETTINGS.lowercase: neighbour = neighbour.lower() # drop neighbour if inventory neighbour is invalid if neighbour in DROP_HOSTS: continue # rewrite neighbour if inventory neighbour and checkmk host don't match if neighbour in HOST_MAP.keys(): neighbour = HOST_MAP[neighbour] # has to be done before checking interfaces get_items_from_autochecks(neighbour) # getting/checking interfaces local_port = check_interface_name(host, topo_neighbour.get(inv_columns.local_port)) neighbour_port = check_interface_name(neighbour, topo_neighbour.get(inv_columns.neighbour_port)) if neighbour and local_port and neighbour_port: data['connections'].update({local_port: [neighbour, neighbour_port, data_source]}) if local_port not in data['interfaces']: data['interfaces'].append(local_port) return {host: data} def get_list_of_devices(data) -> List[str]: devices = [] for connection in data.values(): devices.append(connection[0]) return list(set(devices)) def save_topology_data(data: Dict, path: str): save_file = Path(f'{path}/{SETTINGS.topology_file_name}') save_file.parent.mkdir(exist_ok=True, parents=True) save_file.write_text(dumps(data)) parent_path = Path(f'{path}').parent if SETTINGS.make_default: Path(f'{parent_path}/default').unlink(missing_ok=True) Path(f'{parent_path}/default').symlink_to(target=Path(path), target_is_directory=True) def create_topology( seed_devicees: List[str], path_in_inventory: List[str], inv_columns: InventoryColumns, output_directory: str, data_source: str ): devices_to_go = list(set(seed_devicees)) # remove duplicates topology_data = {} number_of_devices = 0 devices_done = [] while devices_to_go: device = devices_to_go[0] if device in HOST_MAP.keys(): try: devices_to_go.remove(device) except ValueError: pass device = HOST_MAP[device] if device in devices_done: continue number_of_devices += 1 topo_data = get_inventory_data(device, path_in_inventory) if topo_data: topology_data.update( create_device_from_inv( host=device, inv_data=topo_data, inv_columns=inv_columns, data_source=data_source, )) devices_list = get_list_of_devices(topology_data[device]['connections']) for entry in devices_list: if entry not in devices_done: devices_to_go.append(entry) devices_to_go = list(set(devices_to_go)) devices_done.append(device) devices_to_go.remove(device) # print(f'Device done: {device}, source: {data_source}') if topology_data: save_topology_data(topology_data, f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}/{output_directory}') print(f'Devices added: {number_of_devices}, source {data_source}') if __name__ == '__main__': SETTINGS = Settings(vars(parse_arguments(CREATE_TOPOLOGY_VERSION))) if SETTINGS.version: print(f'create_topology_data.py version: {CREATE_TOPOLOGY_VERSION}') exit(0) if not SETTINGS.seed_devices: print('make-topology.py: error: the following arguments are required: -s, --seed-devices') print('see make-topology.py -h') exit(1) print(f'Start time: {strftime(SETTINGS.time_format)}') start_time = time_ns() create_topology( seed_devicees=SETTINGS.seed_devices, path_in_inventory=SETTINGS.path_in_inventory, inv_columns=SETTINGS.inventory_columns, output_directory=SETTINGS.output_directory, data_source=SETTINGS.data_source ) print(f'time taken: {(time_ns() - start_time) / 1e9}/s') print(f'End time: {strftime(SETTINGS.time_format)}')