Collection of CheckMK checks (see https://checkmk.com/). All checks and plugins are provided as is. Absolutely no warranty. Send any comments to thl-cmk[at]outlook[dot]com

Skip to content
Snippets Groups Projects
create_topology_data.py 13.9 KiB
Newer Older
thl-cmk's avatar
thl-cmk committed
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2

# Author: thl-cmk[at]outlook[dot]com
# URL   : https://thl-cmk.hopto.org
# Date  : 2023-10-08
# File  : create_topology_data.py

#
# PoC for creating topology_data.json from inventory data
#
# This script creates the topology data file needed for the Checkmk "network_visualization" plugin by
# Andreas Boesl and schnetz. See
# https://forum.checkmk.com/t/network-visualization/41680
# and
# https://exchange.checkmk.com/p/network-visualization
# for more information.
thl-cmk's avatar
thl-cmk committed
#
# NOTE: the topology_data configuration (layout etc.) is saved under ~/var/check_mk/topology

thl-cmk's avatar
thl-cmk committed
#
# The inventory data could be created with my CDP/LLDP inventory plugins:
#
# CDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_cdp_cache\
# CDP path-in-inventory: 'networking,cdp_cache'
# CDP inventory-columns: 'device_id,local_port,device_port'
#
# LLDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lldp_cache\
# LLDP path-in-inventory: 'networking,lldp_cache'
# LLDP inventory-columns: 'system_name,local_port_num,port_id'
#
# USAGE CDP (is the default):
thl-cmk's avatar
thl-cmk committed
# ~/local/lib/topology_data/create_topology_data.py -s CORE01 -m
thl-cmk's avatar
thl-cmk committed
#
# USAGE LLDP:
thl-cmk's avatar
thl-cmk committed
# ~/local/lib/topology_data/create_topology_data.py -s CORE01 -p "networking,lldp_cache" \
# -c "system_name,local_port_num,port_id" -m
thl-cmk's avatar
thl-cmk committed
#

thl-cmk's avatar
thl-cmk committed
from re import match as re_match
thl-cmk's avatar
thl-cmk committed
from ast import literal_eval
from pathlib import Path
from json import dumps
thl-cmk's avatar
thl-cmk committed
from typing import Dict, List, Optional, Any, Tuple
thl-cmk's avatar
thl-cmk committed
from time import strftime, time_ns
thl-cmk's avatar
thl-cmk committed
from tomllib import loads as toml_loads
from tomllib import TOMLDecodeError
thl-cmk's avatar
thl-cmk committed

from create_topology_utils import parse_arguments
thl-cmk's avatar
thl-cmk committed
from create_topology_classes import InventoryColumns, Interface, Settings, StaticConnection
thl-cmk's avatar
thl-cmk committed


thl-cmk's avatar
thl-cmk committed
CREATE_TOPOLOGY_VERSION = '0.0.4-202310116'
thl-cmk's avatar
thl-cmk committed

ITEMS = {}
thl-cmk's avatar
thl-cmk committed
MAC_TABLE: Dict[str, Interface] = {}
HOST_MAP = {}
DROP_HOSTS: List[str] = []
STATIC_CONNECTIONS: Dict = {}


def is_mac_address(mac_address: str) -> bool:
    re_mac_pattern = '([0-9A-Z]{2}\\:){5}[0-9A-Z]{2}'
    if re_match(re_mac_pattern, mac_address):
        if SETTINGS.debug:
            print(f'mac: {mac_address}, match')
        return True
    else:
        if SETTINGS.debug:
            print(f'mac: {mac_address}, no match')
        return False
thl-cmk's avatar
thl-cmk committed


def get_inventory_data(host: str, path: List[str]) -> Optional[List[Dict[str, str]]]:
    inventory_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.inventory_path}/{host}')
thl-cmk's avatar
thl-cmk committed
    data = []
thl-cmk's avatar
thl-cmk committed
    if inventory_file.exists():
        data = literal_eval(inventory_file.read_text())
        if data:
            for m in path:
                try:
                    data = data[m]
                except KeyError:
