Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-10-08
# File : create_topology_data.py
#
# PoC for creating topology_data.json from inventory data
#
# This script creates the topology data file needed for the Checkmk "network_visualization" plugin by
# Andreas Boesl and schnetz. See
# https://forum.checkmk.com/t/network-visualization/41680
# and
# https://exchange.checkmk.com/p/network-visualization
# for more information.
#
# The inventory data could be created with my CDP/LLDP inventory plugins:
#
# CDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_cdp_cache\
# CDP path-in-inventory: 'networking,cdp_cache'
# CDP inventory-columns: 'device_id,local_port,device_port'
#
# LLDP: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lldp_cache\
# LLDP path-in-inventory: 'networking,lldp_cache'
# LLDP inventory-columns: 'system_name,local_port_num,port_id'
#
# USAGE CDP (is the default):
# create_topology_data.py -s CORE01 -m
#
# USAGE LLDP:
# create_topology_data.py -s CORE01 -p "networking,lldp_cache" -c "system_name,local_port_num,port_id" -m
#
from ast import literal_eval
from pathlib import Path
from json import dumps
from typing import Dict, List, Optional, Any
from time import strftime, time_ns
from create_topology_classes import InventoryColumns, Settings
from create_topology_utils import parse_arguments
CREATE_TOPOLOGY_VERSION = '0.0.2-20231012'
# map inventory host name to CMK host name
HOST_MAP = {
'inventory-host': 'cmkhost',
'nexus01': 'NX01',
'nexus02': 'NX02',
'nexus03': 'NX03',
}
# drop hosts with invalid names
DROP_HOSTS = [
'not advertised',
]
ITEMS = {}
def get_inventory_data(host: str, path: List[str]) -> Optional[List[Dict[str, str]]]:
inventory_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.inventory_path}/{host}')
if inventory_file.exists():
data = literal_eval(inventory_file.read_text())
if data:
for m in path:
try:
data = data[m]
except KeyError:
return None
return data
else:
print(f'Device: {host}: no inventory data found!')
else:
print(f'Device: {host}: not found in inventory path!')
def get_items_from_autochecks(host: str):
if host and host not in ITEMS.keys():
ITEMS[host] = []
autochecks_file = Path(f'{SETTINGS.omd_root}/{SETTINGS.autochecks_path}/{host}.mk')
if autochecks_file.exists():
data: List[Dict[str, str]] = literal_eval(autochecks_file.read_text())
for service in data:
if service['check_plugin_name'] in ['if64']:
ITEMS[host].append(service['item'])
else:
print(f'Device: {host}: not found in auto checks path!')
return None
def check_interface_name(host: str, interface: str) -> str:
if not host:
return interface
items = ITEMS.get(host, [])
if interface in items:
return interface
else:
if_padding = '/'.join(interface.split('/')[:-1]) + f'/{interface.split("/")[-1]:0>2}'
for item in items:
# interface = Gi0/1
# item = Gi0/1 - Access port
if item.startswith(interface):
return item
elif item.startswith(if_padding):
return item
print(f'Device: {host}: interface ({interface}) not found in services')
return interface
def create_device_from_inv(
host: str,
inv_data: List[Dict[str, str]],
inv_columns: InventoryColumns,
data_source: str,
) -> Optional[Dict[str, Any]]:
data = {'connections': {}, "interfaces": []}
get_items_from_autochecks(host)
for topo_neighbour in inv_data:
neighbour = topo_neighbour.get(inv_columns.neighbour).split('.')[0]
# drop neighbour if inventory neighbour is invalid
if neighbour in DROP_HOSTS:
continue
# rewrite neighbour if inventory neighbour and checkmk host don't match
if neighbour in HOST_MAP.keys():
neighbour = HOST_MAP[neighbour]
# has to be done before checking interfaces
get_items_from_autochecks(neighbour)
# getting/checking interfaces
local_port = check_interface_name(host, topo_neighbour.get(inv_columns.local_port))
neighbour_port = check_interface_name(neighbour, topo_neighbour.get(inv_columns.neighbour_port))
if neighbour and local_port and neighbour_port:
data['connections'].update({local_port: [neighbour, neighbour_port, data_source]})
if local_port not in data['interfaces']:
data['interfaces'].append(local_port)
return {host: data}
def get_list_of_devices(data) -> List[str]:
devices = []
for connection in data.values():
devices.append(connection[0])
return list(set(devices))
def save_topology_data(data: Dict, path: str):
save_file = Path(f'{path}/{SETTINGS.topology_file_name}')
save_file.parent.mkdir(exist_ok=True, parents=True)
save_file.write_text(dumps(data))
parent_path = Path(f'{path}').parent
if SETTINGS.make_default:
Path(f'{parent_path}/default').unlink(missing_ok=True)
Path(f'{parent_path}/default').symlink_to(target=Path(path), target_is_directory=True)
def create_topology(
seed_devicees: List[str],
path_in_inventory: List[str],
inv_columns: InventoryColumns,
output_directory: str,
data_source: str
):
devices_to_go = list(set(seed_devicees)) # remove duplicates
topology_data = {}
number_of_devices = 0
devices_done = []
while devices_to_go:
device = devices_to_go[0]
if device in HOST_MAP.keys():
try:
devices_to_go.remove(device)
except ValueError:
pass
device = HOST_MAP[device]
if device in devices_done:
continue
number_of_devices += 1
topo_data = get_inventory_data(device, path_in_inventory)
if topo_data:
topology_data.update(
create_device_from_inv(
host=device,
inv_data=topo_data,
inv_columns=inv_columns,
data_source=data_source,
))
devices_list = get_list_of_devices(topology_data[device]['connections'])
for entry in devices_list:
if entry not in devices_done:
devices_to_go.append(entry)
devices_to_go = list(set(devices_to_go))
devices_done.append(device)
devices_to_go.remove(device)
# print(f'Device done: {device}, source: {data_source}')
if topology_data:
save_topology_data(topology_data, f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}/{output_directory}')
print(f'Devices added: {number_of_devices}, source {data_source}')
if __name__ == '__main__':
SETTINGS = Settings(vars(parse_arguments(CREATE_TOPOLOGY_VERSION)))
if SETTINGS.version:
print(f'create_topology_data.py version: {CREATE_TOPOLOGY_VERSION}')
exit(0)
if not SETTINGS.seed_devices:
print('make-topology.py: error: the following arguments are required: -s, --seed-devices')
print('see make-topology.py -h')
exit(1)
print(f'Start time: {strftime(SETTINGS.time_format)}')
start_time = time_ns()
create_topology(
seed_devicees=SETTINGS.seed_devices,
path_in_inventory=SETTINGS.path_in_inventory,
inv_columns=SETTINGS.inventory_columns,
output_directory=SETTINGS.output_directory,
data_source=SETTINGS.data_source
)
# print('KLNL LLDP Topology')
# create_topology_data.py -s CORE01 -p "networking,lldp_cache" -c "system_name,local_port_num,port_id" -d inv_LLDP -o KLNL_LLDP
# create_topology_data.py -s CORE01 -p "networking,cdp_cache" -c "device_id,local_port,device_port" -d inv_CDP
print(f'time taken: {(time_ns() - start_time) / 1e9}/s')
print(f'End time: {strftime(SETTINGS.time_format)}')