Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-10-08
# File : create_topology_data.py
# 2023-10-10: initial release
# 2023-10-16: added options --keep-max and --min-age
# 2023-10-17: removed -p, -c, -d (use the long options instead: --path-in-inventory, --data-source, --inventory-columns)
# changed option --keep-max to --keep
# added option --check-user-data-only
# added SEED_DEVICES to user data, option -s/--sed-devices is now optional
# refactoring
# changed get inventory data from file access to request via live status query
# get interfaces from autochecks changed to live status query
# 2023-10-19: cleanup removed "all" unused functions
# added option -u, --user-data-file to provide a user specific data file
# 2023-10-20: changed option -m/--make-default to -d/--default (-m is needed for -m/--merge
# added -m/--merge option
# 2023-10-27: improved matching of port name from CDP/LLDP data to interface (item) name of service
# improved internal data handling (HostCache)
# 2023-10-28: reworked handling of layers (see option -l/--layers)
# added option -l/-layers {CDP,CUSTOM,LLDP,STATIC} [{CDP,CUSTOM,LLDP,STATIC} ...]
# added section CUSTOM_LAYERS to user data file
# removed option -m/--merge, included in option -l/--layers
# removed option --lldp, included in option -l/--layers
# removed option --data-source, now handled in CUSTOM_LAYERS
# removed option --inventory-columns, now handled in CUSTOM_LAYERS
# removed option --path-in-inventory, now handled in CUSTOM_LAYERS
# 2023-1ß-29: fixed missing path from custom layer in host prefetch
# 2023-11-16: added option -b/--backend [LIVESTATUS, FILESYSTEM],
# LIVESTATUS is the default for performance reasons -> for now only local site
# FILESYSTEM fethches the data directly form the inventory files -> use in distributed environments
#
# PoC for creating topology_data.json from inventory data
#
# This script creates the topology data file needed for the Checkmk "network_visualization" plugin by
# https://forum.checkmk.com/t/network-visualization/41680
# https://exchange.checkmk.com/p/network-visualization
#
# NOTE: the topology_data configuration (layout etc.) is saved under ~/var/check_mk/topology
#
# The inventory data could be created with my CDP/LLDP inventory plugins:
# CDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_cdp_cache\
# LLDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lldp_cache\
#
# USAGE:
# ~/local/lib/topology_data/create_topology_data.py --help
from create_topology_utils import (
remove_old_data,
get_data_from_toml,
merge_topologies,
from create_topology_classes import (
InventoryColumns,
Settings,
StaticConnection,
DROP_HOSTS: List[str] = []
STATIC_CONNECTIONS: Dict = {}
def get_list_of_devices(data) -> List[str]:
devices = []
for connection in data.values():
devices.append(connection[0])
return list(set(devices))
def get_service_by_interface(host: str, interface: str, debug: bool = False) -> str:
# try to find the item for an interface
def _match_entry_with_item(_entry: Dict[str, str]):
values = [_entry.get('name'), _entry.get('description'), _entry.get('alias')]
for value in values:
if value in items:
return value
index = str(_entry.get('index'))
# for index try with padding
for i in range(1, 6):
index_padded = f'{index:0>{i}}'
if index_padded in items:
return index_padded
# still not found try values + index
for value in values:
if f'{value} {index_padded}' in items:
return f'{value} {index_padded}'
return interface
items = HOST_CACHE.get_data(host=host, item=CacheItems.interfaces, path=CACHE_INTERFACES_ITEM)
# try to find the interface in the host interface inventory list
inventory = HOST_CACHE.get_data(host=host, item=CacheItems.inventory, path=PATH_INTERFACES)
_entry.get('name'),
_entry.get('description'),
_entry.get('alias'),
str(_entry.get('index')),
_entry.get('phys_address'),
print(f'Device: {host}: service for interface {interface} not found')
def create_device_from_inv(
host: str,
inv_data: List[Dict[str, str]],
inv_columns: InventoryColumns,
if not SETTINGS.keep_domain:
neighbour = neighbour.split('.')[0]
if SETTINGS.uppercase:
neighbour = neighbour.upper()
if SETTINGS.lowercase:
neighbour = neighbour.lower()
# drop neighbour if inventory neighbour is invalid
if neighbour in DROP_HOSTS:
continue
# rewrite neighbour if inventory neighbour and checkmk host don't match
if neighbour in HOST_MAP.keys():
neighbour = HOST_MAP[neighbour]
_local_port = local_port
local_port = get_service_by_interface(host, local_port, debug=debug)
if debug and local_port != _local_port:
print(f'host: {host}, local_port: {local_port}, _local_port: {_local_port}')
neighbour_port = topo_neighbour.get(inv_columns.neighbour_port)
_neighbour_port = neighbour_port
neighbour_port = get_service_by_interface(neighbour, neighbour_port, debug=debug)
if debug and neighbour_port != _neighbour_port:
print(f'neighbour: {neighbour}, neighbour_port {neighbour_port}, _neighbour_port {_neighbour_port}')
data['connections'].update({local_port: [neighbour, neighbour_port, label]})
if local_port not in data['interfaces']:
data['interfaces'].append(local_port)
def create_static_connections(connections: List[List[str]], debug: bool = False) -> Dict:
data = {}
for connection in connections:
connection = StaticConnection(*connection)
print(f'connection: {connection}')
if connection.host not in data.keys():
data[connection.host] = {'connections': {}, 'interfaces': []}
if connection.neighbour not in data.keys():
data[connection.neighbour] = {'connections': {}, 'interfaces': []}
# add connection from host to neighbour
data[connection.host]['connections'].update({connection.local_port: [
connection.neighbour, connection.neighbour_port, connection.label
]})
data[connection.host]['interfaces'].append(connection.local_port)
# add connection from neighbour to host
data[connection.neighbour]['connections'].update({
connection.neighbour_port: [connection.host, connection.local_port, connection.label]
})
data[connection.neighbour]['interfaces'].append(connection.neighbour_port)
if data:
print(f'Devices added: {len(data)}, source static')
path_in_inventory: List[str],
inv_columns: InventoryColumns,
debug: bool = False,
) -> Dict:
devices_to_go = list(set(seed_devices)) # remove duplicates
for static_host in STATIC_CONNECTIONS.keys():
devices_to_go.append(static_host)
devices_to_go = list(set(devices_to_go))
topology_data = {}
devices_done = []
while devices_to_go:
device = devices_to_go[0]
if device in HOST_MAP.keys():
try:
devices_to_go.remove(device)
except ValueError:
pass
device = HOST_MAP[device]
if device in devices_done:
continue
# topo_data = get_inventory_data_lq(host=device, path=path_in_inventory, debug=debug)
topo_data = HOST_CACHE.get_data(host=device, item=CacheItems.inventory, path=path_in_inventory)
if topo_data:
topology_data.update(
create_device_from_inv(
host=device,
inv_data=topo_data,
inv_columns=inv_columns,
))
devices_list = get_list_of_devices(topology_data[device]['connections'])
for _entry in devices_list:
if _entry not in devices_done:
devices_to_go.append(_entry)
devices_to_go = list(set(devices_to_go))
devices_done.append(device)
devices_to_go.remove(device)
start_time = time_ns()
SETTINGS = Settings(vars(parse_arguments()))
match SETTINGS.backend:
case 'LIVESTATUS':
HOST_CACHE = HostCacheLiveStatus()
case 'FILESYSTEM':
HOST_CACHE = HostCacheFileSystem()
case _:
print(f'Backend {SETTINGS.backend} not (yet) implemented')
exit()
HOST_MAP = user_data.get('HOST_MAP', {})
DROP_HOSTS = user_data.get('DROP_HOSTS', [])
CUSTOM_LAYERS = user_data.get('CUSTOM_LAYERS', [])
jobs = []
for layer in SETTINGS.layers:
if layer == 'STATIC':
jobs.append('STATIC')
elif layer in LAYERS.keys():
jobs.append(LAYERS[layer])
HOST_CACHE.add_inventory_prefetch_path(path=LAYERS[layer]['path'])
elif layer == 'CUSTOM':
for entry in CUSTOM_LAYERS:
jobs.append(entry)
for job in jobs:
topology = {}
if job == 'STATIC':
topology = create_static_connections(connections=user_data.get('STATIC_CONNECTIONS', []))
else:
topology = create_topology(
seed_devices=list(set(SETTINGS.seed_devices + user_data.get('SEED_DEVICES', []))),
path_in_inventory=job['path'],
inv_columns=InventoryColumns(
neighbour=columns[0],
local_port=columns[1],
neighbour_port=columns[2]
),
label=job['label'],
if topology:
final_topology = merge_topologies(topo_pri=topology, topo_sec=final_topology)
save_topology(
data=final_topology,
base_directory=f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}',
output_directory=SETTINGS.output_directory,
topology_file_name=SETTINGS.topology_file_name,
dont_compare=SETTINGS.dont_compare,
make_default=SETTINGS.default,
)
if SETTINGS.keep:
remove_old_data(
keep=SETTINGS.keep,