Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-10-12
# File : create_topology_classes.py
from os import environ
class InventoryColumns(NamedTuple):
neighbour: str
local_port: str
neighbour_port: str
class StaticConnection(NamedTuple):
host: str
local_port: str
neighbour_port: str
neighbour: str
label: str
class Settings:
def __init__(
self,
cli_args: Dict[str, Any],
):
self.__topology_save_path = 'var/topology_data'
self.__topology_file_name = 'network_data.json'
self.__path_to_if_table = 'networking,interfaces'
'user_data_file': f'{self.__omd_root}/local/bin/topology_data/{USER_DATA_FILE}',
'keep_domain': False,
'lowercase': False,
'uppercase': False,
'debug': False,
'version': False,
# args in the form {'s, __seed_devices': 'CORE01', 'p, __path_in_inventory': None, ... }}
# we will remove 's, __'
self.__args = ({k.split(',')[-1].strip(' ').strip('_'): v for k, v in cli_args.items() if v})
self.__settings.update(self.__args)
if self.version:
print(f'{Path(SCRIPT).name} version: {CREATE_TOPOLOGY_VERSION}')
exit(code=ExitCodes.OK.value)
if self.check_user_data_only:
print(f'Could read/parse the user data from {self.user_data_file}')
exit(code=ExitCodes.OK.value)
if self.layers:
layers = list(set(self.layers))
if len(layers) != len(self.layers):
print(f'-l/--layers options must be unique. Don~\'t use any layer more than once.')
@property
def backend(self) -> str:
return self.__settings['backend']
@property
def version(self) -> bool:
return self.__settings['version']
def layers(self) -> List[str]:
return self.__settings['layers']
@property
def min_age(self) -> int:
if self.__settings['min_age']:
@property
def check_user_data_only(self) -> bool:
return self.__settings['check_user_data_only']
@property
def debug(self) -> bool:
return self.__settings['debug']
@property
def dont_compare(self) -> bool:
return self.__settings['dont_compare']
def default(self) -> bool:
return self.__settings['default']
@property
def keep_domain(self) -> bool:
return self.__settings['keep_domain']
@property
def uppercase(self) -> bool:
return self.__settings['uppercase']
@property
def lowercase(self) -> bool:
return self.__settings['lowercase']
@property
def time_format(self) -> str:
return self.__settings['time_format']
@property
def omd_root(self) -> str:
@property
def user_data_file(self) -> str:
return self.__settings['user_data_file']
@property
def seed_devices(self) -> List[str]:
if self.__settings['seed_devices']:
else:
return []
@property
def output_directory(self) -> str:
if not self.__settings['output_directory']:
return f'{strftime(self.__settings["time_format"])}'
else:
return self.__settings['output_directory']
self.__cache = {}
self.__inventory_pre_fetch_list: List[str] = [
PATH_INTERFACES,
]
self._count = 0
self._debug = debug
if debug:
print('init HOST_CACHE')
@abstractmethod
def get_inventory_data(self, host: str, debug: bool = False) -> Dict[str, str] | None:
"""
Args:
host: the host name to return the inventory data for
debug: enable debug output
Returns:
the inventory data as dictionary
"""
@abstractmethod
def get_interface_items(self, host: str, debug: bool = False) -> List:
"""
Args:
host: the host name to return the interface items
debug: enable debug output
Returns:
list of the interface items
"""
def __fill_cache(self, host: str):
# pre fill inventory data
if self._debug:
self._count += 1
_pre_query = time_ns()
if self._debug:
print(f'{(time_ns() - _pre_query) / 1e9}|{self._count:0>4}|inventory|{host}')
self.__cache[host][CacheItems.inventory.value] = {}
self.__cache[host][CacheItems.inventory.value].update({
entry: get_table_from_inventory(
inventory=inventory,
path=entry
) for entry in self.__inventory_pre_fetch_list
})
else:
self.__cache[host][CacheItems.inventory.value] = None
self.__cache[host][CacheItems.interfaces.value] = {}
if self._debug:
self._count += 1
_pre_query = time_ns()
self.__cache[host][CacheItems.interfaces.value][CACHE_INTERFACES_ITEM] = self.get_interface_items(host)
if self._debug:
print(f'{(time_ns() - _pre_query) / 1e9}|{self._count:0>4}|items|{host}')
if host not in self.__cache.keys():
self.__cache[host]: Dict[str, Any] = {}
self.__fill_cache(host=host)
try:
return None
def add_inventory_prefetch_path(self, path: str):
self.__inventory_pre_fetch_list = list(set(self.__inventory_pre_fetch_list + [path]))
class HostCacheLiveStatus(HostCache):
def get_inventory_data(self, host: str, debug: bool = False) -> Dict[str, str] | None:
query = f'GET hosts\nColumns: mk_inventory\nOutputFormat: python3\nFilter: host_name = {host}\n'
data = get_data_form_live_status(query=query)
if data:
try:
data = literal_eval(data[0][0].decode('utf-8'))
except SyntaxError as e:
if debug:
print(f'data: |{data}|')
print(f'type: {type(data)}')
print(f'exception: {e}')
return
return data
if debug:
print(f'Device: {host}: no inventory data found!')
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def get_interface_items(self, host: str, debug: bool = False) -> List:
"""
Sample data from lq query, we keep only the item (description without "Interface" ).
[
['C9540-7-1', 'Interface Vlan-999', 'check_mk-if64'],
['C9540-7-1', 'Interface Vlan-998', 'check_mk-if64'],
['C9540-7-1', 'Interface Vlan-997', 'check_mk-if64'],
['C9540-7-1', 'Interface Vlan-996', 'check_mk-if64'],
['C9540-7-1', 'Interface Vlan-8', 'check_mk-if64'],
['C9540-7-1', 'Interface Te2/0/2', 'check_mk-if64'],
['C9540-7-1', 'Interface Te2/0/30', 'check_mk-if64']
]
Args:
host:
debug:
Returns:
"""
query = (
'GET services\n'
'Columns: host_name description check_command\n'
'Filter: description ~ Interface\n'
f'Filter: host_name = {host}\n'
'OutputFormat: python3\n'
)
data = get_data_form_live_status(query=query)
items = []
for host, description, check_command in data:
items.append(description[10:]) # remove 'Interface ' from description
if debug:
print(f'Interfaces items found: {len(items)} an host {host}')
return items
class HostCacheFileSystem(HostCache):
def get_inventory_data(self, host: str, debug: bool = False) -> Dict[str, str] | None:
__inventory_path = 'var/check_mk/inventory'
inventory_file = Path(f'{OMD_ROOT}/{__inventory_path}/{host}')
if inventory_file.exists():
data = literal_eval(inventory_file.read_text())
return data
else:
if debug:
print(f'Device: {host}: not found in inventory data path!')
return None
def get_interface_items(self, host: str, debug: bool = False) -> List:
"""
{'check_plugin_name': 'if64', 'item': 'Fa0', 'parameters': {'discovered_oper_status': ['2'], ...}},\n
{'check_plugin_name': 'if64', 'item': 'Fa1/0/1', 'parameters': {'discovered_oper_status': ['1'], ...}},\n
{'check_plugin_name': 'if64', 'item': 'Fa1/0/10', 'parameters': {'discovered_oper_status': ['2'], ...}},\n
{'check_plugin_name': 'if64', 'item': 'Fa1/0/11', 'parameters': {'discovered_oper_status': ['2'], ...}},\n
{'check_plugin_name': 'if64', 'item': 'Fa1/0/12', 'parameters': {'discovered_oper_status': ['1'], ...}}\n
host: name of the host object in cmk to fetch the data for
debug: output debug information
"""
__autochecks_path = 'var/check_mk/autochecks'
autochecks_file = Path(f'{OMD_ROOT}/{__autochecks_path}/{host}.mk')
__data = []
if autochecks_file.exists():
data: List[Dict[str, str]] = literal_eval(autochecks_file.read_text())
for service in data:
if service['check_plugin_name'] in ['if64']:
__data.append(service['item'])
else:
if debug:
print(f'Device: {host}: not found in auto checks path!')
return []
return __data
class HostCacheMultiSite(HostCache):
def __init__(self, debug: bool = False):
super().__init__(debug=debug)
self.__sites = {}
self.get_sites(debug=debug)
self.__c = livestatus.MultiSiteConnection(self.__sites)
# self.__c.set_prepend_site(False) # is default
# self.__c.parallelize = True # is default
self.__dead_sites = [site['site']['alias'] for site in self.__c.dead_sites().values()]
if self.__dead_sites:
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
print(f'WARNING: use of dead site(s) {self.__dead_sites} is disabled')
self.__c.set_only_sites(self.__c.alive_sites())
def get_sites(self, debug: bool = False):
sites_mk = Path(f'{OMD_ROOT}/etc/check_mk/multisite.d/sites.mk')
socket_path = f'unix:{OMD_ROOT}/tmp/run/live'
if sites_mk.exists():
# make eval() "secure"
# https://realpython.com/python-eval-function/#minimizing-the-security-issues-of-eval
_code = compile(sites_mk.read_text(), "<string>", "eval")
allowed_names = ['sites', 'update']
for name in _code.co_names:
if name not in allowed_names:
raise NameError(f'Use of {name} in {sites_mk.name} not allowed.')
sites_raw = {}
eval(sites_mk.read_text(), {"__builtins__": {}}, {"sites": sites_raw})
for site, data in sites_raw.items():
self.__sites[site] = {
'alias': data['alias'],
'timeout': data['timeout'],
'nagios_url': '/nagios/',
}
if data['socket'] == ('local', None):
self.__sites[site]['socket'] = socket_path
else:
protocol, socket = data['socket']
address, port = socket['address']
self.__sites[site]['socket'] = f'{protocol}:{address}:{port}'
self.__sites[site]['tls'] = socket['tls']
if debug:
print(f'Multisite: Site {site} found, '
f'Socket: {self.__sites[site]["socket"]}, '
f'TLS: {self.__sites[site].get("tls", "N/A")}.')
def get_inventory_data(self, host: str, debug: bool = False) -> Dict[str, str] | None:
query = f'GET hosts\nColumns: mk_inventory\nOutputFormat: python3\nFilter: host_name = {host}\n'
data = self.__c.query(query=query)
if data:
try:
data = literal_eval(data[0][0].decode('utf-8'))
except SyntaxError as e:
if debug:
print(f'data: |{data}|')
print(f'type: {type(data)}')
print(f'exception: {e}')
return
return data
def get_interface_items(self, host: str, debug: bool = False) -> List:
query = (
'GET services\n'
'Columns: host_name description check_command\n'
'Filter: description ~ Interface\n'
f'Filter: host_name = {host}\n'
'OutputFormat: python3\n'
)
data = self.__c.query(query=query)
items = []
for host, description, check_command in data:
items.append(description[10:]) # remove 'Interface ' from description
if debug:
if items:
print(f'Interfaces items found: {len(items)} an host {host}')
else:
print(f'No Interfaces items found for host {host}')
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
class HostCacheRestApi(HostCache):
def __init__(self, debug: bool = False):
super().__init__(debug=debug)
try:
self.__secret = Path(f'{OMD_ROOT}/var/check_mk/web/automation/automation.secret').read_text().strip('\n)')
except FileNotFoundError as e:
print(f'automation.secret not found, {e}')
exit()
self.__hostname = 'localhost'
self.__site = OMD_ROOT.split('/')[-1]
self.__api_url = f"http://{self.__hostname}/{self.__site}/check_mk/api/1.0"
self.__user = 'automation'
if debug:
print('Create REST API session')
self.__session = session()
self.__session.headers['Authorization'] = f"Bearer {self.__user} {self.__secret}"
self.__session.headers['Accept'] = 'application/json'
def get_inventory_data(self, host: str, debug: bool = False) -> Dict[str, str] | None:
# self._count += 1
__query = '{"op": "=", "left": "name", "right": "' + host + '"}'
resp = self.__session.get(
f"{self.__api_url}/domain-types/host/collections/all",
params={
"query": __query,
"columns": ['mk_inventory'],
},
)
if resp.status_code == 200:
# print(f'{resp.elapsed}|{self._count:0>4}|inventory|{host}')
try:
return resp.json()['value'][0]['extensions']['mk_inventory']
except IndexError:
return None
else:
if debug:
print(f'Device: {host}: no inventory data found!')
return None
def get_interface_items(self, host: str, debug: bool = False) -> List:
# self._count += 1
__query = '{"op": "~", "left": "description", "right": "Interface "}'
resp = self.__session.get(
f"{self.__api_url}/objects/host/NX01/collections/services",
params={
"query": __query,
"columns": ['host_name', 'description', 'check_command'],
},
)
if resp.status_code == 200:
# print(f'{resp.elapsed}|{self._count:0>4}|items|{host}')
items = [service['extensions']['description'][10:] for service in resp.json()['value']]
if debug:
print(f'Interfaces items found: {len(items)} an host {host}')
return items
else:
if debug:
print(f'No Interfaces items found for host {host}')
return []