Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-10-12
# File : create_topology_classes.py
from os import environ
from enum import Enum, unique
from create_topology_utils import (
CREATE_TOPOLOGY_VERSION,
PATH_CDP,
COLUMNS_CDP,
LABEL_CDP,
PATH_LLDP,
COLUMNS_LLDP,
LABEL_LLDP,
PATH_INTERFACES,
USER_DATA_FILE,
LQ_INTERFACES,
SCRIPT,
get_inventory_data,
get_table_from_inventory,
get_interface_items_from_lq,
ExitCodes,
get_data_from_toml,
)
class TopologyParams(NamedTuple):
path: str
columns: str
label: str
@unique
class Topologies(Enum):
CDP = TopologyParams(path=PATH_CDP, columns=COLUMNS_CDP, label=LABEL_CDP)
LLDP = TopologyParams(path=PATH_LLDP, columns=COLUMNS_LLDP, label=LABEL_LLDP)
@unique
class CacheSources(Enum):
inventory = 'inventory'
lq = 'lq'
class InventoryColumns(NamedTuple):
neighbour: str
local_port: str
neighbour_port: str
class Interface(NamedTuple):
host: str
index: str
name: str
description: str
alias: str
class StaticConnection(NamedTuple):
host: str
local_port: str
neighbour_port: str
neighbour: str
label: str
class Settings:
def __init__(
self,
cli_args: Dict[str, Any],
):
self.__topology_save_path = 'var/topology_data'
self.__topology_file_name = 'network_data.json'
self.__path_to_if_table = 'networking,interfaces'
# self.__inventory_path = 'var/check_mk/inventory'
# self.__autochecks_path = 'var/check_mk/autochecks'
'path_in_inventory': PATH_CDP,
'inventory_columns': COLUMNS_CDP,
'data_source': LABEL_CDP,
'user_data_file': f'{self.__omd_root}/local/bin/topology_data/{USER_DATA_FILE}',
'keep_domain': False,
'lowercase': False,
'uppercase': False,
'debug': False,
'lldp': False,
'version': False,
# args in the form {'s, __seed_devices': 'CORE01', 'p, __path_in_inventory': None, ... }}
# we will remove 's, __'
self.__args = ({k.split(',')[-1].strip(' ').strip('_'): v for k, v in cli_args.items() if v})
self.__settings['data_source'] = LABEL_LLDP
self.__settings['path_in_inventory'] = PATH_LLDP
self.__settings['inventory_columns'] = COLUMNS_LLDP
if self.version:
print(f'{Path(SCRIPT).name} version: {CREATE_TOPOLOGY_VERSION}')
exit(code=ExitCodes.OK.value)
if self.check_user_data_only:
user_data = get_data_from_toml(file=self.user_data_file)
print(f'Could read/parse the user data from {self.user_data_file}')
exit(code=ExitCodes.OK.value)
if self.merge:
_merge = list(set(self.merge.copy()))
if len(_merge) != len(self.merge):
print(f'-m/--merge options must be unique. Use "-m CDP LLDP" oder "-m LLDP CDP"')
exit(code=ExitCodes.BAD_OPTION_LIST.value)
else:
if self.lldp:
def set_topology_param(self, topology: TopologyParams):
self.__settings['path_in_inventory'] = topology.path
self.__settings['inventory_columns'] = topology.columns
self.__settings['data_source'] = topology.label
@property
def version(self) -> bool:
return self.__settings['version']
@property
def merge(self) -> List[str] | None:
return self.__settings['merge']
@merge.setter
def merge(self, topologies: List[str]):
self.__settings['merge'] = topologies
def keep(self) -> int | None:
if self.__settings['keep']:
return max(self.__settings['keep'], 1)
@property
def min_age(self) -> int:
if self.__settings['min_age']:
return 1
@property
def check_user_data_only(self) -> bool:
return self.__settings['check_user_data_only']
@property
def lldp(self) -> bool:
return self.__settings['lldp']
@property
def debug(self) -> bool:
return self.__settings['debug']
@property
def dont_compare(self) -> bool:
return self.__settings['dont_compare']
def default(self) -> bool:
return self.__settings['default']
@property
def keep_domain(self) -> bool:
return self.__settings['keep_domain']
@property
def uppercase(self) -> bool:
return self.__settings['uppercase']
@property
def lowercase(self) -> bool:
return self.__settings['lowercase']
@property
def time_format(self) -> str:
return self.__settings['time_format']
@property
def omd_root(self) -> str:
# replaced by live status query
# @property
# def inventory_path(self) -> str:
# return self.__inventory_path
# @property
# def autochecks_path(self) -> str:
# return self.__autochecks_path
@property
def user_data_file(self) -> str:
return self.__settings['user_data_file']
# path = ('Nodes,' + ',Nodes,'.join(self.__settings['path_in_inventory'].split(',')) + ',Table,Rows').split(',')
# return path
return self.__settings['path_in_inventory']
@property
def path_to_if_table(self) -> List[str]:
path = ('Nodes,' + ',Nodes,'.join(self.__path_to_if_table.split(',')) + ',Table,Rows').split(',')
return path
@property
def inventory_columns(self) -> InventoryColumns:
columns = self.__settings['inventory_columns'].split(',')
return InventoryColumns(
neighbour=columns[0],
local_port=columns[1],
neighbour_port=columns[2],
)
@property
def seed_devices(self) -> List[str]:
if self.__settings['seed_devices']:
else:
return []
@property
def data_source(self) -> str:
return self.__settings['data_source']
@property
def output_directory(self) -> str:
if not self.__settings['output_directory']:
return f'{strftime(self.__settings["time_format"])}'
else:
return self.__settings['output_directory']
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
class HostCache:
def __init__(self):
self.__cache = {}
self.__inventory_pre_fetch_list: List[str] = [
PATH_CDP,
PATH_LLDP,
PATH_INTERFACES,
]
def __fill_cache(self, host: str):
# pre fill inventory data
inventory = get_inventory_data(host=host)
if inventory:
self.__cache[host][CacheSources.inventory.value] = {}
self.__cache[host][CacheSources.inventory.value].update({
entry: get_table_from_inventory(
inventory=inventory,
path=entry
) for entry in self.__inventory_pre_fetch_list
})
else:
self.__cache[host][CacheSources.inventory.value] = None
# prefill live status data
self.__cache[host][CacheSources.lq.value] = {}
self.__cache[host][CacheSources.lq.value][LQ_INTERFACES] = get_interface_items_from_lq(host)
def get_data(self, host: str, source: CacheSources, path: str):
if host not in self.__cache.keys():
self.__cache[host]: Dict[str, Any] = {}
self.__fill_cache(host=host)
try:
return self.__cache[host][source.value][path]
except (KeyError, TypeError) as e:
return None
def add_inventory_prefetch_path(self, path: str):
self.__inventory_pre_fetch_list = list(set(self.__inventory_pre_fetch_list + [path]))