Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-10-08
# File : create_topology_data.py
# 2023-10-10: initial release
# 2023-10-16: added options --keep-max and --min-age
# 2023-10-17: removed -p, -c, -d (use the long options instead: --path-in-inventory, --data-source, --inventory-columns)
# changed option --keep-max to --keep
# added option --check-user-data-only
# added SEED_DEVICES to user data, option -s/--sed-devices is now optional
# refactoring
#
# PoC for creating topology_data.json from inventory data
#
# This script creates the topology data file needed for the Checkmk "network_visualization" plugin by
# Andreas Boesl and schnetz. See
# https://forum.checkmk.com/t/network-visualization/41680
# and
# https://exchange.checkmk.com/p/network-visualization
# for more information.
#
# NOTE: the topology_data configuration (layout etc.) is saved under ~/var/check_mk/topology
#
# The inventory data could be created with my CDP/LLDP inventory plugins:
#
# CDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_cdp_cache\
# CDP path-in-inventory: 'networking,cdp_cache'
# CDP inventory-columns: 'device_id,local_port,device_port'
#
# LLDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lldp_cache\
# LLDP path-in-inventory: 'networking,lldp_cache'
# LLDP inventory-columns: 'system_name,local_port_num,port_id'
#
# USAGE CDP (is the default):
# ~/local/lib/topology_data/create_topology_data.py -s CORE01 -m
# ~/local/lib/topology_data/create_topology_data.py -s CORE01 -m --lldp
#
from ast import literal_eval
from pathlib import Path
from create_topology_utils import (
parse_arguments,
save_data_to_file,
remove_old_data,
get_data_from_toml,
is_mac_address,
is_equal_with_default,
merge_topologies,
)
from create_topology_classes import (
InventoryColumns,
Interface,
Settings,
StaticConnection,
)
DROP_HOSTS: List[str] = []
STATIC_CONNECTIONS: Dict = {}
if host and host not in ITEMS.keys():
ITEMS[host] = []
autochecks_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.autochecks_path}/{host}.mk')
if autochecks_file.exists():
data: List[Dict[str, str]] = literal_eval(autochecks_file.read_text())
for service in data:
if service['check_plugin_name'] in ['if64']:
ITEMS[host].append(service['item'])
else:
def get_list_of_devices(data) -> List[str]:
devices = []
for connection in data.values():
devices.append(connection[0])
return list(set(devices))
def get_service_by_interface(host: str, interface: str, debug: bool = False) -> str | None:
if not host:
return interface
items = ITEMS.get(host, [])
if interface in items:
return interface
else:
if_padding = '/'.join(interface.split('/')[:-1]) + f'/{interface.split("/")[-1]:0>2}'
for item in items:
# interface = Gi0/1
# item = Gi0/1 - Access port
if item.startswith(interface):
return item
elif item.startswith(if_padding):
return item
print(f'Device: {host}: interface ({interface}) not found in services')
def get_if_host_by_mac(mac_address: str) -> Tuple[str, str] | None:
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
try:
interface: Interface = MAC_TABLE[mac_address]
except KeyError:
return
host = interface.host
if interface.name:
if_name = interface.name
elif interface.description:
if_name = interface.description
elif interface.alias:
if_name = interface.alias
else:
if_name = interface.index
return host, if_name
def add_static_connections(host: str, data: Dict) -> Dict:
if host in STATIC_CONNECTIONS.keys():
host_connections = data.get('connections', {})
host_connections.update(STATIC_CONNECTIONS[host].get('connections', {}))
host_interfaces = data.get('interfaces', [])
for interface in STATIC_CONNECTIONS[host].get('connections', {}).keys():
host_interfaces.append(interface)
data.update({'connections': host_connections, 'interfaces': list(set(host_interfaces))})
return data
def get_inventory_data(host: str, path: List[str], debug: bool = False, ) -> List[Dict[str, str]] | None:
inventory_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.inventory_path}/{host}')
data = []
if inventory_file.exists():
data = literal_eval(inventory_file.read_text())
if data:
for m in path:
try:
data = data[m]
except KeyError:
return []
else:
if debug:
print(f'Device: {host}: no inventory data found!')
else:
if debug:
print(f'Device: {host}: not found in inventory path!')
return data
def create_mac_to_interface_table(debug: bool = False):
path = Path(f'{SETTINGS.omd_root}/{SETTINGS.inventory_path}/')
files = [str(file) for file in list(path.iterdir()) if not str(file).endswith('.gz')]
while files:
host = files[0].split('/')[-1]
files.pop(0)
if host.startswith('.'):
continue
if_table = get_inventory_data(host=host, path=SETTINGS.path_to_if_table, debug=debug)
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
print(f'host: {host}, interfaces: {len(if_table)}')
for interface in if_table:
# {
# 'index': 436756992,
# 'description': 'Ethernet2/50',
# 'alias': '',
# 'speed': 100000000000,
# 'phys_address': 'AC:7A:56:C7:37:C8',
# 'oper_status': 2,
# 'port_type': 6,
# 'admin_status': 2,
# 'available': True,
# 'name': 'Ethernet2/50'
# }
try:
MAC_TABLE[interface['phys_address'].upper()] = Interface(
host=host,
index=interface.get('index'),
name=interface.get('name'),
description=interface.get('description'),
alias=interface.get('alias'),
)
except KeyError:
continue
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def save_topology(
data: dict,
base_directory: str,
output_directory: str,
dont_compare: bool,
make_default: bool,
topology_file_name: str,
):
path = f'{base_directory}/{output_directory}'
if dont_compare:
save_data_to_file(
data=data,
path=path,
file=topology_file_name,
make_default=make_default,
)
else:
if not is_equal_with_default(
data=data,
file=f'{base_directory}/default/{topology_file_name}'
):
save_data_to_file(
data=data,
path=path,
file=topology_file_name,
make_default=make_default,
)
else:
print(
'Topology matches default topology, not saved! Use "--dont-compare" to save identical topologies.'
)
def create_device_from_inv(
host: str,
inv_data: List[Dict[str, str]],
inv_columns: InventoryColumns,
data_source: str,
data = {'connections': {}, "interfaces": []}
get_items_from_autochecks(host)
for topo_neighbour in inv_data:
if not neighbour:
continue
# try to find neighbour against known MAC addresses (LLDP)
if is_mac_address(neighbour):
try:
neighbour = MAC_TABLE[neighbour].host
except KeyError:
pass
if not SETTINGS.keep_domain:
neighbour = neighbour.split('.')[0]
if SETTINGS.uppercase:
neighbour = neighbour.upper()
if SETTINGS.lowercase:
neighbour = neighbour.lower()
# drop neighbour if inventory neighbour is invalid
if neighbour in DROP_HOSTS:
continue
# rewrite neighbour if inventory neighbour and checkmk host don't match
if neighbour in HOST_MAP.keys():
neighbour = HOST_MAP[neighbour]
# has to be done before checking interfaces
get_items_from_autochecks(neighbour)
local_port = topo_neighbour.get(inv_columns.local_port)
if is_mac_address(local_port):
try:
print(f'host: {host}, local_port: {local_port}, {MAC_TABLE[local_port]}')
except KeyError:
pass
local_port = get_service_by_interface(host, local_port)
neighbour_port = topo_neighbour.get(inv_columns.neighbour_port)
if is_mac_address(neighbour):
try:
print(f'neighbour: {neighbour}, neighbour_port: {neighbour_port}, {MAC_TABLE[neighbour_port]}')
except KeyError:
pass
neighbour_port = get_service_by_interface(neighbour, neighbour_port)
if neighbour and local_port and neighbour_port:
data['connections'].update({local_port: [neighbour, neighbour_port, data_source]})
if local_port not in data['interfaces']:
data['interfaces'].append(local_port)
def create_static_connections(connections: List[List[str]], debug: bool = False) -> Dict:
data = {}
for connection in connections:
connection = StaticConnection(*connection)
print(f'connection: {connection}')
if connection.host not in data.keys():
data[connection.host] = {'connections': {}, 'interfaces': []}
if connection.neighbour not in data.keys():
data[connection.neighbour] = {'connections': {}, 'interfaces': []}
# add connection from host to neighbour
data[connection.host]['connections'].update({connection.local_port: [
connection.neighbour, connection.neighbour_port, connection.label
]})
data[connection.host]['interfaces'].append(connection.local_port)
# add connection from neighbour to host
data[connection.neighbour]['connections'].update({
connection.neighbour_port: [connection.host, connection.local_port, connection.label]
})
data[connection.neighbour]['interfaces'].append(connection.neighbour_port)
if data:
print(f'Devices added: {len(data)}, source static')
path_in_inventory: List[str],
inv_columns: InventoryColumns,
data_source: str,
debug: bool = False,
) -> Dict:
devices_to_go = list(set(seed_devices)) # remove duplicates
for static_host in STATIC_CONNECTIONS.keys():
devices_to_go.append(static_host)
devices_to_go = list(set(devices_to_go))
topology_data = {}
devices_done = []
while devices_to_go:
device = devices_to_go[0]
if device in HOST_MAP.keys():
try:
devices_to_go.remove(device)
except ValueError:
pass
device = HOST_MAP[device]
if device in devices_done:
continue
topo_data = get_inventory_data(host=device, path=path_in_inventory, debug=debug)
topology_data.update(
create_device_from_inv(
host=device,
inv_data=topo_data,
inv_columns=inv_columns,
data_source=data_source,
))
devices_list = get_list_of_devices(topology_data[device]['connections'])
for entry in devices_list:
if entry not in devices_done:
devices_to_go.append(entry)
devices_to_go = list(set(devices_to_go))
devices_done.append(device)
devices_to_go.remove(device)
print(f'Devices added: {len(devices_done)}, source {data_source}')
if __name__ == '__main__':
SETTINGS = Settings(vars(parse_arguments(CREATE_TOPOLOGY_VERSION)))
if SETTINGS.version:
print(f'{Path(__file__).name} version: {CREATE_TOPOLOGY_VERSION}')
if SETTINGS.check_user_data_only:
user_data = get_data_from_toml(file=SETTINGS.user_data_file)
print(f'Could read/parse the user data from {SETTINGS.user_data_file}')
exit(0)
print()
print(f'Start time: {strftime(SETTINGS.time_format)}')
start_time = time_ns()
HOST_MAP = user_data.get('HOST_MAP', {})
DROP_HOSTS = user_data.get('DROP_HOSTS', [])
STATIC_CONNECTIONS = create_static_connections(connections=user_data.get('STATIC_CONNECTIONS', []))
topology = create_topology(
seed_devices=list(set(SETTINGS.seed_devices + user_data.get('SEED_DEVICES', []))),
path_in_inventory=SETTINGS.path_in_inventory,
inv_columns=SETTINGS.inventory_columns,
if topology:
# merge topology with static connections
topology = merge_topologies(topo_pri=STATIC_CONNECTIONS, topo_sec=topology)
save_topology(
data=topology,
base_directory=f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}',
output_directory=SETTINGS.output_directory,
topology_file_name=SETTINGS.topology_file_name,
dont_compare=SETTINGS.dont_compare,
make_default=SETTINGS.make_default,
)
if SETTINGS.keep:
remove_old_data(
keep=SETTINGS.keep,
min_age=SETTINGS.min_age,
path=Path(f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}'),
)