Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-10-08
# File : create_topology_data.py
#
# PoC for creating topology_data.json from inventory data
#
# This script creates the topology data file needed for the Checkmk "network_visualization" plugin by
# Andreas Boesl and schnetz. See
# https://forum.checkmk.com/t/network-visualization/41680
# and
# https://exchange.checkmk.com/p/network-visualization
# for more information.
#
# NOTE: the topology_data configuration (layout etc.) is saved under ~/var/check_mk/topology
#
# The inventory data could be created with my CDP/LLDP inventory plugins:
#
# CDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_cdp_cache\
# CDP path-in-inventory: 'networking,cdp_cache'
# CDP inventory-columns: 'device_id,local_port,device_port'
#
# LLDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lldp_cache\
# LLDP path-in-inventory: 'networking,lldp_cache'
# LLDP inventory-columns: 'system_name,local_port_num,port_id'
#
# USAGE CDP (is the default):
# ~/local/lib/topology_data/create_topology_data.py -s CORE01 -m
# ~/local/lib/topology_data/create_topology_data.py -s CORE01 -p "networking,lldp_cache" \
# -c "system_name,local_port_num,port_id" -m
from ast import literal_eval
from pathlib import Path
from json import dumps
from tomllib import loads as toml_loads
from tomllib import TOMLDecodeError
from create_topology_classes import InventoryColumns, Interface, Settings, StaticConnection
MAC_TABLE: Dict[str, Interface] = {}
HOST_MAP = {}
DROP_HOSTS: List[str] = []
STATIC_CONNECTIONS: Dict = {}
def is_mac_address(mac_address: str) -> bool:
re_mac_pattern = '([0-9A-Z]{2}\\:){5}[0-9A-Z]{2}'
if re_match(re_mac_pattern, mac_address):
if SETTINGS.debug:
print(f'mac: {mac_address}, match')
return True
else:
if SETTINGS.debug:
print(f'mac: {mac_address}, no match')
return False
def get_inventory_data(host: str, path: List[str]) -> Optional[List[Dict[str, str]]]:
inventory_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.inventory_path}/{host}')
if inventory_file.exists():
data = literal_eval(inventory_file.read_text())
if data:
for m in path:
try:
data = data[m]
except KeyError:
if SETTINGS.debug:
print(f'Device: {host}: no inventory data found!')
if SETTINGS.debug:
print(f'Device: {host}: not found in inventory path!')
return data
def get_items_from_autochecks(host: str):
if host and host not in ITEMS.keys():
ITEMS[host] = []
autochecks_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.autochecks_path}/{host}.mk')
if autochecks_file.exists():
data: List[Dict[str, str]] = literal_eval(autochecks_file.read_text())
for service in data:
if service['check_plugin_name'] in ['if64']:
ITEMS[host].append(service['item'])
else:
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
if SETTINGS.debug:
print(f'Device: {host}: not found in auto checks path!')
def create_mac_to_interface():
path = Path(f'{SETTINGS.omd_root}/{SETTINGS.inventory_path}/')
files = [str(file) for file in list(path.iterdir()) if not str(file).endswith('.gz')]
while files:
host = files[0].split('/')[-1]
files.pop(0)
if host.startswith('.'):
continue
if_table = get_inventory_data(host, SETTINGS.path_to_if_table)
if if_table and if_table != []:
if SETTINGS.debug:
print(f'host: {host}, interfaces: {len(if_table)}')
for interface in if_table:
# {
# 'index': 436756992,
# 'description': 'Ethernet2/50',
# 'alias': '',
# 'speed': 100000000000,
# 'phys_address': 'AC:7A:56:C7:37:C8',
# 'oper_status': 2,
# 'port_type': 6,
# 'admin_status': 2,
# 'available': True,
# 'name': 'Ethernet2/50'
# }
try:
MAC_TABLE[interface['phys_address'].upper()] = Interface(
host=host,
index=interface.get('index'),
name=interface.get('name'),
description=interface.get('description'),
alias=interface.get('alias'),
)
except KeyError:
continue
def get_service_by_interface(host: str, interface: str) -> Optional[str]:
# empty host/neighbour should never happen here
if not host:
return interface
items = ITEMS.get(host, [])
if interface in items:
return interface
else:
if_padding = '/'.join(interface.split('/')[:-1]) + f'/{interface.split("/")[-1]:0>2}'
for item in items:
# interface = Gi0/1
# item = Gi0/1 - Access port
if item.startswith(interface):
return item
elif item.startswith(if_padding):
return item
if SETTINGS.debug:
print(f'Device: {host}: interface ({interface}) not found in services')
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def get_if_host_by_mac(mac_address: str) -> Optional[Tuple[str, str]]:
try:
interface: Interface = MAC_TABLE[mac_address]
except KeyError:
return
host = interface.host
if interface.name:
if_name = interface.name
elif interface.description:
if_name = interface.description
elif interface.alias:
if_name = interface.alias
else:
if_name = interface.index
return host, if_name
def add_static_connections(host: str, data: Dict) -> Dict:
if host in STATIC_CONNECTIONS.keys():
host_connections = data.get('connections', {})
host_connections.update(STATIC_CONNECTIONS[host].get('connections', {}))
host_interfaces = data.get('interfaces', [])
for interface in STATIC_CONNECTIONS[host].get('connections', {}).keys():
host_interfaces.append(interface)
data.update({'connections': host_connections, 'interfaces': list(set(host_interfaces))})
return data
def create_device_from_inv(
host: str,
inv_data: List[Dict[str, str]],
inv_columns: InventoryColumns,
data_source: str,
) -> Optional[Dict[str, Any]]:
data = {'connections': {}, "interfaces": []}
get_items_from_autochecks(host)
for topo_neighbour in inv_data:
if not neighbour:
continue
# try to find neighbour against known MAC addresses (LLDP)
if is_mac_address(neighbour):
try:
neighbour = MAC_TABLE[neighbour].host
except KeyError:
pass
if not SETTINGS.keep_domain:
neighbour = neighbour.split('.')[0]
if SETTINGS.uppercase:
neighbour = neighbour.upper()
if SETTINGS.lowercase:
neighbour = neighbour.lower()
# drop neighbour if inventory neighbour is invalid
if neighbour in DROP_HOSTS:
continue
# rewrite neighbour if inventory neighbour and checkmk host don't match
if neighbour in HOST_MAP.keys():
neighbour = HOST_MAP[neighbour]
# has to be done before checking interfaces
get_items_from_autochecks(neighbour)
local_port = topo_neighbour.get(inv_columns.local_port)
if is_mac_address(local_port):
try:
print(f'host: {host}, local_port: {local_port}, {MAC_TABLE[local_port]}')
except KeyError:
pass
local_port = get_service_by_interface(host, local_port)
neighbour_port = topo_neighbour.get(inv_columns.neighbour_port)
if is_mac_address(neighbour):
try:
print(f'neighbour: {neighbour}, neighbour_port: {neighbour_port}, {MAC_TABLE[neighbour_port]}')
except KeyError:
pass
neighbour_port = get_service_by_interface(neighbour, neighbour_port)
if neighbour and local_port and neighbour_port:
data['connections'].update({local_port: [neighbour, neighbour_port, data_source]})
if local_port not in data['interfaces']:
data['interfaces'].append(local_port)
# add static connections
data = add_static_connections(host, data)
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
return {host: data}
def get_list_of_devices(data) -> List[str]:
devices = []
for connection in data.values():
devices.append(connection[0])
return list(set(devices))
def save_topology_data(data: Dict, path: str):
save_file = Path(f'{path}/{SETTINGS.topology_file_name}')
save_file.parent.mkdir(exist_ok=True, parents=True)
save_file.write_text(dumps(data))
parent_path = Path(f'{path}').parent
if SETTINGS.make_default:
Path(f'{parent_path}/default').unlink(missing_ok=True)
Path(f'{parent_path}/default').symlink_to(target=Path(path), target_is_directory=True)
def create_topology(
seed_devicees: List[str],
path_in_inventory: List[str],
inv_columns: InventoryColumns,
output_directory: str,
data_source: str
):
devices_to_go = list(set(seed_devicees)) # remove duplicates
for static_host in STATIC_CONNECTIONS.keys():
devices_to_go.append(static_host)
devices_to_go = list(set(devices_to_go))
topology_data = {}
devices_done = []
while devices_to_go:
device = devices_to_go[0]
if device in HOST_MAP.keys():
try:
devices_to_go.remove(device)
except ValueError:
pass
device = HOST_MAP[device]
if device in devices_done:
continue
topo_data = get_inventory_data(device, path_in_inventory)
topology_data.update(
create_device_from_inv(
host=device,
inv_data=topo_data,
inv_columns=inv_columns,
data_source=data_source,
))
devices_list = get_list_of_devices(topology_data[device]['connections'])
for entry in devices_list:
if entry not in devices_done:
devices_to_go.append(entry)
devices_to_go = list(set(devices_to_go))
devices_done.append(device)
devices_to_go.remove(device)
if SETTINGS.debug:
print(f'Device done: {device}, source: {data_source}')
if topology_data:
save_topology_data(topology_data, f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}/{output_directory}')
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
print(f'Devices added: {len(devices_done)}, source {data_source}')
def get_user_data_from_toml() -> Dict:
data = {}
toml_file = Path(SETTINGS.user_data_file)
if toml_file.exists():
try:
data = toml_loads(toml_file.read_text())
except TOMLDecodeError as e:
print(
f'ERROR: User data file {toml_file} is not in valid TOML format! ({e}), (see https://toml.io/en/)')
exit(2)
if SETTINGS.debug:
print(f'data from TOML: {data}')
return data
def create_static_connections_from_toml(connections: List[List[str]]) -> Dict:
data = {}
for connection in connections:
connection = StaticConnection(*connection)
if SETTINGS.debug:
print(f'connection: {connection}')
if connection.host not in data.keys():
data[connection.host] = {'connections': {}, 'interfaces': []}
if connection.neighbour not in data.keys():
data[connection.neighbour] = {'connections': {}, 'interfaces': []}
# add connection host to neighbour
data[connection.host]['connections'].update({connection.local_port: [
connection.neighbour, connection.neighbour_port, connection.label
]})
data[connection.host]['interfaces'].append(connection.local_port)
# add connection from neighbour to host
data[connection.neighbour]['connections'].update({
connection.neighbour_port: [connection.host, connection.local_port, connection.label]
})
data[connection.neighbour]['interfaces'].append(connection.neighbour_port)
if SETTINGS.debug:
print(data)
return data
if __name__ == '__main__':
SETTINGS = Settings(vars(parse_arguments(CREATE_TOPOLOGY_VERSION)))
if SETTINGS.version:
print(f'create_topology_data.py version: {CREATE_TOPOLOGY_VERSION}')
exit(0)
if not SETTINGS.seed_devices:
print('make-topology.py: error: the following arguments are required: -s, --seed-devices')
print('see make-topology.py -h')
exit(1)
print(f'Start time: {strftime(SETTINGS.time_format)}')
start_time = time_ns()
user_data = get_user_data_from_toml()
HOST_MAP = user_data.get('HOST_MAP', {})
DROP_HOSTS = user_data.get('DROP_HOSTS', [])
STATIC_CONNECTIONS = create_static_connections_from_toml(user_data.get('STATIC_CONNECTIONS', []))
# not yet used, need more test data
# create_mac_to_interface()
create_topology(
seed_devicees=SETTINGS.seed_devices,
path_in_inventory=SETTINGS.path_in_inventory,
inv_columns=SETTINGS.inventory_columns,
output_directory=SETTINGS.output_directory,
data_source=SETTINGS.data_source
)
print(f'time taken: {(time_ns() - start_time) / 1e9}/s')
print(f'End time: {strftime(SETTINGS.time_format)}')