Collection of CheckMK checks (see https://checkmk.com/). All checks and plugins are provided as is. Absolutely no warranty. Send any comments to thl-cmk[at]outlook[dot]com

Skip to content
Snippets Groups Projects
create_topology_data.py 14.8 KiB
Newer Older
thl-cmk's avatar
thl-cmk committed
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2

# Author: thl-cmk[at]outlook[dot]com
# URL   : https://thl-cmk.hopto.org
# Date  : 2023-10-08
# File  : create_topology_data.py

thl-cmk's avatar
thl-cmk committed
# 2023-10-10: initial release
# 2023-10-16: added options  --keep-max  and --min-age
# 2023-10-17: removed -p, -c, -d (use the long options instead: --path-in-inventory, --data-source, --inventory-columns)
#             changed option --keep-max to --keep
#             added option --check-user-data-only
#             added SEED_DEVICES to user data, option -s/--sed-devices is now optional
#             refactoring

thl-cmk's avatar
thl-cmk committed
#
# PoC for creating topology_data.json from inventory data
#
# This script creates the topology data file needed for the Checkmk "network_visualization" plugin by
# Andreas Boesl and schnetz. See
# https://forum.checkmk.com/t/network-visualization/41680
# and
# https://exchange.checkmk.com/p/network-visualization
# for more information.
thl-cmk's avatar
thl-cmk committed
#
# NOTE: the topology_data configuration (layout etc.) is saved under ~/var/check_mk/topology
thl-cmk's avatar
thl-cmk committed
#
# The inventory data could be created with my CDP/LLDP inventory plugins:
#
# CDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_cdp_cache\
# CDP path-in-inventory: 'networking,cdp_cache'
# CDP inventory-columns: 'device_id,local_port,device_port'
#
# LLDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lldp_cache\
# LLDP path-in-inventory: 'networking,lldp_cache'
# LLDP inventory-columns: 'system_name,local_port_num,port_id'
#
# USAGE CDP (is the default):
thl-cmk's avatar
thl-cmk committed
# ~/local/lib/topology_data/create_topology_data.py -s CORE01 -m
thl-cmk's avatar
thl-cmk committed
#
# USAGE LLDP:
thl-cmk's avatar
thl-cmk committed
# ~/local/lib/topology_data/create_topology_data.py -s CORE01 -m --lldp
thl-cmk's avatar
thl-cmk committed
#

from ast import literal_eval
from pathlib import Path
thl-cmk's avatar
thl-cmk committed
from typing import Dict, List, Any, Tuple
thl-cmk's avatar
thl-cmk committed
from time import strftime, time_ns

thl-cmk's avatar
thl-cmk committed

from create_topology_utils import (
    parse_arguments,
    save_data_to_file,
    remove_old_data,
    get_data_from_toml,
    is_mac_address,
    is_equal_with_default,
    merge_topologies,
)
from create_topology_classes import (
    InventoryColumns,
    Interface,
    Settings,
    StaticConnection,
)
thl-cmk's avatar
thl-cmk committed


thl-cmk's avatar
thl-cmk committed
CREATE_TOPOLOGY_VERSION = '0.0.6-202310117'
thl-cmk's avatar
thl-cmk committed

ITEMS = {}
thl-cmk's avatar
thl-cmk committed
MAC_TABLE: Dict[str, Interface] = {}
thl-cmk's avatar
thl-cmk committed
HOST_MAP: Dict[str, str] = {}
thl-cmk's avatar
thl-cmk committed
DROP_HOSTS: List[str] = []
STATIC_CONNECTIONS: Dict = {}


thl-cmk's avatar
thl-cmk committed
def get_items_from_autochecks(host: str, debug: bool = False):
thl-cmk's avatar
thl-cmk committed
    if host and host not in ITEMS.keys():
        ITEMS[host] = []
        autochecks_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.autochecks_path}/{host}.mk')
        if autochecks_file.exists():
            data: List[Dict[str, str]] = literal_eval(autochecks_file.read_text())
            for service in data:
                if service['check_plugin_name'] in ['if64']:
                    ITEMS[host].append(service['item'])
        else:
thl-cmk's avatar
thl-cmk committed
            if debug:
thl-cmk's avatar
thl-cmk committed
                print(f'Device: {host}: not found in auto checks path!')


thl-cmk's avatar
thl-cmk committed
def get_list_of_devices(data) -> List[str]:
    devices = []
    for connection in data.values():
        devices.append(connection[0])
    return list(set(devices))
thl-cmk's avatar
thl-cmk committed


thl-cmk's avatar
thl-cmk committed
def get_service_by_interface(host: str, interface: str, debug: bool = False) -> str | None:
thl-cmk's avatar
thl-cmk committed
    # empty host/neighbour should never happen here
thl-cmk's avatar
thl-cmk committed
    if not host:
        return interface
    items = ITEMS.get(host, [])
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    if interface in items:
        return interface
    else:
        if_padding = '/'.join(interface.split('/')[:-1]) + f'/{interface.split("/")[-1]:0>2}'
        for item in items:
            # interface = Gi0/1
            # item = Gi0/1 - Access port
            if item.startswith(interface):
                return item
            elif item.startswith(if_padding):
                return item

thl-cmk's avatar
thl-cmk committed
        if debug:
thl-cmk's avatar
thl-cmk committed
            print(f'Device: {host}: interface ({interface}) not found in services')
thl-cmk's avatar
thl-cmk committed
        return interface


thl-cmk's avatar
thl-cmk committed
def get_if_host_by_mac(mac_address: str) -> Tuple[str, str] | None:
thl-cmk's avatar
thl-cmk committed
    try:
        interface: Interface = MAC_TABLE[mac_address]
    except KeyError:
        return

    host = interface.host
    if interface.name:
        if_name = interface.name
    elif interface.description:
        if_name = interface.description
    elif interface.alias:
        if_name = interface.alias
    else:
        if_name = interface.index

    return host, if_name


def add_static_connections(host: str, data: Dict) -> Dict:
    if host in STATIC_CONNECTIONS.keys():
        host_connections = data.get('connections', {})
        host_connections.update(STATIC_CONNECTIONS[host].get('connections', {}))
        host_interfaces = data.get('interfaces', [])
        for interface in STATIC_CONNECTIONS[host].get('connections', {}).keys():
            host_interfaces.append(interface)
        data.update({'connections': host_connections, 'interfaces': list(set(host_interfaces))})
    return data


thl-cmk's avatar
thl-cmk committed
def get_inventory_data(host: str, path: List[str], debug: bool = False, ) -> List[Dict[str, str]] | None:
    inventory_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.inventory_path}/{host}')
    data = []
    if inventory_file.exists():
        data = literal_eval(inventory_file.read_text())
        if data:
            for m in path:
                try:
                    data = data[m]
                except KeyError:
                    return []
        else:
            if debug:
                print(f'Device: {host}: no inventory data found!')
    else:
        if debug:
            print(f'Device: {host}: not found in inventory path!')
    return data


def create_mac_to_interface_table(debug: bool = False):
thl-cmk's avatar
thl-cmk committed
    path = Path(f'{SETTINGS.omd_root}/{SETTINGS.inventory_path}/')
    files = [str(file) for file in list(path.iterdir()) if not str(file).endswith('.gz')]

    while files:
        host = files[0].split('/')[-1]
        files.pop(0)
        if host.startswith('.'):
            continue
thl-cmk's avatar
thl-cmk committed
        if_table = get_inventory_data(host=host, path=SETTINGS.path_to_if_table, debug=debug)
thl-cmk's avatar
thl-cmk committed
        if if_table and if_table != []:
thl-cmk's avatar
thl-cmk committed
            if debug:
thl-cmk's avatar
thl-cmk committed
                print(f'host: {host}, interfaces: {len(if_table)}')
            for interface in if_table:
                # {
                #     'index': 436756992,
                #     'description': 'Ethernet2/50',
                #     'alias': '',
                #     'speed': 100000000000,
                #     'phys_address': 'AC:7A:56:C7:37:C8',
                #     'oper_status': 2,
                #     'port_type': 6,
                #     'admin_status': 2,
                #     'available': True,
                #     'name': 'Ethernet2/50'
                # }
                try:
                    MAC_TABLE[interface['phys_address'].upper()] = Interface(
                        host=host,
                        index=interface.get('index'),
                        name=interface.get('name'),
                        description=interface.get('description'),
                        alias=interface.get('alias'),
                    )
                except KeyError:
                    continue


thl-cmk's avatar
thl-cmk committed
def save_topology(
        data: dict,
        base_directory: str,
        output_directory: str,
        dont_compare: bool,
        make_default: bool,
        topology_file_name: str,
):
    path = f'{base_directory}/{output_directory}'

    if dont_compare:
        save_data_to_file(
            data=data,
            path=path,
            file=topology_file_name,
            make_default=make_default,
        )
    else:
        if not is_equal_with_default(
                data=data,
                file=f'{base_directory}/default/{topology_file_name}'
        ):
            save_data_to_file(
                data=data,
                path=path,
                file=topology_file_name,
                make_default=make_default,
            )
        else:
            print(
                'Topology matches default topology, not saved! Use "--dont-compare" to save identical topologies.'
            )


thl-cmk's avatar
thl-cmk committed
def create_device_from_inv(
        host: str,
        inv_data: List[Dict[str, str]],
        inv_columns: InventoryColumns,
        data_source: str,
thl-cmk's avatar
thl-cmk committed
) -> Dict[str, Any] | None:
thl-cmk's avatar
thl-cmk committed
    data = {'connections': {}, "interfaces": []}
    get_items_from_autochecks(host)
    for topo_neighbour in inv_data:
thl-cmk's avatar
thl-cmk committed
        neighbour = topo_neighbour.get(inv_columns.neighbour)
thl-cmk's avatar
thl-cmk committed
        if not neighbour:
            continue

        # try to find neighbour against known MAC addresses (LLDP)
        if is_mac_address(neighbour):
            try:
                neighbour = MAC_TABLE[neighbour].host
            except KeyError:
                pass
thl-cmk's avatar
thl-cmk committed
        if not SETTINGS.keep_domain:
            neighbour = neighbour.split('.')[0]
        if SETTINGS.uppercase:
            neighbour = neighbour.upper()
        if SETTINGS.lowercase:
            neighbour = neighbour.lower()
thl-cmk's avatar
thl-cmk committed
        # drop neighbour if inventory neighbour is invalid
        if neighbour in DROP_HOSTS:
            continue
        # rewrite neighbour if inventory neighbour and checkmk host don't match
        if neighbour in HOST_MAP.keys():
            neighbour = HOST_MAP[neighbour]
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
        # has to be done before checking interfaces
        get_items_from_autochecks(neighbour)
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
        # getting/checking interfaces
thl-cmk's avatar
thl-cmk committed
        local_port = topo_neighbour.get(inv_columns.local_port)
        if is_mac_address(local_port):
            try:
                print(f'host: {host}, local_port: {local_port}, {MAC_TABLE[local_port]}')
            except KeyError:
                pass

        local_port = get_service_by_interface(host, local_port)

        neighbour_port = topo_neighbour.get(inv_columns.neighbour_port)
        if is_mac_address(neighbour):
            try:
                print(f'neighbour: {neighbour}, neighbour_port: {neighbour_port}, {MAC_TABLE[neighbour_port]}')
            except KeyError:
                pass

        neighbour_port = get_service_by_interface(neighbour, neighbour_port)

thl-cmk's avatar
thl-cmk committed
        if neighbour and local_port and neighbour_port:
            data['connections'].update({local_port: [neighbour, neighbour_port, data_source]})
            if local_port not in data['interfaces']:
                data['interfaces'].append(local_port)
thl-cmk's avatar
thl-cmk committed
    # add static connections
thl-cmk's avatar
thl-cmk committed
    # data = add_static_connections(host, data)
thl-cmk's avatar
thl-cmk committed
    return {host: data}


thl-cmk's avatar
thl-cmk committed
def create_static_connections(connections: List[List[str]], debug: bool = False) -> Dict:
thl-cmk's avatar
thl-cmk committed
    data = {}
    for connection in connections:
        connection = StaticConnection(*connection)
thl-cmk's avatar
thl-cmk committed
        if debug:
thl-cmk's avatar
thl-cmk committed
            print(f'connection: {connection}')
        if connection.host not in data.keys():
            data[connection.host] = {'connections': {}, 'interfaces': []}
        if connection.neighbour not in data.keys():
            data[connection.neighbour] = {'connections': {}, 'interfaces': []}
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
        # add connection from host to neighbour
        data[connection.host]['connections'].update({connection.local_port: [
            connection.neighbour, connection.neighbour_port, connection.label
        ]})
        data[connection.host]['interfaces'].append(connection.local_port)
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
        # add connection from neighbour to host
        data[connection.neighbour]['connections'].update({
            connection.neighbour_port: [connection.host, connection.local_port, connection.label]
        })
        data[connection.neighbour]['interfaces'].append(connection.neighbour_port)
thl-cmk's avatar
thl-cmk committed
    if debug:
thl-cmk's avatar
thl-cmk committed
        print(data)

thl-cmk's avatar
thl-cmk committed
    if data:
        print(f'Devices added: {len(data)}, source static')

thl-cmk's avatar
thl-cmk committed
    return data
thl-cmk's avatar
thl-cmk committed


def create_topology(
thl-cmk's avatar
thl-cmk committed
        seed_devices: List[str],
thl-cmk's avatar
thl-cmk committed
        path_in_inventory: List[str],
        inv_columns: InventoryColumns,
thl-cmk's avatar
thl-cmk committed
        data_source: str,
        debug: bool = False,
) -> Dict:
    devices_to_go = list(set(seed_devices))  # remove duplicates
thl-cmk's avatar
thl-cmk committed
    for static_host in STATIC_CONNECTIONS.keys():
        devices_to_go.append(static_host)
    devices_to_go = list(set(devices_to_go))
thl-cmk's avatar
thl-cmk committed
    topology_data = {}
    devices_done = []

    while devices_to_go:
        device = devices_to_go[0]

        if device in HOST_MAP.keys():
            try:
                devices_to_go.remove(device)
            except ValueError:
                pass
            device = HOST_MAP[device]
            if device in devices_done:
                continue

thl-cmk's avatar
thl-cmk committed
        topo_data = get_inventory_data(host=device, path=path_in_inventory, debug=debug)
thl-cmk's avatar
thl-cmk committed

        topology_data.update(
            create_device_from_inv(
                host=device,
                inv_data=topo_data,
                inv_columns=inv_columns,
                data_source=data_source,
            ))
        devices_list = get_list_of_devices(topology_data[device]['connections'])
        for entry in devices_list:
            if entry not in devices_done:
                devices_to_go.append(entry)
thl-cmk's avatar
thl-cmk committed

        devices_to_go = list(set(devices_to_go))
        devices_done.append(device)
        devices_to_go.remove(device)
thl-cmk's avatar
thl-cmk committed
        if debug:
thl-cmk's avatar
thl-cmk committed
            print(f'Device done: {device}, source: {data_source}')
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    print(f'Devices added: {len(devices_done)}, source {data_source}')

thl-cmk's avatar
thl-cmk committed
    return topology_data

thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
if __name__ == '__main__':
    SETTINGS = Settings(vars(parse_arguments(CREATE_TOPOLOGY_VERSION)))
    if SETTINGS.version:
thl-cmk's avatar
thl-cmk committed
        print(f'{Path(__file__).name} version: {CREATE_TOPOLOGY_VERSION}')
thl-cmk's avatar
thl-cmk committed
        exit(0)

thl-cmk's avatar
thl-cmk committed
    if SETTINGS.check_user_data_only:
        user_data = get_data_from_toml(file=SETTINGS.user_data_file)
        print(f'Could read/parse the user data from {SETTINGS.user_data_file}')
        exit(0)

    print()
thl-cmk's avatar
thl-cmk committed
    print(f'Start time: {strftime(SETTINGS.time_format)}')
    start_time = time_ns()

thl-cmk's avatar
thl-cmk committed
    user_data = get_data_from_toml(file=SETTINGS.user_data_file)
thl-cmk's avatar
thl-cmk committed
    HOST_MAP = user_data.get('HOST_MAP', {})
    DROP_HOSTS = user_data.get('DROP_HOSTS', [])
thl-cmk's avatar
thl-cmk committed
    STATIC_CONNECTIONS = create_static_connections(connections=user_data.get('STATIC_CONNECTIONS', []))
thl-cmk's avatar
thl-cmk committed

    # not yet used, need more test data
thl-cmk's avatar
thl-cmk committed
    # create_mac_to_interface_table()
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    topology = create_topology(
        seed_devices=list(set(SETTINGS.seed_devices + user_data.get('SEED_DEVICES', []))),
thl-cmk's avatar
thl-cmk committed
        path_in_inventory=SETTINGS.path_in_inventory,
        inv_columns=SETTINGS.inventory_columns,
thl-cmk's avatar
thl-cmk committed
        data_source=SETTINGS.data_source,
        debug=SETTINGS.debug,
thl-cmk's avatar
thl-cmk committed
    )

thl-cmk's avatar
thl-cmk committed
    if topology:
        # merge topology with static connections
        topology = merge_topologies(topo_pri=STATIC_CONNECTIONS, topo_sec=topology)

        save_topology(
            data=topology,
            base_directory=f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}',
            output_directory=SETTINGS.output_directory,
            topology_file_name=SETTINGS.topology_file_name,
            dont_compare=SETTINGS.dont_compare,
            make_default=SETTINGS.make_default,
        )

    if SETTINGS.keep:
        remove_old_data(
            keep=SETTINGS.keep,
thl-cmk's avatar
thl-cmk committed
            min_age=SETTINGS.min_age,
            path=Path(f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}'),
        )

thl-cmk's avatar
thl-cmk committed
    print(f'Time taken: {(time_ns() - start_time) / 1e9}/s')
thl-cmk's avatar
thl-cmk committed
    print(f'End time: {strftime(SETTINGS.time_format)}')
thl-cmk's avatar
thl-cmk committed
    print()