thl-cmk's avatar
thl-cmk committed
                    return []
thl-cmk's avatar
thl-cmk committed
        else:
thl-cmk's avatar
thl-cmk committed
            if SETTINGS.debug:
                print(f'Device: {host}: no inventory data found!')
thl-cmk's avatar
thl-cmk committed
    else:
thl-cmk's avatar
thl-cmk committed
        if SETTINGS.debug:
            print(f'Device: {host}: not found in inventory path!')
    return data
thl-cmk's avatar
thl-cmk committed


def get_items_from_autochecks(host: str):
    if host and host not in ITEMS.keys():
        ITEMS[host] = []
        autochecks_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.autochecks_path}/{host}.mk')
        if autochecks_file.exists():
            data: List[Dict[str, str]] = literal_eval(autochecks_file.read_text())
            for service in data:
                if service['check_plugin_name'] in ['if64']:
                    ITEMS[host].append(service['item'])
        else:
thl-cmk's avatar
thl-cmk committed
            if SETTINGS.debug:
                print(f'Device: {host}: not found in auto checks path!')


def create_mac_to_interface():
    path = Path(f'{SETTINGS.omd_root}/{SETTINGS.inventory_path}/')
    files = [str(file) for file in list(path.iterdir()) if not str(file).endswith('.gz')]

    while files:
        host = files[0].split('/')[-1]
        files.pop(0)
        if host.startswith('.'):
            continue
        if_table = get_inventory_data(host, SETTINGS.path_to_if_table)
        if if_table and if_table != []:
            if SETTINGS.debug:
                print(f'host: {host}, interfaces: {len(if_table)}')
            for interface in if_table:
                # {
                #     'index': 436756992,
                #     'description': 'Ethernet2/50',
                #     'alias': '',
                #     'speed': 100000000000,
                #     'phys_address': 'AC:7A:56:C7:37:C8',
                #     'oper_status': 2,
                #     'port_type': 6,
                #     'admin_status': 2,
                #     'available': True,
                #     'name': 'Ethernet2/50'
                # }
                try:
                    MAC_TABLE[interface['phys_address'].upper()] = Interface(
                        host=host,
                        index=interface.get('index'),
                        name=interface.get('name'),
                        description=interface.get('description'),
                        alias=interface.get('alias'),
                    )
                except KeyError:
                    continue
thl-cmk's avatar
thl-cmk committed


thl-cmk's avatar
thl-cmk committed
def get_service_by_interface(host: str, interface: str) -> Optional[str]:
    # empty host/neighbour should never happen here
thl-cmk's avatar
thl-cmk committed
    if not host:
        return interface
    items = ITEMS.get(host, [])
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    if interface in items:
        return interface
    else:
        if_padding = '/'.join(interface.split('/')[:-1]) + f'/{interface.split("/")[-1]:0>2}'
        for item in items:
            # interface = Gi0/1
            # item = Gi0/1 - Access port
            if item.startswith(interface):
                return item
            elif item.startswith(if_padding):
                return item

thl-cmk's avatar
thl-cmk committed
        if SETTINGS.debug:
            print(f'Device: {host}: interface ({interface}) not found in services')
thl-cmk's avatar
thl-cmk committed
        return interface


thl-cmk's avatar
thl-cmk committed
def get_if_host_by_mac(mac_address: str) -> Optional[Tuple[str, str]]:
    try:
        interface: Interface = MAC_TABLE[mac_address]
    except KeyError:
        return

    host = interface.host
    if interface.name:
        if_name = interface.name
    elif interface.description:
        if_name = interface.description
    elif interface.alias:
        if_name = interface.alias
    else:
        if_name = interface.index

    return host, if_name


def add_static_connections(host: str, data: Dict) -> Dict:
    if host in STATIC_CONNECTIONS.keys():
        host_connections = data.get('connections', {})
        host_connections.update(STATIC_CONNECTIONS[host].get('connections', {}))
        host_interfaces = data.get('interfaces', [])
        for interface in STATIC_CONNECTIONS[host].get('connections', {}).keys():
            host_interfaces.append(interface)
        data.update({'connections': host_connections, 'interfaces': list(set(host_interfaces))})
    return data


thl-cmk's avatar
thl-cmk committed
def create_device_from_inv(
        host: str,
        inv_data: List[Dict[str, str]],
        inv_columns: InventoryColumns,
        data_source: str,
) -> Optional[Dict[str, Any]]:
    data = {'connections': {}, "interfaces": []}
    get_items_from_autochecks(host)
    for topo_neighbour in inv_data:
thl-cmk's avatar
thl-cmk committed
        neighbour = topo_neighbour.get(inv_columns.neighbour)
thl-cmk's avatar
thl-cmk committed
        if not neighbour:
            continue

        # try to find neighbour against known MAC addresses (LLDP)
        if is_mac_address(neighbour):
            try:
                neighbour = MAC_TABLE[neighbour].host
            except KeyError:
                pass
thl-cmk's avatar
thl-cmk committed
        if not SETTINGS.keep_domain:
            neighbour = neighbour.split('.')[0]
        if SETTINGS.uppercase:
            neighbour = neighbour.upper()
        if SETTINGS.lowercase:
            neighbour = neighbour.lower()
thl-cmk's avatar
thl-cmk committed
        # drop neighbour if inventory neighbour is invalid
        if neighbour in DROP_HOSTS:
            continue
        # rewrite neighbour if inventory neighbour and checkmk host don't match
        if neighbour in HOST_MAP.keys():
            neighbour = HOST_MAP[neighbour]
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
        # has to be done before checking interfaces
        get_items_from_autochecks(neighbour)
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
        # getting/checking interfaces
thl-cmk's avatar
thl-cmk committed
        local_port = topo_neighbour.get(inv_columns.local_port)
        if is_mac_address(local_port):
            try:
                print(f'host: {host}, local_port: {local_port}, {MAC_TABLE[local_port]}')
            except KeyError:
                pass

        local_port = get_service_by_interface(host, local_port)

        neighbour_port = topo_neighbour.get(inv_columns.neighbour_port)
        if is_mac_address(neighbour):
            try:
                print(f'neighbour: {neighbour}, neighbour_port: {neighbour_port}, {MAC_TABLE[neighbour_port]}')
            except KeyError:
                pass

        neighbour_port = get_service_by_interface(neighbour, neighbour_port)

thl-cmk's avatar
thl-cmk committed
        if neighbour and local_port and neighbour_port:
            data['connections'].update({local_port: [neighbour, neighbour_port, data_source]})
            if local_port not in data['interfaces']:
                data['interfaces'].append(local_port)
thl-cmk's avatar
thl-cmk committed
    # add static connections
    data = add_static_connections(host, data)
thl-cmk's avatar
thl-cmk committed
    return {host: data}


def get_list_of_devices(data) -> List[str]:
    devices = []
    for connection in data.values():
        devices.append(connection[0])
    return list(set(devices))


def save_topology_data(data: Dict, path: str):
    save_file = Path(f'{path}/{SETTINGS.topology_file_name}')
    save_file.parent.mkdir(exist_ok=True, parents=True)
    save_file.write_text(dumps(data))
    parent_path = Path(f'{path}').parent
    if SETTINGS.make_default:
        Path(f'{parent_path}/default').unlink(missing_ok=True)
        Path(f'{parent_path}/default').symlink_to(target=Path(path), target_is_directory=True)


def create_topology(
        seed_devicees: List[str],
        path_in_inventory: List[str],
        inv_columns: InventoryColumns,
        output_directory: str,
        data_source: str
):
    devices_to_go = list(set(seed_devicees))  # remove duplicates
thl-cmk's avatar
thl-cmk committed
    for static_host in STATIC_CONNECTIONS.keys():
        devices_to_go.append(static_host)
    devices_to_go = list(set(devices_to_go))
thl-cmk's avatar
thl-cmk committed
    topology_data = {}
    devices_done = []

    while devices_to_go:
        device = devices_to_go[0]

        if device in HOST_MAP.keys():
            try:
                devices_to_go.remove(device)
            except ValueError:
                pass
            device = HOST_MAP[device]
            if device in devices_done:
                continue

        topo_data = get_inventory_data(device, path_in_inventory)
