Collection of CheckMK checks (see https://checkmk.com/). All checks and plugins are provided as is. Absolutely no warranty. Send any comments to thl-cmk[at]outlook[dot]com

Skip to content
Snippets Groups Projects
create_topology_data.py 8.11 KiB
Newer Older
thl-cmk's avatar
thl-cmk committed
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2

# Author: thl-cmk[at]outlook[dot]com
# URL   : https://thl-cmk.hopto.org
# Date  : 2023-10-08
# File  : create_topology_data.py

#
# PoC for creating topology_data.json from inventory data
#
# This script creates the topology data file needed for the Checkmk "network_visualization" plugin by
# Andreas Boesl and schnetz. See
# https://forum.checkmk.com/t/network-visualization/41680
# and
# https://exchange.checkmk.com/p/network-visualization
# for more information.
#
# The inventory data could be created with my CDP/LLDP inventory plugins:
#
# CDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_cdp_cache\
# CDP path-in-inventory: 'networking,cdp_cache'
# CDP inventory-columns: 'device_id,local_port,device_port'
#
# LLDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lldp_cache\
# LLDP path-in-inventory: 'networking,lldp_cache'
# LLDP inventory-columns: 'system_name,local_port_num,port_id'
#
# USAGE CDP (is the default):
# create_topology_data.py -s CORE01 -m
#
# USAGE LLDP:
# create_topology_data.py -s CORE01 -p "networking,lldp_cache" -c "system_name,local_port_num,port_id" -m
#

from ast import literal_eval
from pathlib import Path
from json import dumps
from typing import Dict, List, Optional, Any
from time import strftime, time_ns

from create_topology_classes import InventoryColumns, Settings
from create_topology_utils import parse_arguments

CREATE_TOPOLOGY_VERSION = '0.0.2-20231012'

# map inventory host name to CMK host name
HOST_MAP = {
    'inventory-host': 'cmkhost',
    'nexus01': 'NX01',
    'nexus02': 'NX02',
    'nexus03': 'NX03',
}

# drop hosts with invalid names
DROP_HOSTS = [
    'not advertised',
]

ITEMS = {}


def get_inventory_data(host: str, path: List[str]) -> Optional[List[Dict[str, str]]]:
    inventory_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.inventory_path}/{host}')
    if inventory_file.exists():
        data = literal_eval(inventory_file.read_text())
        if data:
            for m in path:
                try:
                    data = data[m]
                except KeyError:
                    return None
            return data
        else:
            print(f'Device: {host}: no inventory data found!')
    else:
        print(f'Device: {host}: not found in inventory path!')


def get_items_from_autochecks(host: str):
    if host and host not in ITEMS.keys():
        ITEMS[host] = []
        autochecks_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.autochecks_path}/{host}.mk')
        if autochecks_file.exists():
            data: List[Dict[str, str]] = literal_eval(autochecks_file.read_text())
            for service in data:
                if service['check_plugin_name'] in ['if64']:
                    ITEMS[host].append(service['item'])
        else:
            print(f'Device: {host}: not found in auto checks path!')
            return None


def check_interface_name(host: str, interface: str) -> str:
    if not host:
        return interface
    items = ITEMS.get(host, [])
    if interface in items:
        return interface
    else:
        if_padding = '/'.join(interface.split('/')[:-1]) + f'/{interface.split("/")[-1]:0>2}'
        for item in items:
            # interface = Gi0/1
            # item = Gi0/1 - Access port
            if item.startswith(interface):
                return item
            elif item.startswith(if_padding):
                return item

        print(f'Device: {host}: interface ({interface}) not found in services')
        return interface


