Collection of CheckMK checks (see https://checkmk.com/). All checks and plugins are provided as is. Absolutely no warranty. Send any comments to thl-cmk[at]outlook[dot]com

Skip to content
Snippets Groups Projects
Commit 27104d7c authored by thl-cmk's avatar thl-cmk :flag_na:
Browse files

added option --include-l3-hosts, changed default layers to None, reworked static topology

parent bd0166e3
No related branches found
No related tags found
No related merge requests found
[PACKAGE]: ../../raw/master/mkp/nvdct-0.9.3-20241209.mkp "nvdct-0.9.3-20241209.mkp"
[PACKAGE]: ../../raw/master/mkp/nvdct-0.9.4-20241210.mkp "nvdct-0.9.4-20241210.mkp"
# Network Visualization Data Creation Tool (NVDCT)
This script creates the topology data file needed for the [Checkmk Exchange Network visualization](https://exchange.checkmk.com/p/network-visualization) plugin.\
......
File added
......@@ -13,6 +13,7 @@
#
# list of (additional to -s/--seed-devices) seed devices
# [0-9-a-zA-Z\.\_\-]{1,253} -> host
L2_SEED_DEVICES = [
# "CORE01",
# "LOCATION01",
......@@ -26,6 +27,7 @@ L2_DROP_HOSTS = [
]
# hosts will be ignored in L3v4 topology
# [0-9-a-zA-Z\.\_\-]{1,253} -> host
L3V4_IGNORE_HOSTS = [
# "host1",
# "host2",
......@@ -64,29 +66,39 @@ PROTECTED_TOPOLOGIES = [
]
# user defined static connections
# these connections will be added from host to neighbour and in reverese
# hosts/neighbours in this section will be added to SEED_DEVICES
# [0-9-a-zA-Z\.\_\-]{1,253} -> host
STATIC_CONNECTIONS = [
# ["cmk_host1", "local-port1", "neighbour-port1", "neighbour1", "label"],
# ["cmk_host1", "local-port2", "neighbour-port2", "neighbour2", "label"],
# valid entry formats
# ["left_host", "left_service", "right_service", "right_host"],
# connection: "left_host"<->"left_service"<->"right_service"<->"right_host"
# ["left_host", "", "right_service", "right_host"],
# connection: "left_host"<->"right_service"<->"right_host"
# ["left_host", "left_service", "", "right_host"],
# connection: "left_host"<->"left_service"<->"right_host"
# ["left_host", "", "", "right_host"],
# connection: "left_host"<->"right_host"
]
# optional custom layers use option -l/--layers CUSTOM to include this layers
# THIS OPTION IS DEPRECATED
# optional custom layers use option -l/--layers CUSTOM to include these layers
# don't use --pre-fetch without a host_label that matches all host you want to add
# THIS OPTION IS DEPRECATED
CUSTOM_LAYERS = [
# { path = "path,in,inventory", columns = "columns from inventory", label = "label for the layer", host_label = "CMK host label to find matching hosts" },
# { path = "networking,lldp_cache,neighbours", columns = "neighbour_name,local_port,neighbour_port", label = "custom_LLDP", host_label = "nvdct/has_lldp_neighbours" },
# { path = "networking,cdp_cache,neighbours", columns = "neighbour_name,local_port,neighbour_port", label = "custom_CDP", host_label = "nvdct/has_cdp_neighbours" },
]
# list customers so include/excluse, use option --filter-costumers INCLUDE/EXCLUDE
# list customers to include/excluse, use option --filter-costumers INCLUDE/EXCLUDE
# [0-9-a-zA-Z\.\_\-]{1,16} -> customer
CUSTOMERS = [
# "customer1",
# "customer2",
# "customer3",
]
# list site so include/excluse, use option --filter-sites INCLUDE/EXCLUDE
# list site to include/excluse, use option --filter-sites INCLUDE/EXCLUDE
# [0-9-a-zA-Z\.\_\-]{1,16} -> site
SITES = [
# "site1",
# "site2",
......@@ -94,6 +106,7 @@ SITES = [
]
# map inventory neighbour name to Checkmk host name
# [0-9-a-zA-Z\.\_\-]{1,253} -> host
[L2_HOST_MAP]
# inventory_neighbour1 = "cmk_host1"
# inventory_neighbour2 = "cmk_host2"
......@@ -106,6 +119,7 @@ SITES = [
# "^Meraki.*\\s-\\s" = ""
# replace network objects (takes place after summarize)
# [0-9-a-zA-Z\.\_\-]{1,253} -> host
[L3V4_REPLACE]
# "10.193.172.0/24" = "MPLS"
# "10.194.8.0/23" = "MPLS"
......@@ -127,29 +141,30 @@ SITES = [
# must be sorted from slower to faster speed
# use only one entry to have all conections with the same thickness
# bits per second = thickness
# 2000000 = 1 # 2 mbit
# 5000000 = 2 # 5 mbit
# 1e7 = 3 # 10 mbit
# 51e7 = 4 # 51 mbit
1e8 = 1 # 100 mbit
1e9 = 3 # 1 gbit
1e10 = 5 # 10 gbit
# "2000000" = 1 # 2 mbit
# "5000000" = 2 # 5 mbit
# "1e7" = 3 # 10 mbit
# "51e7" = 4 # 51 mbit
"1e8" = 1 # 100 mbit
"1e9" = 3 # 1 gbit
"1e10" = 5 # 10 gbit
[SETTINGS]
# api_port = 80
# api_port = 5001
# backend = "MULTISITE" | "RESTAPI" | "LIVESTATUS"
# case = "LOWER" | "UPPER"
# default = false
# dont_compare = false
# filter_customers = "INCLUDE" |"EXCLUDE"
# filter_sites = "INCLUDE" | "EXCLUDE"
# include_l3_hosts = false
# keep = 0
# layers = ["LLDP", "CDP", "STATIC", "CUSTOM", "L3v4"]
# layers = ["LLDP", "CDP", L3v4, "STATIC", "CUSTOM"]
# log_file = "~/var/log/nvdct.log"
# log_level = "WARNING"
# log_to_stdout = false
# min_age = 0
# output_directory = ''
# output_directory = '' #
# pre_fetch = false
# prefix = ""
# quiet = true
......
......@@ -23,6 +23,7 @@
# --dont-compare
# --filter-customers
# --filter-sites
# --include-l3-hosts
# --keep
# --log-file
# --log-level
......@@ -43,18 +44,19 @@ from argparse import (
)
from pathlib import Path
from lib.utils import (
ExitCodes,
from lib.constants import (
HOME_URL,
MIN_CDP_VERSION,
MIN_LINUX_IP_ADDRESSES,
MIN_SNMP_IP_ADDRESSES,
MIN_WINDOWS_IP_ADDRESSES,
MIN_LLDP_VERSION,
MIN_IP_ADDRESSES,
NVDCT_VERSION,
# SAMPLE_SEEDS,
SCRIPT,
TIME_FORMAT_ARGPARSER,
USER_DATA_FILE,
)
from lib.utils import ExitCodes
def parse_arguments() -> arg_Namespace:
......@@ -83,7 +85,7 @@ def parse_arguments() -> arg_Namespace:
f' {ExitCodes.BACKEND_NOT_IMPLEMENTED.value} - Backend not implemented\n'
f' {ExitCodes.AUTOMATION_SECRET_NOT_FOUND.value} - Automation secret not found\n'
'\nUsage:\n'
f'{SCRIPT} -u ~/local/bin/nvdct/conf/{USER_DATA_FILE} \n\n'
f'{SCRIPT} -u ~/local/bin/nvdct/conf/my_{USER_DATA_FILE} \n\n'
)
parser.add_argument(
......@@ -122,12 +124,13 @@ def parse_arguments() -> arg_Namespace:
choices=['CDP', 'CUSTOM', 'LLDP', 'STATIC', 'L3v4'],
# default=['CDP'],
help=(
f' - CDP : needs inv_cdp_cache package at least in version {MIN_CDP_VERSION}\n'
f' - LLDP : needs inv_lldp_cache package at least in version {MIN_LLDP_VERSION}\n'
f' - L3v4 : needs inv_ip_address package at least in version {MIN_IP_ADDRESSES}\n'
f' adds, layer 3 topology fpr IPv4\n'
f' - STATIC (deprecated)\n'
f' - CUSTOM (deprecated)\n'
f' - CDP : needs inv_cdp_cache package at least in version {MIN_CDP_VERSION}\n'
f' - LLDP : needs inv_lldp_cache package at least in version {MIN_LLDP_VERSION}\n'
f' - L3v4 : needs inv_ip_address package at least in version {MIN_SNMP_IP_ADDRESSES} for SNMP based hosts\n'
f' for Linux based hosts inv_lnx_ip_if in version {MIN_LINUX_IP_ADDRESSES}\n'
f' for Windows based hosts inv_win_ip_if in version {MIN_WINDOWS_IP_ADDRESSES}\n'
f' - STATIC: creates a topology base on the "STATIC_CONNECTIONS" in the toml file\n'
f' - CUSTOM: (deprecated)\n'
)
)
parser.add_argument(
......@@ -192,6 +195,10 @@ def parse_arguments() -> arg_Namespace:
help='INCLUDE/EXCLUDE site list from config file.\n'
'Note: MULTISITE backend only.',
)
parser.add_argument(
'--include-l3-hosts', action='store_const', const=True, # default=False,
help='Include hosts (single IP objects) in layer 3 topology',
)
parser.add_argument(
'--remove-domain', action='store_const', const=True, # default=False,
help='Remove the domain name from the neighbor name',
......
......@@ -23,14 +23,17 @@ from sys import exit as sys_exit
from livestatus import MultiSiteConnection, SiteConfigurations, SiteId
from lib.utils import (
from lib.constants import (
CACHE_INTERFACES_DATA,
OMD_ROOT,
PATH_INTERFACES,
)
from lib.utils import (
ExitCodes,
get_data_form_live_status,
get_table_from_inventory,
LOGGER,
OMD_ROOT,
PATH_INTERFACES,
)
......@@ -410,8 +413,8 @@ class HostCacheMultiSite(HostCacheLiveStatus):
'local site only. Try -b RESTAPI if you have a distributed environment.'
)
def filter_sites(self, filter: str| None, sites:List[str]):
match filter:
def filter_sites(self, filter_: str | None, sites: List[str]):
match filter_:
case 'INCLUDE':
self.sites = {site: data for site, data in self.sites.items() if site in sites}
case 'EXCLUDE':
......@@ -419,8 +422,8 @@ class HostCacheMultiSite(HostCacheLiveStatus):
case _:
return
def filter_costumers(self, filter: str | None, costumers:List[str]):
match filter:
def filter_costumers(self, filter_: str | None, costumers: List[str]):
match filter_:
case 'INCLUDE':
self.sites = {
site: data for site, data in self.sites.items() if data.get('customer') in costumers
......@@ -437,10 +440,17 @@ class HostCacheMultiSite(HostCacheLiveStatus):
class HostCacheRestApi(HostCache):
def __init__(self, pre_fetch: bool, api_port: int):
def __init__(
self,
pre_fetch: bool,
api_port: int,
filter_sites: str | None = None,
sites: List[str] = [],
):
super().__init__(pre_fetch, '[RESTAPI]')
LOGGER.debug(f'{self.backend} init backend')
self._api_port = api_port
self.sites = []
try:
self.__secret = Path(
f'{OMD_ROOT}/var/check_mk/web/automation/automation.secret'
......@@ -458,9 +468,33 @@ class HostCacheRestApi(HostCache):
self.__session.headers['Authorization'] = f"Bearer {self.__user} {self.__secret}"
self.__session.headers['Accept'] = 'application/json'
self.get_sites()
self.filter_sites(filter_=filter_sites, sites=sites)
LOGGER.info(f'{self.backend} filtered sites : {self.sites}')
if self.pre_fetch:
self.pre_fetch_hosts()
def get_sites(self):
LOGGER.debug(f'{self.backend} get_sites')
resp = self.__session.get(url=f"{self.__api_url}/domain-types/site_connection/collections/all")
if resp.status_code == 200:
sites = resp.json().get("value")
self.sites = [site.get('id') for site in sites]
LOGGER.debug(f'{self.backend} sites : {self.sites}')
else:
LOGGER.warning(f'{self.backend} got no site information! status code {resp.status_code}')
LOGGER.debug(f'{self.backend} response text: {resp.text}')
def filter_sites(self, filter_: str | None, sites: List[str]):
match filter_:
case 'INCLUDE':
self.sites = [site for site in self.sites if site in sites]
case 'EXCLUDE':
self.sites = [site for site in self.sites if site not in sites]
case _:
return
def get_inventory_data(self, hosts: List[str]) -> Dict[str, Dict | None]:
LOGGER.debug(f'{self.backend} get_inventory_data {hosts}')
host_data: Dict[str, Dict | None] = {}
......@@ -475,8 +509,9 @@ class HostCacheRestApi(HostCache):
resp = self.__session.get(
url=f"{self.__api_url}/domain-types/host/collections/all",
params={
"query": query,
"columns": ['name', 'mk_inventory'],
'query': query,
'columns': ['name', 'mk_inventory'],
'sites': self.sites,
},
)
if resp.status_code == 200:
......@@ -487,7 +522,7 @@ class HostCacheRestApi(HostCache):
if host:
host_data[host] = raw_host['extensions'].get('mk_inventory')
else:
LOGGER.info(
LOGGER.warning(
f'{self.backend} got no inventory data found!, status code {resp.status_code}'
)
LOGGER.debug(f'{self.backend} response query: {query}')
......@@ -511,10 +546,11 @@ class HostCacheRestApi(HostCache):
query = f'{{"op": "and", "expr": [{query_item},{query_host}]}}'
resp = self.__session.get(
url=f"{self.__api_url}/domain-types/service/collections/all",
url=f'{self.__api_url}/domain-types/service/collections/all',
params={
"query": query,
"columns": ['host_name', 'description', 'long_plugin_output'],
'query': query,
'columns': ['host_name', 'description', 'long_plugin_output'],
'sites': self.sites,
},
)
......@@ -556,10 +592,11 @@ class HostCacheRestApi(HostCache):
# return False
query = '{"op": "=", "left": "name", "right": "' + host + '"}'
resp = self.__session.get(
url=f"{self.__api_url}/domain-types/host/collections/all",
url=f'{self.__api_url}/domain-types/host/collections/all',
params={
"query": query,
"columns": ['name'],
'query': query,
'columns': ['name'],
'sites': self.sites,
},
)
if resp.status_code == 200:
......@@ -579,14 +616,14 @@ class HostCacheRestApi(HostCache):
def get_hosts_by_label(self, label: str) -> List[str] | None:
LOGGER.debug(f'{self.backend} get_hosts_by_label {label}')
# query = '{"op": "=", "left": "label_names", "right": "' + label + '"}'
query = '{"op": "=", "left": "labels", "right": "' + label + '"}'
resp = self.__session.get(
url=f"{self.__api_url}/domain-types/host/collections/all",
url=f'{self.__api_url}/domain-types/host/collections/all',
params={
"query": query,
"columns": ['name', 'labels'],
'columns': ['name', 'labels'],
'query': query,
'sites': self.sites,
},
)
if resp.status_code == 200:
......@@ -608,11 +645,12 @@ class HostCacheRestApi(HostCache):
def pre_fetch_hosts(self):
LOGGER.debug(f'{self.backend} pre_fetch_hosts')
LOGGER.critical(f'{self.backend} pre_fetch_hosts sites {self.sites}')
resp = self.__session.get(
url=f"{self.__api_url}/domain-types/host/collections/all",
url=f'{self.__api_url}/domain-types/host/collections/all',
params={
"columns": ['name'],
'columns': ['name'],
'sites': self.sites,
},
)
if resp.status_code == 200:
......
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2024-12-11
# File : nvdct/lib/constants.py
from logging import getLogger
from os import environ
from typing import Final
NVDCT_VERSION: Final[str] = '0.9.4-20241210'
#
OMD_ROOT: Final[str] = environ["OMD_ROOT"]
#
API_PORT: Final[int] = 5001
CACHE_INTERFACES_DATA: Final[str] = 'interface_data'
CMK_SITE_CONF: Final[str] = f'{OMD_ROOT}/etc/omd/site.conf'
COLUMNS_CDP: Final[str] = 'neighbour_name,local_port,neighbour_port'
COLUMNS_L3v4: Final[str] = 'address,device,cidr,network,type'
COLUMNS_LLDP: Final[str] = 'neighbour_name,local_port,neighbour_port'
DATAPATH: Final[str] = f'{OMD_ROOT}/var/check_mk/topology/data'
HOME_URL: Final[str] = 'https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/nvdct'
HOST_LABEL_CDP: Final[str] = "'nvdct/has_cdp_neighbours' 'yes'"
HOST_LABEL_L3V4_HOSTS: Final[str] = "'nvdct/l3v4_topology' 'host'"
HOST_LABEL_L3V4_ROUTER: Final[str] = "'nvdct/l3v4_topology' 'router'"
HOST_LABEL_LLDP: Final[str] = "'nvdct/has_lldp_neighbours' 'yes'"
LABEL_CDP: Final[str] = 'CDP'
LABEL_L3v4: Final[str] = 'LAYER3v4'
LABEL_LLDP: Final[str] = 'LLDP'
LOGGER: Final[str] = getLogger('root)')
LOG_FILE: Final[str] = f'{OMD_ROOT}/var/log/nvdct.log'
MIN_CDP_VERSION: Final[str] = '0.7.1-20240320'
MIN_LINUX_IP_ADDRESSES: Final[str] = '0.0.4-20241210'
MIN_SNMP_IP_ADDRESSES: Final[str] = '0.0.6-20241210'
MIN_WINDOWS_IP_ADDRESSES: Final[str] = '0.0.3-20241210'
MIN_LLDP_VERSION: Final[str] = '0.9.3-20240320'
PATH_CDP: Final[str] = 'networking,cdp_cache,neighbours'
PATH_INTERFACES: Final[str] = 'networking,interfaces'
PATH_L3v4: Final[str] = 'networking,addresses'
PATH_LLDP: Final[str] = 'networking,lldp_cache,neighbours'
SCRIPT: Final[str] = '~/local/bin/nvdct/nvdct.py'
TIME_FORMAT: Final[str] = '%Y-%m-%dT%H:%M:%S.%m'
TIME_FORMAT_ARGPARSER: Final[str] = '%%Y-%%m-%%dT%%H:%%M:%%S.%%m'
USER_DATA_FILE: Final[str] = 'nvdct.toml'
......@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
from cmk_addons.plugins.bgp_topology.lib.utils import OMD_ROOT
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-10-12
......@@ -13,20 +13,29 @@
from collections.abc import Mapping
from ipaddress import AddressValueError, IPv4Address, IPv4Network, NetmaskValueError
from logging import CRITICAL, FATAL, ERROR, WARNING, INFO, DEBUG
from os import environ
from sys import exit as sys_exit
from time import strftime
from typing import Dict, List, NamedTuple, Tuple
from pathlib import Path
from lib.constants import (
API_PORT,
LOGGER,
LOG_FILE,
OMD_ROOT,
TIME_FORMAT,
USER_DATA_FILE,
)
from lib.utils import (
ExitCodes,
Layer,
get_data_from_toml,
get_local_cmk_api_port,
# get_local_cmk_version,
LOGGER,
TIME_FORMAT,
USER_DATA_FILE,
Layer
is_valid_customer_name,
is_valid_hostname,
is_valid_log_file,
is_valid_output_directory,
is_valid_site_name,
)
......@@ -45,11 +54,10 @@ class Thickness(NamedTuple):
class StaticConnection(NamedTuple):
host: str
local_port: str
neighbour_port: str
neighbour: str
label: str
right_host: str
right_service: str
left_service: str
left_host: str
class Wildcard(NamedTuple):
......@@ -65,11 +73,6 @@ class Settings:
self,
cli_args: Mapping[str, object],
):
self.__omd_root = environ['OMD_ROOT']
self.__path_to_if_table = 'networking,interfaces'
self.__topology_file_name = 'network_data.json'
# self.__topology_save_path = 'var/topology_data'
self.__topology_save_path_cmk_2_3 = 'var/check_mk/topology/data'
# cli defaults
self.__settings = {
# 'api_port': 80,
......@@ -80,10 +83,11 @@ class Settings:
'dont_compare': False,
'filter_customers': None,
'filter_sites': None,
'include_l3_hosts': False,
'keep': False,
'remove_domain': False,
'layers': ['CDP'],
'log_file': f'{self.omd_root}/var/log/nvdct.log',
'layers': [],
'log_file': LOG_FILE,
'log_level': 'WARNING',
'log_to_stdout': False,
'min_age': 0,
......@@ -91,11 +95,10 @@ class Settings:
'prefix': None,
'quiet': False,
'pre_fetch': False,
# 'seed_devices': [],
'skip_l3_if': False,
'skip_l3_ip': False,
'time_format': TIME_FORMAT,
'user_data_file': f'{self.omd_root}/local/bin/nvdct/conf/{USER_DATA_FILE}',
'user_data_file': f'{OMD_ROOT}/local/bin/nvdct/conf/{USER_DATA_FILE}',
}
# args in the form {'s, __seed_devices': 'CORE01', 'p, __path_in_inventory': None, ... }}
# we will remove 's, __'
......@@ -128,7 +131,6 @@ class Settings:
self.__api_port: int | None = None
# init user data with defaults
# self.__drop_host_regex: List[str] | None = None
self.__custom_layers: List[StaticConnection] | None = None
self.__customers: List[str] | None = None
self.__emblems: Emblems | None = None
......@@ -157,7 +159,7 @@ class Settings:
else:
self.__api_port = get_local_cmk_api_port()
if self.__api_port is None:
self.__api_port = 80
self.__api_port = API_PORT
return self.__api_port
......@@ -217,6 +219,10 @@ class Settings:
)
return None
@property # --include-l3-hosts
def include_l3_hosts(self) -> bool:
return bool(self.__settings['include_l3_hosts'])
@property # --keep
def keep(self) -> int | None:
if isinstance(self.__settings['keep'], int):
......@@ -233,27 +239,25 @@ class Settings:
@property # --log-file
def log_file(self) -> str:
return str(self.__settings['log_file'])
raw_log_file = str(Path(str(self.__settings['log_file'])).expanduser())
if is_valid_log_file(raw_log_file):
return raw_log_file
else:
LOGGER.error(f'Falling back to {LOG_FILE}')
return LOG_FILE
@property # --log-level
def loglevel(self) -> int:
match self.__settings['log_level']:
case 'DEBUG':
return DEBUG
case 'INFO':
return INFO
case 'WARNING':
return WARNING
case 'ERROR':
return ERROR
case 'FATAL':
return FATAL
case 'CRITICAL':
return CRITICAL
case 'OFF':
return -1
case _:
return WARNING
log_levels = {
'DEBUG': DEBUG,
'INFO': INFO,
'WARNING': WARNING,
'ERROR': ERROR,
'FATAL': FATAL,
'CRITICAL': CRITICAL,
'OFF': -1,
}
return log_levels.get(self.__settings['log_level'], WARNING)
@property # --log-to-stdout
def log_to_stdtout(self) -> bool:
......@@ -271,7 +275,11 @@ class Settings:
# init output directory with current time if not set
if not self.__settings['output_directory']:
self.__settings['output_directory'] = f'{strftime(self.__settings["time_format"])}'
return str(self.__settings['output_directory'])
if is_valid_output_directory(str(self.__settings['output_directory'])):
return str(self.__settings['output_directory'])
else:
LOGGER.error('Falling back to "nvdct"')
return 'nvdct'
@property # --prefix
def prefix(self) -> str | None:
......@@ -311,7 +319,7 @@ class Settings:
if self.__customers is None:
self.__customers = [
str(customer) for customer in set(self.__user_data.get('CUSTOMERS', []))
]
if is_valid_customer_name(customer)]
LOGGER.info(f'Found {len(self.__customers)} to filter on')
return self.__customers
......@@ -362,9 +370,7 @@ class Settings:
def l2_seed_devices(self) -> List[str]:
if self.__l2_seed_devices is None:
self.__l2_seed_devices = list(set(str(host) for host in (
# self.__user_data.get('L2_SEED_DEVICES', []) + self.__settings.get('seed_devices', [])
self.__user_data.get('L2_SEED_DEVICES', [])
)))
self.__user_data.get('L2_SEED_DEVICES', [])) if is_valid_hostname(host)))
return self.__l2_seed_devices
@property
......@@ -373,7 +379,7 @@ class Settings:
self.__l2_host_map = {
str(host): str(replace_host) for host, replace_host in self.__user_data.get(
'L2_HOST_MAP', {}
).items()
).items() if is_valid_hostname(host)
}
return self.__l2_host_map
......@@ -392,7 +398,7 @@ class Settings:
if self.__l3v4_ignore_hosts is None:
self.__l3v4_ignore_hosts = [str(host) for host in set(self.__user_data.get(
'L3V4_IGNORE_HOSTS', []
))]
)) if is_valid_hostname(host)]
return self.__l3v4_ignore_hosts
@property
......@@ -460,9 +466,12 @@ class Settings:
_ip_network = IPv4Network(ip_network) # noqa: F841
except (AddressValueError, NetmaskValueError):
LOGGER.error(
f'Invalid entry in L3V4_REPLACE found: {ip_network} -> ignored'
f'Invalid entry in L3V4_REPLACE found: {ip_network} -> line ignored'
)
continue
if not is_valid_hostname(node):
LOGGER.error(f'Invalid node name found: {node} -> line ignored ')
continue
self.__l3v4_replace[ip_network] = str(node)
LOGGER.info(
f'Valid entries in L3V4_REPLACE found: {len(self.__l3v4_replace)}/'
......@@ -526,18 +535,22 @@ class Settings:
self.__static_connections = []
for connection in self.__user_data.get('STATIC_CONNECTIONS', []):
try:
host, local_port, neighbour_port, neighbour, label = connection
left_host, left_service, right_service, right_host = connection
except ValueError:
LOGGER.error(
f'Worng entry in STATIC_CONNECTIONS -> {connection} -> ignored'
f'Wrong entry in STATIC_CONNECTIONS -> {connection} -> ignored'
)
continue
if not right_host or not left_host:
LOGGER.warning(f'Both hosts must be set, got {connection}')
continue
if not is_valid_hostname(right_host) or not is_valid_hostname(left_host):
continue
self.__static_connections.append(StaticConnection(
host=str(host),
local_port=str(local_port),
neighbour_port=str(neighbour_port),
neighbour=str(neighbour),
label=str(label),
right_host=str(right_host),
right_service=str(right_service),
left_service=str(left_service),
left_host=str(left_host),
))
LOGGER.info(
f'Valid entries in STATIC_CONNECTIONS found: {len(self.__static_connections)}/'
......@@ -548,21 +561,6 @@ class Settings:
@property
def sites(self) -> List[str]:
if self.__sites is None:
self.__sites = [str(site) for site in set(self.__user_data.get('SITES', []))]
LOGGER.info(f'fFound {len(self.__sites)} to filter on')
self.__sites = [str(site) for site in set(self.__user_data.get('SITES', [])) if is_valid_site_name(site)]
LOGGER.info(f'Found {len(self.__sites)} to filter on')
return self.__sites
#
# all other settings
#
@property
def omd_root(self) -> str:
return self.__omd_root
@property
def topology_save_path(self) -> str:
return self.__topology_save_path_cmk_2_3
@property
def topology_file_name(self) -> str:
return self.__topology_file_name
......@@ -10,11 +10,32 @@
from collections.abc import Mapping, Sequence
from ipaddress import IPv4Address, IPv4Network
from typing import Dict, List
from lib.backends import CACHE_INTERFACES_DATA, CacheItems, HostCache, PATH_INTERFACES
from lib.settings import Thickness, Wildcard
from lib.utils import LOGGER
from typing import Dict, List, Tuple
from re import sub as re_sub
from lib.backends import (
CacheItems,
HostCache,
)
from lib.constants import (
CACHE_INTERFACES_DATA,
HOST_LABEL_L3V4_HOSTS,
HOST_LABEL_L3V4_ROUTER,
LOGGER,
PATH_INTERFACES,
PATH_L3v4,
)
from lib.settings import (
Emblems,
StaticConnection,
Thickness,
Wildcard,
)
from lib.utils import (
InventoryColumns,
Ipv4Info,
is_valid_hostname,
)
class NvObjects:
......@@ -23,12 +44,15 @@ class NvObjects:
self.host_count: int = 0
self.host_list: List[str] = []
def add_host_object(
def add_host(
self,
host: str,
host_cache: HostCache,
emblem: str | None = None
) -> None:
if not is_valid_hostname(host):
LOGGER.error(f'host not added! Invalid name {host}')
return
if host not in self.nv_objects:
self.host_count += 1
self.host_list.append(host)
......@@ -57,7 +81,45 @@ class NvObjects:
}
LOGGER.debug(f'host: {host}, link: {link}, metadata: {metadata}')
def add_service_object(
def add_service(
self,
host: str,
service: str,
host_cache: HostCache,
emblem: str | None = None,
metadata: Dict | None = None,
name: str | None = None,
) -> None:
if metadata is None:
metadata = {}
if name is None:
name = service
self.add_host(host=host, host_cache=host_cache)
service_object = f'{service}@{host}'
if service_object not in self.nv_objects:
link: Dict = {}
if host_cache.host_exists(host=host):
link = {'core': [host, service]}
elif emblem is not None:
metadata.update({
'images': {
'emblem': emblem, # node image
# 'icon': 'icon_tick', # status image, top left from emblem
},
**({'tooltip': {'quickinfo': [{'name': 'Service node', 'value': service}]}} if not link else {})
})
self.nv_objects[service_object] = {
'name': name,
'link': link,
'metadata': metadata,
}
elif metadata is not {}:
self.nv_objects[service_object]['metadata'].update(metadata)
def add_interface(
self,
host: str,
service: str,
......@@ -73,7 +135,7 @@ class NvObjects:
name = service
speed = None
self.add_host_object(host=host, host_cache=host_cache)
self.add_host(host=host, host_cache=host_cache)
service_object = f'{service}@{host}'
if service_object not in self.nv_objects:
link: Dict = {}
......@@ -106,7 +168,7 @@ class NvObjects:
elif metadata is not {}:
self.nv_objects[service_object]['metadata'].update(metadata)
def add_ipv4_address_object(
def add_ipv4_address(
self,
host: str,
ipv4_address: str,
......@@ -135,7 +197,7 @@ class NvObjects:
}
}
def add_ipv4_network_object(self, network: str, emblem: str, ) -> None:
def add_ipv4_network(self, network: str, emblem: str, ) -> None:
if network not in self.nv_objects:
self.nv_objects[network] = {
'name': network,
......@@ -339,19 +401,19 @@ def get_service_by_interface(host: str, interface: str, host_cache: HostCache) -
# 'management': 'Ma',
}
def _get_short_if_name(interface: str) -> str:
def _get_short_if_name(interface_: str) -> str:
"""
returns short interface name from long interface name
interface: is the long interface name
:type interface: str
:type interface_: str
"""
if not interface:
return interface
if not interface_:
return interface_
for interface_prefix in _alternate_if_name.keys():
if interface.lower().startswith(interface_prefix.lower()):
if interface_.lower().startswith(interface_prefix.lower()):
interface_short = _alternate_if_name[interface_prefix]
return interface.lower().replace(interface_prefix.lower(), interface_short, 1)
return interface
return interface_.lower().replace(interface_prefix.lower(), interface_short, 1)
return interface_
# try to find the item for an interface
def _match_entry_with_item(_entry: Mapping[str, str], services: Sequence[str]) -> str | None:
......@@ -552,3 +614,387 @@ def get_list_of_devices(data: Mapping) -> List[str]:
for connection in data.values():
devices.append(connection[0])
return list(set(devices))
def create_static_connections(
connections_: Sequence[StaticConnection],
emblems: Emblems,
host_cache: HostCache,
nv_connections: NvConnections,
nv_objects: NvObjects,
):
for connection in connections_:
LOGGER.info(msg=f'connection: {connection}')
nv_objects.add_host(
host=connection.right_host,
host_cache=host_cache,
emblem=emblems.host_node
)
nv_objects.add_host(
host=connection.left_host,
host_cache=host_cache,
emblem=emblems.host_node
)
if connection.right_service:
nv_objects.add_service(
host=connection.right_host,
host_cache=host_cache,
emblem=emblems.service_node,
service=connection.right_service
)
nv_connections.add_connection(
left=connection.right_host,
right=f'{connection.right_service}@{connection.right_host}',
)
if connection.left_service:
nv_objects.add_service(
host=connection.left_host,
host_cache=host_cache,
emblem=emblems.service_node,
service=connection.left_service
)
nv_connections.add_connection(
left=connection.left_host,
right=f'{connection.left_service}@{connection.left_host}',
)
if connection.right_service and connection.left_service:
nv_connections.add_connection(
left=f'{connection.right_service}@{connection.right_host}',
right=f'{connection.left_service}@{connection.left_host}',
)
elif connection.right_service: # connect right_service with left_host
nv_connections.add_connection(
left=f'{connection.right_service}@{connection.right_host}',
right=f'{connection.left_host}',
)
elif connection.left_service: # connect left_service with right_host
nv_connections.add_connection(
left=f'{connection.right_host}',
right=f'{connection.left_service}@{connection.left_host}',
)
else: # connect right_host with left_host
nv_connections.add_connection(
left=f'{connection.right_host}',
right=f'{connection.left_host}',
)
def create_l2_device_from_inv(
case: str,
host: str,
host_cache: HostCache,
inv_columns: InventoryColumns,
inv_data: Sequence[Mapping[str, str]],
l2_drop_hosts: List,
l2_host_map: Dict[str, str],
l2_neighbour_replace_regex: List[Tuple[str, str]] | None,
nv_connections: NvConnections,
nv_objects: NvObjects,
prefix: str,
remove_domain: bool,
) -> None:
for topo_neighbour in inv_data:
# check if required data are not empty
if not (neighbour := topo_neighbour.get(inv_columns.neighbour)):
LOGGER.warning(f'incomplete data, neighbour missing {topo_neighbour}')
continue
if not (raw_local_port := topo_neighbour.get(inv_columns.local_port)):
LOGGER.warning(f'incomplete data, local port missing {topo_neighbour}')
continue
if not (raw_neighbour_port := topo_neighbour.get(inv_columns.neighbour_port)):
LOGGER.warning(f'incomplete data, neighbour port missing {topo_neighbour}')
continue
# drop neighbour before domain split
if neighbour in l2_drop_hosts:
LOGGER.info(msg=f'drop neighbour: {neighbour}')
continue
if l2_neighbour_replace_regex:
for re_str, replace_str in l2_neighbour_replace_regex:
re_neighbour = re_sub(re_str, replace_str, neighbour)
if re_neighbour != neighbour:
LOGGER.info(f'regex changed Neighbor |{neighbour}| to |{re_neighbour}|')
neighbour = re_neighbour
if not neighbour:
LOGGER.info(f'Neighbour removed by regex (|{neighbour}|, |{re_str}|, |{replace_str}|)')
break
if not neighbour:
continue
if remove_domain:
neighbour = neighbour.split('.')[0]
# drop neighbour after domain split
if neighbour in l2_drop_hosts:
LOGGER.info(msg=f'drop neighbour: {neighbour}')
continue
if case == 'UPPER':
neighbour = neighbour.upper()
LOGGER.debug(f'Changed neighbour to upper case: {neighbour}')
elif case == 'LOWER':
neighbour = neighbour.lower()
LOGGER.debug(f'Changed neighbour to lower case: {neighbour}')
if prefix:
neighbour = f'{prefix}{neighbour}'
# rewrite neighbour if inventory neighbour and checkmk host don't match
if neighbour in l2_host_map.keys():
neighbour = l2_host_map[neighbour]
# getting/checking interfaces
local_port = get_service_by_interface(host, raw_local_port, host_cache)
if not local_port:
local_port = raw_local_port
LOGGER.warning(msg=f'service not found: host: {host}, raw_local_port: {raw_local_port}')
elif local_port != raw_local_port:
# local_port = raw_local_port # don't reset local_port
LOGGER.info(
msg=f'host: {host}, raw_local_port: {raw_local_port} -> local_port: {local_port}'
)
neighbour_port = get_service_by_interface(neighbour, raw_neighbour_port, host_cache)
if not neighbour_port:
neighbour_port = raw_neighbour_port
LOGGER.warning(
msg=f'service not found: neighbour: {neighbour}, '
f'raw_neighbour_port: {raw_neighbour_port}'
)
elif neighbour_port != raw_neighbour_port:
# neighbour_port = raw_neighbour_port # don't reset neighbour_port
LOGGER.info(
msg=f'neighbour: {neighbour}, raw_neighbour_port {raw_neighbour_port} '
f'-> neighbour_port {neighbour_port}'
)
metadata = {
'duplex': topo_neighbour.get('duplex'),
'native_vlan': topo_neighbour.get('native_vlan'),
}
nv_objects.add_host(host=host, host_cache=host_cache)
nv_objects.add_host(host=neighbour, host_cache=host_cache)
nv_objects.add_interface(
host=host,
service=local_port,
host_cache=host_cache,
metadata=metadata,
name=raw_local_port,
item=local_port
)
nv_objects.add_interface(
host=neighbour,
service=neighbour_port,
host_cache=host_cache,
name=raw_neighbour_port,
item=neighbour_port
)
nv_connections.add_connection(
left=host,
right=f'{local_port}@{host}',
)
nv_connections.add_connection(
left=neighbour,
right=f'{neighbour_port}@{neighbour}',
)
nv_connections.add_connection(
left=f'{local_port}@{host}',
right=f'{neighbour_port}@{neighbour}',
)
def create_l2_topology(
case: str,
host_cache: HostCache,
inv_columns: InventoryColumns,
l2_drop_hosts: List[str],
l2_host_map: Dict[str, str],
l2_neighbour_replace_regex: List[Tuple[str, str]],
label_: str,
nv_connections: NvConnections,
nv_objects: NvObjects,
path_in_inventory: str,
prefix: str,
remove_domain: bool,
seed_devices: Sequence[str],
) -> None:
devices_to_go = list(set(seed_devices)) # remove duplicates
devices_done = []
while devices_to_go:
device = devices_to_go[0]
if device in l2_host_map.keys():
try:
devices_to_go.remove(device)
except ValueError:
pass
device = l2_host_map[device]
if device in devices_done:
continue
topo_data = host_cache.get_data(
host=device, item=CacheItems.inventory, path=path_in_inventory
)
if topo_data:
create_l2_device_from_inv(
host=device,
inv_data=topo_data,
inv_columns=inv_columns,
l2_host_map=l2_host_map,
l2_drop_hosts=l2_drop_hosts,
l2_neighbour_replace_regex=l2_neighbour_replace_regex,
host_cache=host_cache,
nv_objects=nv_objects,
nv_connections=nv_connections,
case=case,
prefix=prefix,
remove_domain=remove_domain
)
for _entry in nv_objects.host_list:
if _entry not in devices_done:
devices_to_go.append(_entry)
devices_to_go = list(set(devices_to_go))
devices_done.append(device)
devices_to_go.remove(device)
LOGGER.info(msg=f'Device done: {device}, source: {label_}')
def create_l3v4_topology(
emblems: Emblems,
host_cache: HostCache,
ignore_hosts: Sequence[str],
ignore_ips: Sequence[IPv4Network],
ignore_wildcard: Sequence[Wildcard],
include_hosts: bool,
nv_connections: NvConnections,
nv_objects: NvObjects,
replace: Mapping[str, str],
skip_if: bool,
skip_ip: bool,
summarize: Sequence[IPv4Network],
) -> None:
host_list: Sequence[str] = host_cache.get_hosts_by_label(HOST_LABEL_L3V4_ROUTER)
if include_hosts:
host_list += host_cache.get_hosts_by_label(HOST_LABEL_L3V4_HOSTS)
LOGGER.debug(f'host list: {host_list}')
if not host_list:
LOGGER.warning(
msg='No (routing capable) host found. Check if "inv_ip_addresses.mkp" '
'added/enabled and inventory and host label discovery has run.'
)
return
LOGGER.debug(f'L3v4 ignore hosts: {ignore_hosts}')
for raw_host in host_list:
host = raw_host
if host in ignore_hosts:
LOGGER.info(f'L3v4 host {host} ignored')
continue
if not (ipv4_addresses := host_cache.get_data(
host=host, item=CacheItems.inventory, path=PATH_L3v4)
):
LOGGER.warning(f'No IPv4 address inventory found for host: {host}')
continue
nv_objects.add_host(host=host, host_cache=host_cache)
for _entry in ipv4_addresses:
emblem = emblems.ip_network
try:
ipv4_info = Ipv4Info(**_entry)
except TypeError: # as e
LOGGER.warning(f'Drop IPv4 address data for host: {host}, data: {_entry}')
continue
if ipv4_info.address.startswith('127.'): # drop loopback addresses
LOGGER.info(f'host: {host} dropped loopback address: {ipv4_info.address}')
continue
if ipv4_info.cidr == 32: # drop host addresses
LOGGER.info(
f'host: {host} dropped host address: {ipv4_info.address}/{ipv4_info.cidr}'
)
continue
if ipv4_info.type.lower() != 'ipv4': # drop if not ipv4
LOGGER.warning(
f'host: {host} dropped non ipv4 address: {ipv4_info.address},'
' type: {ipv4_info.type}'
)
continue
if is_ignore_ipv4(ipv4_info.address, ignore_ips):
LOGGER.info(f'host: {host} dropped ignore address: {ipv4_info.address}')
continue
if is_ignore_wildcard(ipv4_info.address, ignore_wildcard):
LOGGER.info(f'host: {host} dropped wildcard address: {ipv4_info.address}')
continue
if network := get_network_summary(
ipv4_address=ipv4_info.address,
summarize=summarize,
):
emblem = emblems.l3v4_summarize
LOGGER.info(
f'Network summarized: {ipv4_info.network}/{ipv4_info.cidr} -> {network}'
)
else:
network = f'{ipv4_info.network}/{ipv4_info.cidr}'
if network in replace.keys():
LOGGER.info(f'Replaced network {network} with {replace[network]}')
network = replace[network]
emblem = emblems.l3v4_replace
nv_objects.add_ipv4_network(network=network, emblem=emblem)
if skip_if is True and skip_ip is True:
nv_connections.add_connection(left=host, right=network)
elif skip_if is True and skip_ip is False:
nv_objects.add_ipv4_address(
host=host,
interface=None,
ipv4_address=ipv4_info.address,
emblem=emblems.ip_address,
)
nv_objects.add_tooltip_quickinfo(
'{ipv4_info.address}@{host}', 'Interface', ipv4_info.device
)
nv_connections.add_connection(left=f'{host}', right=f'{ipv4_info.address}@{host}')
nv_connections.add_connection(left=network, right=f'{ipv4_info.address}@{host}')
elif skip_if is False and skip_ip is True:
nv_objects.add_interface(
host=host, service=ipv4_info.device, host_cache=host_cache
)
nv_objects.add_tooltip_quickinfo(
f'{ipv4_info.device}@{host}', 'IP-address', ipv4_info.address
)
nv_connections.add_connection(left=f'{host}', right=f'{ipv4_info.device}@{host}')
nv_connections.add_connection(left=network, right=f'{ipv4_info.device}@{host}')
else:
nv_objects.add_ipv4_address(
host=host,
interface=ipv4_info.device,
ipv4_address=ipv4_info.address,
emblem=emblems.ip_address,
)
nv_objects.add_interface(
host=host, service=ipv4_info.device, host_cache=host_cache,
)
nv_connections.add_connection(
left=host, right=f'{ipv4_info.device}@{host}')
nv_connections.add_connection(
left=f'{ipv4_info.device}@{host}',
right=f'{ipv4_info.address}@{ipv4_info.device}@{host}',
)
nv_connections.add_connection(
left=network, right=f'{ipv4_info.address}@{ipv4_info.device}@{host}',
)
......@@ -14,7 +14,6 @@ from enum import Enum, unique
from json import dumps
from logging import disable as log_off, Formatter, getLogger, StreamHandler
from logging.handlers import RotatingFileHandler
from os import environ
from pathlib import Path
from re import match as re_match
from socket import socket, AF_UNIX, AF_INET, SOCK_STREAM, SHUT_WR
......@@ -23,8 +22,23 @@ from time import time as now_time
from tomllib import loads as toml_loads, TOMLDecodeError
from typing import List, Dict, TextIO
NVDCT_VERSION = '0.9.3-20241209'
from lib.constants import (
CMK_SITE_CONF,
COLUMNS_CDP,
COLUMNS_LLDP,
HOST_LABEL_CDP,
HOST_LABEL_L3V4_ROUTER,
HOST_LABEL_LLDP,
LABEL_CDP,
LABEL_L3v4,
LABEL_LLDP,
LOGGER,
OMD_ROOT,
PATH_CDP,
PATH_L3v4,
PATH_LLDP,
DATAPATH,
)
@unique
class ExitCodes(Enum):
......@@ -33,15 +47,7 @@ class ExitCodes(Enum):
BAD_TOML_FORMAT = 2
BACKEND_NOT_IMPLEMENTED = 3
AUTOMATION_SECRET_NOT_FOUND = 4
@dataclass(frozen=True)
class Layer:
path: str
columns: str
label: str
host_label: str
NO_LAYER_CONFIGURED = 5
@dataclass(frozen=True)
class Ipv4Info:
......@@ -53,44 +59,18 @@ class Ipv4Info:
network: str
type: str
@dataclass(frozen=True)
class InventoryColumns:
neighbour: str
local_port: str
neighbour_port: str
# constants
OMD_ROOT = environ["OMD_ROOT"]
CACHE_INTERFACES_DATA = 'interface_data'
CMK_SITE_CONF = f'{OMD_ROOT}/etc/omd/site.conf'
COLUMNS_CDP = 'neighbour_name,local_port,neighbour_port'
COLUMNS_L3v4 = 'address,device,cidr,network,type'
COLUMNS_LLDP = 'neighbour_name,local_port,neighbour_port'
HOME_URL = 'https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/nvdct'
HOST_LABEL_CDP = "'nvdct/has_cdp_neighbours' 'yes'"
HOST_LABEL_L3V4 = "'nvdct/l3v4_topology' 'router'"
HOST_LABEL_LLDP = "'nvdct/has_lldp_neighbours' 'yes'"
LABEL_CDP = 'CDP'
LABEL_L3v4 = 'LAYER3v4'
LABEL_LLDP = 'LLDP'
LOG_FILE = f'{OMD_ROOT}/var/log/nvdct.log'
LOGGER = getLogger('root)')
MIN_CDP_VERSION = '0.7.1-20240320'
MIN_IP_ADDRESSES = '0.0.5-2024120'
MIN_LLDP_VERSION = '0.9.3-20240320'
PATH_CDP = 'networking,cdp_cache,neighbours'
PATH_INTERFACES = 'networking,interfaces'
PATH_L3v4 = 'networking,addresses'
PATH_LLDP = 'networking,lldp_cache,neighbours'
SAMPLE_SEEDS = 'Core01 Core02'
SCRIPT = '~/local/bin/nvdct/nvdct.py'
TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%m'
TIME_FORMAT_ARGPARSER = '%%Y-%%m-%%dT%%H:%%M:%%S.%%m'
USER_DATA_FILE = 'nvdct.toml'
@dataclass(frozen=True)
class Layer:
path: str
columns: str
label: str
host_label: str
LAYERS = {
'CDP': Layer(
......@@ -109,7 +89,7 @@ LAYERS = {
path=PATH_L3v4,
columns='',
label=LABEL_L3v4,
host_label=HOST_LABEL_L3V4,
host_label=HOST_LABEL_L3V4_ROUTER,
),
}
......@@ -164,9 +144,7 @@ def get_data_from_toml(file: str) -> Dict:
def rm_tree(root: Path) -> None:
# safety
if not str(root).startswith(
f'{OMD_ROOT}/var/topology_data'
) and not str(root).startswith(f'{OMD_ROOT}/var/check_mk/topology'):
if not str(root).startswith(DATAPATH):
LOGGER.warning(msg=f"WARNING: bad path to remove, {str(root)}, don\'t delete it.")
return
for p in root.iterdir():
......@@ -323,6 +301,47 @@ def is_list_of_str_equal(list1: List[str], list2: List[str]) -> bool:
return tmp_list1 == tmp_list2
def is_valid_hostname(host: str) -> bool:
re_host_pattern = r'^[0-9a-z-A-Z\.\-\_]{1,253}$'
if re_match(re_host_pattern, host):
return True
else:
LOGGER.error(f'Invalid hostname found: {host}')
return False
def is_valid_site_name(site: str) -> bool:
re_host_pattern = r'^[0-9a-z-A-Z\.\-\_]{1,16}$'
if re_match(re_host_pattern, site):
return True
else:
LOGGER.error(f'Invalid site name found: {site}')
return False
def is_valid_customer_name(customer: str) -> bool:
re_host_pattern = r'^[0-9a-z-A-Z\.\-\_]{1,16}$'
if re_match(re_host_pattern, customer):
return True
else:
LOGGER.error(f'Invalid customer name found: {customer}')
return False
def is_valid_output_directory(directory: str) -> bool:
# 2024-12-11T17:35:08.12
re_host_pattern = r'^[0-9a-z-A-Z\.\-\_\:]{1,30}$'
if re_match(re_host_pattern, directory):
return True
else:
LOGGER.error(f'Invalid output directory name found: {directory}')
return False
def is_valid_log_file(log_file: str) -> bool:
if not log_file.startswith(f'{OMD_ROOT}/var/log/'):
LOGGER.error(f'Logg file needs to be under "{OMD_ROOT}/var/log/"! Got {Path(log_file).absolute()}')
return False
return True
# not used in cmk 2.3.x format
def merge_topologies(topo_pri: Dict, topo_sec: Dict) -> Dict:
"""
......@@ -410,8 +429,7 @@ def configure_logger(log_file: str, log_level: int, log_to_console: bool) -> Non
log = getLogger()
log_formatter = Formatter(
fmt='%(asctime)s :: %(levelname)s :: %(module)s :'
': %(funcName)s() :: %(lineno)s :: %(message)s',
fmt='%(asctime)s :: %(levelname)s :: %(module)s :: %(funcName)s() :: %(lineno)s :: %(message)s',
)
log.setLevel(log_level)
......
......@@ -133,7 +133,14 @@
# incompatible changed the option keep-domain to remove-domain -> don't mess with neighbor names by default
# 2024-12-08: incompatible: changed hostlabel for L3v4 topology to nvdct/l3v4_topology
# needs at least inv_ip_address inv_ip_address-0.0.5-20241209.mkp
# 2024-12-09: added option --include-l3-hosts
# added site filter for RESTAPI backend
# enabled customer filter for MULTISITE backend
# 2024-12-10: refactoring: moved topology code to topologies, removed all global variables, created main() function
# 2024-12-11: incompatible: changed default layers to None -> use the CLI option -l CDP or the configfile instead
# incompatible: reworked static topology -> can now be used for each service, host/service name has to be
# exactly like in CMK. See ~/local/bin/nvdct/conf/nfdct.toml
# moved string constants to lib/constants.py
# creating topology data json from inventory data
#
......@@ -147,7 +154,9 @@
# The inventory data could be created with my CDP/LLDP/IP Address/Interface nane inventory plugins:
# CDP.....: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_cdp_cache
# LLDP....: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lldp_cache
# L3v4....: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_ipv4_addresses
# L3v4....: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_ip_addresses
# : https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lnx_if_ip
# : https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_win_if_ip
# IF Name.: https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_ifname
#
# USAGE:
......@@ -229,554 +238,233 @@ __data = {
"""
import sys
from collections.abc import Mapping, Sequence
from ipaddress import IPv4Network
from logging import DEBUG
from re import sub as re_sub
from time import strftime, time_ns
from typing import Dict, List, Tuple
from typing import List
from lib.args import parse_arguments
from lib.backends import (
CacheItems,
HostCache,
HostCacheLiveStatus,
HostCacheMultiSite,
HostCacheRestApi,
)
from lib.constants import (
HOME_URL,
LOGGER,
NVDCT_VERSION,
DATAPATH,
)
from lib.settings import Settings
from lib.topologies import (
NvConnections,
NvObjects,
# get_list_of_devices,
get_network_summary,
get_service_by_interface,
is_ignore_ipv4,
is_ignore_wildcard,
create_l2_topology,
create_l3v4_topology,
create_static_connections,
)
from lib.utils import (
configure_logger,
ExitCodes,
HOME_URL,
HOST_LABEL_L3V4,
InventoryColumns,
Ipv4Info,
Layer,
LAYERS,
LOGGER,
# merge_topologies,
NVDCT_VERSION,
PATH_L3v4,
Layer,
StdoutQuiet,
configure_logger,
remove_old_data,
save_data_to_file,
# save_topology,
StdoutQuiet,
)
from lib.settings import (
Emblems,
Settings,
StaticConnection,
Thickness,
Wildcard,
)
EMBLEMS: Emblems
HOST_CACHE: HostCache
L2_DROP_HOSTS: List[str] = []
L2_HOST_MAP: Dict[str, str] = {}
L2_NEIGHBOUR_REPLACE_REGEX: List[Tuple[str, str]] | None = None
MAP_SPEED_TO_THICKNESS: List[Thickness] = []
NV_CONNECTIONS = NvConnections()
NV_OBJECTS = NvObjects()
SETTINGS: Settings
def create_l2_device_from_inv(
host: str,
inv_data: Sequence[Mapping[str, str]],
inv_columns: InventoryColumns,
# label: str,
) -> None:
for topo_neighbour in inv_data:
# check if required data are not empty
if not (neighbour := topo_neighbour.get(inv_columns.neighbour)):
LOGGER.warning(f'incomplete data, neighbour missing {topo_neighbour}')
continue
if not (raw_local_port := topo_neighbour.get(inv_columns.local_port)):
LOGGER.warning(f'incomplete data, local port missing {topo_neighbour}')
continue
if not (raw_neighbour_port := topo_neighbour.get(inv_columns.neighbour_port)):
LOGGER.warning(f'incomplete data, neighbour port missing {topo_neighbour}')
continue
# drop neighbour before domain split
if neighbour in L2_DROP_HOSTS:
LOGGER.info(msg=f'drop neighbour: {neighbour}')
continue
if L2_NEIGHBOUR_REPLACE_REGEX:
for re_str, replace_str in L2_NEIGHBOUR_REPLACE_REGEX:
re_neighbour = re_sub(re_str, replace_str, neighbour)
if re_neighbour != neighbour:
LOGGER.info(f'regex changed Neighbor |{neighbour}| to |{re_neighbour}|')
neighbour = re_neighbour
if not neighbour:
LOGGER.info(f'Neighbour removed by regex (|{neighbour}|, |{re_str}|, |{replace_str}|)')
break
if not neighbour:
continue
if SETTINGS.remove_domain:
neighbour = neighbour.split('.')[0]
# drop neighbour after domain split
if neighbour in L2_DROP_HOSTS:
LOGGER.info(msg=f'drop neighbour: {neighbour}')
continue
if SETTINGS.case == 'UPPER':
neighbour = neighbour.upper()
LOGGER.debug(f'Changed neighbour to upper case: {neighbour}')
elif SETTINGS.case == 'LOWER':
neighbour = neighbour.lower()
LOGGER.debug(f'Changed neighbour to lower case: {neighbour}')
if SETTINGS.prefix:
neighbour = f'{SETTINGS.prefix}{neighbour}'
# rewrite neighbour if inventory neighbour and checkmk host don't match
if neighbour in L2_HOST_MAP.keys():
neighbour = L2_HOST_MAP[neighbour]
# getting/checking interfaces
local_port = get_service_by_interface(host, raw_local_port, HOST_CACHE)
if not local_port:
local_port = raw_local_port
LOGGER.warning(msg=f'service not found: host: {host}, raw_local_port: {raw_local_port}')
elif local_port != raw_local_port:
# local_port = raw_local_port # don't reset local_port
LOGGER.info(
msg=f'host: {host}, raw_local_port: {raw_local_port} -> local_port: {local_port}'
)
neighbour_port = get_service_by_interface(neighbour, raw_neighbour_port, HOST_CACHE)
if not neighbour_port:
neighbour_port = raw_neighbour_port
LOGGER.warning(
msg=f'service not found: neighbour: {neighbour}, '
f'raw_neighbour_port: {raw_neighbour_port}'
)
elif neighbour_port != raw_neighbour_port:
# neighbour_port = raw_neighbour_port # don't reset neighbour_port
LOGGER.info(
msg=f'neighbour: {neighbour}, raw_neighbour_port {raw_neighbour_port} '
f'-> neighbour_port {neighbour_port}'
)
metadata = {
'duplex': topo_neighbour.get('duplex'),
'native_vlan': topo_neighbour.get('native_vlan'),
}
NV_OBJECTS.add_host_object(host=host, host_cache=HOST_CACHE)
NV_OBJECTS.add_host_object(host=neighbour, host_cache=HOST_CACHE)
NV_OBJECTS.add_service_object(
host=host,
service=local_port,
host_cache=HOST_CACHE,
metadata=metadata,
name=raw_local_port,
item=local_port
)
NV_OBJECTS.add_service_object(
host=neighbour,
service=neighbour_port,
host_cache=HOST_CACHE,
name=raw_neighbour_port,
item=neighbour_port
)
NV_CONNECTIONS.add_connection(
left=host,
right=f'{local_port}@{host}',
)
NV_CONNECTIONS.add_connection(
left=neighbour,
right=f'{neighbour_port}@{neighbour}',
)
NV_CONNECTIONS.add_connection(
left=f'{local_port}@{host}',
right=f'{neighbour_port}@{neighbour}',
)
def create_static_connections(connections: Sequence[StaticConnection]):
for connection in connections:
LOGGER.info(msg=f'connection: {connection}')
NV_OBJECTS.add_host_object(
host=connection.host,
host_cache=HOST_CACHE,
emblem=EMBLEMS.host_node
)
NV_OBJECTS.add_host_object(
host=connection.neighbour,
host_cache=HOST_CACHE,
emblem=EMBLEMS.host_node
)
NV_OBJECTS.add_service_object(
host=connection.host,
host_cache=HOST_CACHE,
emblem=EMBLEMS.service_node,
service=connection.local_port
)
NV_OBJECTS.add_service_object(
host=connection.neighbour,
host_cache=HOST_CACHE,
emblem=EMBLEMS.service_node,
service=connection.neighbour_port
)
NV_CONNECTIONS.add_connection(
left=connection.host,
right=f'{connection.local_port}@{connection.host}',
)
NV_CONNECTIONS.add_connection(
left=connection.neighbour,
right=f'{connection.neighbour_port}@{connection.neighbour}',
)
NV_CONNECTIONS.add_connection(
left=f'{connection.local_port}@{connection.host}',
right=f'{connection.neighbour_port}@{connection.neighbour}',
)
def create_l2_topology(
seed_devices: Sequence[str],
path_in_inventory: str,
inv_columns: InventoryColumns,
label: str,
) -> None:
devices_to_go = list(set(seed_devices)) # remove duplicates
devices_done = []
while devices_to_go:
device = devices_to_go[0]
if device in L2_HOST_MAP.keys():
try:
devices_to_go.remove(device)
except ValueError:
pass
device = L2_HOST_MAP[device]
if device in devices_done:
continue
topo_data = HOST_CACHE.get_data(
host=device, item=CacheItems.inventory, path=path_in_inventory
)
if topo_data:
create_l2_device_from_inv(
host=device,
inv_data=topo_data,
inv_columns=inv_columns,
# label=label,
)
for _entry in NV_OBJECTS.host_list:
if _entry not in devices_done:
devices_to_go.append(_entry)
devices_to_go = list(set(devices_to_go))
devices_done.append(device)
devices_to_go.remove(device)
LOGGER.info(msg=f'Device done: {device}, source: {label}')
def create_l3v4_topology(
ignore_hosts: Sequence[str],
ignore_ips: Sequence[IPv4Network],
ignore_wildcard: Sequence[Wildcard],
summarize: Sequence[IPv4Network],
replace: Mapping[str, str],
skip_if: bool,
skip_ip: bool,
) -> None:
host_list: Sequence[str] = HOST_CACHE.get_hosts_by_label(HOST_LABEL_L3V4)
LOGGER.debug(f'host list: {host_list}')
if not host_list:
LOGGER.warning(
msg='No routing capable host found. Check if "inv_ipv4_addresses.mkp" '
'added/enabled and inventory has run.'
)
return
LOGGER.debug(f'L3v4 ignore hosts: {ignore_hosts}')
for raw_host in host_list:
host = raw_host
if host in ignore_hosts:
LOGGER.info(f'L3v4 host {host} ignored')
continue
if not (ipv4_addresses := HOST_CACHE.get_data(
host=host, item=CacheItems.inventory, path=PATH_L3v4)
):
LOGGER.warning(f'No IPv4 address inventory found for host: {host}')
continue
NV_OBJECTS.add_host_object(host=host, host_cache=HOST_CACHE)
for _entry in ipv4_addresses:
emblem = EMBLEMS.ip_network
try:
ipv4_info = Ipv4Info(**_entry)
except TypeError: # as e
LOGGER.warning(f'Drop IPv4 address data for host: {host}, data: {_entry}')
continue
if ipv4_info.address.startswith('127.'): # drop loopback addresses
LOGGER.info(f'host: {host} dropped loopback address: {ipv4_info.address}')
continue
if ipv4_info.cidr == 32: # drop host addresses
LOGGER.info(
f'host: {host} dropped host address: {ipv4_info.address}/{ipv4_info.cidr}'
)
continue
if ipv4_info.type.lower() != 'ipv4': # drop if not ipv4
LOGGER.warning(
f'host: {host} dropped non ipv4 address: {ipv4_info.address},'
' type: {ipv4_info.type}'
)
continue
if is_ignore_ipv4(ipv4_info.address, ignore_ips):
LOGGER.info(f'host: {host} dropped ignore address: {ipv4_info.address}')
continue
if is_ignore_wildcard(ipv4_info.address, ignore_wildcard):
LOGGER.info(f'host: {host} dropped wildcard address: {ipv4_info.address}')
continue
if network := get_network_summary(
ipv4_address=ipv4_info.address,
summarize=summarize,
):
emblem = EMBLEMS.l3v4_summarize
LOGGER.info(
f'Network summarized: {ipv4_info.network}/{ipv4_info.cidr} -> {network}'
)
else:
network = f'{ipv4_info.network}/{ipv4_info.cidr}'
if network in replace.keys():
LOGGER.info(f'Replaced network {network} with {replace[network]}')
network = replace[network]
emblem = EMBLEMS.l3v4_replace
NV_OBJECTS.add_ipv4_network_object(network=network, emblem=emblem)
if skip_if is True and skip_ip is True:
NV_CONNECTIONS.add_connection(left=host, right=network)
elif skip_if is True and skip_ip is False:
NV_OBJECTS.add_ipv4_address_object(
host=host,
interface=None,
ipv4_address=ipv4_info.address,
emblem=EMBLEMS.ip_address,
)
NV_OBJECTS.add_tooltip_quickinfo(
'{ipv4_info.address}@{host}', 'Interface', ipv4_info.device
)
NV_CONNECTIONS.add_connection(left=f'{host}', right=f'{ipv4_info.address}@{host}')
NV_CONNECTIONS.add_connection(left=network, right=f'{ipv4_info.address}@{host}')
elif skip_if is False and skip_ip is True:
NV_OBJECTS.add_service_object(
host=host, service=ipv4_info.device, host_cache=HOST_CACHE
)
NV_OBJECTS.add_tooltip_quickinfo(
f'{ipv4_info.device}@{host}', 'IP-address', ipv4_info.address
)
NV_CONNECTIONS.add_connection(left=f'{host}', right=f'{ipv4_info.device}@{host}')
NV_CONNECTIONS.add_connection(left=network, right=f'{ipv4_info.device}@{host}')
else:
NV_OBJECTS.add_ipv4_address_object(
host=host,
interface=ipv4_info.device,
ipv4_address=ipv4_info.address,
emblem=EMBLEMS.ip_address,
)
NV_OBJECTS.add_service_object(
host=host, service=ipv4_info.device, host_cache=HOST_CACHE,
)
NV_CONNECTIONS.add_connection(
left=host, right=f'{ipv4_info.device}@{host}')
NV_CONNECTIONS.add_connection(
left=f'{ipv4_info.device}@{host}',
right=f'{ipv4_info.address}@{ipv4_info.device}@{host}',
)
NV_CONNECTIONS.add_connection(
left=network, right=f'{ipv4_info.address}@{ipv4_info.device}@{host}',
)
if __name__ == '__main__':
def main():
start_time = time_ns()
SETTINGS = Settings(vars(parse_arguments()))
sys.stdout = StdoutQuiet(quiet=SETTINGS.quiet)
nv_connections = NvConnections()
nv_objects = NvObjects()
settings: Settings = Settings(vars(parse_arguments()))
sys.stdout = StdoutQuiet(quiet=settings.quiet)
configure_logger(
log_file=SETTINGS.log_file,
log_to_console=SETTINGS.log_to_stdtout,
log_level=SETTINGS.loglevel,
log_file=settings.log_file,
log_to_console=settings.log_to_stdtout,
log_level=settings.loglevel,
)
LOGGER.info(msg='Data creation started')
# always logg start and end of a session (except --log-level OFF)
LOGGER.critical(msg='Data creation started')
print()
print('')
print(
f'Network Visualisation Data Creation Tool (NVDCT)\n'
f'by thl-cmk[at]outlook[dot]com, version {NVDCT_VERSION}\n'
f'see {HOME_URL}'
)
print()
print(f'Start time....: {strftime(SETTINGS.time_format)}')
print('')
print(f'Start time....: {strftime(settings.time_format)}')
match SETTINGS.backend:
match settings.backend:
case 'RESTAPI':
HOST_CACHE = HostCacheRestApi(
pre_fetch=SETTINGS.pre_fetch,
api_port=SETTINGS.api_port
host_cache: HostCache = HostCacheRestApi(
pre_fetch=settings.pre_fetch,
api_port=settings.api_port,
filter_sites=settings.filter_sites,
sites=settings.sites
)
case 'MULTISITE':
HOST_CACHE = HostCacheMultiSite(
pre_fetch=SETTINGS.pre_fetch,
filter_sites=SETTINGS.filter_sites,
sites=SETTINGS.sites,
host_cache: HostCache = HostCacheMultiSite(
pre_fetch=settings.pre_fetch,
filter_sites=settings.filter_sites,
sites=settings.sites,
filter_customers=settings.filter_customers,
customers=settings.customers,
)
case 'LIVESTATUS':
HOST_CACHE = HostCacheLiveStatus(
pre_fetch=SETTINGS.pre_fetch,
host_cache: HostCache = HostCacheLiveStatus(
pre_fetch=settings.pre_fetch,
)
case _:
LOGGER.error(msg=f'Backend {SETTINGS.backend} not (yet) implemented')
LOGGER.error(msg=f'Backend {settings.backend} not (yet) implemented')
host_cache: HostCache | None = None # to keep linter happy
sys.exit(ExitCodes.BACKEND_NOT_IMPLEMENTED.value)
EMBLEMS = SETTINGS.emblems
L2_DROP_HOSTS = SETTINGS.l2_drop_hosts
L2_HOST_MAP = SETTINGS.l2_host_map
L2_NEIGHBOUR_REPLACE_REGEX = SETTINGS.l2_neighbour_replace_regex
MAP_SPEED_TO_THICKNESS = SETTINGS.map_speed_to_thickness
jobs: List[Layer] = []
final_topology: Dict = {}
pre_fetch_layers: List[str] = []
pre_fetch_host_list: List[str] = []
for layer in SETTINGS.layers:
for layer in settings.layers:
if layer == 'STATIC':
jobs.append(layer)
if layer == 'L3v4':
jobs.append(layer)
HOST_CACHE.add_inventory_prefetch_path(path=LAYERS[layer].path)
host_cache.add_inventory_prefetch_path(path=LAYERS[layer].path)
pre_fetch_layers.append(LAYERS[layer].host_label)
elif layer in LAYERS:
jobs.append(LAYERS[layer])
HOST_CACHE.add_inventory_prefetch_path(path=LAYERS[layer].path)
host_cache.add_inventory_prefetch_path(path=LAYERS[layer].path)
pre_fetch_layers.append(LAYERS[layer].host_label)
elif layer == 'CUSTOM':
for entry in SETTINGS.custom_layers:
for entry in settings.custom_layers:
jobs.append(entry)
HOST_CACHE.add_inventory_prefetch_path(entry.path)
host_cache.add_inventory_prefetch_path(entry.path)
if not jobs:
message = ('No layer to work on. Please configura at least one layer (i.e. CLI option "-l CDP")\n'
'See ~/local/bin/nvdct/conf/nvdct.toml -> SETTINGS -> layers')
LOGGER.warning(message)
print(message)
sys.exit(ExitCodes.NO_LAYER_CONFIGURED.value)
if SETTINGS.pre_fetch:
if settings.pre_fetch:
LOGGER.info('Pre fill cache...')
for host_label in pre_fetch_layers:
if _host_list := HOST_CACHE.get_hosts_by_label(host_label):
if _host_list := host_cache.get_hosts_by_label(host_label):
pre_fetch_host_list = list(set(pre_fetch_host_list + _host_list))
LOGGER.info(f'Fetching data for {len(pre_fetch_host_list)} hosts start')
print(f'Prefetch start: {strftime(SETTINGS.time_format)}')
print(f'Prefetch hosts: {len(pre_fetch_host_list)} of {len(HOST_CACHE.cache.keys())}')
HOST_CACHE.pre_fetch_cache(pre_fetch_host_list)
print(f'Prefetch start: {strftime(settings.time_format)}')
print(f'Prefetch hosts: {len(pre_fetch_host_list)} of {len(host_cache.cache.keys())}')
host_cache.pre_fetch_cache(pre_fetch_host_list)
LOGGER.info(f'Fetching data for {len(pre_fetch_host_list)} hosts end')
print(f'Prefetch end..: {strftime(SETTINGS.time_format)}')
print(f'Prefetch end..: {strftime(settings.time_format)}')
for job in jobs:
match job:
case 'STATIC':
label = 'static'
label = 'STATIC'
create_static_connections(
connections=SETTINGS.static_connections
connections_=settings.static_connections,
emblems=settings.emblems,
host_cache=host_cache,
nv_objects=nv_objects,
nv_connections=nv_connections,
)
case 'L3v4':
topology = None
label = 'l3v4'
label = 'L3v4'
create_l3v4_topology(
ignore_hosts=SETTINGS.l3v4_ignore_hosts,
ignore_ips=SETTINGS.l3v4_ignore_ips,
ignore_wildcard=SETTINGS.l3v4_ignore_wildcard,
summarize=SETTINGS.l3v4_summarize,
replace=SETTINGS.l3v4_replace,
skip_if=SETTINGS.skip_l3_if,
skip_ip=SETTINGS.skip_l3_ip
ignore_hosts=settings.l3v4_ignore_hosts,
ignore_ips=settings.l3v4_ignore_ips,
ignore_wildcard=settings.l3v4_ignore_wildcard,
include_hosts=settings.include_l3_hosts,
replace=settings.l3v4_replace,
skip_if=settings.skip_l3_if,
skip_ip=settings.skip_l3_ip,
summarize=settings.l3v4_summarize,
emblems=settings.emblems,
host_cache=host_cache,
nv_objects=nv_objects,
nv_connections=nv_connections,
)
case _:
label = job.label.lower()
label = job.label.upper()
columns = job.columns.split(',')
create_l2_topology(
seed_devices=SETTINGS.l2_seed_devices,
seed_devices=settings.l2_seed_devices,
path_in_inventory=job.path,
inv_columns=InventoryColumns(
neighbour=columns[0],
local_port=columns[1],
neighbour_port=columns[2]
),
label=label,
label_=label,
l2_drop_hosts=settings.l2_drop_hosts,
l2_host_map=settings.l2_host_map,
l2_neighbour_replace_regex=settings.l2_neighbour_replace_regex,
host_cache=host_cache,
nv_objects=nv_objects,
nv_connections=nv_connections,
case=settings.case,
prefix=settings.prefix,
remove_domain=settings.remove_domain,
)
NV_CONNECTIONS.add_meta_data_to_connections(
nv_objects=NV_OBJECTS,
speed_map=MAP_SPEED_TO_THICKNESS,
nv_connections.add_meta_data_to_connections(
nv_objects=nv_objects,
speed_map=settings.map_speed_to_thickness,
)
if not SETTINGS.loglevel == DEBUG:
connections = NV_CONNECTIONS.nv_connections
else:
connections = sorted(NV_CONNECTIONS.nv_connections)
_data = {
'version': 1,
'name': label,
'objects': NV_OBJECTS.nv_objects if not SETTINGS.loglevel == DEBUG else dict(
sorted(NV_OBJECTS.nv_objects.items())
'objects': nv_objects.nv_objects if not settings.loglevel == DEBUG else dict(
sorted(nv_objects.nv_objects.items())
),
'connections': connections
'connections': nv_connections.nv_connections if not settings.loglevel == DEBUG else sorted(
nv_connections.nv_connections
)
}
save_data_to_file(
data=_data,
path=(
f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}/'
f'{SETTINGS.output_directory}'
f'{DATAPATH}/{settings.output_directory}'
),
file=f'data_{label}.json',
make_default=SETTINGS.default,
make_default=settings.default,
)
message = (
f'Source {label:.<7s}: Devices/Objects/Connections added {NV_OBJECTS.host_count}/'
f'{len(NV_OBJECTS.nv_objects)}/{len(NV_CONNECTIONS.nv_connections)}'
f'Layer {label:.<8s}: Devices/Objects/Connections added {nv_objects.host_count}/'
f'{len(nv_objects.nv_objects)}/{len(nv_connections.nv_connections)}'
)
LOGGER.info(msg=message)
print(message)
NV_OBJECTS = NvObjects()
NV_CONNECTIONS = NvConnections()
nv_objects = NvObjects()
nv_connections = NvConnections()
if SETTINGS.keep:
if settings.keep:
remove_old_data(
keep=SETTINGS.keep,
min_age=SETTINGS.min_age,
raw_path=f'{SETTINGS.omd_root}/{SETTINGS.topology_save_path}',
protected=SETTINGS.protected_topologies,
keep=settings.keep,
min_age=settings.min_age,
raw_path=DATAPATH,
protected=settings.protected_topologies,
)
print(f'Time taken....: {(time_ns() - start_time) / 1e9}/s')
print(f'End time......: {strftime(SETTINGS.time_format)}')
print()
print(f'End time......: {strftime(settings.time_format)}')
print('')
LOGGER.critical('Data creation finished')
LOGGER.info('Data creation finished')
if __name__ == '__main__':
main()
......@@ -17,8 +17,7 @@
'environments\n'
' - Add custom connections (STATIC) for connections that are '
'not in the inventory\n'
' - Optimized for my CDP, LLDP and IPv4 inventory plugins\n'
' - Can also be used with custom inventory plugins\n'
' - Optimized for my CDP, LLDP and IP inventory plugins\n'
'\n'
'For more information about the network visualization plugin '
'see: \n'
......@@ -28,14 +27,7 @@
'\n'
'The inventory data could be created with my inventory '
'plugins:\n'
'CDP: '
'https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_cdp_cache\n'
'LLDP: '
'https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_lldp_cache\n'
'L3v4: '
'https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_ipv4_addresses\n'
'IF_name: '
'https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/inventory/inv_ifname\n'
'https://thl-cmk.hopto.org/gitlab/explore/projects/topics/Network%20Visualization\n'
'\n'
'For the latest version and documentation see:\n'
'https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/nvdct\n',
......@@ -47,14 +39,15 @@
'nvdct/lib/utils.py',
'nvdct/lib/__init__.py',
'nvdct/conf/nvdct.toml',
'nvdct/lib/topologies.py'],
'nvdct/lib/topologies.py',
'nvdct/lib/constants.py'],
'web': ['htdocs/images/icons/cloud_80.png',
'htdocs/images/icons/ip-address_80.png',
'htdocs/images/icons/ip-network_80.png',
'htdocs/images/icons/location_80.png']},
'name': 'nvdct',
'title': 'Network Visualization Data Creation Tool (NVDCT)',
'version': '0.9.3-20241209',
'version': '0.9.4-20241210',
'version.min_required': '2.3.0b1',
'version.packaged': 'cmk-mkp-tool 0.2.0',
'version.usable_until': '2.4.0p1'}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment