Collection of CheckMK checks (see https://checkmk.com/). All checks and plugins are provided as is. Absolutely no warranty. Send any comments to thl-cmk[at]outlook[dot]com

Skip to content
Snippets Groups Projects
create_topology_data.py 12.2 KiB
Newer Older
thl-cmk's avatar
thl-cmk committed
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2

# Author: thl-cmk[at]outlook[dot]com
# URL   : https://thl-cmk.hopto.org
# Date  : 2023-10-08
# File  : create_topology_data.py

thl-cmk's avatar
thl-cmk committed
# 2023-10-10: initial release
# 2023-10-16: added options  --keep-max  and --min-age
# 2023-10-17: removed -p, -c, -d (use the long options instead: --path-in-inventory, --data-source, --inventory-columns)
#             changed option --keep-max to --keep
#             added option --check-user-data-only
#             added SEED_DEVICES to user data, option -s/--sed-devices is now optional
#             refactoring
thl-cmk's avatar
thl-cmk committed
# 2023-10-18: fixed --make-default not working
thl-cmk's avatar
thl-cmk committed
#             added protected topologies
thl-cmk's avatar
thl-cmk committed
#             changed get inventory data from file access to request via live status query
#             get interfaces from autochecks changed to live status query
# 2023-10-19: cleanup removed "all" unused functions
#             added option -u, --user-data-file to provide a user specific data file
thl-cmk's avatar
thl-cmk committed
# 2023-10-20: changed option -m/--make-default to -d/--default (-m is needed for -m/--merge
#             added -m/--merge option
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
#
# PoC for creating topology_data.json from inventory data
#
# This script creates the topology data file needed for the Checkmk "network_visualization" plugin by
# Andreas Boesl and schnetz. See
# https://forum.checkmk.com/t/network-visualization/41680
# and
# https://exchange.checkmk.com/p/network-visualization
# for more information.
thl-cmk's avatar
thl-cmk committed
#
# NOTE: the topology_data configuration (layout etc.) is saved under ~/var/check_mk/topology
thl-cmk's avatar
thl-cmk committed
#
# The inventory data could be created with my CDP/LLDP inventory plugins:
#
# CDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_cdp_cache\
# CDP path-in-inventory: 'networking,cdp_cache'
# CDP inventory-columns: 'device_id,local_port,device_port'
#
# LLDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lldp_cache\
# LLDP path-in-inventory: 'networking,lldp_cache'
# LLDP inventory-columns: 'system_name,local_port_num,port_id'
#
# USAGE CDP (is the default):
thl-cmk's avatar
thl-cmk committed
# ~/local/lib/topology_data/create_topology_data.py -s CORE01 -m
thl-cmk's avatar
thl-cmk committed
#
# USAGE LLDP:
thl-cmk's avatar
thl-cmk committed
# ~/local/lib/topology_data/create_topology_data.py -s CORE01 -m --lldp
thl-cmk's avatar
thl-cmk committed
#

from ast import literal_eval
from pathlib import Path
thl-cmk's avatar
thl-cmk committed
from typing import Dict, List, Any
thl-cmk's avatar
thl-cmk committed
from time import strftime, time_ns

thl-cmk's avatar
thl-cmk committed
from create_topology_utils import (
    parse_arguments,
    remove_old_data,
    get_data_from_toml,
    merge_topologies,
thl-cmk's avatar
thl-cmk committed
    save_topology,
thl-cmk's avatar
thl-cmk committed
    get_data_form_live_status,
thl-cmk's avatar
thl-cmk committed
    ExitCodes,
thl-cmk's avatar
thl-cmk committed
)
from create_topology_classes import (
    InventoryColumns,
    Settings,
    StaticConnection,
thl-cmk's avatar
thl-cmk committed
    Topologies,
thl-cmk's avatar
thl-cmk committed
)
thl-cmk's avatar
thl-cmk committed


thl-cmk's avatar
thl-cmk committed
CREATE_TOPOLOGY_VERSION = '0.0.8-202310120'
thl-cmk's avatar
thl-cmk committed

ITEMS = {}
thl-cmk's avatar
thl-cmk committed
HOST_MAP: Dict[str, str] = {}
thl-cmk's avatar
thl-cmk committed
DROP_HOSTS: List[str] = []
STATIC_CONNECTIONS: Dict = {}


thl-cmk's avatar
thl-cmk committed
def get_interface_items_from_lq(host: str, debug: bool = False) -> Dict:
    query = (
        'GET services\n'
        'Columns: host_name description check_command\n'
        'Filter: description ~ Interface\n'
        f'Filter: host_name = {host}\n'
        'OutputFormat: python3\n'
    )
    data = get_data_form_live_status(query=query)
    items = {}
    for host, description, check_command in data:
        if host:
            if host not in items.keys():
                items[host] = []
            items[host].append(description[10:])  # remove 'Interface ' from description

    interfaces = 0
    for host in items.keys():
        items[host] = list(set(items[host]))
        items[host].sort()
        interfaces += len(items[host])
    # print(f'Interfaces found: {interfaces}')
    return items
thl-cmk's avatar
thl-cmk committed


thl-cmk's avatar
thl-cmk committed
def get_list_of_devices(data) -> List[str]:
    devices = []
    for connection in data.values():
        devices.append(connection[0])
    return list(set(devices))
thl-cmk's avatar
thl-cmk committed


thl-cmk's avatar
thl-cmk committed
def get_service_by_interface(host: str, interface: str, debug: bool = False) -> str | None:
thl-cmk's avatar
thl-cmk committed
    # empty host/neighbour should never happen here
thl-cmk's avatar
thl-cmk committed
    if not host:
        return interface
    items = ITEMS.get(host, [])
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    if interface in items:
        return interface
    else:
thl-cmk's avatar
thl-cmk committed
        if_padding = interface
        if '/' in interface:
            if_padding = '/'.join(interface.split('/')[:-1]) + f'/{interface.split("/")[-1]:0>2}'

thl-cmk's avatar
thl-cmk committed
        for item in items:
            # interface = Gi0/1
            # item = Gi0/1 - Access port
thl-cmk's avatar
thl-cmk committed
            # this might not catch the correct interface (fa0 and fa0/0)
            item = item.split(' ')[0]
            if item == interface:
thl-cmk's avatar
thl-cmk committed
                return item
thl-cmk's avatar
thl-cmk committed
            elif item == if_padding:
thl-cmk's avatar
thl-cmk committed
                return item

thl-cmk's avatar
thl-cmk committed
        if debug:
thl-cmk's avatar
thl-cmk committed
            print(f'Device: {host}: interface ({interface}) not found in services')

thl-cmk's avatar
thl-cmk committed
        return interface
thl-cmk's avatar
thl-cmk committed


thl-cmk's avatar
thl-cmk committed
def get_inventory_data_lq(host: str, path: List[str], debug: bool = False, ) -> List[Dict[str, str]]:
    query = f'GET hosts\nColumns: mk_inventory\nOutputFormat: python3\nFilter: host_name = {host}\n'
    data = get_data_form_live_status(query=query)
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    if data:
        try:
            data = literal_eval(data[0][0].decode('utf-8'))
        except SyntaxError as e:
thl-cmk's avatar
thl-cmk committed
            if debug:
thl-cmk's avatar
thl-cmk committed
                print(f'data: |{data}|')
                print(f'type: {type(data)}')
                print(f'exception: {e}')
            return []
        for m in path:
            try:
                data = data[m]
            except KeyError:
                return []
        return data
    return []
thl-cmk's avatar
thl-cmk committed


thl-cmk's avatar
thl-cmk committed
def create_device_from_inv(
        host: str,
        inv_data: List[Dict[str, str]],
        inv_columns: InventoryColumns,
        data_source: str,
thl-cmk's avatar
thl-cmk committed
) -> Dict[str, Any] | None:
thl-cmk's avatar
thl-cmk committed
    data = {'connections': {}, "interfaces": []}
thl-cmk's avatar
thl-cmk committed
    if not host in ITEMS.keys():
        ITEMS.update(get_interface_items_from_lq(host=host))
thl-cmk's avatar
thl-cmk committed
    for topo_neighbour in inv_data:
thl-cmk's avatar
thl-cmk committed
        neighbour = topo_neighbour.get(inv_columns.neighbour)
thl-cmk's avatar
thl-cmk committed
        if not neighbour:
            continue

thl-cmk's avatar
thl-cmk committed
        if not SETTINGS.keep_domain:
            neighbour = neighbour.split('.')[0]
        if SETTINGS.uppercase:
            neighbour = neighbour.upper()
        if SETTINGS.lowercase:
            neighbour = neighbour.lower()
thl-cmk's avatar
thl-cmk committed
        # drop neighbour if inventory neighbour is invalid
        if neighbour in DROP_HOSTS:
            continue
        # rewrite neighbour if inventory neighbour and checkmk host don't match
        if neighbour in HOST_MAP.keys():
            neighbour = HOST_MAP[neighbour]
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
        # has to be done before checking interfaces
thl-cmk's avatar
thl-cmk committed
        if not neighbour in ITEMS.keys():
            ITEMS.update(get_interface_items_from_lq(host=neighbour))
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
        # getting/checking interfaces
thl-cmk's avatar
thl-cmk committed
        local_port = topo_neighbour.get(inv_columns.local_port)

        local_port = get_service_by_interface(host, local_port)

        neighbour_port = topo_neighbour.get(inv_columns.neighbour_port)

        neighbour_port = get_service_by_interface(neighbour, neighbour_port)

thl-cmk's avatar
thl-cmk committed
        if neighbour and local_port and neighbour_port:
            data['connections'].update({local_port: [neighbour, neighbour_port, data_source]})
            if local_port not in data['interfaces']:
                data['interfaces'].append(local_port)
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    return {host: data}


thl-cmk's avatar
thl-cmk committed
def create_static_connections(connections: List[List[str]], debug: bool = False) -> Dict:
thl-cmk's avatar
thl-cmk committed
    data = {}
    for connection in connections:
        connection = StaticConnection(*connection)
thl-cmk's avatar
thl-cmk committed
        if debug:
thl-cmk's avatar
thl-cmk committed
            print(f'connection: {connection}')
        if connection.host not in data.keys():
            data[connection.host] = {'connections': {}, 'interfaces': []}
        if connection.neighbour not in data.keys():
            data[connection.neighbour] = {'connections': {}, 'interfaces': []}
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
        # add connection from host to neighbour
        data[connection.host]['connections'].update({connection.local_port: [
            connection.neighbour, connection.neighbour_port, connection.label
        ]})
        data[connection.host]['interfaces'].append(connection.local_port)
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
        # add connection from neighbour to host
        data[connection.neighbour]['connections'].update({
            connection.neighbour_port: [connection.host, connection.local_port, connection.label]
        })
        data[connection.neighbour]['interfaces'].append(connection.neighbour_port)
thl-cmk's avatar
thl-cmk committed
    if debug:
thl-cmk's avatar
thl-cmk committed
        print(data)

thl-cmk's avatar
thl-cmk committed
    if data:
        print(f'Devices added: {len(data)}, source static')

thl-cmk's avatar
thl-cmk committed
    return data
thl-cmk's avatar
thl-cmk committed


def create_topology(
thl-cmk's avatar
thl-cmk committed
        seed_devices: List[str],
thl-cmk's avatar
thl-cmk committed
        path_in_inventory: List[str],
        inv_columns: InventoryColumns,
thl-cmk's avatar
thl-cmk committed
        data_source: str,
        debug: bool = False,
) -> Dict:
    devices_to_go = list(set(seed_devices))  # remove duplicates
thl-cmk's avatar
thl-cmk committed
    for static_host in STATIC_CONNECTIONS.keys():
        devices_to_go.append(static_host)
    devices_to_go = list(set(devices_to_go))
thl-cmk's avatar
thl-cmk committed
    topology_data = {}
    devices_done = []

    while devices_to_go:
        device = devices_to_go[0]

        if device in HOST_MAP.keys():
            try:
                devices_to_go.remove(device)
            except ValueError:
                pass
            device = HOST_MAP[device]
            if device in devices_done:
                continue

thl-cmk's avatar
thl-cmk committed
        topo_data = get_inventory_data_lq(host=device, path=path_in_inventory, debug=debug)
thl-cmk's avatar
thl-cmk committed
        topology_data.update(
            create_device_from_inv(
                host=device,
                inv_data=topo_data,
                inv_columns=inv_columns,
                data_source=data_source,
            ))
        devices_list = get_list_of_devices(topology_data[device]['connections'])
        for entry in devices_list:
            if entry not in devices_done:
                devices_to_go.append(entry)
thl-cmk's avatar
thl-cmk committed

        devices_to_go = list(set(devices_to_go))
        devices_done.append(device)
        devices_to_go.remove(device)
thl-cmk's avatar
thl-cmk committed
        if debug:
thl-cmk's avatar
thl-cmk committed
            print(f'Device done: {device}, source: {data_source}')
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    print(f'Devices added: {len(devices_done)}, source {data_source}')

thl-cmk's avatar
thl-cmk committed
    return topology_data

thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
if __name__ == '__main__':
    SETTINGS = Settings(vars(parse_arguments(CREATE_TOPOLOGY_VERSION)))
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    if SETTINGS.version:
thl-cmk's avatar
thl-cmk committed
        print(f'{Path(__file__).name} version: {CREATE_TOPOLOGY_VERSION}')
thl-cmk's avatar
thl-cmk committed
        exit(code=ExitCodes.ok)
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    if SETTINGS.check_user_data_only:
        user_data = get_data_from_toml(file=SETTINGS.user_data_file)
        print(f'Could read/parse the user data from {SETTINGS.user_data_file}')
thl-cmk's avatar
thl-cmk committed
        exit(code=ExitCodes.OK)

    if SETTINGS.merge:
        _merge = list(set(SETTINGS.merge.copy()))
        if len(_merge) != len(SETTINGS.merge):
            print(f'-m/--merge options must be unique. Use "-m cdp lldp" oder "-m lldp cdp"')
            exit(code=ExitCodes.BAD_OPTION_LIST)
thl-cmk's avatar
thl-cmk committed

    print()
thl-cmk's avatar
thl-cmk committed
    print(f'Start time: {strftime(SETTINGS.time_format)}')
    start_time = time_ns()

thl-cmk's avatar
thl-cmk committed
    user_data = get_data_from_toml(file=SETTINGS.user_data_file)
thl-cmk's avatar
thl-cmk committed
    HOST_MAP = user_data.get('HOST_MAP', {})
    DROP_HOSTS = user_data.get('DROP_HOSTS', [])
thl-cmk's avatar
thl-cmk committed

    final_topology = {}
    while SETTINGS.merge:
        SETTINGS.set_topology_param(Topologies[SETTINGS.merge[-1]].value)
        topology = create_topology(
            seed_devices=list(set(SETTINGS.seed_devices + user_data.get('SEED_DEVICES', []))),
            path_in_inventory=SETTINGS.path_in_inventory,
            inv_columns=SETTINGS.inventory_columns,
            data_source=SETTINGS.data_source,
            debug=SETTINGS.debug,
        )
        if topology:
            final_topology = merge_topologies(topo_pri=topology, topo_sec=final_topology)
        SETTINGS.merge.remove(SETTINGS.merge[-1])

thl-cmk's avatar
thl-cmk committed
    STATIC_CONNECTIONS = create_static_connections(connections=user_data.get('STATIC_CONNECTIONS', []))
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    final_topology = merge_topologies(topo_pri=STATIC_CONNECTIONS, topo_sec=final_topology)
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    save_topology(
        data=final_topology,
        base_directory=f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}',
        output_directory=SETTINGS.output_directory,
        topology_file_name=SETTINGS.topology_file_name,
        dont_compare=SETTINGS.dont_compare,
        make_default=SETTINGS.default,
    )
thl-cmk's avatar
thl-cmk committed

    if SETTINGS.keep:
        remove_old_data(
            keep=SETTINGS.keep,
thl-cmk's avatar
thl-cmk committed
            min_age=SETTINGS.min_age,
            path=Path(f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}'),
thl-cmk's avatar
thl-cmk committed
            protected=user_data.get('PROTECTED_TOPOLOGIES', []),
thl-cmk's avatar
thl-cmk committed
        )

thl-cmk's avatar
thl-cmk committed
    print(f'Time taken: {(time_ns() - start_time) / 1e9}/s')
thl-cmk's avatar
thl-cmk committed
    print(f'End time: {strftime(SETTINGS.time_format)}')
thl-cmk's avatar
thl-cmk committed
    print()