def create_device_from_inv(
        host: str,
        inv_data: List[Dict[str, str]],
        inv_columns: InventoryColumns,
        data_source: str,
) -> Optional[Dict[str, Any]]:
    data = {'connections': {}, "interfaces": []}
    get_items_from_autochecks(host)
    for topo_neighbour in inv_data:
        neighbour = topo_neighbour.get(inv_columns.neighbour).split('.')[0]
        # drop neighbour if inventory neighbour is invalid
        if neighbour in DROP_HOSTS:
            continue
        # rewrite neighbour if inventory neighbour and checkmk host don't match
        if neighbour in HOST_MAP.keys():
            neighbour = HOST_MAP[neighbour]
        # has to be done before checking interfaces
        get_items_from_autochecks(neighbour)
        # getting/checking interfaces
        local_port = check_interface_name(host, topo_neighbour.get(inv_columns.local_port))
        neighbour_port = check_interface_name(neighbour, topo_neighbour.get(inv_columns.neighbour_port))
        if neighbour and local_port and neighbour_port:
            data['connections'].update({local_port: [neighbour, neighbour_port, data_source]})
            if local_port not in data['interfaces']:
                data['interfaces'].append(local_port)
    return {host: data}


def get_list_of_devices(data) -> List[str]:
    devices = []
    for connection in data.values():
        devices.append(connection[0])
    return list(set(devices))


def save_topology_data(data: Dict, path: str):
    save_file = Path(f'{path}/{SETTINGS.topology_file_name}')
    save_file.parent.mkdir(exist_ok=True, parents=True)
    save_file.write_text(dumps(data))
    parent_path = Path(f'{path}').parent
    if SETTINGS.make_default:
        Path(f'{parent_path}/default').unlink(missing_ok=True)
        Path(f'{parent_path}/default').symlink_to(target=Path(path), target_is_directory=True)


def create_topology(
        seed_devicees: List[str],
        path_in_inventory: List[str],
        inv_columns: InventoryColumns,
        output_directory: str,
        data_source: str
):
    devices_to_go = list(set(seed_devicees))  # remove duplicates
    topology_data = {}
    number_of_devices = 0
    devices_done = []

    while devices_to_go:
        device = devices_to_go[0]

        if device in HOST_MAP.keys():
            try:
                devices_to_go.remove(device)
            except ValueError:
                pass
            device = HOST_MAP[device]
            if device in devices_done:
                continue

        number_of_devices += 1
        topo_data = get_inventory_data(device, path_in_inventory)
        if topo_data:
            topology_data.update(
                create_device_from_inv(
                    host=device,
                    inv_data=topo_data,
                    inv_columns=inv_columns,
                    data_source=data_source,
                ))
            devices_list = get_list_of_devices(topology_data[device]['connections'])
            for entry in devices_list:
                if entry not in devices_done:
                    devices_to_go.append(entry)

        devices_to_go = list(set(devices_to_go))
        devices_done.append(device)
        devices_to_go.remove(device)
        # print(f'Device done: {device}, source: {data_source}')

    if topology_data:
        save_topology_data(topology_data, f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}/{output_directory}')

    print(f'Devices added: {number_of_devices}, source {data_source}')


if __name__ == '__main__':
    SETTINGS = Settings(vars(parse_arguments(CREATE_TOPOLOGY_VERSION)))
    if SETTINGS.version:
        print(f'create_topology_data.py version: {CREATE_TOPOLOGY_VERSION}')
        exit(0)
    if not SETTINGS.seed_devices:
        print('make-topology.py: error: the following arguments are required: -s, --seed-devices')
        print('see make-topology.py -h')
        exit(1)

    print(f'Start time: {strftime(SETTINGS.time_format)}')
    start_time = time_ns()

    create_topology(
        seed_devicees=SETTINGS.seed_devices,
        path_in_inventory=SETTINGS.path_in_inventory,
        inv_columns=SETTINGS.inventory_columns,
        output_directory=SETTINGS.output_directory,
        data_source=SETTINGS.data_source
    )
    # print('KLNL LLDP Topology')
    # create_topology_data.py -s CORE01 -p "networking,lldp_cache" -c "system_name,local_port_num,port_id" -d inv_LLDP -o KLNL_LLDP
    # create_topology_data.py -s CORE01 -p "networking,cdp_cache" -c "device_id,local_port,device_port" -d inv_CDP

    print(f'time taken: {(time_ns() - start_time) / 1e9}/s')
    print(f'End time: {strftime(SETTINGS.time_format)}')