Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-10-08
# File : create_topology_data.py
#
# PoC for creating topology_data.json from inventory data
#
# This script creates the topology data file needed for the Checkmk "network_visualization" plugin by
# Andreas Boesl and schnetz. See
# https://forum.checkmk.com/t/network-visualization/41680
# and
# https://exchange.checkmk.com/p/network-visualization
# for more information.
#
# NOTE: the topology_data configuration (layout etc.) is saved under ~/var/check_mk/topology
#
# The inventory data could be created with my CDP/LLDP inventory plugins:
#
# CDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_cdp_cache\
# CDP path-in-inventory: 'networking,cdp_cache'
# CDP inventory-columns: 'device_id,local_port,device_port'
#
# LLDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lldp_cache\
# LLDP path-in-inventory: 'networking,lldp_cache'
# LLDP inventory-columns: 'system_name,local_port_num,port_id'
#
# USAGE CDP (is the default):
# ~/local/lib/topology_data/create_topology_data.py -s CORE01 -m
# ~/local/lib/topology_data/create_topology_data.py -s CORE01 -m --lldp
from ast import literal_eval
from pathlib import Path
from json import dumps
from tomllib import loads as toml_loads
from tomllib import TOMLDecodeError
from create_topology_utils import parse_arguments, compare_dicts, rm_tree
from create_topology_classes import InventoryColumns, Interface, Settings, StaticConnection
DROP_HOSTS: List[str] = []
STATIC_CONNECTIONS: Dict = {}
def is_mac_address(mac_address: str) -> bool:
re_mac_pattern = '([0-9A-Z]{2}\\:){5}[0-9A-Z]{2}'
if re_match(re_mac_pattern, mac_address):
if SETTINGS.debug:
print(f'mac: {mac_address}, match')
return True
else:
if SETTINGS.debug:
print(f'mac: {mac_address}, no match')
return False
def is_equal_with_default(data: Dict) -> bool:
default_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}/default/{SETTINGS.topology_file_name}')
if default_file.exists():
default_data = literal_eval(default_file.read_text())
return compare_dicts(data, default_data)
def get_inventory_data(host: str, path: List[str]) -> List[Dict[str, str]] | None:
inventory_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.inventory_path}/{host}')
if inventory_file.exists():
data = literal_eval(inventory_file.read_text())
if data:
for m in path:
try:
data = data[m]
except KeyError:
if SETTINGS.debug:
print(f'Device: {host}: no inventory data found!')
if SETTINGS.debug:
print(f'Device: {host}: not found in inventory path!')
return data
def get_items_from_autochecks(host: str):
if host and host not in ITEMS.keys():
ITEMS[host] = []
autochecks_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.autochecks_path}/{host}.mk')
if autochecks_file.exists():
data: List[Dict[str, str]] = literal_eval(autochecks_file.read_text())
for service in data:
if service['check_plugin_name'] in ['if64']:
ITEMS[host].append(service['item'])
else:
if SETTINGS.debug:
print(f'Device: {host}: not found in auto checks path!')
def get_list_of_devices(data) -> List[str]:
devices = []
for connection in data.values():
devices.append(connection[0])
return list(set(devices))
def get_service_by_interface(host: str, interface: str) -> str | None:
if not host:
return interface
items = ITEMS.get(host, [])
if interface in items:
return interface
else:
if_padding = '/'.join(interface.split('/')[:-1]) + f'/{interface.split("/")[-1]:0>2}'
for item in items:
# interface = Gi0/1
# item = Gi0/1 - Access port
if item.startswith(interface):
return item
elif item.startswith(if_padding):
return item
if SETTINGS.debug:
print(f'Device: {host}: interface ({interface}) not found in services')
def get_if_host_by_mac(mac_address: str) -> Tuple[str, str] | None:
try:
interface: Interface = MAC_TABLE[mac_address]
except KeyError:
return
host = interface.host
if interface.name:
if_name = interface.name
elif interface.description:
if_name = interface.description
elif interface.alias:
if_name = interface.alias
else:
if_name = interface.index
return host, if_name
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def get_user_data_from_toml() -> Dict:
data = {}
toml_file = Path(SETTINGS.user_data_file)
if toml_file.exists():
try:
data = toml_loads(toml_file.read_text())
except TOMLDecodeError as e:
print(
f'ERROR: User data file {toml_file} is not in valid TOML format! ({e}), (see https://toml.io/en/)')
exit(2)
if SETTINGS.debug:
print(f'data from TOML: {data}')
return data
def remove_old_topology_data(keep: int, min_age: int, path: Path):
default_topo = path.joinpath('default')
directories = [str(directory) for directory in list(path.iterdir())]
# keep default top
if str(default_topo) in directories:
directories.remove(str(default_topo))
keep -= 1
if default_topo.is_symlink():
try:
directories.remove(str(default_topo.readlink()))
except ValueError:
pass
if len(directories) < keep < 1:
return
topo_by_age = {}
for directory in directories:
if Path(directory).is_dir():
topo_by_age[Path(directory).stat().st_ctime] = directory
topo_age = list(topo_by_age.keys())
topo_age.sort()
if min_age * 86400 > now_time() - topo_age[0]:
print(f'No topology older then {min_age} days found. Keep all topologies.')
return
while len(topo_by_age) > keep:
print(f'delete old topology: {topo_by_age[topo_age[0]]}')
rm_tree(Path(topo_by_age[topo_age[0]]))
topo_by_age.pop(topo_age[0])
topo_age.pop(0)
def save_topology_data(data: Dict, path: str):
save_file = Path(f'{path}/{SETTINGS.topology_file_name}')
save_file.parent.mkdir(exist_ok=True, parents=True)
save_file.write_text(dumps(data))
parent_path = Path(f'{path}').parent
if SETTINGS.make_default:
Path(f'{parent_path}/default').unlink(missing_ok=True)
Path(f'{parent_path}/default').symlink_to(target=Path(path), target_is_directory=True)
def add_static_connections(host: str, data: Dict) -> Dict:
if host in STATIC_CONNECTIONS.keys():
host_connections = data.get('connections', {})
host_connections.update(STATIC_CONNECTIONS[host].get('connections', {}))
host_interfaces = data.get('interfaces', [])
for interface in STATIC_CONNECTIONS[host].get('connections', {}).keys():
host_interfaces.append(interface)
data.update({'connections': host_connections, 'interfaces': list(set(host_interfaces))})
return data
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def create_mac_to_interface_table():
path = Path(f'{SETTINGS.omd_root}/{SETTINGS.inventory_path}/')
files = [str(file) for file in list(path.iterdir()) if not str(file).endswith('.gz')]
while files:
host = files[0].split('/')[-1]
files.pop(0)
if host.startswith('.'):
continue
if_table = get_inventory_data(host, SETTINGS.path_to_if_table)
if if_table and if_table != []:
if SETTINGS.debug:
print(f'host: {host}, interfaces: {len(if_table)}')
for interface in if_table:
# {
# 'index': 436756992,
# 'description': 'Ethernet2/50',
# 'alias': '',
# 'speed': 100000000000,
# 'phys_address': 'AC:7A:56:C7:37:C8',
# 'oper_status': 2,
# 'port_type': 6,
# 'admin_status': 2,
# 'available': True,
# 'name': 'Ethernet2/50'
# }
try:
MAC_TABLE[interface['phys_address'].upper()] = Interface(
host=host,
index=interface.get('index'),
name=interface.get('name'),
description=interface.get('description'),
alias=interface.get('alias'),
)
except KeyError:
continue
def create_device_from_inv(
host: str,
inv_data: List[Dict[str, str]],
inv_columns: InventoryColumns,
data_source: str,
data = {'connections': {}, "interfaces": []}
get_items_from_autochecks(host)
for topo_neighbour in inv_data:
if not neighbour:
continue
# try to find neighbour against known MAC addresses (LLDP)
if is_mac_address(neighbour):
try:
neighbour = MAC_TABLE[neighbour].host
except KeyError:
pass
if not SETTINGS.keep_domain:
neighbour = neighbour.split('.')[0]
if SETTINGS.uppercase:
neighbour = neighbour.upper()
if SETTINGS.lowercase:
neighbour = neighbour.lower()
# drop neighbour if inventory neighbour is invalid
if neighbour in DROP_HOSTS:
continue
# rewrite neighbour if inventory neighbour and checkmk host don't match
if neighbour in HOST_MAP.keys():
neighbour = HOST_MAP[neighbour]
# has to be done before checking interfaces
get_items_from_autochecks(neighbour)
local_port = topo_neighbour.get(inv_columns.local_port)
if is_mac_address(local_port):
try:
print(f'host: {host}, local_port: {local_port}, {MAC_TABLE[local_port]}')
except KeyError:
pass
local_port = get_service_by_interface(host, local_port)
neighbour_port = topo_neighbour.get(inv_columns.neighbour_port)
if is_mac_address(neighbour):
try:
print(f'neighbour: {neighbour}, neighbour_port: {neighbour_port}, {MAC_TABLE[neighbour_port]}')
except KeyError:
pass
neighbour_port = get_service_by_interface(neighbour, neighbour_port)
if neighbour and local_port and neighbour_port:
data['connections'].update({local_port: [neighbour, neighbour_port, data_source]})
if local_port not in data['interfaces']:
data['interfaces'].append(local_port)
# add static connections
data = add_static_connections(host, data)
def create_static_connections_from_toml(connections: List[List[str]]) -> Dict:
data = {}
for connection in connections:
connection = StaticConnection(*connection)
if SETTINGS.debug:
print(f'connection: {connection}')
if connection.host not in data.keys():
data[connection.host] = {'connections': {}, 'interfaces': []}
if connection.neighbour not in data.keys():
data[connection.neighbour] = {'connections': {}, 'interfaces': []}
# add connection from host to neighbour
data[connection.host]['connections'].update({connection.local_port: [
connection.neighbour, connection.neighbour_port, connection.label
]})
data[connection.host]['interfaces'].append(connection.local_port)
# add connection from neighbour to host
data[connection.neighbour]['connections'].update({
connection.neighbour_port: [connection.host, connection.local_port, connection.label]
})
data[connection.neighbour]['interfaces'].append(connection.neighbour_port)
if SETTINGS.debug:
print(data)
return data
def create_topology(
seed_devicees: List[str],
path_in_inventory: List[str],
inv_columns: InventoryColumns,
output_directory: str,
data_source: str
):
devices_to_go = list(set(seed_devicees)) # remove duplicates
for static_host in STATIC_CONNECTIONS.keys():
devices_to_go.append(static_host)
devices_to_go = list(set(devices_to_go))
topology_data = {}
devices_done = []
while devices_to_go:
device = devices_to_go[0]
if device in HOST_MAP.keys():
try:
devices_to_go.remove(device)
except ValueError:
pass
device = HOST_MAP[device]
if device in devices_done:
continue
topo_data = get_inventory_data(device, path_in_inventory)
topology_data.update(
create_device_from_inv(
host=device,
inv_data=topo_data,
inv_columns=inv_columns,
data_source=data_source,
))
devices_list = get_list_of_devices(topology_data[device]['connections'])
for entry in devices_list:
if entry not in devices_done:
devices_to_go.append(entry)
devices_to_go = list(set(devices_to_go))
devices_done.append(device)
devices_to_go.remove(device)
if SETTINGS.debug:
print(f'Device done: {device}, source: {data_source}')
path = f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}/{output_directory}'
if SETTINGS.dont_compare:
save_topology_data(data=topology_data, path=path)
else:
if not is_equal_with_default(topology_data):
save_topology_data(data=topology_data, path=path)
else:
print(
'Topology matches default topology, not saved! Use "--dont-compare" to save identical topologies.'
)
print(f'Devices added: {len(devices_done)}, source {data_source}')
if __name__ == '__main__':
SETTINGS = Settings(vars(parse_arguments(CREATE_TOPOLOGY_VERSION)))
if SETTINGS.version:
print(f'create_topology_data.py version: {CREATE_TOPOLOGY_VERSION}')
exit(0)
if not SETTINGS.seed_devices:
print('make-topology.py: error: the following arguments are required: -s, --seed-devices')
print('see make-topology.py -h')
exit(1)
print(f'Start time: {strftime(SETTINGS.time_format)}')
start_time = time_ns()
user_data = get_user_data_from_toml()
HOST_MAP = user_data.get('HOST_MAP', {})
DROP_HOSTS = user_data.get('DROP_HOSTS', [])
STATIC_CONNECTIONS = create_static_connections_from_toml(user_data.get('STATIC_CONNECTIONS', []))
# not yet used, need more test data
create_topology(
seed_devicees=SETTINGS.seed_devices,
path_in_inventory=SETTINGS.path_in_inventory,
inv_columns=SETTINGS.inventory_columns,
output_directory=SETTINGS.output_directory,
data_source=SETTINGS.data_source
)
if SETTINGS.keep_max:
remove_old_topology_data(
keep=SETTINGS.keep_max,
min_age=SETTINGS.min_age,
path=Path(f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}'),
)
print(f'time taken: {(time_ns() - start_time) / 1e9}/s')
print(f'End time: {strftime(SETTINGS.time_format)}')