#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # License: GNU General Public License v2 # Author: thl-cmk[at]outlook[dot]com # URL : https://thl-cmk.hopto.org # Date : 2023-10-08 # File : create_topology_data.py # # PoC for creating topology_data.json from inventory data # # This script creates the topology data file needed for the Checkmk "network_visualization" plugin by # Andreas Boesl and schnetz. See # https://forum.checkmk.com/t/network-visualization/41680 # and # https://exchange.checkmk.com/p/network-visualization # for more information. # # NOTE: the topology_data configuration (layout etc.) is saved under ~/var/check_mk/topology # # The inventory data could be created with my CDP/LLDP inventory plugins: # # CDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_cdp_cache\ # CDP path-in-inventory: 'networking,cdp_cache' # CDP inventory-columns: 'device_id,local_port,device_port' # # LLDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lldp_cache\ # LLDP path-in-inventory: 'networking,lldp_cache' # LLDP inventory-columns: 'system_name,local_port_num,port_id' # # USAGE CDP (is the default): # ~/local/lib/topology_data/create_topology_data.py -s CORE01 -m # # USAGE LLDP: # ~/local/lib/topology_data/create_topology_data.py -s CORE01 -p "networking,lldp_cache" \ # -c "system_name,local_port_num,port_id" -m # from re import match as re_match from ast import literal_eval from pathlib import Path from json import dumps from typing import Dict, List, Optional, Any, Tuple from time import strftime, time_ns from tomllib import loads as toml_loads from tomllib import TOMLDecodeError from create_topology_utils import parse_arguments from create_topology_classes import InventoryColumns, Interface, Settings, StaticConnection CREATE_TOPOLOGY_VERSION = '0.0.4-202310116' ITEMS = {} MAC_TABLE: Dict[str, Interface] = {} HOST_MAP = {} DROP_HOSTS: List[str] = [] STATIC_CONNECTIONS: Dict = {} def is_mac_address(mac_address: str) -> bool: re_mac_pattern = '([0-9A-Z]{2}\\:){5}[0-9A-Z]{2}' if re_match(re_mac_pattern, mac_address): if SETTINGS.debug: print(f'mac: {mac_address}, match') return True else: if SETTINGS.debug: print(f'mac: {mac_address}, no match') return False def get_inventory_data(host: str, path: List[str]) -> Optional[List[Dict[str, str]]]: inventory_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.inventory_path}/{host}') data = [] if inventory_file.exists(): data = literal_eval(inventory_file.read_text()) if data: for m in path: try: data = data[m] except KeyError: return [] else: if SETTINGS.debug: print(f'Device: {host}: no inventory data found!') else: if SETTINGS.debug: print(f'Device: {host}: not found in inventory path!') return data def get_items_from_autochecks(host: str): if host and host not in ITEMS.keys(): ITEMS[host] = [] autochecks_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.autochecks_path}/{host}.mk') if autochecks_file.exists(): data: List[Dict[str, str]] = literal_eval(autochecks_file.read_text()) for service in data: if service['check_plugin_name'] in ['if64']: ITEMS[host].append(service['item']) else: if SETTINGS.debug: print(f'Device: {host}: not found in auto checks path!') def create_mac_to_interface(): path = Path(f'{SETTINGS.omd_root}/{SETTINGS.inventory_path}/') files = [str(file) for file in list(path.iterdir()) if not str(file).endswith('.gz')] while files: host = files[0].split('/')[-1] files.pop(0) if host.startswith('.'): continue if_table = get_inventory_data(host, SETTINGS.path_to_if_table) if if_table and if_table != []: if SETTINGS.debug: print(f'host: {host}, interfaces: {len(if_table)}') for interface in if_table: # { # 'index': 436756992, # 'description': 'Ethernet2/50', # 'alias': '', # 'speed': 100000000000, # 'phys_address': 'AC:7A:56:C7:37:C8', # 'oper_status': 2, # 'port_type': 6, # 'admin_status': 2, # 'available': True, # 'name': 'Ethernet2/50' # } try: MAC_TABLE[interface['phys_address'].upper()] = Interface( host=host, index=interface.get('index'), name=interface.get('name'), description=interface.get('description'), alias=interface.get('alias'), ) except KeyError: continue def get_service_by_interface(host: str, interface: str) -> Optional[str]: # empty host/neighbour should never happen here if not host: return interface items = ITEMS.get(host, []) if interface in items: return interface else: if_padding = '/'.join(interface.split('/')[:-1]) + f'/{interface.split("/")[-1]:0>2}' for item in items: # interface = Gi0/1 # item = Gi0/1 - Access port if item.startswith(interface): return item elif item.startswith(if_padding): return item if SETTINGS.debug: print(f'Device: {host}: interface ({interface}) not found in services') return interface def get_if_host_by_mac(mac_address: str) -> Optional[Tuple[str, str]]: try: interface: Interface = MAC_TABLE[mac_address] except KeyError: return host = interface.host if interface.name: if_name = interface.name elif interface.description: if_name = interface.description elif interface.alias: if_name = interface.alias else: if_name = interface.index return host, if_name def add_static_connections(host: str, data: Dict) -> Dict: if host in STATIC_CONNECTIONS.keys(): host_connections = data.get('connections', {}) host_connections.update(STATIC_CONNECTIONS[host].get('connections', {})) host_interfaces = data.get('interfaces', []) for interface in STATIC_CONNECTIONS[host].get('connections', {}).keys(): host_interfaces.append(interface) data.update({'connections': host_connections, 'interfaces': list(set(host_interfaces))}) return data def create_device_from_inv( host: str, inv_data: List[Dict[str, str]], inv_columns: InventoryColumns, data_source: str, ) -> Optional[Dict[str, Any]]: data = {'connections': {}, "interfaces": []} get_items_from_autochecks(host) for topo_neighbour in inv_data: neighbour = topo_neighbour.get(inv_columns.neighbour) if not neighbour: continue # try to find neighbour against known MAC addresses (LLDP) if is_mac_address(neighbour): try: neighbour = MAC_TABLE[neighbour].host except KeyError: pass if not SETTINGS.keep_domain: neighbour = neighbour.split('.')[0] if SETTINGS.uppercase: neighbour = neighbour.upper() if SETTINGS.lowercase: neighbour = neighbour.lower() # drop neighbour if inventory neighbour is invalid if neighbour in DROP_HOSTS: continue # rewrite neighbour if inventory neighbour and checkmk host don't match if neighbour in HOST_MAP.keys(): neighbour = HOST_MAP[neighbour] # has to be done before checking interfaces get_items_from_autochecks(neighbour) # getting/checking interfaces local_port = topo_neighbour.get(inv_columns.local_port) if is_mac_address(local_port): try: print(f'host: {host}, local_port: {local_port}, {MAC_TABLE[local_port]}') except KeyError: pass local_port = get_service_by_interface(host, local_port) neighbour_port = topo_neighbour.get(inv_columns.neighbour_port) if is_mac_address(neighbour): try: print(f'neighbour: {neighbour}, neighbour_port: {neighbour_port}, {MAC_TABLE[neighbour_port]}') except KeyError: pass neighbour_port = get_service_by_interface(neighbour, neighbour_port) if neighbour and local_port and neighbour_port: data['connections'].update({local_port: [neighbour, neighbour_port, data_source]}) if local_port not in data['interfaces']: data['interfaces'].append(local_port) # add static connections data = add_static_connections(host, data) return {host: data} def get_list_of_devices(data) -> List[str]: devices = [] for connection in data.values(): devices.append(connection[0]) return list(set(devices)) def save_topology_data(data: Dict, path: str): save_file = Path(f'{path}/{SETTINGS.topology_file_name}') save_file.parent.mkdir(exist_ok=True, parents=True) save_file.write_text(dumps(data)) parent_path = Path(f'{path}').parent if SETTINGS.make_default: Path(f'{parent_path}/default').unlink(missing_ok=True) Path(f'{parent_path}/default').symlink_to(target=Path(path), target_is_directory=True) def create_topology( seed_devicees: List[str], path_in_inventory: List[str], inv_columns: InventoryColumns, output_directory: str, data_source: str ): devices_to_go = list(set(seed_devicees)) # remove duplicates for static_host in STATIC_CONNECTIONS.keys(): devices_to_go.append(static_host) devices_to_go = list(set(devices_to_go)) topology_data = {} devices_done = [] while devices_to_go: device = devices_to_go[0] if device in HOST_MAP.keys(): try: devices_to_go.remove(device) except ValueError: pass device = HOST_MAP[device] if device in devices_done: continue topo_data = get_inventory_data(device, path_in_inventory) topology_data.update( create_device_from_inv( host=device, inv_data=topo_data, inv_columns=inv_columns, data_source=data_source, )) devices_list = get_list_of_devices(topology_data[device]['connections']) for entry in devices_list: if entry not in devices_done: devices_to_go.append(entry) devices_to_go = list(set(devices_to_go)) devices_done.append(device) devices_to_go.remove(device) if SETTINGS.debug: print(f'Device done: {device}, source: {data_source}') if topology_data: save_topology_data(topology_data, f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}/{output_directory}') print(f'Devices added: {len(devices_done)}, source {data_source}') def get_user_data_from_toml() -> Dict: data = {} toml_file = Path(SETTINGS.user_data_file) if toml_file.exists(): try: data = toml_loads(toml_file.read_text()) except TOMLDecodeError as e: print( f'ERROR: User data file {toml_file} is not in valid TOML format! ({e}), (see https://toml.io/en/)') exit(2) if SETTINGS.debug: print(f'data from TOML: {data}') return data def create_static_connections_from_toml(connections: List[List[str]]) -> Dict: data = {} for connection in connections: connection = StaticConnection(*connection) if SETTINGS.debug: print(f'connection: {connection}') if connection.host not in data.keys(): data[connection.host] = {'connections': {}, 'interfaces': []} if connection.neighbour not in data.keys(): data[connection.neighbour] = {'connections': {}, 'interfaces': []} # add connection host to neighbour data[connection.host]['connections'].update({connection.local_port: [ connection.neighbour, connection.neighbour_port, connection.label ]}) data[connection.host]['interfaces'].append(connection.local_port) # add connection from neighbour to host data[connection.neighbour]['connections'].update({ connection.neighbour_port: [connection.host, connection.local_port, connection.label] }) data[connection.neighbour]['interfaces'].append(connection.neighbour_port) if SETTINGS.debug: print(data) return data if __name__ == '__main__': SETTINGS = Settings(vars(parse_arguments(CREATE_TOPOLOGY_VERSION))) if SETTINGS.version: print(f'create_topology_data.py version: {CREATE_TOPOLOGY_VERSION}') exit(0) if not SETTINGS.seed_devices: print('make-topology.py: error: the following arguments are required: -s, --seed-devices') print('see make-topology.py -h') exit(1) print(f'Start time: {strftime(SETTINGS.time_format)}') start_time = time_ns() user_data = get_user_data_from_toml() HOST_MAP = user_data.get('HOST_MAP', {}) DROP_HOSTS = user_data.get('DROP_HOSTS', []) STATIC_CONNECTIONS = create_static_connections_from_toml(user_data.get('STATIC_CONNECTIONS', [])) # not yet used, need more test data # create_mac_to_interface() create_topology( seed_devicees=SETTINGS.seed_devices, path_in_inventory=SETTINGS.path_in_inventory, inv_columns=SETTINGS.inventory_columns, output_directory=SETTINGS.output_directory, data_source=SETTINGS.data_source ) print(f'time taken: {(time_ns() - start_time) / 1e9}/s') print(f'End time: {strftime(SETTINGS.time_format)}')