Collection of CheckMK checks (see https://checkmk.com/). All checks and plugins are provided as is. Absolutely no warranty. Send any comments to thl-cmk[at]outlook[dot]com

Skip to content
Snippets Groups Projects
create_topology_classes.py 17.5 KiB
Newer Older
thl-cmk's avatar
thl-cmk committed
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2

# Author: thl-cmk[at]outlook[dot]com
# URL   : https://thl-cmk.hopto.org
# Date  : 2023-10-12
# File  : create_topology_classes.py


from os import environ
thl-cmk's avatar
thl-cmk committed
from pathlib import Path
thl-cmk's avatar
thl-cmk committed
from time import strftime, time_ns
thl-cmk's avatar
thl-cmk committed
from typing import Dict, List, Any, NamedTuple
thl-cmk's avatar
thl-cmk committed
from enum import Enum, unique
thl-cmk's avatar
thl-cmk committed
from abc import abstractmethod
from ast import literal_eval
thl-cmk's avatar
thl-cmk committed
from requests import session
thl-cmk's avatar
thl-cmk committed
import livestatus

thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
from create_topology_utils import (
thl-cmk's avatar
thl-cmk committed
    CREATE_TOPOLOGY_VERSION,
    PATH_INTERFACES,
    USER_DATA_FILE,
thl-cmk's avatar
thl-cmk committed
    CACHE_INTERFACES_ITEM,
thl-cmk's avatar
thl-cmk committed
    SCRIPT,
thl-cmk's avatar
thl-cmk committed
    get_data_form_live_status,
thl-cmk's avatar
thl-cmk committed
    get_table_from_inventory,
    ExitCodes,
    get_data_from_toml,
thl-cmk's avatar
thl-cmk committed
    OMD_ROOT,
thl-cmk's avatar
thl-cmk committed
)


thl-cmk's avatar
thl-cmk committed
@unique
thl-cmk's avatar
thl-cmk committed
class CacheItems(Enum):
thl-cmk's avatar
thl-cmk committed
    inventory = 'inventory'
thl-cmk's avatar
thl-cmk committed
    interfaces = 'interfaces'
thl-cmk's avatar
thl-cmk committed


class InventoryColumns(NamedTuple):
    neighbour: str
    local_port: str
    neighbour_port: str


thl-cmk's avatar
thl-cmk committed
class StaticConnection(NamedTuple):
    host: str
    local_port: str
    neighbour_port: str
    neighbour: str
    label: str


thl-cmk's avatar
thl-cmk committed
class Settings:
    def __init__(
            self,
            cli_args: Dict[str, Any],
    ):
thl-cmk's avatar
thl-cmk committed
        self.__omd_root = environ['OMD_ROOT']
thl-cmk's avatar
thl-cmk committed
        self.__topology_save_path = 'var/topology_data'
        self.__topology_file_name = 'network_data.json'
        self.__path_to_if_table = 'networking,interfaces'
thl-cmk's avatar
thl-cmk committed
        self.__settings = {
thl-cmk's avatar
thl-cmk committed
            'layers': [],
thl-cmk's avatar
thl-cmk committed
            'seed_devices': None,
            'time_format': '%Y-%m-%dT%H:%M:%S.%m',
thl-cmk's avatar
thl-cmk committed
            'user_data_file': f'{self.__omd_root}/local/bin/topology_data/{USER_DATA_FILE}',
thl-cmk's avatar
thl-cmk committed
            'output_directory': None,
thl-cmk's avatar
thl-cmk committed
            'default': False,
thl-cmk's avatar
thl-cmk committed
            'keep_domain': False,
            'lowercase': False,
            'uppercase': False,
            'debug': False,
            'version': False,
thl-cmk's avatar
thl-cmk committed
            'dont_compare': False,
thl-cmk's avatar
thl-cmk committed
            'check_user_data_only': False,
            'keep': None,
thl-cmk's avatar
thl-cmk committed
            'min_age': None,
thl-cmk's avatar
thl-cmk committed
            'backend': 'LIVESTATUS',
thl-cmk's avatar
thl-cmk committed
        }
thl-cmk's avatar
thl-cmk committed
        # args in the form {'s, __seed_devices': 'CORE01', 'p, __path_in_inventory': None, ... }}
thl-cmk's avatar
thl-cmk committed
        # we will remove 's, __'
        self.__args = ({k.split(',')[-1].strip(' ').strip('_'): v for k, v in cli_args.items() if v})
        self.__settings.update(self.__args)
thl-cmk's avatar
thl-cmk committed

        if self.version:
            print(f'{Path(SCRIPT).name} version: {CREATE_TOPOLOGY_VERSION}')
            exit(code=ExitCodes.OK.value)

        if self.check_user_data_only:
thl-cmk's avatar
thl-cmk committed
            _user_data = get_data_from_toml(file=self.user_data_file)
thl-cmk's avatar
thl-cmk committed
            print(f'Could read/parse the user data from {self.user_data_file}')
            exit(code=ExitCodes.OK.value)

thl-cmk's avatar
thl-cmk committed
        if self.layers:
            layers = list(set(self.layers))
            if len(layers) != len(self.layers):
                print(f'-l/--layers options must be unique. Don~\'t use any layer more than once.')
thl-cmk's avatar
thl-cmk committed
                exit(code=ExitCodes.BAD_OPTION_LIST.value)
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    @property
    def backend(self) -> str:
        return self.__settings['backend']

thl-cmk's avatar
thl-cmk committed
    @property
    def version(self) -> bool:
        return self.__settings['version']

thl-cmk's avatar
thl-cmk committed
    @property
thl-cmk's avatar
thl-cmk committed
    def layers(self) -> List[str]:
        return self.__settings['layers']
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    @property
thl-cmk's avatar
thl-cmk committed
    def keep(self) -> int | None:
        if self.__settings['keep']:
thl-cmk's avatar
thl-cmk committed
            return max(self.__settings['keep'], 0)
thl-cmk's avatar
thl-cmk committed

    @property
    def min_age(self) -> int:
        if self.__settings['min_age']:
thl-cmk's avatar
thl-cmk committed
            return max(self.__settings['min_age'], 0)
thl-cmk's avatar
thl-cmk committed
        else:
thl-cmk's avatar
thl-cmk committed
            return 0
thl-cmk's avatar
thl-cmk committed

    @property
    def check_user_data_only(self) -> bool:
        return self.__settings['check_user_data_only']
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    @property
    def debug(self) -> bool:
        return self.__settings['debug']

thl-cmk's avatar
thl-cmk committed
    @property
    def dont_compare(self) -> bool:
        return self.__settings['dont_compare']

thl-cmk's avatar
thl-cmk committed
    @property
thl-cmk's avatar
thl-cmk committed
    def default(self) -> bool:
        return self.__settings['default']
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    @property
    def keep_domain(self) -> bool:
        return self.__settings['keep_domain']

    @property
    def uppercase(self) -> bool:
        return self.__settings['uppercase']

    @property
    def lowercase(self) -> bool:
        return self.__settings['lowercase']

thl-cmk's avatar
thl-cmk committed
    @property
    def time_format(self) -> str:
        return self.__settings['time_format']

    @property
    def omd_root(self) -> str:
thl-cmk's avatar
thl-cmk committed
        return self.__omd_root
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    @property
    def user_data_file(self) -> str:
        return self.__settings['user_data_file']
thl-cmk's avatar
thl-cmk committed

    @property
    def topology_save_path(self) -> str:
thl-cmk's avatar
thl-cmk committed
        return self.__topology_save_path
thl-cmk's avatar
thl-cmk committed

    @property
    def topology_file_name(self) -> str:
thl-cmk's avatar
thl-cmk committed
        return self.__topology_file_name
thl-cmk's avatar
thl-cmk committed

    @property
    def seed_devices(self) -> List[str]:
        if self.__settings['seed_devices']:
thl-cmk's avatar
thl-cmk committed
            return self.__settings['seed_devices']
thl-cmk's avatar
thl-cmk committed
        else:
            return []

    @property
    def output_directory(self) -> str:
        if not self.__settings['output_directory']:
            return f'{strftime(self.__settings["time_format"])}'
        else:
            return self.__settings['output_directory']
thl-cmk's avatar
thl-cmk committed


class HostCache:
thl-cmk's avatar
thl-cmk committed
    def __init__(self, debug: bool = False):
thl-cmk's avatar
thl-cmk committed
        self.__cache = {}
        self.__inventory_pre_fetch_list: List[str] = [
            PATH_INTERFACES,
        ]
thl-cmk's avatar
thl-cmk committed
        self._count = 0
        self._debug = debug
        if debug:
            print('init HOST_CACHE')
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    @abstractmethod
    def get_inventory_data(self, host: str, debug: bool = False) -> Dict[str, str] | None:
        """
        Args:
            host: the host name to return the inventory data for
            debug: enable debug output

        Returns:
            the inventory data as dictionary
        """
thl-cmk's avatar
thl-cmk committed
        raise NotImplementedError()
thl-cmk's avatar
thl-cmk committed

    @abstractmethod
    def get_interface_items(self, host: str, debug: bool = False) -> List:
        """

        Args:
            host:  the host name to return the interface items
            debug: enable debug output

        Returns:
            list of the interface items
        """
thl-cmk's avatar
thl-cmk committed
        raise NotImplementedError()
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
    def __fill_cache(self, host: str):
        # pre fill inventory data
thl-cmk's avatar
thl-cmk committed
        if self._debug:
            self._count += 1
            _pre_query = time_ns()

thl-cmk's avatar
thl-cmk committed
        inventory = self.get_inventory_data(host=host)
thl-cmk's avatar
thl-cmk committed

        if self._debug:
            print(f'{(time_ns() - _pre_query) / 1e9}|{self._count:0>4}|inventory|{host}')

thl-cmk's avatar
thl-cmk committed
        if inventory:
thl-cmk's avatar
thl-cmk committed
            self.__cache[host][CacheItems.inventory.value] = {}
            self.__cache[host][CacheItems.inventory.value].update({
thl-cmk's avatar
thl-cmk committed
                entry: get_table_from_inventory(
                    inventory=inventory,
                    path=entry
                ) for entry in self.__inventory_pre_fetch_list
            })
        else:
thl-cmk's avatar
thl-cmk committed
            self.__cache[host][CacheItems.inventory.value] = None
        self.__cache[host][CacheItems.interfaces.value] = {}
thl-cmk's avatar
thl-cmk committed

        if self._debug:
            self._count += 1
            _pre_query = time_ns()

thl-cmk's avatar
thl-cmk committed
        self.__cache[host][CacheItems.interfaces.value][CACHE_INTERFACES_ITEM] = self.get_interface_items(host)
thl-cmk's avatar
thl-cmk committed

thl-cmk's avatar
thl-cmk committed
        if self._debug:
            print(f'{(time_ns() - _pre_query) / 1e9}|{self._count:0>4}|items|{host}')

thl-cmk's avatar
thl-cmk committed
    def get_data(self, host: str, item: CacheItems, path: str):
thl-cmk's avatar
thl-cmk committed
        if host not in self.__cache.keys():
            self.__cache[host]: Dict[str, Any] = {}
            self.__fill_cache(host=host)
        try:
thl-cmk's avatar
thl-cmk committed
            return self.__cache[host][item.value][path]
thl-cmk's avatar
thl-cmk committed
        except (KeyError, TypeError) as _e:
thl-cmk's avatar
thl-cmk committed
            return None

    def add_inventory_prefetch_path(self, path: str):
        self.__inventory_pre_fetch_list = list(set(self.__inventory_pre_fetch_list + [path]))
thl-cmk's avatar
thl-cmk committed


class HostCacheLiveStatus(HostCache):
    def get_inventory_data(self, host: str, debug: bool = False) -> Dict[str, str] | None:
        query = f'GET hosts\nColumns: mk_inventory\nOutputFormat: python3\nFilter: host_name = {host}\n'
        data = get_data_form_live_status(query=query)
        if data:
            try:
                data = literal_eval(data[0][0].decode('utf-8'))
            except SyntaxError as e:
                if debug:
                    print(f'data: |{data}|')
                    print(f'type: {type(data)}')
                    print(f'exception: {e}')
                return
            return data
thl-cmk's avatar
thl-cmk committed
        if debug:
            print(f'Device: {host}: no inventory data found!')
thl-cmk's avatar
thl-cmk committed

    def get_interface_items(self, host: str, debug: bool = False) -> List:
        """
        Sample data from lq query, we keep only the item (description without "Interface" ).
        [
         ['C9540-7-1', 'Interface Vlan-999', 'check_mk-if64'],
         ['C9540-7-1', 'Interface Vlan-998', 'check_mk-if64'],
         ['C9540-7-1', 'Interface Vlan-997', 'check_mk-if64'],
         ['C9540-7-1', 'Interface Vlan-996', 'check_mk-if64'],
         ['C9540-7-1', 'Interface Vlan-8', 'check_mk-if64'],
         ['C9540-7-1', 'Interface Te2/0/2', 'check_mk-if64'],
         ['C9540-7-1', 'Interface Te2/0/30', 'check_mk-if64']
         ]
        Args:
            host:
            debug:

        Returns:

        """
        query = (
            'GET services\n'
            'Columns: host_name description check_command\n'
            'Filter: description ~ Interface\n'
            f'Filter: host_name = {host}\n'
            'OutputFormat: python3\n'
        )
        data = get_data_form_live_status(query=query)
        items = []
        for host, description, check_command in data:
            items.append(description[10:])  # remove 'Interface ' from description

        if debug:
            print(f'Interfaces items found: {len(items)} an host {host}')
        return items


class HostCacheFileSystem(HostCache):
    def get_inventory_data(self, host: str, debug: bool = False) -> Dict[str, str] | None:
        __inventory_path = 'var/check_mk/inventory'
        inventory_file = Path(f'{OMD_ROOT}/{__inventory_path}/{host}')
        if inventory_file.exists():
            data = literal_eval(inventory_file.read_text())
            return data
        else:
thl-cmk's avatar
thl-cmk committed
            if debug:
                print(f'Device: {host}: not found in inventory data path!')
thl-cmk's avatar
thl-cmk committed
            return None

    def get_interface_items(self, host: str, debug: bool = False) -> List:
        """
thl-cmk's avatar
thl-cmk committed
        Sample autochecks data, we keep only the item
thl-cmk's avatar
thl-cmk committed
        [
thl-cmk's avatar
thl-cmk committed
         {'check_plugin_name': 'if64', 'item': 'Fa0', 'parameters': {'discovered_oper_status': ['2'], ...}},\n
         {'check_plugin_name': 'if64', 'item': 'Fa1/0/1', 'parameters': {'discovered_oper_status': ['1'], ...}},\n
         {'check_plugin_name': 'if64', 'item': 'Fa1/0/10', 'parameters': {'discovered_oper_status': ['2'], ...}},\n
         {'check_plugin_name': 'if64', 'item': 'Fa1/0/11', 'parameters': {'discovered_oper_status': ['2'], ...}},\n
         {'check_plugin_name': 'if64', 'item': 'Fa1/0/12', 'parameters': {'discovered_oper_status': ['1'], ...}}\n
thl-cmk's avatar
thl-cmk committed
        ]

        Args:
thl-cmk's avatar
thl-cmk committed
            host: name of the host object in cmk to fetch the data for
            debug: output debug information
thl-cmk's avatar
thl-cmk committed

        Returns:
thl-cmk's avatar
thl-cmk committed
            List of interface service items
thl-cmk's avatar
thl-cmk committed

        """
        __autochecks_path = 'var/check_mk/autochecks'
        autochecks_file = Path(f'{OMD_ROOT}/{__autochecks_path}/{host}.mk')
        __data = []
        if autochecks_file.exists():
            data: List[Dict[str, str]] = literal_eval(autochecks_file.read_text())
            for service in data:
                if service['check_plugin_name'] in ['if64']:
                    __data.append(service['item'])
        else:
            if debug:
                print(f'Device: {host}: not found in auto checks path!')
            return []

        return __data
thl-cmk's avatar
thl-cmk committed


class HostCacheMultiSite(HostCache):
    def __init__(self, debug: bool = False):
        super().__init__(debug=debug)
        self.__sites = {}
        self.get_sites(debug=debug)
thl-cmk's avatar
thl-cmk committed
        if debug:
            print('Create livestatus connection(s)')
thl-cmk's avatar
thl-cmk committed
        self.__c = livestatus.MultiSiteConnection(self.__sites)
        # self.__c.set_prepend_site(False)  # is default
        # self.__c.parallelize = True   # is default
        self.__dead_sites = [site['site']['alias'] for site in self.__c.dead_sites().values()]
        if self.__dead_sites:
thl-cmk's avatar
thl-cmk committed
            self.__dead_sites = ', '.join(self.__dead_sites)
thl-cmk's avatar
thl-cmk committed
            print(f'WARNING: use of dead site(s) {self.__dead_sites} is disabled')
            self.__c.set_only_sites(self.__c.alive_sites())

    def get_sites(self, debug: bool = False):
        sites_mk = Path(f'{OMD_ROOT}/etc/check_mk/multisite.d/sites.mk')
        socket_path = f'unix:{OMD_ROOT}/tmp/run/live'
        if sites_mk.exists():
            # make eval() "secure"
            # https://realpython.com/python-eval-function/#minimizing-the-security-issues-of-eval
            _code = compile(sites_mk.read_text(), "<string>", "eval")
            allowed_names = ['sites', 'update']
            for name in _code.co_names:
                if name not in allowed_names:
                    raise NameError(f'Use of {name} in {sites_mk.name} not allowed.')

            sites_raw = {}
            eval(sites_mk.read_text(), {"__builtins__": {}}, {"sites": sites_raw})

            for site, data in sites_raw.items():
                self.__sites[site] = {
                    'alias': data['alias'],
                    'timeout': data['timeout'],
                    'nagios_url': '/nagios/',
                }
                if data['socket'] == ('local', None):
                    self.__sites[site]['socket'] = socket_path
                else:
                    protocol, socket = data['socket']
                    address, port = socket['address']
                    self.__sites[site]['socket'] = f'{protocol}:{address}:{port}'
                    self.__sites[site]['tls'] = socket['tls']
                if debug:
                    print(f'Multisite: Site {site} found, '
                          f'Socket: {self.__sites[site]["socket"]}, '
                          f'TLS: {self.__sites[site].get("tls", "N/A")}.')

    def get_inventory_data(self, host: str, debug: bool = False) -> Dict[str, str] | None:
        query = f'GET hosts\nColumns: mk_inventory\nOutputFormat: python3\nFilter: host_name = {host}\n'
        data = self.__c.query(query=query)
        if data:
            try:
                data = literal_eval(data[0][0].decode('utf-8'))
            except SyntaxError as e:
                if debug:
                    print(f'data: |{data}|')
                    print(f'type: {type(data)}')
                    print(f'exception: {e}')
                return
            return data

    def get_interface_items(self, host: str, debug: bool = False) -> List:
        query = (
            'GET services\n'
            'Columns: host_name description check_command\n'
            'Filter: description ~ Interface\n'
            f'Filter: host_name = {host}\n'
            'OutputFormat: python3\n'
        )
        data = self.__c.query(query=query)
        items = []
        for host, description, check_command in data:
            items.append(description[10:])  # remove 'Interface ' from description
        if debug:
thl-cmk's avatar
thl-cmk committed
            if items:
                print(f'Interfaces items found: {len(items)} an host {host}')
            else:
                print(f'No Interfaces items found for host {host}')
thl-cmk's avatar
thl-cmk committed
        return items
thl-cmk's avatar
thl-cmk committed


class HostCacheRestApi(HostCache):
    def __init__(self, debug: bool = False):
        super().__init__(debug=debug)
        try:
            self.__secret = Path(f'{OMD_ROOT}/var/check_mk/web/automation/automation.secret').read_text().strip('\n)')
        except FileNotFoundError as e:
            print(f'automation.secret not found, {e}')
            exit()
        self.__hostname = 'localhost'
        self.__site = OMD_ROOT.split('/')[-1]
        self.__api_url = f"http://{self.__hostname}/{self.__site}/check_mk/api/1.0"
        self.__user = 'automation'
        if debug:
            print('Create REST API session')
        self.__session = session()
        self.__session.headers['Authorization'] = f"Bearer {self.__user} {self.__secret}"
        self.__session.headers['Accept'] = 'application/json'

    def get_inventory_data(self, host: str, debug: bool = False) -> Dict[str, str] | None:
        # self._count += 1

        __query = '{"op": "=", "left": "name", "right": "' + host + '"}'
        resp = self.__session.get(
            f"{self.__api_url}/domain-types/host/collections/all",
            params={
                "query": __query,
                "columns": ['mk_inventory'],
            },
        )
        if resp.status_code == 200:
            # print(f'{resp.elapsed}|{self._count:0>4}|inventory|{host}')
            try:
                return resp.json()['value'][0]['extensions']['mk_inventory']
            except IndexError:
                return None
        else:
            if debug:
                print(f'Device: {host}: no inventory data found!')
            return None

    def get_interface_items(self, host: str, debug: bool = False) -> List:
        # self._count += 1

        __query = '{"op": "~", "left": "description", "right": "Interface "}'
        resp = self.__session.get(
            f"{self.__api_url}/objects/host/NX01/collections/services",
            params={
                "query": __query,
                "columns": ['host_name', 'description', 'check_command'],
            },
        )

        if resp.status_code == 200:
            # print(f'{resp.elapsed}|{self._count:0>4}|items|{host}')

            items = [service['extensions']['description'][10:] for service in resp.json()['value']]
            if debug:
                print(f'Interfaces items found: {len(items)} an host {host}')
            return items
        else:
            if debug:
                print(f'No Interfaces items found for host {host}')
            return []