#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # License: GNU General Public License v2 # Author: thl-cmk[at]outlook[dot]com # URL : https://thl-cmk.hopto.org # Date : 2023-10-08 # File : create_topology_data.py # 2023-10-10: initial release # 2023-10-16: added options --keep-max and --min-age # 2023-10-17: removed -p, -c, -d (use the long options instead: --path-in-inventory, --data-source, --inventory-columns) # changed option --keep-max to --keep # added option --check-user-data-only # added SEED_DEVICES to user data, option -s/--sed-devices is now optional # refactoring # # PoC for creating topology_data.json from inventory data # # This script creates the topology data file needed for the Checkmk "network_visualization" plugin by # Andreas Boesl and schnetz. See # https://forum.checkmk.com/t/network-visualization/41680 # and # https://exchange.checkmk.com/p/network-visualization # for more information. # # NOTE: the topology_data configuration (layout etc.) is saved under ~/var/check_mk/topology # # The inventory data could be created with my CDP/LLDP inventory plugins: # # CDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_cdp_cache\ # CDP path-in-inventory: 'networking,cdp_cache' # CDP inventory-columns: 'device_id,local_port,device_port' # # LLDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lldp_cache\ # LLDP path-in-inventory: 'networking,lldp_cache' # LLDP inventory-columns: 'system_name,local_port_num,port_id' # # USAGE CDP (is the default): # ~/local/lib/topology_data/create_topology_data.py -s CORE01 -m # # USAGE LLDP: # ~/local/lib/topology_data/create_topology_data.py -s CORE01 -m --lldp # from ast import literal_eval from pathlib import Path from typing import Dict, List, Any, Tuple from time import strftime, time_ns from create_topology_utils import ( parse_arguments, save_data_to_file, remove_old_data, get_data_from_toml, is_mac_address, is_equal_with_default, merge_topologies, ) from create_topology_classes import ( InventoryColumns, Interface, Settings, StaticConnection, ) CREATE_TOPOLOGY_VERSION = '0.0.6-202310117' ITEMS = {} MAC_TABLE: Dict[str, Interface] = {} HOST_MAP: Dict[str, str] = {} DROP_HOSTS: List[str] = [] STATIC_CONNECTIONS: Dict = {} def get_items_from_autochecks(host: str, debug: bool = False): if host and host not in ITEMS.keys(): ITEMS[host] = [] autochecks_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.autochecks_path}/{host}.mk') if autochecks_file.exists(): data: List[Dict[str, str]] = literal_eval(autochecks_file.read_text()) for service in data: if service['check_plugin_name'] in ['if64']: ITEMS[host].append(service['item']) else: if debug: print(f'Device: {host}: not found in auto checks path!') def get_list_of_devices(data) -> List[str]: devices = [] for connection in data.values(): devices.append(connection[0]) return list(set(devices)) def get_service_by_interface(host: str, interface: str, debug: bool = False) -> str | None: # empty host/neighbour should never happen here if not host: return interface items = ITEMS.get(host, []) if interface in items: return interface else: if_padding = '/'.join(interface.split('/')[:-1]) + f'/{interface.split("/")[-1]:0>2}' for item in items: # interface = Gi0/1 # item = Gi0/1 - Access port if item.startswith(interface): return item elif item.startswith(if_padding): return item if debug: print(f'Device: {host}: interface ({interface}) not found in services') return interface def get_if_host_by_mac(mac_address: str) -> Tuple[str, str] | None: try: interface: Interface = MAC_TABLE[mac_address] except KeyError: return host = interface.host if interface.name: if_name = interface.name elif interface.description: if_name = interface.description elif interface.alias: if_name = interface.alias else: if_name = interface.index return host, if_name def add_static_connections(host: str, data: Dict) -> Dict: if host in STATIC_CONNECTIONS.keys(): host_connections = data.get('connections', {}) host_connections.update(STATIC_CONNECTIONS[host].get('connections', {})) host_interfaces = data.get('interfaces', []) for interface in STATIC_CONNECTIONS[host].get('connections', {}).keys(): host_interfaces.append(interface) data.update({'connections': host_connections, 'interfaces': list(set(host_interfaces))}) return data def get_inventory_data(host: str, path: List[str], debug: bool = False, ) -> List[Dict[str, str]] | None: inventory_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.inventory_path}/{host}') data = [] if inventory_file.exists(): data = literal_eval(inventory_file.read_text()) if data: for m in path: try: data = data[m] except KeyError: return [] else: if debug: print(f'Device: {host}: no inventory data found!') else: if debug: print(f'Device: {host}: not found in inventory path!') return data def create_mac_to_interface_table(debug: bool = False): path = Path(f'{SETTINGS.omd_root}/{SETTINGS.inventory_path}/') files = [str(file) for file in list(path.iterdir()) if not str(file).endswith('.gz')] while files: host = files[0].split('/')[-1] files.pop(0) if host.startswith('.'): continue if_table = get_inventory_data(host=host, path=SETTINGS.path_to_if_table, debug=debug) if if_table and if_table != []: if debug: print(f'host: {host}, interfaces: {len(if_table)}') for interface in if_table: # { # 'index': 436756992, # 'description': 'Ethernet2/50', # 'alias': '', # 'speed': 100000000000, # 'phys_address': 'AC:7A:56:C7:37:C8', # 'oper_status': 2, # 'port_type': 6, # 'admin_status': 2, # 'available': True, # 'name': 'Ethernet2/50' # } try: MAC_TABLE[interface['phys_address'].upper()] = Interface( host=host, index=interface.get('index'), name=interface.get('name'), description=interface.get('description'), alias=interface.get('alias'), ) except KeyError: continue def save_topology( data: dict, base_directory: str, output_directory: str, dont_compare: bool, make_default: bool, topology_file_name: str, ): path = f'{base_directory}/{output_directory}' if dont_compare: save_data_to_file( data=data, path=path, file=topology_file_name, make_default=make_default, ) else: if not is_equal_with_default( data=data, file=f'{base_directory}/default/{topology_file_name}' ): save_data_to_file( data=data, path=path, file=topology_file_name, make_default=make_default, ) else: print( 'Topology matches default topology, not saved! Use "--dont-compare" to save identical topologies.' ) def create_device_from_inv( host: str, inv_data: List[Dict[str, str]], inv_columns: InventoryColumns, data_source: str, ) -> Dict[str, Any] | None: data = {'connections': {}, "interfaces": []} get_items_from_autochecks(host) for topo_neighbour in inv_data: neighbour = topo_neighbour.get(inv_columns.neighbour) if not neighbour: continue # try to find neighbour against known MAC addresses (LLDP) if is_mac_address(neighbour): try: neighbour = MAC_TABLE[neighbour].host except KeyError: pass if not SETTINGS.keep_domain: neighbour = neighbour.split('.')[0] if SETTINGS.uppercase: neighbour = neighbour.upper() if SETTINGS.lowercase: neighbour = neighbour.lower() # drop neighbour if inventory neighbour is invalid if neighbour in DROP_HOSTS: continue # rewrite neighbour if inventory neighbour and checkmk host don't match if neighbour in HOST_MAP.keys(): neighbour = HOST_MAP[neighbour] # has to be done before checking interfaces get_items_from_autochecks(neighbour) # getting/checking interfaces local_port = topo_neighbour.get(inv_columns.local_port) if is_mac_address(local_port): try: print(f'host: {host}, local_port: {local_port}, {MAC_TABLE[local_port]}') except KeyError: pass local_port = get_service_by_interface(host, local_port) neighbour_port = topo_neighbour.get(inv_columns.neighbour_port) if is_mac_address(neighbour): try: print(f'neighbour: {neighbour}, neighbour_port: {neighbour_port}, {MAC_TABLE[neighbour_port]}') except KeyError: pass neighbour_port = get_service_by_interface(neighbour, neighbour_port) if neighbour and local_port and neighbour_port: data['connections'].update({local_port: [neighbour, neighbour_port, data_source]}) if local_port not in data['interfaces']: data['interfaces'].append(local_port) # add static connections # data = add_static_connections(host, data) return {host: data} def create_static_connections(connections: List[List[str]], debug: bool = False) -> Dict: data = {} for connection in connections: connection = StaticConnection(*connection) if debug: print(f'connection: {connection}') if connection.host not in data.keys(): data[connection.host] = {'connections': {}, 'interfaces': []} if connection.neighbour not in data.keys(): data[connection.neighbour] = {'connections': {}, 'interfaces': []} # add connection from host to neighbour data[connection.host]['connections'].update({connection.local_port: [ connection.neighbour, connection.neighbour_port, connection.label ]}) data[connection.host]['interfaces'].append(connection.local_port) # add connection from neighbour to host data[connection.neighbour]['connections'].update({ connection.neighbour_port: [connection.host, connection.local_port, connection.label] }) data[connection.neighbour]['interfaces'].append(connection.neighbour_port) if debug: print(data) if data: print(f'Devices added: {len(data)}, source static') return data def create_topology( seed_devices: List[str], path_in_inventory: List[str], inv_columns: InventoryColumns, data_source: str, debug: bool = False, ) -> Dict: devices_to_go = list(set(seed_devices)) # remove duplicates for static_host in STATIC_CONNECTIONS.keys(): devices_to_go.append(static_host) devices_to_go = list(set(devices_to_go)) topology_data = {} devices_done = [] while devices_to_go: device = devices_to_go[0] if device in HOST_MAP.keys(): try: devices_to_go.remove(device) except ValueError: pass device = HOST_MAP[device] if device in devices_done: continue topo_data = get_inventory_data(host=device, path=path_in_inventory, debug=debug) topology_data.update( create_device_from_inv( host=device, inv_data=topo_data, inv_columns=inv_columns, data_source=data_source, )) devices_list = get_list_of_devices(topology_data[device]['connections']) for entry in devices_list: if entry not in devices_done: devices_to_go.append(entry) devices_to_go = list(set(devices_to_go)) devices_done.append(device) devices_to_go.remove(device) if debug: print(f'Device done: {device}, source: {data_source}') print(f'Devices added: {len(devices_done)}, source {data_source}') return topology_data if __name__ == '__main__': SETTINGS = Settings(vars(parse_arguments(CREATE_TOPOLOGY_VERSION))) if SETTINGS.version: print(f'{Path(__file__).name} version: {CREATE_TOPOLOGY_VERSION}') exit(0) if SETTINGS.check_user_data_only: user_data = get_data_from_toml(file=SETTINGS.user_data_file) print(f'Could read/parse the user data from {SETTINGS.user_data_file}') exit(0) print() print(f'Start time: {strftime(SETTINGS.time_format)}') start_time = time_ns() user_data = get_data_from_toml(file=SETTINGS.user_data_file) HOST_MAP = user_data.get('HOST_MAP', {}) DROP_HOSTS = user_data.get('DROP_HOSTS', []) STATIC_CONNECTIONS = create_static_connections(connections=user_data.get('STATIC_CONNECTIONS', [])) # not yet used, need more test data # create_mac_to_interface_table() topology = create_topology( seed_devices=list(set(SETTINGS.seed_devices + user_data.get('SEED_DEVICES', []))), path_in_inventory=SETTINGS.path_in_inventory, inv_columns=SETTINGS.inventory_columns, data_source=SETTINGS.data_source, debug=SETTINGS.debug, ) if topology: # merge topology with static connections topology = merge_topologies(topo_pri=STATIC_CONNECTIONS, topo_sec=topology) save_topology( data=topology, base_directory=f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}', output_directory=SETTINGS.output_directory, topology_file_name=SETTINGS.topology_file_name, dont_compare=SETTINGS.dont_compare, make_default=SETTINGS.make_default, ) if SETTINGS.keep: remove_old_data( keep=SETTINGS.keep, min_age=SETTINGS.min_age, path=Path(f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}'), ) print(f'Time taken: {(time_ns() - start_time) / 1e9}/s') print(f'End time: {strftime(SETTINGS.time_format)}') print()