Collection of CheckMK checks (see https://checkmk.com/). All checks and plugins are provided as is. Absolutely no warranty. Send any comments to thl-cmk[at]outlook[dot]com

Skip to content
Snippets Groups Projects
create_topology_data.py 12.2 KiB
Newer Older
thl-cmk's avatar
thl-cmk committed
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2

# Author: thl-cmk[at]outlook[dot]com
# URL   : https://thl-cmk.hopto.org
# Date  : 2023-10-08
# File  : create_topology_data.py

thl-cmk's avatar
thl-cmk committed
# 2023-10-10: initial release
# 2023-10-16: added options  --keep-max  and --min-age
# 2023-10-17: removed -p, -c, -d (use the long options instead: --path-in-inventory, --data-source, --inventory-columns)
#             changed option --keep-max to --keep
#             added option --check-user-data-only
#             added SEED_DEVICES to user data, option -s/--sed-devices is now optional
#             refactoring
thl-cmk's avatar
thl-cmk committed
# 2023-10-18: fixed --make-default not working
thl-cmk's avatar
thl-cmk committed
#             added protected topologies
thl-cmk's avatar
thl-cmk committed
#             changed get inventory data from file access to request via live status query
#             get interfaces from autochecks changed to live status query
# 2023-10-19: cleanup removed "all" unused functions
#             added option -u, --user-data-file to provide a user specific data file
thl-cmk's avatar
thl-cmk committed
# 2023-10-20: changed option -m/--make-default to -d/--default (-m is needed for -m/--merge
#             added -m/--merge option
thl-cmk's avatar
thl-cmk committed
# 2023-10-27: improved matching of port name from CDP/LLDP data to interface (item) name of service
#             improved internal data handling (HostCache)
thl-cmk's avatar
thl-cmk committed
# 2023-10-28: reworked handling of layers (see option -l/--layers)
#             added option -l/-layers {CDP,CUSTOM,LLDP,STATIC} [{CDP,CUSTOM,LLDP,STATIC} ...]
#             added section CUSTOM_LAYERS to user data file
#             removed option -m/--merge, included in option -l/--layers
#             removed option --lldp, included in option -l/--layers
#             removed option --data-source, now handled in CUSTOM_LAYERS
#             removed option --inventory-columns, now handled in CUSTOM_LAYERS
#             removed option --path-in-inventory, now handled in CUSTOM_LAYERS
thl-cmk's avatar
thl-cmk committed
#             removed lower limits for --keep and --min-age
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
#
# PoC for creating topology_data.json from inventory data
#
# This script creates the topology data file needed for the Checkmk "network_visualization" plugin by
thl-cmk's avatar
thl-cmk committed
# Andreas Boesl and schnetz. For more information see
thl-cmk's avatar
thl-cmk committed
# https://forum.checkmk.com/t/network-visualization/41680
# https://exchange.checkmk.com/p/network-visualization
thl-cmk's avatar
thl-cmk committed
#
# NOTE: the topology_data configuration (layout etc.) is saved under ~/var/check_mk/topology
thl-cmk's avatar
thl-cmk committed
#
# The inventory data could be created with my CDP/LLDP inventory plugins:
# CDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_cdp_cache\
# LLDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lldp_cache\
#
thl-cmk's avatar
thl-cmk committed
# USAGE:
# ~/local/lib/topology_data/create_topology_data.py --help
thl-cmk's avatar
thl-cmk committed
#

thl-cmk's avatar
thl-cmk committed
from typing import Dict, List, Any
thl-cmk's avatar
thl-cmk committed
from time import strftime, time_ns

thl-cmk's avatar
thl-cmk committed
from create_topology_utils import (
    remove_old_data,
    get_data_from_toml,
    merge_topologies,
thl-cmk's avatar
thl-cmk committed
    save_topology,
thl-cmk's avatar
thl-cmk committed
    LQ_INTERFACES,
    PATH_INTERFACES,
thl-cmk's avatar
thl-cmk committed
    LAYERS,
thl-cmk's avatar
thl-cmk committed
)
thl-cmk's avatar
thl-cmk committed
from create_topology_args import parse_arguments
thl-cmk's avatar
thl-cmk committed
from create_topology_classes import (
    InventoryColumns,
    Settings,
    StaticConnection,
thl-cmk's avatar
thl-cmk committed
    HostCache,
    CacheSources,
thl-cmk's avatar
thl-cmk committed
)
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
HOST_MAP: Dict[str, str] = {}
thl-cmk's avatar
thl-cmk committed
DROP_HOSTS: List[str] = []
STATIC_CONNECTIONS: Dict = {}


thl-cmk's avatar
thl-cmk committed
def get_list_of_devices(data) -> List[str]:
    devices = []
    for connection in data.values():
        devices.append(connection[0])
    return list(set(devices))
thl-cmk's avatar
thl-cmk committed


thl-cmk's avatar
thl-cmk committed
def get_service_by_interface(host: str, interface: str, debug: bool = False) -> str:
    # try to find the item for an interface
    def _match_entry_with_item(_entry: Dict[str, str]):
        values = [_entry.get('name'), _entry.get('description'), entry.get('alias')]
        for value in values:
            if value in items:
                return value

        index = str(_entry.get('index'))
        # for index try with padding
        for i in range(1, 6):
            index_padded = f'{index:0>{i}}'
            if index_padded in items:
                return index_padded
            # still not found try values + index
            for value in values:
                if f'{value} {index_padded}' in items:
                    return f'{value} {index_padded}'

        return interface

thl-cmk's avatar
thl-cmk committed
    # empty host/neighbour should never happen here
thl-cmk's avatar
thl-cmk committed
    if not host:
        return interface
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    # get list of interface items
    items = HOST_CACHE.get_data(host=host, source=CacheSources.lq, path=LQ_INTERFACES)

    # the easy case
thl-cmk's avatar
thl-cmk committed
    if interface in items:
        return interface
    else:
thl-cmk's avatar
thl-cmk committed
        # try to find the interface in the host interface inventory list
        inventory = HOST_CACHE.get_data(host=host, source=CacheSources.inventory, path=PATH_INTERFACES)
        if inventory:
            for entry in inventory:
                if interface in [
                    entry.get('name'),
                    entry.get('description'),
                    entry.get('alias'),
                    str(entry.get('index')),
                    entry.get('phys_address'),
                ]:
                    return _match_entry_with_item(entry)
thl-cmk's avatar
thl-cmk committed
        if debug:
thl-cmk's avatar
thl-cmk committed
            print(f'Device: {host}: service for interface {interface} not found')
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
        return interface
thl-cmk's avatar
thl-cmk committed


thl-cmk's avatar
thl-cmk committed
def create_device_from_inv(
        host: str,
        inv_data: List[Dict[str, str]],
        inv_columns: InventoryColumns,
thl-cmk's avatar
thl-cmk committed
        label: str,
thl-cmk's avatar
thl-cmk committed
        debug: bool = False,
thl-cmk's avatar
thl-cmk committed
) -> Dict[str, Any] | None:
thl-cmk's avatar
thl-cmk committed
    data = {'connections': {}, "interfaces": []}
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    for topo_neighbour in inv_data:
thl-cmk's avatar
thl-cmk committed
        neighbour = topo_neighbour.get(inv_columns.neighbour)
thl-cmk's avatar
thl-cmk committed
        if not neighbour:
            continue

thl-cmk's avatar
thl-cmk committed
        if not SETTINGS.keep_domain:
            neighbour = neighbour.split('.')[0]
        if SETTINGS.uppercase:
            neighbour = neighbour.upper()
        if SETTINGS.lowercase:
            neighbour = neighbour.lower()
thl-cmk's avatar
thl-cmk committed
        # drop neighbour if inventory neighbour is invalid
        if neighbour in DROP_HOSTS:
            continue
        # rewrite neighbour if inventory neighbour and checkmk host don't match
        if neighbour in HOST_MAP.keys():
            neighbour = HOST_MAP[neighbour]
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
        # getting/checking interfaces
thl-cmk's avatar
thl-cmk committed
        local_port = topo_neighbour.get(inv_columns.local_port)
thl-cmk's avatar
thl-cmk committed
        _local_port = local_port
        local_port = get_service_by_interface(host, local_port, debug=debug)
        if debug and local_port != _local_port:
            print(f'host: {host}, local_port: {local_port}, _local_port: {_local_port}')
thl-cmk's avatar
thl-cmk committed

        neighbour_port = topo_neighbour.get(inv_columns.neighbour_port)
thl-cmk's avatar
thl-cmk committed
        _neighbour_port = neighbour_port
        neighbour_port = get_service_by_interface(neighbour, neighbour_port, debug=debug)
        if debug and neighbour_port != _neighbour_port:
            print(f'neighbour: {neighbour}, neighbour_port {neighbour_port}, _neighbour_port {_neighbour_port}')
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
        if neighbour and local_port and neighbour_port:
thl-cmk's avatar
thl-cmk committed
            data['connections'].update({local_port: [neighbour, neighbour_port, label]})
thl-cmk's avatar
thl-cmk committed
            if local_port not in data['interfaces']:
                data['interfaces'].append(local_port)
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    return {host: data}


thl-cmk's avatar
thl-cmk committed
def create_static_connections(connections: List[List[str]], debug: bool = False) -> Dict:
thl-cmk's avatar
thl-cmk committed
    data = {}
    for connection in connections:
        connection = StaticConnection(*connection)
thl-cmk's avatar
thl-cmk committed
        if debug:
thl-cmk's avatar
thl-cmk committed
            print(f'connection: {connection}')
        if connection.host not in data.keys():
            data[connection.host] = {'connections': {}, 'interfaces': []}
        if connection.neighbour not in data.keys():
            data[connection.neighbour] = {'connections': {}, 'interfaces': []}
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
        # add connection from host to neighbour
        data[connection.host]['connections'].update({connection.local_port: [
            connection.neighbour, connection.neighbour_port, connection.label
        ]})
        data[connection.host]['interfaces'].append(connection.local_port)
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
        # add connection from neighbour to host
        data[connection.neighbour]['connections'].update({
            connection.neighbour_port: [connection.host, connection.local_port, connection.label]
        })
        data[connection.neighbour]['interfaces'].append(connection.neighbour_port)
thl-cmk's avatar
thl-cmk committed
    if debug:
thl-cmk's avatar
thl-cmk committed
        print(data)

thl-cmk's avatar
thl-cmk committed
    if data:
        print(f'Devices added: {len(data)}, source static')

thl-cmk's avatar
thl-cmk committed
    return data
thl-cmk's avatar
thl-cmk committed


def create_topology(
thl-cmk's avatar
thl-cmk committed
        seed_devices: List[str],
thl-cmk's avatar
thl-cmk committed
        path_in_inventory: List[str],
        inv_columns: InventoryColumns,
thl-cmk's avatar
thl-cmk committed
        label: str,
thl-cmk's avatar
thl-cmk committed
        debug: bool = False,
) -> Dict:
    devices_to_go = list(set(seed_devices))  # remove duplicates
thl-cmk's avatar
thl-cmk committed
    for static_host in STATIC_CONNECTIONS.keys():
        devices_to_go.append(static_host)
    devices_to_go = list(set(devices_to_go))
thl-cmk's avatar
thl-cmk committed
    topology_data = {}
    devices_done = []

    while devices_to_go:
        device = devices_to_go[0]

        if device in HOST_MAP.keys():
            try:
                devices_to_go.remove(device)
            except ValueError:
                pass
            device = HOST_MAP[device]
            if device in devices_done:
                continue

thl-cmk's avatar
thl-cmk committed
        # topo_data = get_inventory_data_lq(host=device, path=path_in_inventory, debug=debug)
        topo_data = HOST_CACHE.get_data(host=device, source=CacheSources.inventory, path=path_in_inventory)
        if topo_data:
            topology_data.update(
                create_device_from_inv(
                    host=device,
                    inv_data=topo_data,
                    inv_columns=inv_columns,
thl-cmk's avatar
thl-cmk committed
                    label=label,
thl-cmk's avatar
thl-cmk committed
                ))
            devices_list = get_list_of_devices(topology_data[device]['connections'])
            for entry in devices_list:
                if entry not in devices_done:
                    devices_to_go.append(entry)
thl-cmk's avatar
thl-cmk committed

        devices_to_go = list(set(devices_to_go))
        devices_done.append(device)
        devices_to_go.remove(device)
thl-cmk's avatar
thl-cmk committed
        if debug:
thl-cmk's avatar
thl-cmk committed
            print(f'Device done: {device}, source: {label}')
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    print(f'Devices added: {len(devices_done)}, source {label}')
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    return topology_data

thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
if __name__ == '__main__':
thl-cmk's avatar
thl-cmk committed
    start_time = time_ns()
    SETTINGS = Settings(vars(parse_arguments()))
    HOST_CACHE = HostCache()
thl-cmk's avatar
thl-cmk committed

    print()
thl-cmk's avatar
thl-cmk committed
    print(f'Start time: {strftime(SETTINGS.time_format)}')

thl-cmk's avatar
thl-cmk committed
    user_data = get_data_from_toml(file=SETTINGS.user_data_file)
thl-cmk's avatar
thl-cmk committed
    HOST_MAP = user_data.get('HOST_MAP', {})
    DROP_HOSTS = user_data.get('DROP_HOSTS', [])
thl-cmk's avatar
thl-cmk committed
    CUSTOM_LAYERS = user_data.get('CUSTOM_LAYERS', [])
    jobs = []
thl-cmk's avatar
thl-cmk committed
    final_topology = {}
thl-cmk's avatar
thl-cmk committed

    for layer in SETTINGS.layers:
        if layer == 'STATIC':
            jobs.append('STATIC')
        elif layer in LAYERS.keys():
            jobs.append(LAYERS[layer])
thl-cmk's avatar
thl-cmk committed
            HOST_CACHE.add_inventory_prefetch_path(LAYERS[layer]['path'])
thl-cmk's avatar
thl-cmk committed
        elif layer == 'CUSTOM':
            for entry in CUSTOM_LAYERS:
                jobs.append(entry)
thl-cmk's avatar
thl-cmk committed
                HOST_CACHE.add_inventory_prefetch_path(entry['path'])
thl-cmk's avatar
thl-cmk committed

    for job in jobs:
        topology = {}
        if job == 'STATIC':
            topology = create_static_connections(connections=user_data.get('STATIC_CONNECTIONS', []))
        else:
thl-cmk's avatar
thl-cmk committed
            columns = job['columns'].split(',')
thl-cmk's avatar
thl-cmk committed
            topology = create_topology(
                seed_devices=list(set(SETTINGS.seed_devices + user_data.get('SEED_DEVICES', []))),
thl-cmk's avatar
thl-cmk committed
                path_in_inventory=job['path'],
                inv_columns=InventoryColumns(
                    neighbour=columns[0],
                    local_port=columns[1],
                    neighbour_port=columns[2]
                ),
                label=job['label'],
thl-cmk's avatar
thl-cmk committed
                debug=SETTINGS.debug,
            )
thl-cmk's avatar
thl-cmk committed
        if topology:
            final_topology = merge_topologies(topo_pri=topology, topo_sec=final_topology)
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    save_topology(
        data=final_topology,
        base_directory=f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}',
        output_directory=SETTINGS.output_directory,
        topology_file_name=SETTINGS.topology_file_name,
        dont_compare=SETTINGS.dont_compare,
        make_default=SETTINGS.default,
    )
thl-cmk's avatar
thl-cmk committed

    if SETTINGS.keep:
        remove_old_data(
            keep=SETTINGS.keep,
thl-cmk's avatar
thl-cmk committed
            min_age=SETTINGS.min_age,
thl-cmk's avatar
thl-cmk committed
            path=f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}',
thl-cmk's avatar
thl-cmk committed
            protected=user_data.get('PROTECTED_TOPOLOGIES', []),
thl-cmk's avatar
thl-cmk committed
        )

thl-cmk's avatar
thl-cmk committed
    print(f'Time taken: {(time_ns() - start_time) / 1e9}/s')
thl-cmk's avatar
thl-cmk committed
    print(f'End time: {strftime(SETTINGS.time_format)}')
thl-cmk's avatar
thl-cmk committed
    print()