Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-10-12
# File : create_topology_classes.py
from os import environ
from enum import Enum, unique
from create_topology_utils import (
CREATE_TOPOLOGY_VERSION,
PATH_CDP,
COLUMNS_CDP,
LABEL_CDP,
PATH_LLDP,
PATH_INTERFACES,
USER_DATA_FILE,
LQ_INTERFACES,
SCRIPT,
get_inventory_data,
get_table_from_inventory,
get_interface_items_from_lq,
ExitCodes,
get_data_from_toml,
)
class TopologyParams(NamedTuple):
path: str
columns: str
label: str
@unique
class CacheSources(Enum):
inventory = 'inventory'
lq = 'lq'
class InventoryColumns(NamedTuple):
neighbour: str
local_port: str
neighbour_port: str
class Interface(NamedTuple):
host: str
index: str
name: str
description: str
alias: str
class StaticConnection(NamedTuple):
host: str
local_port: str
neighbour_port: str
neighbour: str
label: str
class Settings:
def __init__(
self,
cli_args: Dict[str, Any],
):
self.__topology_save_path = 'var/topology_data'
self.__topology_file_name = 'network_data.json'
self.__path_to_if_table = 'networking,interfaces'
# self.__inventory_path = 'var/check_mk/inventory'
# self.__autochecks_path = 'var/check_mk/autochecks'
'path_in_inventory': PATH_CDP,
'inventory_columns': COLUMNS_CDP,
'data_source': LABEL_CDP,
'user_data_file': f'{self.__omd_root}/local/bin/topology_data/{USER_DATA_FILE}',
'keep_domain': False,
'lowercase': False,
'uppercase': False,
'debug': False,
'version': False,
# args in the form {'s, __seed_devices': 'CORE01', 'p, __path_in_inventory': None, ... }}
# we will remove 's, __'
self.__args = ({k.split(',')[-1].strip(' ').strip('_'): v for k, v in cli_args.items() if v})
self.__settings.update(self.__args)
if self.version:
print(f'{Path(SCRIPT).name} version: {CREATE_TOPOLOGY_VERSION}')
exit(code=ExitCodes.OK.value)
if self.check_user_data_only:
print(f'Could read/parse the user data from {self.user_data_file}')
exit(code=ExitCodes.OK.value)
if self.layers:
layers = list(set(self.layers))
if len(layers) != len(self.layers):
print(f'-l/--layers options must be unique. Don~\'t use any layer more than once.')
def set_topology_param(self, topology: Dict[str, str]):
self.__settings['path_in_inventory'] = topology['path']
self.__settings['inventory_columns'] = topology['columns']
self.__settings['data_source'] = topology['label']
@property
def version(self) -> bool:
return self.__settings['version']
def layers(self) -> List[str]:
return self.__settings['layers']
def keep(self) -> int | None:
if self.__settings['keep']:
return max(self.__settings['keep'], 1)
@property
def min_age(self) -> int:
if self.__settings['min_age']:
return 1
@property
def check_user_data_only(self) -> bool:
return self.__settings['check_user_data_only']
@property
def debug(self) -> bool:
return self.__settings['debug']
@property
def dont_compare(self) -> bool:
return self.__settings['dont_compare']
def default(self) -> bool:
return self.__settings['default']
@property
def keep_domain(self) -> bool:
return self.__settings['keep_domain']
@property
def uppercase(self) -> bool:
return self.__settings['uppercase']
@property
def lowercase(self) -> bool:
return self.__settings['lowercase']
@property
def time_format(self) -> str:
return self.__settings['time_format']
@property
def omd_root(self) -> str:
@property
def user_data_file(self) -> str:
return self.__settings['user_data_file']
# @property
# def path_to_if_table(self) -> List[str]:
# path = ('Nodes,' + ',Nodes,'.join(self.__path_to_if_table.split(',')) + ',Table,Rows').split(',')
# return path
@property
def inventory_columns(self) -> InventoryColumns:
columns = self.__settings['inventory_columns'].split(',')
return InventoryColumns(
neighbour=columns[0],
local_port=columns[1],
neighbour_port=columns[2],
)
@property
def seed_devices(self) -> List[str]:
if self.__settings['seed_devices']:
else:
return []
@property
def data_source(self) -> str:
return self.__settings['data_source']
@property
def output_directory(self) -> str:
if not self.__settings['output_directory']:
return f'{strftime(self.__settings["time_format"])}'
else:
return self.__settings['output_directory']
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
class HostCache:
def __init__(self):
self.__cache = {}
self.__inventory_pre_fetch_list: List[str] = [
PATH_CDP,
PATH_LLDP,
PATH_INTERFACES,
]
def __fill_cache(self, host: str):
# pre fill inventory data
inventory = get_inventory_data(host=host)
if inventory:
self.__cache[host][CacheSources.inventory.value] = {}
self.__cache[host][CacheSources.inventory.value].update({
entry: get_table_from_inventory(
inventory=inventory,
path=entry
) for entry in self.__inventory_pre_fetch_list
})
else:
self.__cache[host][CacheSources.inventory.value] = None
# prefill live status data
self.__cache[host][CacheSources.lq.value] = {}
self.__cache[host][CacheSources.lq.value][LQ_INTERFACES] = get_interface_items_from_lq(host)
def get_data(self, host: str, source: CacheSources, path: str):
if host not in self.__cache.keys():
self.__cache[host]: Dict[str, Any] = {}
self.__fill_cache(host=host)
try:
return self.__cache[host][source.value][path]