thl-cmk's avatar
thl-cmk committed

        topology_data.update(
            create_device_from_inv(
                host=device,
                inv_data=topo_data,
                inv_columns=inv_columns,
                data_source=data_source,
            ))
        devices_list = get_list_of_devices(topology_data[device]['connections'])
        for entry in devices_list:
            if entry not in devices_done:
                devices_to_go.append(entry)
thl-cmk's avatar
thl-cmk committed

        devices_to_go = list(set(devices_to_go))
        devices_done.append(device)
        devices_to_go.remove(device)
thl-cmk's avatar
thl-cmk committed
        if SETTINGS.debug:
            print(f'Device done: {device}, source: {data_source}')
thl-cmk's avatar
thl-cmk committed

    if topology_data:
        save_topology_data(topology_data, f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}/{output_directory}')

thl-cmk's avatar
thl-cmk committed
    print(f'Devices added: {len(devices_done)}, source {data_source}')


def get_user_data_from_toml() -> Dict:
    data = {}
    toml_file = Path(SETTINGS.user_data_file)
    if toml_file.exists():
        try:
            data = toml_loads(toml_file.read_text())
        except TOMLDecodeError as e:
            print(
                f'ERROR: User data file {toml_file} is not in valid TOML format! ({e}), (see https://toml.io/en/)')
            exit(2)
    if SETTINGS.debug:
        print(f'data from TOML: {data}')
    return data


def create_static_connections_from_toml(connections: List[List[str]]) -> Dict:
    data = {}
    for connection in connections:
        connection = StaticConnection(*connection)
        if SETTINGS.debug:
            print(f'connection: {connection}')
        if connection.host not in data.keys():
            data[connection.host] = {'connections': {}, 'interfaces': []}
        if connection.neighbour not in data.keys():
            data[connection.neighbour] = {'connections': {}, 'interfaces': []}

        # add connection host to neighbour
        data[connection.host]['connections'].update({connection.local_port: [
            connection.neighbour, connection.neighbour_port, connection.label
        ]})
        data[connection.host]['interfaces'].append(connection.local_port)

        # add connection from neighbour to host
        data[connection.neighbour]['connections'].update({
            connection.neighbour_port: [connection.host, connection.local_port, connection.label]
        })
        data[connection.neighbour]['interfaces'].append(connection.neighbour_port)
    if SETTINGS.debug:
        print(data)

    return data
thl-cmk's avatar
thl-cmk committed


if __name__ == '__main__':
    SETTINGS = Settings(vars(parse_arguments(CREATE_TOPOLOGY_VERSION)))
    if SETTINGS.version:
        print(f'create_topology_data.py version: {CREATE_TOPOLOGY_VERSION}')
        exit(0)
    if not SETTINGS.seed_devices:
        print('make-topology.py: error: the following arguments are required: -s, --seed-devices')
        print('see make-topology.py -h')
        exit(1)

    print(f'Start time: {strftime(SETTINGS.time_format)}')
    start_time = time_ns()

thl-cmk's avatar
thl-cmk committed
    user_data = get_user_data_from_toml()
    HOST_MAP = user_data.get('HOST_MAP', {})
    DROP_HOSTS = user_data.get('DROP_HOSTS', [])
    STATIC_CONNECTIONS = create_static_connections_from_toml(user_data.get('STATIC_CONNECTIONS', []))

    # not yet used, need more test data
    # create_mac_to_interface()

thl-cmk's avatar
thl-cmk committed
    create_topology(
        seed_devicees=SETTINGS.seed_devices,
        path_in_inventory=SETTINGS.path_in_inventory,
        inv_columns=SETTINGS.inventory_columns,
        output_directory=SETTINGS.output_directory,
        data_source=SETTINGS.data_source
    )

    print(f'time taken: {(time_ns() - start_time) / 1e9}/s')
    print(f'End time: {strftime(SETTINGS.time_format)}')