Newer
Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-10-12
# File : create_topology_utils.py
# -v --version
# --check-user-data-only
# --data-source
# --debug
# --dont-compare
# --inventory-columns
from ast import literal_eval
from time import time as now_time
from tomllib import loads as toml_loads
from tomllib import TOMLDecodeError
from re import match as re_match
from enum import Enum, unique
@unique
class ExitCodes(Enum):
OK = 0
BAD_OPTION_LIST = 1
BAD_TOML_FORMAT = 3
# constants
CREATE_TOPOLOGY_VERSION = '0.0.9-202310127'
SCRIPT = '~/local/bin/network-topology/create_topology_data.py'
SAMPLE_SEEDS = 'Core01 Core02'
PATH_CDP = 'networking,cdp_cache'
PATH_LLDP = 'networking,lldp_cache'
PATH_INTERFACES = 'networking,interfaces'
LABEL_CDP = 'inv_CDP'
LABEL_LLDP = 'inv_LLDP'
COLUMNS_LLDP = 'system_name,local_port_num,port_id'
COLUMNS_CDP = 'device_id,local_port,device_port'
USER_DATA_FILE = 'create_topology_data.toml'
LQ_INTERFACES = 'interface_items'
def get_data_form_live_status(query: str):
address = f'{environ.get("OMD_ROOT")}/tmp/run/live'
family = socket.AF_INET if type(address) is tuple else socket.AF_UNIX
sock = socket.socket(family, socket.SOCK_STREAM)
sock.connect(address)
sock.sendall(query.encode())
sock.shutdown(socket.SHUT_WR)
chunks = []
while len(chunks) == 0 or chunks[-1] != "":
chunks.append(sock.recv(4096).decode())
sock.close()
if len(chunks):
reply = "".join(chunks).strip()
reply = literal_eval(reply)
if reply != [[b'']]:
return reply
def get_data_from_toml(file: str, debug: bool = False) -> Dict:
data = {}
toml_file = Path(file)
if toml_file.exists():
try:
data = toml_loads(toml_file.read_text())
except TOMLDecodeError as e:
print(
f'ERROR: data file {toml_file} is not in valid TOML format! ({e}), (see https://toml.io/en/)')
if debug:
print(f'TOML file read: {file}')
print(f'Data from toml file: {data}')
return data
def rm_tree(root: Path):
# safety
if not str(root).startswith(f'{environ["OMD_ROOT"]}/var/topology_data'):
print(f"WARNING: bad path to remove, {str(root)}, don\'t delete it.")
return
for p in root.iterdir():
if p.is_dir():
rm_tree(p)
else:
p.unlink()
root.rmdir()
def remove_old_data(keep: int, min_age: int, path: str, protected: List[str], debug: bool = False):
path = Path(path)
default_topo = path.joinpath('default')
directories = [str(directory) for directory in list(path.iterdir())]
# keep default top
if str(default_topo) in directories:
directories.remove(str(default_topo))
keep -= 1
if default_topo.is_symlink():
try:
directories.remove(str(default_topo.readlink()))
except ValueError:
pass
# keep protected topologies
for directory in protected:
try:
directories.remove(str(path.joinpath(directory)))
except ValueError as e:
if debug:
print(e)
print(directory)
print(str(path.joinpath(directory)))
print(directories)
else:
print(f'Protected topology: {directory}, will not be deleted.')
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
if len(directories) < keep < 1:
return
topo_by_age = {}
for directory in directories:
if Path(directory).is_dir():
topo_by_age[Path(directory).stat().st_ctime] = directory
topo_age = list(topo_by_age.keys())
topo_age.sort()
while len(topo_by_age) > keep:
if min_age * 86400 > now_time() - topo_age[0]:
print(f'Topology "{Path(topo_by_age[topo_age[0]]).name}" not older then {min_age} day(s). not deleted.')
return
print(f'delete old topology: {topo_by_age[topo_age[0]]}')
rm_tree(Path(topo_by_age[topo_age[0]]))
topo_by_age.pop(topo_age[0])
topo_age.pop(0)
def save_data_to_file(data: Dict, path: str, file: str, make_default: bool):
"""
Save the data as json file.
Args:
data: the topology data
path: the path were to save the dat
file: the file name to save data in
make_default: if True, create the symlink "default" with path as target
Returns:
None
"""
path_file = f'{path}/{file}'
save_file = Path(f'{path_file}')
save_file.parent.mkdir(exist_ok=True, parents=True)
save_file.write_text(dumps(data))
if make_default:
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
Path(f'{parent_path}/default').symlink_to(target=Path(path), target_is_directory=True)
def save_topology(
data: dict,
base_directory: str,
output_directory: str,
dont_compare: bool,
make_default: bool,
topology_file_name: str,
):
path = f'{base_directory}/{output_directory}'
def _save():
save_data_to_file(
data=data,
path=path,
file=topology_file_name,
make_default=make_default,
)
if dont_compare:
_save()
else:
if not is_equal_with_default(
data=data,
file=f'{base_directory}/default/{topology_file_name}'
):
_save()
else:
print(
'Topology matches default topology, not saved! Use "--dont-compare" to save identical topologies.'
)
def is_mac_address(mac_address: str, debug: bool = False) -> bool:
"""
Checks if mac_address is a valid MAC address. Will only accept MAC address in the form "AA:BB:CC:DD:EE:FF"
(lower case is also ok).
Args:
mac_address: the MAC address to check
debug: optional. If True the result of the function will be printed to stdout
Returns:
True if mac_address is a valid MAC address
False if mac_address not a valid MAC address
"""
re_mac_pattern = '([0-9A-Z]{2}\\:){5}[0-9A-Z]{2}'
if re_match(re_mac_pattern, mac_address.upper()):
if debug:
print(f'mac: {mac_address}, match')
return True
else:
if debug:
print(f'mac: {mac_address}, no match')
return False
def is_list_of_str_equal(list1: List[str], list2: List[str]) -> bool:
"""
Compares two list of strings. Before compared the list will internal sorted.
Args:
list1:
list2:
Returns:
True if both lists match
False if they don't match
"""
tmp_list1 = list1.copy()
tmp_list2 = list2.copy()
tmp_list1.sort()
tmp_list2.sort()
return tmp_list1 == tmp_list2
def merge_topologies(topo_pri: Dict, topo_sec: Dict) -> Dict:
"""
Merge dict_prim into dict_sec
Args:
topo_pri: data of dict_pri will overwrite the data in dict_sec
topo_sec: dict where the data of dict_pri will be merged to
Returns:
Dict: topo_sec that contains merged data from top_sec and top_pri
"""
keys_pri = list(topo_pri.keys())
# first transfer all completely missing items from dict_prim to dict_sec
for key in keys_pri:
if key not in topo_sec.keys():
topo_sec[key] = topo_pri[key]
else:
topo_sec[key]['connections'].update(topo_pri[key].get('connections', {}))
topo_sec[key]['interfaces'] = list(set((topo_sec[key]['interfaces'] + topo_pri[key].get('interfaces', []))))
topo_pri.pop(key)
return topo_sec
def compare_dicts(dict1: Dict, dict2: Dict) -> bool:
# check top level keys
if not is_list_of_str_equal(list(dict1.keys()), list(dict2.keys())):
# print('top level dont match')
# print(f'dict1: {list(dict1.keys())}')
# print(f'dict1: {list(dict2.keys())}')
return False
for key, value in dict1.items():
_type = type(value)
if _type == dict:
if not compare_dicts(value, dict2[key]):
return False
elif _type == list:
if not is_list_of_str_equal(value, dict2[key]):
# print(f'list1: {value}')
# print(f'list2: {dict2[key]}')
return False
elif _type == str:
if not value == dict2[key]:
# print('value dont match')
# print(f'value1: {value}')
# print(f'value2 {dict2[key]}')
return False
else:
return False
return True
def is_equal_with_default(data: Dict, file: str) -> bool:
default_file = Path(file)
if default_file.exists():
default_data = literal_eval(default_file.read_text())
return compare_dicts(data, default_data)
def get_inventory_data(host: str, debug: bool = False, ) -> Dict[str, str] | None:
query = f'GET hosts\nColumns: mk_inventory\nOutputFormat: python3\nFilter: host_name = {host}\n'
data = get_data_form_live_status(query=query)
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
if data:
try:
data = literal_eval(data[0][0].decode('utf-8'))
except SyntaxError as e:
if debug:
print(f'data: |{data}|')
print(f'type: {type(data)}')
print(f'exception: {e}')
return
return data
def get_table_from_inventory(inventory: Dict[str, Any], path: List[str]) -> List | None:
path = ('Nodes,' + ',Nodes,'.join(path.split(',')) + ',Table,Rows').split(',')
table = inventory.copy()
for m in path:
try:
table = table[m]
except KeyError:
return
return table
def get_interface_items_from_lq(host: str, debug: bool = False) -> List:
query = (
'GET services\n'
'Columns: host_name description check_command\n'
'Filter: description ~ Interface\n'
f'Filter: host_name = {host}\n'
'OutputFormat: python3\n'
data = get_data_form_live_status(query=query)
items = []
for host, description, check_command in data:
items.append(description[10:]) # remove 'Interface ' from description
if debug:
print(f'Interfaces items found: {len(items)} an host {host}')
return items