#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # License: GNU General Public License v2 # Author: thl-cmk[at]outlook[dot]com # URL : https://thl-cmk.hopto.org # Date : 2023-10-08 # File : create_topology_data.py # 2023-10-10: initial release # 2023-10-16: added options --keep-max and --min-age # 2023-10-17: removed -p, -c, -d (use the long options instead: --path-in-inventory, --data-source, --inventory-columns) # changed option --keep-max to --keep # added option --check-user-data-only # added SEED_DEVICES to user data, option -s/--sed-devices is now optional # refactoring # 2023-10-18: fixed --make-default not working # added protected topologies # changed get inventory data from file access to request via live status query # get interfaces from autochecks changed to live status query # 2023-10-19: cleanup removed "all" unused functions # added option -u, --user-data-file to provide a user specific data file # 2023-10-20: changed option -m/--make-default to -d/--default (-m is needed for -m/--merge # added -m/--merge option # # PoC for creating topology_data.json from inventory data # # This script creates the topology data file needed for the Checkmk "network_visualization" plugin by # Andreas Boesl and schnetz. See # https://forum.checkmk.com/t/network-visualization/41680 # and # https://exchange.checkmk.com/p/network-visualization # for more information. # # NOTE: the topology_data configuration (layout etc.) is saved under ~/var/check_mk/topology # # The inventory data could be created with my CDP/LLDP inventory plugins: # # CDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_cdp_cache\ # CDP path-in-inventory: 'networking,cdp_cache' # CDP inventory-columns: 'device_id,local_port,device_port' # # LLDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lldp_cache\ # LLDP path-in-inventory: 'networking,lldp_cache' # LLDP inventory-columns: 'system_name,local_port_num,port_id' # # USAGE CDP (is the default): # ~/local/lib/topology_data/create_topology_data.py -s CORE01 -m # # USAGE LLDP: # ~/local/lib/topology_data/create_topology_data.py -s CORE01 -m --lldp # from ast import literal_eval from pathlib import Path from typing import Dict, List, Any from time import strftime, time_ns from create_topology_utils import ( parse_arguments, remove_old_data, get_data_from_toml, merge_topologies, save_topology, get_data_form_live_status, ExitCodes, ) from create_topology_classes import ( InventoryColumns, Settings, StaticConnection, Topologies, ) CREATE_TOPOLOGY_VERSION = '0.0.8-202310120' ITEMS = {} HOST_MAP: Dict[str, str] = {} DROP_HOSTS: List[str] = [] STATIC_CONNECTIONS: Dict = {} def get_interface_items_from_lq(host: str, debug: bool = False) -> Dict: query = ( 'GET services\n' 'Columns: host_name description check_command\n' 'Filter: description ~ Interface\n' f'Filter: host_name = {host}\n' 'OutputFormat: python3\n' ) data = get_data_form_live_status(query=query) items = {} for host, description, check_command in data: if host: if host not in items.keys(): items[host] = [] items[host].append(description[10:]) # remove 'Interface ' from description interfaces = 0 for host in items.keys(): items[host] = list(set(items[host])) items[host].sort() interfaces += len(items[host]) # print(f'Interfaces found: {interfaces}') return items def get_list_of_devices(data) -> List[str]: devices = [] for connection in data.values(): devices.append(connection[0]) return list(set(devices)) def get_service_by_interface(host: str, interface: str, debug: bool = False) -> str | None: # empty host/neighbour should never happen here if not host: return interface items = ITEMS.get(host, []) if interface in items: return interface else: if_padding = interface if '/' in interface: if_padding = '/'.join(interface.split('/')[:-1]) + f'/{interface.split("/")[-1]:0>2}' for item in items: # interface = Gi0/1 # item = Gi0/1 - Access port # this might not catch the correct interface (fa0 and fa0/0) item = item.split(' ')[0] if item == interface: return item elif item == if_padding: return item if debug: print(f'Device: {host}: interface ({interface}) not found in services') return interface def get_inventory_data_lq(host: str, path: List[str], debug: bool = False, ) -> List[Dict[str, str]]: query = f'GET hosts\nColumns: mk_inventory\nOutputFormat: python3\nFilter: host_name = {host}\n' data = get_data_form_live_status(query=query) if data: try: data = literal_eval(data[0][0].decode('utf-8')) except SyntaxError as e: if debug: print(f'data: |{data}|') print(f'type: {type(data)}') print(f'exception: {e}') return [] for m in path: try: data = data[m] except KeyError: return [] return data return [] def create_device_from_inv( host: str, inv_data: List[Dict[str, str]], inv_columns: InventoryColumns, data_source: str, ) -> Dict[str, Any] | None: data = {'connections': {}, "interfaces": []} if not host in ITEMS.keys(): ITEMS.update(get_interface_items_from_lq(host=host)) for topo_neighbour in inv_data: neighbour = topo_neighbour.get(inv_columns.neighbour) if not neighbour: continue if not SETTINGS.keep_domain: neighbour = neighbour.split('.')[0] if SETTINGS.uppercase: neighbour = neighbour.upper() if SETTINGS.lowercase: neighbour = neighbour.lower() # drop neighbour if inventory neighbour is invalid if neighbour in DROP_HOSTS: continue # rewrite neighbour if inventory neighbour and checkmk host don't match if neighbour in HOST_MAP.keys(): neighbour = HOST_MAP[neighbour] # has to be done before checking interfaces if not neighbour in ITEMS.keys(): ITEMS.update(get_interface_items_from_lq(host=neighbour)) # getting/checking interfaces local_port = topo_neighbour.get(inv_columns.local_port) local_port = get_service_by_interface(host, local_port) neighbour_port = topo_neighbour.get(inv_columns.neighbour_port) neighbour_port = get_service_by_interface(neighbour, neighbour_port) if neighbour and local_port and neighbour_port: data['connections'].update({local_port: [neighbour, neighbour_port, data_source]}) if local_port not in data['interfaces']: data['interfaces'].append(local_port) return {host: data} def create_static_connections(connections: List[List[str]], debug: bool = False) -> Dict: data = {} for connection in connections: connection = StaticConnection(*connection) if debug: print(f'connection: {connection}') if connection.host not in data.keys(): data[connection.host] = {'connections': {}, 'interfaces': []} if connection.neighbour not in data.keys(): data[connection.neighbour] = {'connections': {}, 'interfaces': []} # add connection from host to neighbour data[connection.host]['connections'].update({connection.local_port: [ connection.neighbour, connection.neighbour_port, connection.label ]}) data[connection.host]['interfaces'].append(connection.local_port) # add connection from neighbour to host data[connection.neighbour]['connections'].update({ connection.neighbour_port: [connection.host, connection.local_port, connection.label] }) data[connection.neighbour]['interfaces'].append(connection.neighbour_port) if debug: print(data) if data: print(f'Devices added: {len(data)}, source static') return data def create_topology( seed_devices: List[str], path_in_inventory: List[str], inv_columns: InventoryColumns, data_source: str, debug: bool = False, ) -> Dict: devices_to_go = list(set(seed_devices)) # remove duplicates for static_host in STATIC_CONNECTIONS.keys(): devices_to_go.append(static_host) devices_to_go = list(set(devices_to_go)) topology_data = {} devices_done = [] while devices_to_go: device = devices_to_go[0] if device in HOST_MAP.keys(): try: devices_to_go.remove(device) except ValueError: pass device = HOST_MAP[device] if device in devices_done: continue topo_data = get_inventory_data_lq(host=device, path=path_in_inventory, debug=debug) topology_data.update( create_device_from_inv( host=device, inv_data=topo_data, inv_columns=inv_columns, data_source=data_source, )) devices_list = get_list_of_devices(topology_data[device]['connections']) for entry in devices_list: if entry not in devices_done: devices_to_go.append(entry) devices_to_go = list(set(devices_to_go)) devices_done.append(device) devices_to_go.remove(device) if debug: print(f'Device done: {device}, source: {data_source}') print(f'Devices added: {len(devices_done)}, source {data_source}') return topology_data if __name__ == '__main__': SETTINGS = Settings(vars(parse_arguments(CREATE_TOPOLOGY_VERSION))) if SETTINGS.version: print(f'{Path(__file__).name} version: {CREATE_TOPOLOGY_VERSION}') exit(code=ExitCodes.ok) if SETTINGS.check_user_data_only: user_data = get_data_from_toml(file=SETTINGS.user_data_file) print(f'Could read/parse the user data from {SETTINGS.user_data_file}') exit(code=ExitCodes.OK) if SETTINGS.merge: _merge = list(set(SETTINGS.merge.copy())) if len(_merge) != len(SETTINGS.merge): print(f'-m/--merge options must be unique. Use "-m cdp lldp" oder "-m lldp cdp"') exit(code=ExitCodes.BAD_OPTION_LIST) print() print(f'Start time: {strftime(SETTINGS.time_format)}') start_time = time_ns() user_data = get_data_from_toml(file=SETTINGS.user_data_file) HOST_MAP = user_data.get('HOST_MAP', {}) DROP_HOSTS = user_data.get('DROP_HOSTS', []) final_topology = {} while SETTINGS.merge: SETTINGS.set_topology_param(Topologies[SETTINGS.merge[-1]].value) topology = create_topology( seed_devices=list(set(SETTINGS.seed_devices + user_data.get('SEED_DEVICES', []))), path_in_inventory=SETTINGS.path_in_inventory, inv_columns=SETTINGS.inventory_columns, data_source=SETTINGS.data_source, debug=SETTINGS.debug, ) if topology: final_topology = merge_topologies(topo_pri=topology, topo_sec=final_topology) SETTINGS.merge.remove(SETTINGS.merge[-1]) STATIC_CONNECTIONS = create_static_connections(connections=user_data.get('STATIC_CONNECTIONS', [])) final_topology = merge_topologies(topo_pri=STATIC_CONNECTIONS, topo_sec=final_topology) save_topology( data=final_topology, base_directory=f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}', output_directory=SETTINGS.output_directory, topology_file_name=SETTINGS.topology_file_name, dont_compare=SETTINGS.dont_compare, make_default=SETTINGS.default, ) if SETTINGS.keep: remove_old_data( keep=SETTINGS.keep, min_age=SETTINGS.min_age, path=Path(f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}'), protected=user_data.get('PROTECTED_TOPOLOGIES', []), ) print(f'Time taken: {(time_ns() - start_time) / 1e9}/s') print(f'End time: {strftime(SETTINGS.time_format)}') print()