Collection of CheckMK checks (see https://checkmk.com/). All checks and plugins are provided as is. Absolutely no warranty. Send any comments to thl-cmk[at]outlook[dot]com

Skip to content
Snippets Groups Projects
create_topology_classes.py 6.3 KiB
Newer Older
thl-cmk's avatar
thl-cmk committed
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2

# Author: thl-cmk[at]outlook[dot]com
# URL   : https://thl-cmk.hopto.org
# Date  : 2023-10-12
# File  : create_topology_classes.py


from os import environ
thl-cmk's avatar
thl-cmk committed
from pathlib import Path
thl-cmk's avatar
thl-cmk committed
from time import strftime
thl-cmk's avatar
thl-cmk committed
from typing import Dict, List, Any, NamedTuple
thl-cmk's avatar
thl-cmk committed
from enum import Enum, unique
from create_topology_utils import (
thl-cmk's avatar
thl-cmk committed
    CREATE_TOPOLOGY_VERSION,
    PATH_CDP,
    PATH_LLDP,
    PATH_INTERFACES,
    USER_DATA_FILE,
    LQ_INTERFACES,
    SCRIPT,
    get_inventory_data,
    get_table_from_inventory,
    get_interface_items_from_lq,
    ExitCodes,
    get_data_from_toml,
thl-cmk's avatar
thl-cmk committed
)


thl-cmk's avatar
thl-cmk committed
@unique
class CacheSources(Enum):
    inventory = 'inventory'
    lq = 'lq'
thl-cmk's avatar
thl-cmk committed


class InventoryColumns(NamedTuple):
    neighbour: str
    local_port: str
    neighbour_port: str


thl-cmk's avatar
thl-cmk committed
class StaticConnection(NamedTuple):
    host: str
    local_port: str
    neighbour_port: str
    neighbour: str
    label: str


thl-cmk's avatar
thl-cmk committed
class Settings:
    def __init__(
            self,
            cli_args: Dict[str, Any],
    ):
thl-cmk's avatar
thl-cmk committed
        self.__omd_root = environ['OMD_ROOT']
thl-cmk's avatar
thl-cmk committed
        self.__topology_save_path = 'var/topology_data'
        self.__topology_file_name = 'network_data.json'
        self.__path_to_if_table = 'networking,interfaces'
thl-cmk's avatar
thl-cmk committed
        # self.__inventory_path = 'var/check_mk/inventory'
        # self.__autochecks_path = 'var/check_mk/autochecks'
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
        self.__settings = {
thl-cmk's avatar
thl-cmk committed
            'layers': [],
thl-cmk's avatar
thl-cmk committed
            'seed_devices': None,
            'time_format': '%Y-%m-%dT%H:%M:%S.%m',
thl-cmk's avatar
thl-cmk committed
            'user_data_file': f'{self.__omd_root}/local/bin/topology_data/{USER_DATA_FILE}',
thl-cmk's avatar
thl-cmk committed
            'output_directory': None,
thl-cmk's avatar
thl-cmk committed
            'default': False,
thl-cmk's avatar
thl-cmk committed
            'keep_domain': False,
            'lowercase': False,
            'uppercase': False,
            'debug': False,
            'version': False,
thl-cmk's avatar
thl-cmk committed
            'dont_compare': False,
thl-cmk's avatar
thl-cmk committed
            'check_user_data_only': False,
            'keep': None,
thl-cmk's avatar
thl-cmk committed
            'min_age': None,
thl-cmk's avatar
thl-cmk committed
        }
thl-cmk's avatar
thl-cmk committed
        # args in the form {'s, __seed_devices': 'CORE01', 'p, __path_in_inventory': None, ... }}
thl-cmk's avatar
thl-cmk committed
        # we will remove 's, __'
        self.__args = ({k.split(',')[-1].strip(' ').strip('_'): v for k, v in cli_args.items() if v})
        self.__settings.update(self.__args)
thl-cmk's avatar
thl-cmk committed

        if self.version:
            print(f'{Path(SCRIPT).name} version: {CREATE_TOPOLOGY_VERSION}')
            exit(code=ExitCodes.OK.value)

        if self.check_user_data_only:
thl-cmk's avatar
thl-cmk committed
            _user_data = get_data_from_toml(file=self.user_data_file)
thl-cmk's avatar
thl-cmk committed
            print(f'Could read/parse the user data from {self.user_data_file}')
            exit(code=ExitCodes.OK.value)

thl-cmk's avatar
thl-cmk committed
        if self.layers:
            layers = list(set(self.layers))
            if len(layers) != len(self.layers):
                print(f'-l/--layers options must be unique. Don~\'t use any layer more than once.')
thl-cmk's avatar
thl-cmk committed
                exit(code=ExitCodes.BAD_OPTION_LIST.value)
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    @property
    def version(self) -> bool:
        return self.__settings['version']

thl-cmk's avatar
thl-cmk committed
    @property
thl-cmk's avatar
thl-cmk committed
    def layers(self) -> List[str]:
        return self.__settings['layers']
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    @property
thl-cmk's avatar
thl-cmk committed
    def keep(self) -> int | None:
        if self.__settings['keep']:
thl-cmk's avatar
thl-cmk committed
            return max(self.__settings['keep'], 0)
thl-cmk's avatar
thl-cmk committed

    @property
    def min_age(self) -> int:
        if self.__settings['min_age']:
thl-cmk's avatar
thl-cmk committed
            return max(self.__settings['min_age'], 0)
thl-cmk's avatar
thl-cmk committed
        else:
thl-cmk's avatar
thl-cmk committed
            return 0
thl-cmk's avatar
thl-cmk committed

    @property
    def check_user_data_only(self) -> bool:
        return self.__settings['check_user_data_only']
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    @property
    def debug(self) -> bool:
        return self.__settings['debug']

thl-cmk's avatar
thl-cmk committed
    @property
    def dont_compare(self) -> bool:
        return self.__settings['dont_compare']

thl-cmk's avatar
thl-cmk committed
    @property
thl-cmk's avatar
thl-cmk committed
    def default(self) -> bool:
        return self.__settings['default']
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    @property
    def keep_domain(self) -> bool:
        return self.__settings['keep_domain']

    @property
    def uppercase(self) -> bool:
        return self.__settings['uppercase']

    @property
    def lowercase(self) -> bool:
        return self.__settings['lowercase']

thl-cmk's avatar
thl-cmk committed
    @property
    def time_format(self) -> str:
        return self.__settings['time_format']

    @property
    def omd_root(self) -> str:
thl-cmk's avatar
thl-cmk committed
        return self.__omd_root
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    @property
    def user_data_file(self) -> str:
        return self.__settings['user_data_file']
thl-cmk's avatar
thl-cmk committed

    @property
    def topology_save_path(self) -> str:
thl-cmk's avatar
thl-cmk committed
        return self.__topology_save_path
thl-cmk's avatar
thl-cmk committed

    @property
    def topology_file_name(self) -> str:
thl-cmk's avatar
thl-cmk committed
        return self.__topology_file_name
thl-cmk's avatar
thl-cmk committed

    @property
    def seed_devices(self) -> List[str]:
        if self.__settings['seed_devices']:
thl-cmk's avatar
thl-cmk committed
            return self.__settings['seed_devices']
thl-cmk's avatar
thl-cmk committed
        else:
            return []

    @property
    def output_directory(self) -> str:
        if not self.__settings['output_directory']:
            return f'{strftime(self.__settings["time_format"])}'
        else:
            return self.__settings['output_directory']
thl-cmk's avatar
thl-cmk committed


class HostCache:
    def __init__(self):
        self.__cache = {}
        self.__inventory_pre_fetch_list: List[str] = [
thl-cmk's avatar
thl-cmk committed
            # PATH_CDP,
            # PATH_LLDP,
thl-cmk's avatar
thl-cmk committed
            PATH_INTERFACES,
        ]

    def __fill_cache(self, host: str):
        # pre fill inventory data
        inventory = get_inventory_data(host=host)
        if inventory:
            self.__cache[host][CacheSources.inventory.value] = {}
            self.__cache[host][CacheSources.inventory.value].update({
                entry: get_table_from_inventory(
                    inventory=inventory,
                    path=entry
                ) for entry in self.__inventory_pre_fetch_list
            })
        else:
            self.__cache[host][CacheSources.inventory.value] = None
        # prefill live status data
        self.__cache[host][CacheSources.lq.value] = {}
        self.__cache[host][CacheSources.lq.value][LQ_INTERFACES] = get_interface_items_from_lq(host)

    def get_data(self, host: str, source: CacheSources, path: str):
        if host not in self.__cache.keys():
            self.__cache[host]: Dict[str, Any] = {}
            self.__fill_cache(host=host)
        try:
            return self.__cache[host][source.value][path]
thl-cmk's avatar
thl-cmk committed
        except (KeyError, TypeError) as _e:
thl-cmk's avatar
thl-cmk committed
            return None

    def add_inventory_prefetch_path(self, path: str):
        self.__inventory_pre_fetch_list = list(set(self.__inventory_pre_fetch_list + [path]))