Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-10-08
# File : create_topology_data.py
# 2023-10-10: initial release
# 2023-10-16: added options --keep-max and --min-age
# 2023-10-17: removed -p, -c, -d (use the long options instead: --path-in-inventory, --data-source, --inventory-columns)
# changed option --keep-max to --keep
# added option --check-user-data-only
# added SEED_DEVICES to user data, option -s/--sed-devices is now optional
# refactoring
# changed get inventory data from file access to request via live status query
# get interfaces from autochecks changed to live status query
# 2023-10-19: cleanup removed "all" unused functions
# added option -u, --user-data-file to provide a user specific data file
# 2023-10-20: changed option -m/--make-default to -d/--default (-m is needed for -m/--merge
# added -m/--merge option
#
# PoC for creating topology_data.json from inventory data
#
# This script creates the topology data file needed for the Checkmk "network_visualization" plugin by
# Andreas Boesl and schnetz. See
# https://forum.checkmk.com/t/network-visualization/41680
# and
# https://exchange.checkmk.com/p/network-visualization
# for more information.
#
# NOTE: the topology_data configuration (layout etc.) is saved under ~/var/check_mk/topology
#
# The inventory data could be created with my CDP/LLDP inventory plugins:
#
# CDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_cdp_cache\
# CDP path-in-inventory: 'networking,cdp_cache'
# CDP inventory-columns: 'device_id,local_port,device_port'
#
# LLDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lldp_cache\
# LLDP path-in-inventory: 'networking,lldp_cache'
# LLDP inventory-columns: 'system_name,local_port_num,port_id'
#
# USAGE CDP (is the default):
# ~/local/lib/topology_data/create_topology_data.py -s CORE01 -m
# ~/local/lib/topology_data/create_topology_data.py -s CORE01 -m --lldp
#
from ast import literal_eval
from pathlib import Path
from create_topology_utils import (
parse_arguments,
remove_old_data,
get_data_from_toml,
merge_topologies,
)
from create_topology_classes import (
InventoryColumns,
Settings,
StaticConnection,
DROP_HOSTS: List[str] = []
STATIC_CONNECTIONS: Dict = {}
def get_interface_items_from_lq(host: str, debug: bool = False) -> Dict:
query = (
'GET services\n'
'Columns: host_name description check_command\n'
'Filter: description ~ Interface\n'
f'Filter: host_name = {host}\n'
'OutputFormat: python3\n'
)
data = get_data_form_live_status(query=query)
items = {}
for host, description, check_command in data:
if host:
if host not in items.keys():
items[host] = []
items[host].append(description[10:]) # remove 'Interface ' from description
interfaces = 0
for host in items.keys():
items[host] = list(set(items[host]))
items[host].sort()
interfaces += len(items[host])
# print(f'Interfaces found: {interfaces}')
return items
def get_list_of_devices(data) -> List[str]:
devices = []
for connection in data.values():
devices.append(connection[0])
return list(set(devices))
def get_service_by_interface(host: str, interface: str, debug: bool = False) -> str | None:
if not host:
return interface
items = ITEMS.get(host, [])
if_padding = interface
if '/' in interface:
if_padding = '/'.join(interface.split('/')[:-1]) + f'/{interface.split("/")[-1]:0>2}'
for item in items:
# interface = Gi0/1
# item = Gi0/1 - Access port
# this might not catch the correct interface (fa0 and fa0/0)
item = item.split(' ')[0]
if item == interface:
print(f'Device: {host}: interface ({interface}) not found in services')
def get_inventory_data_lq(host: str, path: List[str], debug: bool = False, ) -> List[Dict[str, str]]:
query = f'GET hosts\nColumns: mk_inventory\nOutputFormat: python3\nFilter: host_name = {host}\n'
data = get_data_form_live_status(query=query)
if data:
try:
data = literal_eval(data[0][0].decode('utf-8'))
except SyntaxError as e:
print(f'data: |{data}|')
print(f'type: {type(data)}')
print(f'exception: {e}')
return []
for m in path:
try:
data = data[m]
except KeyError:
return []
return data
return []
def create_device_from_inv(
host: str,
inv_data: List[Dict[str, str]],
inv_columns: InventoryColumns,
data_source: str,
if not host in ITEMS.keys():
ITEMS.update(get_interface_items_from_lq(host=host))
if not SETTINGS.keep_domain:
neighbour = neighbour.split('.')[0]
if SETTINGS.uppercase:
neighbour = neighbour.upper()
if SETTINGS.lowercase:
neighbour = neighbour.lower()
# drop neighbour if inventory neighbour is invalid
if neighbour in DROP_HOSTS:
continue
# rewrite neighbour if inventory neighbour and checkmk host don't match
if neighbour in HOST_MAP.keys():
neighbour = HOST_MAP[neighbour]
if not neighbour in ITEMS.keys():
ITEMS.update(get_interface_items_from_lq(host=neighbour))
local_port = topo_neighbour.get(inv_columns.local_port)
local_port = get_service_by_interface(host, local_port)
neighbour_port = topo_neighbour.get(inv_columns.neighbour_port)
neighbour_port = get_service_by_interface(neighbour, neighbour_port)
if neighbour and local_port and neighbour_port:
data['connections'].update({local_port: [neighbour, neighbour_port, data_source]})
if local_port not in data['interfaces']:
data['interfaces'].append(local_port)
def create_static_connections(connections: List[List[str]], debug: bool = False) -> Dict:
data = {}
for connection in connections:
connection = StaticConnection(*connection)
print(f'connection: {connection}')
if connection.host not in data.keys():
data[connection.host] = {'connections': {}, 'interfaces': []}
if connection.neighbour not in data.keys():
data[connection.neighbour] = {'connections': {}, 'interfaces': []}
# add connection from host to neighbour
data[connection.host]['connections'].update({connection.local_port: [
connection.neighbour, connection.neighbour_port, connection.label
]})
data[connection.host]['interfaces'].append(connection.local_port)
# add connection from neighbour to host
data[connection.neighbour]['connections'].update({
connection.neighbour_port: [connection.host, connection.local_port, connection.label]
})
data[connection.neighbour]['interfaces'].append(connection.neighbour_port)
if data:
print(f'Devices added: {len(data)}, source static')
path_in_inventory: List[str],
inv_columns: InventoryColumns,
data_source: str,
debug: bool = False,
) -> Dict:
devices_to_go = list(set(seed_devices)) # remove duplicates
for static_host in STATIC_CONNECTIONS.keys():
devices_to_go.append(static_host)
devices_to_go = list(set(devices_to_go))
topology_data = {}
devices_done = []
while devices_to_go:
device = devices_to_go[0]
if device in HOST_MAP.keys():
try:
devices_to_go.remove(device)
except ValueError:
pass
device = HOST_MAP[device]
if device in devices_done:
continue
topo_data = get_inventory_data_lq(host=device, path=path_in_inventory, debug=debug)
topology_data.update(
create_device_from_inv(
host=device,
inv_data=topo_data,
inv_columns=inv_columns,
data_source=data_source,
))
devices_list = get_list_of_devices(topology_data[device]['connections'])
for entry in devices_list:
if entry not in devices_done:
devices_to_go.append(entry)
devices_to_go = list(set(devices_to_go))
devices_done.append(device)
devices_to_go.remove(device)
print(f'Devices added: {len(devices_done)}, source {data_source}')
if __name__ == '__main__':
SETTINGS = Settings(vars(parse_arguments(CREATE_TOPOLOGY_VERSION)))
print(f'{Path(__file__).name} version: {CREATE_TOPOLOGY_VERSION}')
if SETTINGS.check_user_data_only:
user_data = get_data_from_toml(file=SETTINGS.user_data_file)
print(f'Could read/parse the user data from {SETTINGS.user_data_file}')
exit(code=ExitCodes.OK)
if SETTINGS.merge:
_merge = list(set(SETTINGS.merge.copy()))
if len(_merge) != len(SETTINGS.merge):
print(f'-m/--merge options must be unique. Use "-m cdp lldp" oder "-m lldp cdp"')
exit(code=ExitCodes.BAD_OPTION_LIST)
print(f'Start time: {strftime(SETTINGS.time_format)}')
start_time = time_ns()
HOST_MAP = user_data.get('HOST_MAP', {})
DROP_HOSTS = user_data.get('DROP_HOSTS', [])
final_topology = {}
while SETTINGS.merge:
SETTINGS.set_topology_param(Topologies[SETTINGS.merge[-1]].value)
topology = create_topology(
seed_devices=list(set(SETTINGS.seed_devices + user_data.get('SEED_DEVICES', []))),
path_in_inventory=SETTINGS.path_in_inventory,
inv_columns=SETTINGS.inventory_columns,
data_source=SETTINGS.data_source,
debug=SETTINGS.debug,
)
if topology:
final_topology = merge_topologies(topo_pri=topology, topo_sec=final_topology)
SETTINGS.merge.remove(SETTINGS.merge[-1])
STATIC_CONNECTIONS = create_static_connections(connections=user_data.get('STATIC_CONNECTIONS', []))
final_topology = merge_topologies(topo_pri=STATIC_CONNECTIONS, topo_sec=final_topology)
save_topology(
data=final_topology,
base_directory=f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}',
output_directory=SETTINGS.output_directory,
topology_file_name=SETTINGS.topology_file_name,
dont_compare=SETTINGS.dont_compare,
make_default=SETTINGS.default,
)
if SETTINGS.keep:
remove_old_data(
keep=SETTINGS.keep,
min_age=SETTINGS.min_age,
path=Path(f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}'),