Collection of CheckMK checks (see https://checkmk.com/). All checks and plugins are provided as is. Absolutely no warranty. Send any comments to thl-cmk[at]outlook[dot]com

Skip to content
Snippets Groups Projects
create_topology_utils.py 15.1 KiB
Newer Older
thl-cmk's avatar
thl-cmk committed
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
# Author: thl-cmk[at]outlook[dot]com
# URL   : https://thl-cmk.hopto.org
# Date  : 2023-10-12
# File  : create_topology_utils.py

thl-cmk's avatar
thl-cmk committed
#
# options used
thl-cmk's avatar
thl-cmk committed
# -d --default
thl-cmk's avatar
thl-cmk committed
# -o --output-directory
# -s --seed-devices
thl-cmk's avatar
thl-cmk committed
# -u --user-data-file
thl-cmk's avatar
thl-cmk committed
# -v --version
# --check-user-data-only
# --data-source
# --debug
# --dont-compare
# --inventory-columns
thl-cmk's avatar
thl-cmk committed
# --keep-domain
thl-cmk's avatar
thl-cmk committed
# --keep
# --lldp
thl-cmk's avatar
thl-cmk committed
# --lowercase
thl-cmk's avatar
thl-cmk committed
# --min-age
# --path-in-inventory
# --time-format
thl-cmk's avatar
thl-cmk committed
# --uppercase
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
from os import environ
thl-cmk's avatar
thl-cmk committed
from json import dumps
import socket
thl-cmk's avatar
thl-cmk committed
from ast import literal_eval
from time import time as now_time
from tomllib import loads as toml_loads
from tomllib import TOMLDecodeError
from re import match as re_match
thl-cmk's avatar
thl-cmk committed
from pathlib import Path
from typing import List, Dict
thl-cmk's avatar
thl-cmk committed
from argparse import (
    Namespace as arg_Namespace,
    ArgumentParser,
    RawTextHelpFormatter,
)
thl-cmk's avatar
thl-cmk committed
from enum import Enum, unique


@unique
class ExitCodes(Enum):
    OK = 0
    BAD_OPTION_LIST = 1
    BAD_TOML_FORMAT = 3

thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
script = '~/local/bin/network-topology/create_topology_data.py'
sample_seeds = 'Core01 Core02'
cdp_path = 'networking,cdp_cache'
cdp_columns = 'device_id,local_port,device_port'
cdp_label = 'inv_CDP'
lldp_path = 'networking,lldp_cache'
lldp_columns = 'system_name,local_port_num,port_id'
lldp_label = 'inv_LLDP'
thl-cmk's avatar
thl-cmk committed
user_data_file = 'create_topology_data.toml'

thl-cmk's avatar
thl-cmk committed

def get_data_form_live_status(query: str):
    address = f'{environ.get("OMD_ROOT")}/tmp/run/live'
    family = socket.AF_INET if type(address) is tuple else socket.AF_UNIX
    sock = socket.socket(family, socket.SOCK_STREAM)
    sock.connect(address)
    sock.sendall(query.encode())
    sock.shutdown(socket.SHUT_WR)
    chunks = []
    while len(chunks) == 0 or chunks[-1] != "":
        chunks.append(sock.recv(4096).decode())
    sock.close()
    if len(chunks):
        reply = "".join(chunks).strip()
        reply = literal_eval(reply)
        if reply != [[b'']]:
            return reply
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed

def get_data_from_toml(file: str, debug: bool = False) -> Dict:
    data = {}
    toml_file = Path(file)
    if toml_file.exists():
        try:
            data = toml_loads(toml_file.read_text())
        except TOMLDecodeError as e:
            print(
                f'ERROR: data file {toml_file} is not in valid TOML format! ({e}), (see https://toml.io/en/)')
thl-cmk's avatar
thl-cmk committed
            exit(code=ExitCodes.BAD_TOML_FORMAT)
thl-cmk's avatar
thl-cmk committed
    else:
        print(f'WARNING: User data {file} not found.')
thl-cmk's avatar
thl-cmk committed
    if debug:
        print(f'TOML file read: {file}')
        print(f'Data from toml file: {data}')
    return data
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
def rm_tree(root: Path):
    # safety
    if not str(root).startswith(f'{environ["OMD_ROOT"]}/var/topology_data'):
        print(f"WARNING: bad path to remove, {str(root)}, don\'t delete it.")
        return
    for p in root.iterdir():
        if p.is_dir():
            rm_tree(p)
        else:
            p.unlink()
    root.rmdir()


thl-cmk's avatar
thl-cmk committed
def remove_old_data(keep: int, min_age: int, path: Path, protected: List[str], debug: bool = False):
thl-cmk's avatar
thl-cmk committed
    default_topo = path.joinpath('default')
    directories = [str(directory) for directory in list(path.iterdir())]
    # keep default top
    if str(default_topo) in directories:
        directories.remove(str(default_topo))
        keep -= 1
        if default_topo.is_symlink():
            try:
                directories.remove(str(default_topo.readlink()))
            except ValueError:
                pass

thl-cmk's avatar
thl-cmk committed
    # keep protected topologies
    for directory in protected:
        try:
            directories.remove(str(path.joinpath(directory)))
        except ValueError as e:
            if debug:
                print(e)
                print(directory)
                print(str(path.joinpath(directory)))
                print(directories)
        else:
            print(f'Protected topology: {directory}, will not be deleted.')

thl-cmk's avatar
thl-cmk committed
    if len(directories) < keep < 1:
        return

    topo_by_age = {}

    for directory in directories:
        if Path(directory).is_dir():
            topo_by_age[Path(directory).stat().st_ctime] = directory

    topo_age = list(topo_by_age.keys())
    topo_age.sort()

    while len(topo_by_age) > keep:
        if min_age * 86400 > now_time() - topo_age[0]:
            print(f'Topology "{Path(topo_by_age[topo_age[0]]).name}" not older then {min_age} day(s). not deleted.')
            return
        print(f'delete old topology: {topo_by_age[topo_age[0]]}')
        rm_tree(Path(topo_by_age[topo_age[0]]))
        topo_by_age.pop(topo_age[0])
        topo_age.pop(0)


def save_data_to_file(data: Dict, path: str, file: str, make_default: bool):
    """
    Save the data as json file.
    Args:
        data: the topology data
        path: the path were to save the dat
        file: the file name to save  data in
        make_default: if True, create the symlink "default" with path as target

    Returns:
        None
    """
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    path_file = f'{path}/{file}'
    save_file = Path(f'{path_file}')
    save_file.parent.mkdir(exist_ok=True, parents=True)
    save_file.write_text(dumps(data))
    if make_default:
thl-cmk's avatar
thl-cmk committed
        parent_path = Path(f'{path}').parent
thl-cmk's avatar
thl-cmk committed
        Path(f'{parent_path}/default').unlink(missing_ok=True)
thl-cmk's avatar
thl-cmk committed
        Path(f'{parent_path}/default').symlink_to(target=Path(path), target_is_directory=True)


def save_topology(
        data: dict,
        base_directory: str,
        output_directory: str,
        dont_compare: bool,
        make_default: bool,
        topology_file_name: str,
):
    path = f'{base_directory}/{output_directory}'

    def _save():
        save_data_to_file(
            data=data,
            path=path,
            file=topology_file_name,
            make_default=make_default,
        )

    if dont_compare:
        _save()
    else:
        if not is_equal_with_default(
                data=data,
                file=f'{base_directory}/default/{topology_file_name}'
        ):
            _save()
        else:
            print(
                'Topology matches default topology, not saved! Use "--dont-compare" to save identical topologies.'
            )
thl-cmk's avatar
thl-cmk committed


def is_mac_address(mac_address: str, debug: bool = False) -> bool:
    """
    Checks if mac_address is a valid MAC address. Will only accept MAC address in the form "AA:BB:CC:DD:EE:FF"
    (lower case is also ok).
    Args:
        mac_address: the MAC address to check
        debug: optional. If True the result of the function will be printed to stdout

    Returns:
        True if mac_address is a valid MAC address
        False if mac_address not a valid MAC address
    """
    re_mac_pattern = '([0-9A-Z]{2}\\:){5}[0-9A-Z]{2}'
    if re_match(re_mac_pattern, mac_address.upper()):
        if debug:
            print(f'mac: {mac_address}, match')
        return True
    else:
        if debug:
            print(f'mac: {mac_address}, no match')
        return False


thl-cmk's avatar
thl-cmk committed
def is_list_of_str_equal(list1: List[str], list2: List[str]) -> bool:
thl-cmk's avatar
thl-cmk committed
    """
    Compares two list of strings. Before compared the list will internal sorted.
    Args:
        list1:
        list2:
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    Returns:
        True if both lists match
        False if they don't match
    """
    tmp_list1 = list1.copy()
    tmp_list2 = list2.copy()
    tmp_list1.sort()
    tmp_list2.sort()
    return tmp_list1 == tmp_list2
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed

def merge_topologies(topo_pri: Dict, topo_sec: Dict) -> Dict:
    """
    Merge dict_prim into dict_sec
    Args:
        topo_pri: data of dict_pri will overwrite the data in dict_sec
        topo_sec: dict where the data of dict_pri will be merged to

    Returns:
        Dict: topo_sec that contains merged data from top_sec and top_pri
    """
    keys_pri = list(topo_pri.keys())

    # first transfer all completely missing items from dict_prim to dict_sec
    for key in keys_pri:
        if key not in topo_sec.keys():
            topo_sec[key] = topo_pri[key]
        else:
            topo_sec[key]['connections'].update(topo_pri[key].get('connections', {}))
            topo_sec[key]['interfaces'] = list(set((topo_sec[key]['interfaces'] + topo_pri[key].get('interfaces', []))))
        topo_pri.pop(key)
    return topo_sec
thl-cmk's avatar
thl-cmk committed


def compare_dicts(dict1: Dict, dict2: Dict) -> bool:
    # check top level keys
thl-cmk's avatar
thl-cmk committed
    if not is_list_of_str_equal(list(dict1.keys()), list(dict2.keys())):
        # print('top level dont match')
        # print(f'dict1: {list(dict1.keys())}')
        # print(f'dict1: {list(dict2.keys())}')
thl-cmk's avatar
thl-cmk committed
        return False

    for key, value in dict1.items():
        _type = type(value)
        if _type == dict:
            if not compare_dicts(value, dict2[key]):
                return False
        elif _type == list:
thl-cmk's avatar
thl-cmk committed
            if not is_list_of_str_equal(value, dict2[key]):
                # print(f'list1: {value}')
                # print(f'list2: {dict2[key]}')
thl-cmk's avatar
thl-cmk committed
                return False
        elif _type == str:
            if not value == dict2[key]:
thl-cmk's avatar
thl-cmk committed
                # print('value dont match')
                # print(f'value1: {value}')
                # print(f'value2 {dict2[key]}')
thl-cmk's avatar
thl-cmk committed
                return False
        else:
            return False

    return True


thl-cmk's avatar
thl-cmk committed
def is_equal_with_default(data: Dict, file: str) -> bool:
    default_file = Path(file)
    if default_file.exists():
        default_data = literal_eval(default_file.read_text())
        return compare_dicts(data, default_data)


thl-cmk's avatar
thl-cmk committed
def parse_arguments(create_topology_version: str) -> arg_Namespace:
thl-cmk's avatar
thl-cmk committed
    parser = ArgumentParser(
thl-cmk's avatar
thl-cmk committed
        prog='create_topology_data.py',
thl-cmk's avatar
thl-cmk committed
        description='This script creates the topology data file needed for the Checkmk "network_visualization"\n'
                    'plugin by Andreas Boesl and schnetz. For more information see\n'
                    'the announcement from schnetz: https://forum.checkmk.com/t/network-visualization/41680\n'
                    'and the plugin on the Exchange: https://exchange.checkmk.com/p/network-visualization .\n'
thl-cmk's avatar
thl-cmk committed
                    '\n'
thl-cmk's avatar
thl-cmk committed
                    'The required inventory data can be created with my inventory plugins:\n'
thl-cmk's avatar
thl-cmk committed
                    'CDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_cdp_cache\n'
                    'LLDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lldp_cache\n'
                    '\n'
thl-cmk's avatar
thl-cmk committed
                    f'\nVersion: {create_topology_version} | Written by: thl-cmk\n'
                    f'for more information see: https://thl-cmk.hopto.org',
thl-cmk's avatar
thl-cmk committed
        formatter_class=RawTextHelpFormatter,
        epilog='Usage:\n'
               'for CDP (the default):\n'
thl-cmk's avatar
thl-cmk committed
               f'{script} -s {sample_seeds} -d\n'
thl-cmk's avatar
thl-cmk committed
               'for LLDP:\n'
thl-cmk's avatar
thl-cmk committed
               f'{script} -s {sample_seeds} -d --lldp\n',
thl-cmk's avatar
thl-cmk committed
    )
thl-cmk's avatar
thl-cmk committed
    command_group = parser.add_mutually_exclusive_group()
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    parser.add_argument(
thl-cmk's avatar
thl-cmk committed
        '-d', '--default', default=False, action='store_const', const=True,
thl-cmk's avatar
thl-cmk committed
        help='Set the created topology data as default',
    )
thl-cmk's avatar
thl-cmk committed
    parser.add_argument(
        '-m', '--merge',
        nargs=2,
        choices=['CDP', 'LLDP'],
        help=f'Merge topologies. This runs the topology creation for CDP and LLDP.\n'
             f'The topologies are then merged in the specified order.\n'
             f'I.e. -m CDP LLDP merges the CDP topology into the LLDP topology, overwriting\n'
             f'the LLDP data in case there are conflicts.\n'
             f'NOTE: static connection data from the user file will always merged with the\n'
             f' highest priority',
    )
thl-cmk's avatar
thl-cmk committed
    parser.add_argument(
        '-o', '--output-directory', type=str,
thl-cmk's avatar
thl-cmk committed
        help='Directory name where to save the topology data.\n'
             'I.e.: my_topology. Default is the actual date/time\n'
thl-cmk's avatar
thl-cmk committed
             'in "--time-format" format.\n'
             'NOTE: the directory is a sub directory under "~/var/topology_data/"',
    )
    parser.add_argument(
        '-s', '--seed-devices', type=str, nargs='+',
thl-cmk's avatar
thl-cmk committed
        help=f'List of devices to start the topology discovery from.\n'
             f'I.e. {sample_seeds}',
thl-cmk's avatar
thl-cmk committed
    )
thl-cmk's avatar
thl-cmk committed
    parser.add_argument(
        '-u', '--user-data-file', type=str,
        help='Set the name uf the user provided data file\n'
             'Default is ~local/bin/topology_data/create_topology_data.toml\n',
    )
thl-cmk's avatar
thl-cmk committed
    parser.add_argument(
thl-cmk's avatar
thl-cmk committed
        '-v', '--version', default=False, action='store_const', const=True,
        help='Print version of this script and exit',
    )
thl-cmk's avatar
thl-cmk committed
    parser.add_argument(
        '--check-user-data-only', default=False, action='store_const', const=True,
        help=f'Only tries to read/parse the user data from {user_data_file} and exits.',
    )
    parser.add_argument(
        '--data-source', type=str,
        help=f'The source from which the topology data originates.\n'
             f'I.e. {cdp_label} for CDP data from the inventory.\n'
             'NOTE: right now this only an unused label.',
    )
thl-cmk's avatar
thl-cmk committed
    parser.add_argument(
        '--debug', default=False, action='store_const', const=True,
        help='Print debug information',
    )
thl-cmk's avatar
thl-cmk committed
    parser.add_argument(
        '--dont-compare', default=False, action='store_const', const=True,
thl-cmk's avatar
thl-cmk committed
        help='Do not compare the actual topology data with the default topology\n'
             'data. By default, the actual topology is compared with the default\n'
             'topology. If the data matches, the actual topology is not saved.\n'
             'So, if you run this tool in a cron job, a new topology will be\n'
             'created only if there was a change, unless you use "--dont-compare".'
    )
    parser.add_argument(
        '--inventory-columns', type=str,
        help=f'Columns used from the inventory data.\n'
             f'I.e. "{cdp_columns}"\n'
             'NOTE: the columns must be in the order: neighbour, local_port,\n'
             'neighbour_port',
thl-cmk's avatar
thl-cmk committed
    )
thl-cmk's avatar
thl-cmk committed
    parser.add_argument(
        '--keep-domain', default=False, action='store_const', const=True,
        help='Do not remove the domain name from the neighbor name',
    )
thl-cmk's avatar
thl-cmk committed
    parser.add_argument(
thl-cmk's avatar
thl-cmk committed
        '--keep', type=int,
        help=f'Number of topologies to keep. The oldest topologies above keep\n'
             f'max will be deleted. The minimum value for --keep is 1.\n'
             f'NOTE: The default topologies will be always kept.\n'
thl-cmk's avatar
thl-cmk committed
    )
thl-cmk's avatar
thl-cmk committed
    parser.add_argument(
        '--lldp', default=False, action='store_const', const=True,
thl-cmk's avatar
thl-cmk committed
        help=f'Sets data source to {lldp_label}, inventory path \n'
             f'to "{lldp_path}" and columns\n'
             f'to "{lldp_columns}"',
thl-cmk's avatar
thl-cmk committed
    )
    command_group.add_argument(
thl-cmk's avatar
thl-cmk committed
        '--lowercase', default=False, action='store_const', const=True,
        help='Change neighbour names to all lower case',
thl-cmk's avatar
thl-cmk committed
    )
thl-cmk's avatar
thl-cmk committed
    parser.add_argument(
        '--min-age', type=int,
thl-cmk's avatar
thl-cmk committed
        help=f'The minimum number of days before a topology is deleted\n'
             f'by "--keep".\n'
             f'NOTE: Topologies that are not older than 1 days are always kept.',
    )
    parser.add_argument(
        '--path-in-inventory', type=str,
        help=f'Checkmk inventory path to the topology data.\n'
             f'I.e. "{cdp_path}"',
thl-cmk's avatar
thl-cmk committed
    )
thl-cmk's avatar
thl-cmk committed
    parser.add_argument(
thl-cmk's avatar
thl-cmk committed
        '--time-format', type=str,
        help='Format string to render the time. (default: %%Y-%%m-%%dT%%H:%%M:%%S.%%m)',
    )
    command_group.add_argument(
        '--uppercase', default=False, action='store_const', const=True,
        help='Change neighbour names to all upper case',
thl-cmk's avatar
thl-cmk committed
    )
thl-cmk's avatar
thl-cmk committed
    return parser.parse_args()