#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # License: GNU General Public License v2 # Author: thl-cmk[at]outlook[dot]com # URL : https://thl-cmk.hopto.org # Date : 2023-10-08 # File : create_topology_data.py # 2023-10-10: initial release # 2023-10-16: added options --keep-max and --min-age # 2023-10-17: removed -p, -c, -d (use the long options instead: --path-in-inventory, --data-source, --inventory-columns) # changed option --keep-max to --keep # added option --check-user-data-only # added SEED_DEVICES to user data, option -s/--sed-devices is now optional # refactoring # 2023-10-18: fixed --make-default not working # added protected topologies # changed get inventory data from file access to request via live status query # get interfaces from autochecks changed to live status query # 2023-10-19: cleanup removed "all" unused functions # added option -u, --user-data-file to provide a user specific data file # 2023-10-20: changed option -m/--make-default to -d/--default (-m is needed for -m/--merge # added -m/--merge option # 2023-10-27: improved matching of port name from CDP/LLDP data to interface (item) name of service # improved internal data handling (HostCache) # # PoC for creating topology_data.json from inventory data # # This script creates the topology data file needed for the Checkmk "network_visualization" plugin by # Andreas Boesl and schnetz. See # https://forum.checkmk.com/t/network-visualization/41680 # and # https://exchange.checkmk.com/p/network-visualization # for more information. # # NOTE: the topology_data configuration (layout etc.) is saved under ~/var/check_mk/topology # # The inventory data could be created with my CDP/LLDP inventory plugins: # CDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_cdp_cache\ # LLDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lldp_cache\ # # USAGE: # ~/local/lib/topology_data/create_topology_data.py --help # from typing import Dict, List, Any from time import strftime, time_ns from create_topology_utils import ( remove_old_data, get_data_from_toml, merge_topologies, save_topology, LQ_INTERFACES, PATH_INTERFACES, ) from create_topology_args import parse_arguments from create_topology_classes import ( InventoryColumns, Settings, StaticConnection, Topologies, HostCache, CacheSources, ) HOST_MAP: Dict[str, str] = {} DROP_HOSTS: List[str] = [] STATIC_CONNECTIONS: Dict = {} def get_list_of_devices(data) -> List[str]: devices = [] for connection in data.values(): devices.append(connection[0]) return list(set(devices)) def get_service_by_interface(host: str, interface: str, debug: bool = False) -> str: # try to find the item for an interface def _match_entry_with_item(_entry: Dict[str, str]): values = [_entry.get('name'), _entry.get('description'), entry.get('alias')] for value in values: if value in items: return value index = str(_entry.get('index')) # for index try with padding for i in range(1, 6): index_padded = f'{index:0>{i}}' if index_padded in items: return index_padded # still not found try values + index for value in values: if f'{value} {index_padded}' in items: return f'{value} {index_padded}' return interface # empty host/neighbour should never happen here if not host: return interface # get list of interface items items = HOST_CACHE.get_data(host=host, source=CacheSources.lq, path=LQ_INTERFACES) # the easy case if interface in items: return interface else: # try to find the interface in the host interface inventory list inventory = HOST_CACHE.get_data(host=host, source=CacheSources.inventory, path=PATH_INTERFACES) if inventory: for entry in inventory: if interface in [ entry.get('name'), entry.get('description'), entry.get('alias'), str(entry.get('index')), entry.get('phys_address'), ]: return _match_entry_with_item(entry) if debug: print(f'Device: {host}: service for interface {interface} not found') return interface def create_device_from_inv( host: str, inv_data: List[Dict[str, str]], inv_columns: InventoryColumns, data_source: str, debug: bool = False, ) -> Dict[str, Any] | None: data = {'connections': {}, "interfaces": []} for topo_neighbour in inv_data: neighbour = topo_neighbour.get(inv_columns.neighbour) if not neighbour: continue if not SETTINGS.keep_domain: neighbour = neighbour.split('.')[0] if SETTINGS.uppercase: neighbour = neighbour.upper() if SETTINGS.lowercase: neighbour = neighbour.lower() # drop neighbour if inventory neighbour is invalid if neighbour in DROP_HOSTS: continue # rewrite neighbour if inventory neighbour and checkmk host don't match if neighbour in HOST_MAP.keys(): neighbour = HOST_MAP[neighbour] # getting/checking interfaces local_port = topo_neighbour.get(inv_columns.local_port) _local_port = local_port local_port = get_service_by_interface(host, local_port, debug=debug) if debug and local_port != _local_port: print(f'host: {host}, local_port: {local_port}, _local_port: {_local_port}') neighbour_port = topo_neighbour.get(inv_columns.neighbour_port) _neighbour_port = neighbour_port neighbour_port = get_service_by_interface(neighbour, neighbour_port, debug=debug) if debug and neighbour_port != _neighbour_port: print(f'neighbour: {neighbour}, neighbour_port {neighbour_port}, _neighbour_port {_neighbour_port}') if neighbour and local_port and neighbour_port: data['connections'].update({local_port: [neighbour, neighbour_port, data_source]}) if local_port not in data['interfaces']: data['interfaces'].append(local_port) return {host: data} def create_static_connections(connections: List[List[str]], debug: bool = False) -> Dict: data = {} for connection in connections: connection = StaticConnection(*connection) if debug: print(f'connection: {connection}') if connection.host not in data.keys(): data[connection.host] = {'connections': {}, 'interfaces': []} if connection.neighbour not in data.keys(): data[connection.neighbour] = {'connections': {}, 'interfaces': []} # add connection from host to neighbour data[connection.host]['connections'].update({connection.local_port: [ connection.neighbour, connection.neighbour_port, connection.label ]}) data[connection.host]['interfaces'].append(connection.local_port) # add connection from neighbour to host data[connection.neighbour]['connections'].update({ connection.neighbour_port: [connection.host, connection.local_port, connection.label] }) data[connection.neighbour]['interfaces'].append(connection.neighbour_port) if debug: print(data) if data: print(f'Devices added: {len(data)}, source static') return data def create_topology( seed_devices: List[str], path_in_inventory: List[str], inv_columns: InventoryColumns, data_source: str, debug: bool = False, ) -> Dict: devices_to_go = list(set(seed_devices)) # remove duplicates for static_host in STATIC_CONNECTIONS.keys(): devices_to_go.append(static_host) devices_to_go = list(set(devices_to_go)) topology_data = {} devices_done = [] while devices_to_go: device = devices_to_go[0] if device in HOST_MAP.keys(): try: devices_to_go.remove(device) except ValueError: pass device = HOST_MAP[device] if device in devices_done: continue # topo_data = get_inventory_data_lq(host=device, path=path_in_inventory, debug=debug) topo_data = HOST_CACHE.get_data(host=device, source=CacheSources.inventory, path=path_in_inventory) if topo_data: topology_data.update( create_device_from_inv( host=device, inv_data=topo_data, inv_columns=inv_columns, data_source=data_source, )) devices_list = get_list_of_devices(topology_data[device]['connections']) for entry in devices_list: if entry not in devices_done: devices_to_go.append(entry) devices_to_go = list(set(devices_to_go)) devices_done.append(device) devices_to_go.remove(device) if debug: print(f'Device done: {device}, source: {data_source}') print(f'Devices added: {len(devices_done)}, source {data_source}') return topology_data if __name__ == '__main__': start_time = time_ns() SETTINGS = Settings(vars(parse_arguments())) HOST_CACHE = HostCache() print() print(f'Start time: {strftime(SETTINGS.time_format)}') user_data = get_data_from_toml(file=SETTINGS.user_data_file) HOST_MAP = user_data.get('HOST_MAP', {}) DROP_HOSTS = user_data.get('DROP_HOSTS', []) final_topology = {} while SETTINGS.merge: SETTINGS.set_topology_param(Topologies[SETTINGS.merge[-1]].value) topology = create_topology( seed_devices=list(set(SETTINGS.seed_devices + user_data.get('SEED_DEVICES', []))), path_in_inventory=SETTINGS.path_in_inventory, inv_columns=SETTINGS.inventory_columns, data_source=SETTINGS.data_source, debug=SETTINGS.debug, ) if topology: final_topology = merge_topologies(topo_pri=topology, topo_sec=final_topology) SETTINGS.merge.remove(SETTINGS.merge[-1]) STATIC_CONNECTIONS = create_static_connections(connections=user_data.get('STATIC_CONNECTIONS', [])) final_topology = merge_topologies(topo_pri=STATIC_CONNECTIONS, topo_sec=final_topology) save_topology( data=final_topology, base_directory=f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}', output_directory=SETTINGS.output_directory, topology_file_name=SETTINGS.topology_file_name, dont_compare=SETTINGS.dont_compare, make_default=SETTINGS.default, ) if SETTINGS.keep: remove_old_data( keep=SETTINGS.keep, min_age=SETTINGS.min_age, path=f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}', protected=user_data.get('PROTECTED_TOPOLOGIES', []), ) print(f'Time taken: {(time_ns() - start_time) / 1e9}/s') print(f'End time: {strftime(SETTINGS.time_format)}') print()