Collection of CheckMK checks (see https://checkmk.com/). All checks and plugins are provided as is. Absolutely no warranty. Send any comments to thl-cmk[at]outlook[dot]com

Skip to content
Snippets Groups Projects
Commit 8b7a2243 authored by thl-cmk's avatar thl-cmk :flag_na:
Browse files

update project

parent 7b260fec
No related branches found
No related tags found
No related merge requests found
Showing
with 1166 additions and 46 deletions
[PACKAGE]: ../../raw/master/packagee-0.1.2-20230706.mkp "package-0.1.2-20230706.mkp"
[PACKAGE]: ../../raw/master/mkp/bgp_topology-0.0.1-20240722.mkp "bgp_topology-0.0.1-20240722.mkp"
# Title
A short description about the plugin
......
File added
title: Dummy check man page - used as template for new check manuals
agents: linux, windows, aix, solaris, hpux, vms, freebsd, snmp
catalog: see modules/catalog.py for possible values
license: GPL
distribution: check_mk
description:
Describe here: (1) what the check actually does, (2) under which
circumstances it goes warning/critical, (3) which devices are supported
by the check, (4) if the check requires a separated plugin or
tool or separate configuration on the target host.
item:
Describe the syntax and meaning of the check's item here. Provide all
information one needs if coding a manual check with {checks +=} in {main.mk}.
Give an example. If the check uses {None} as sole item,
then leave out this section.
examples:
# Give examples for configuration in {main.mk} here. If the check has
# configuration variable, then give example for them here.
# set default levels to 40 and 60 percent:
foo_default_values = (40, 60)
# another configuration variable here:
inventory_foo_filter = [ "superfoo", "superfoo2" ]
perfdata:
Describe precisely the number and meaning of performance variables
the check sends. If it outputs no performance data, then leave out this
section.
inventory:
Describe how the inventory for the check works. Which items
will it find? Describe the influence of check specific
configuration parameters to the inventory.
[parameters]
foofirst(int): describe the first parameter here (if parameters are grouped
as tuple)
fooother(string): describe another parameter here.
[configuration]
foo_default_levels(int, int): Describe global configuration variable of
foo here. Important: also tell the user how they are preset.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
#
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2024-07-16
# File : bgp_topology/constants.py
# 2024-07-16: copied from bgp_topology/lib/ruleset_names.py
from typing import Final
ARG_PARSER_ANCHOR: Final[str] = '--bgp-anchor'
ARG_PARSER_ANCHOR_AS: Final[str] = 'bgp-as'
ARG_PARSER_ANCHOR_BOTH: Final[str] = 'both'
ARG_PARSER_ANCHOR_ID: Final[str] = 'bgp-id'
ARG_PARSER_EMBLEM_AS: Final[str] = '--bgp-emblem-as'
ARG_PARSER_EMBLEM_ID: Final[str] = '--bgp-emblem-id'
ARG_PARSER_HOST: Final[str] = '--host'
ARG_PARSER_MAKE_DEFAULT: Final[str] = '--make-default'
ARG_PARSER_NONE: Final[str] = 'none'
ARG_PARSER_SITES_EXCLUDE: Final[str] = '--exclude-sites'
ARG_PARSER_SITES_INCLUDE: Final[str] = '--include-sites'
BGP_PEER_LOCAL_ADDR: Final[str] = 'local_addr'
BGP_PEER_LOCAL_AS: Final[str] = 'local_as'
BGP_PEER_LOCAL_ID: Final[str] = 'local_id'
BGP_PEER_REMOTE_ADDR: Final[str] = 'remote_addr'
BGP_PEER_REMOTE_AS: Final[str] = 'remote_as'
BGP_PEER_REMOTE_ID: Final[str] = 'remote_id'
BGP_PEER_STATE: Final[str] = 'state'
BGP_PEER_UPTIME: Final[str] = 'uptime'
EMBLEM_BGP_ID: Final[str] = 'icon_topic_system' # icon_plugins_hw
EMBLEM_BGP_AS: Final[str] = 'icon_cloud'
METRIC_TIME_TAKEN: Final[str] = 'topology_time_taken'
PARAM_BGP_AS: Final[str] = 'bgp_as'
PARAM_BGP_EXT_ANCHOR: Final[str] = 'bgp_ext_anchor'
PARAM_BGP_EXT_ANCHOR_AS: Final[str] = 'bgp_ext_anchor_as'
PARAM_BGP_EXT_ANCHOR_BOTH: Final[str] = 'bgp_ext_anchor_both'
PARAM_BGP_EXT_ANCHOR_ID: Final[str] = 'bgp_ext_anchor_id'
PARAM_BGP_EXT_ANCHOR_NONE: Final[str] = 'bgp_ext_anchor_none'
PARAM_BGP_ID: Final[str] = 'bgp_id'
PARAM_BGP_SITES_EXCLUDE: Final[str] = 'exclude_sites'
PARAM_BGP_SITES_FILTER: Final[str] = 'bgp_filter_sites'
PARAM_BGP_SITES_INCLUDE: Final[str] = 'include_sites'
PARAM_EMBLEM_CUSTOM: Final[str] = 'custom_emblem'
PARAM_EMBLEM_DEFAULT: Final[str] = 'default_emblem'
PARAM_EMBLEM_NO_EMBLEM: Final[str] = 'no_emblem'
PARAM_MAKE_DEFAULT: Final[str] = 'make_default'
PICTURE_TYPE_EMBLEM: Final[str] = 'emblem'
PICTURE_TYPE_ICON: Final[str] = 'icon'
RULE_SET_NAME_BGP_TOPOLOGY: Final[str] = 'bgp_topology'
TOPOLOGY_NAME: Final[str] = 'BGP'
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
#
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2024-07-16
# File : bgp_topology/graphing/bgp_topology.py
from cmk.graphing.v1 import Title
from cmk.graphing.v1.graphs import Graph
from cmk.graphing.v1.metrics import Color, Metric, Unit, TimeNotation, AutoPrecision
from cmk.graphing.v1.perfometers import Closed, FocusRange, Open, Perfometer
from cmk_addons.plugins.bgp_topology.constants import METRIC_TIME_TAKEN
metric_topology_time_taken = Metric(
name=METRIC_TIME_TAKEN,
title=Title('Time taken'),
unit=Unit(TimeNotation(), AutoPrecision(4)),
color=Color.BLUE,
)
graph_topology_time_taken = Graph(
name=METRIC_TIME_TAKEN,
title=Title('Time taken'),
compound_lines=[METRIC_TIME_TAKEN],
)
perfometer_topology_time_taken = Perfometer(
name=METRIC_TIME_TAKEN,
focus_range=FocusRange(Closed(0), Open(1)),
segments=[METRIC_TIME_TAKEN]
)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
#
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2024-07-16
# File : bgp_topology/lib/bgp_topology.py
# 2024-07-20: moved to lib -> is now an active check
__AUTHOR__ = 'thl-cmk[at]outlook[dot]com'
__URL__ = 'https://thl-cmk.hopto.org/gitlab/checkmk/vendor-independent/bgp_topology'
__USAGE__ = '/local/lib/python3/cmk_addons/plugins/bgp_topology/libexec/check_bgp_topology --make-default'
__VERSION__ = '0.0.1-20240721'
from argparse import ArgumentParser, Namespace, RawTextHelpFormatter
from collections.abc import MutableMapping, MutableSequence, Sequence
from dataclasses import dataclass
from time import time_ns
from cmk.agent_based.v2 import render
from cmk_addons.plugins.bgp_topology.constants import (
ARG_PARSER_ANCHOR,
ARG_PARSER_ANCHOR_AS,
ARG_PARSER_ANCHOR_BOTH,
ARG_PARSER_ANCHOR_ID,
ARG_PARSER_EMBLEM_AS,
ARG_PARSER_EMBLEM_ID,
ARG_PARSER_HOST,
ARG_PARSER_MAKE_DEFAULT,
ARG_PARSER_NONE,
ARG_PARSER_SITES_EXCLUDE,
ARG_PARSER_SITES_INCLUDE,
BGP_PEER_LOCAL_ADDR,
BGP_PEER_LOCAL_AS,
BGP_PEER_LOCAL_ID,
BGP_PEER_REMOTE_ADDR,
BGP_PEER_REMOTE_AS,
BGP_PEER_REMOTE_ID,
BGP_PEER_STATE,
BGP_PEER_UPTIME,
EMBLEM_BGP_AS,
EMBLEM_BGP_ID,
METRIC_TIME_TAKEN,
PARAM_BGP_EXT_ANCHOR_AS,
PARAM_BGP_EXT_ANCHOR_BOTH,
PARAM_BGP_EXT_ANCHOR_ID,
TOPOLOGY_NAME,
)
from cmk_addons.plugins.bgp_topology.lib.utils import (
BASE_TOPO_PATH,
LiveStatusConnection,
Metric,
OMD_ROOT,
TopoConnections,
TopoObjects,
add_dummy_topologies,
get_anchor,
get_bgp_peer_clean_key,
get_bgp_peer_clean_value,
get_emblem,
get_service,
make_topo_default,
save_topology,
)
class Params(Namespace):
bgp_anchor: str
make_default: bool = False
bgp_emblem_as: str = EMBLEM_BGP_AS
bgp_emblem_id: str = EMBLEM_BGP_ID
include_sites: Sequence[str] | None = None
exclude_sites: Sequence[str] | None = None
host: str = ''
@dataclass(frozen=True)
class BgpPeer:
host: str
service: str
state: int
state_str: str
remote_address: str
uptime: str | None = None
local_as: str | None = None
local_address: str | None = None
local_id: str | None = None
remote_as: str | None = None
remote_id: str | None = None
@classmethod
def parse(cls, host: str, state: int, servie: str, raw_peer_data: str):
raw_peer_attributes: Sequence[str] = raw_peer_data.split('\\n')
bgp_peer: MutableMapping[str, str] = {}
for entry in raw_peer_attributes:
try:
key, value = entry.split(':', 1)
except ValueError:
key = None
value = None
key: str | None = get_bgp_peer_clean_key(key)
value: str | None = get_bgp_peer_clean_value(key, value)
if key is not None and value is not None:
bgp_peer[key] = value
return cls(
host=host,
local_address=bgp_peer.get(BGP_PEER_LOCAL_ADDR),
local_as=bgp_peer.get(BGP_PEER_LOCAL_AS),
local_id=bgp_peer.get(BGP_PEER_LOCAL_ID),
remote_address=bgp_peer.get(BGP_PEER_REMOTE_ADDR),
remote_as=bgp_peer.get(BGP_PEER_REMOTE_AS),
remote_id=bgp_peer.get(BGP_PEER_REMOTE_ID),
service=servie,
state=state,
state_str=bgp_peer.get(BGP_PEER_STATE),
uptime=bgp_peer.get(BGP_PEER_UPTIME),
)
def create_bgp_topology(params: Params | None) -> int:
start_time = time_ns()
summary: list[str] = []
details: list[str] = []
objects = TopoObjects()
connections = TopoConnections()
sub_directory = TOPOLOGY_NAME
ls_connection = LiveStatusConnection()
if params.include_sites is not None:
ls_connection.filter_sites(include=True, sites=params.include_sites)
sites_str: str = ', '.join(params.include_sites)
details.append(f'Site(s) included: {sites_str}')
sub_directory = f'{TOPOLOGY_NAME}_{params.host}'
elif params.exclude_sites is not None:
ls_connection.filter_sites(include=False, sites=params.exclude_sites)
sites_str: str = ', '.join(params.exclude_sites)
details.append(f'Site(s) excluded: {sites_str}')
sub_directory = f'{TOPOLOGY_NAME}_{params.host}'
bgp_anchor = get_anchor(params.bgp_anchor)
emblem_as = get_emblem(params.bgp_emblem_as)
emblem_id = get_emblem(params.bgp_emblem_id)
query: str = (
'GET services\n'
'Columns: host_name state description long_plugin_output\n'
'Filter: description ~ BGP peer\n'
'OutputFormat: python3\n'
)
if (raw_bgp_peers := ls_connection.query(query=query)) is not None:
bgp_peers: MutableSequence[BgpPeer] = [
BgpPeer.parse(host, state, service, raw_peer_data) for host, state, service, raw_peer_data in raw_bgp_peers
]
# create index by local address, add host and service objects
bgp_peers_by_local_addr: MutableMapping[str, MutableSequence[BgpPeer]] = {'0.0.0.0': []}
for bgp_peer in bgp_peers:
objects.add_host(bgp_peer.host)
objects.add_service(bgp_peer.host, bgp_peer.service)
connections.add_connection(
right=bgp_peer.host,
left=get_service(bgp_peer.host, bgp_peer.service),
left_state=bgp_peer.state
)
if bgp_peer.local_address is not None:
if bgp_peers_by_local_addr.get(bgp_peer.local_address) is None:
bgp_peers_by_local_addr[bgp_peer.local_address]: MutableSequence[BgpPeer] = []
bgp_peers_by_local_addr[bgp_peer.local_address].append(bgp_peer)
else:
bgp_peers_by_local_addr['0.0.0.0'].append(bgp_peer)
# find connections
for bgp_peer in bgp_peers:
if (peer_list := bgp_peers_by_local_addr.get(bgp_peer.remote_address)) is not None:
for peer in peer_list:
if bgp_peer.local_as == peer.remote_as and bgp_peer.local_as is not None:
if bgp_peer.local_id == peer.remote_id and bgp_peer.local_id is not None:
if bgp_peer.local_address == peer.remote_address and bgp_peer.local_address is not None:
connections.add_connection(
left=get_service(bgp_peer.host, bgp_peer.service),
right=get_service(peer.host, peer.service),
right_state=bgp_peer.state,
left_state=peer.state,
)
# if there is no peer_list the peer is either external (to checkmk) or the connection is not
# established, so we have no local address or remote id
else:
ext_bgp_host_as: str | None = None
ext_bgp_host_id: str | None = None
if bgp_anchor in [
PARAM_BGP_EXT_ANCHOR_BOTH,
PARAM_BGP_EXT_ANCHOR_AS,
] and bgp_peer.remote_as is not None:
ext_bgp_host_as: str = f'BGP-AS: {bgp_peer.remote_as}'
objects.add_host(
host=ext_bgp_host_as,
link2core=False,
emblem=emblem_as,
)
if bgp_anchor in [
PARAM_BGP_EXT_ANCHOR_BOTH,
PARAM_BGP_EXT_ANCHOR_ID
] and bgp_peer.remote_id is not None:
ext_bgp_host_id: str = f'BGP-ID: {bgp_peer.remote_id}'
objects.add_host(
host=ext_bgp_host_id,
link2core=False,
emblem=emblem_id
)
connections.add_connection(
right=ext_bgp_host_id,
left=get_service(bgp_peer.host, bgp_peer.service),
left_state=bgp_peer.state
)
if ext_bgp_host_as is not None:
connections.add_connection(ext_bgp_host_id, ext_bgp_host_as)
if ext_bgp_host_as is not None and ext_bgp_host_id is None:
connections.add_connection(
right=get_service(bgp_peer.host, bgp_peer.service),
left=ext_bgp_host_as,
right_state=bgp_peer.state,
)
connections.topo_connections.sort()
data_set = {
'version': 1,
'name': TOPOLOGY_NAME,
'objects': dict(sorted(objects.topo_objects.items())),
'connections': connections.topo_connections,
}
save_topology(data=data_set, sub_directory=sub_directory)
# workaround for backend is only picking up topologies from default folder
add_dummy_topologies(sub_directory=sub_directory)
# end workaround
make_topo_default(sub_directory=sub_directory, make_default=params.make_default)
summary.append(f'Objects: {len(objects.topo_objects)}')
details.append(f'Objects: {len(objects.topo_objects)}')
summary.append(f'Connections: {len(connections.topo_connections)}')
details.append(f'Connections: {len(connections.topo_connections)}')
details.append(f'Written to: {BASE_TOPO_PATH}/{sub_directory}/data_{TOPOLOGY_NAME.lower()}.json')
value = (time_ns() - start_time) / 1e9
summary.append(f'Time taken: {render.timespan(value)}')
details.append(f'Time taken: {render.timespan(value)}')
perf_data = Metric(
name=METRIC_TIME_TAKEN,
value=value,
# levels=None,
# boundaries=None,
)
all_summary: str = ', '.join(summary)
all_details: str = '\n'.join(details)
print(f'{all_summary}\n{all_details}|{perf_data}')
return 0
def parse_arguments(argv: Sequence[str]) -> Params:
parser = ArgumentParser(
prog='bgp_topology',
formatter_class=RawTextHelpFormatter,
description=f"""Create BGP peer network topology for Checkmk""",
epilog=f"""
Example usage:
{OMD_ROOT}/{__USAGE__}
Version: {__VERSION__} | Written by {__AUTHOR__}
for more information see: {__URL__}
"""
)
parser.add_argument(
ARG_PARSER_ANCHOR,
choices=[
ARG_PARSER_ANCHOR_BOTH,
ARG_PARSER_ANCHOR_AS,
ARG_PARSER_ANCHOR_ID,
ARG_PARSER_NONE,
],
default=ARG_PARSER_ANCHOR_BOTH,
help='Anchor for external BGP objects (default: %(default)s).',
)
parser.add_argument(
ARG_PARSER_EMBLEM_AS,
type=str,
default=EMBLEM_BGP_AS,
help='Emblem to use for BGP-AS objects (default: %(default)s).',
)
parser.add_argument(
ARG_PARSER_EMBLEM_ID,
type=str,
default=EMBLEM_BGP_ID,
help='Emblem to use for BGP-ID objects (default: %(default)s).',
)
parser.add_argument(
ARG_PARSER_MAKE_DEFAULT,
action='store_const', const=True,
default=False,
help='Make this topology the default (default: %(default)s).',
)
parser.add_argument(
ARG_PARSER_HOST,
type=str,
help="""The name of the Checkmk host to which the plugin is attached. This is set
automatically by Checkmk. If a site filter is active, the host name is
appended to the subdirectory where the topology is stored (“BGP” becomes
“BGP_host_name”). This way we can have more than one BGP topology without
overwriting each other.""",
)
site_filter = parser.add_mutually_exclusive_group()
site_filter.add_argument(
ARG_PARSER_SITES_INCLUDE,
type=str,
nargs='+',
help=f"""List of Checkmk site names to include in the topology creation.
Can not used together with {ARG_PARSER_SITES_EXCLUDE}""",
)
site_filter.add_argument(
ARG_PARSER_SITES_EXCLUDE,
type=str,
nargs='+',
help=f"""List of Checkmk site names to exclude from the topology creation.
Can not used together with {ARG_PARSER_SITES_INCLUDE}""",
)
return parser.parse_args(argv)
def main(argv: Sequence[str] | None = None) -> int:
return create_bgp_topology(parse_arguments(argv=argv))
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
#
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2024-07-16
# File : bgp_topology/lib/utils.py
# 2024-07-16: copied from vsphere_topology/lib/utils.py
from collections.abc import Mapping, MutableMapping, MutableSequence, Sequence
from dataclasses import dataclass
from json import dumps as json_dunps, loads as json_loads
from os import environ
from pathlib import Path
from typing import Final, Tuple
from livestatus import MultiSiteConnection, SiteConfigurations, SiteId
from cmk_addons.plugins.bgp_topology.constants import (
ARG_PARSER_NONE,
BGP_PEER_LOCAL_ADDR,
BGP_PEER_LOCAL_AS,
BGP_PEER_LOCAL_ID,
BGP_PEER_REMOTE_ADDR,
BGP_PEER_REMOTE_AS,
BGP_PEER_REMOTE_ID,
BGP_PEER_STATE,
BGP_PEER_UPTIME,
PARAM_BGP_EXT_ANCHOR_AS,
PARAM_BGP_EXT_ANCHOR_BOTH,
PARAM_BGP_EXT_ANCHOR_ID,
TOPOLOGY_NAME,
)
OMD_ROOT = environ['OMD_ROOT']
BASE_TOPO_PATH: Final[str] = f'{OMD_ROOT}/var/check_mk/topology/data'
def get_bgp_peer_clean_key(raw_key: str) -> str | None:
key_map: Mapping = {
'Local AS': BGP_PEER_LOCAL_AS,
'Local address': BGP_PEER_LOCAL_ADDR,
'Local identifier': BGP_PEER_LOCAL_ID,
'Peer state': BGP_PEER_STATE,
'Remote AS': BGP_PEER_REMOTE_AS,
'Remote address': BGP_PEER_REMOTE_ADDR,
'Remote identifier': BGP_PEER_REMOTE_ID,
'Uptime': BGP_PEER_UPTIME,
}
return key_map.get(raw_key)
def get_bgp_peer_clean_value(key: str, value: str) -> str | None:
class MatchValues:
local_addr = BGP_PEER_LOCAL_ADDR
remote_addr = BGP_PEER_REMOTE_ADDR
remote_id = BGP_PEER_REMOTE_ID
if value is not None:
value = value.strip()
match key:
case MatchValues.local_addr | MatchValues.remote_addr | MatchValues.remote_id:
if value.strip() in ['0.0.0.0', 'N/A']:
return
if value:
return value
def get_service(host: str, servie: str) -> str:
return f'{servie}@{host}'
def get_anchor(raw_anchor: str) -> str | None:
anchor_map = {
'both': PARAM_BGP_EXT_ANCHOR_BOTH,
'as': PARAM_BGP_EXT_ANCHOR_AS,
'id': PARAM_BGP_EXT_ANCHOR_ID,
'none': ARG_PARSER_NONE
}
return anchor_map.get(raw_anchor)
def get_emblem(emblem: str) -> str | None:
if emblem != ARG_PARSER_NONE:
return emblem
def save_topology(data: Mapping, sub_directory: str) -> None:
"""
Save the topology as json file under $OMD_ROOT/var/check_mk/topology/data/{sub_directory}.data_{data['name']}.json
the filename will be changed to lower case.
Args:
data: the topology data
sub_directory: the subdirectory were to save the data under ~/var/check_mk/topology/data/
Returns:
None
"""
file_name = f'data_{data["name"]}.json'.lower()
save_file = Path(f'{BASE_TOPO_PATH}/{sub_directory}/{file_name}')
save_file.parent.mkdir(exist_ok=True, parents=True)
save_file.write_text(json_dunps(data))
def make_topo_default(sub_directory: str, make_default: bool) -> None:
"""
Create the symlink "default" to $OMD_ROOT/var/check_mk/topology/data/{sub_directory} in
$OMD_ROOT/var/check_mk/topology/data/ if it don't exist or mage_default is True
Args:
sub_directory: the subdirectory under ~/var/check_mk/topology/data/ thaht become default
make_default: if True, create the symlink "default" with path as target
Returns:
None
"""
target_path = f'{BASE_TOPO_PATH}/{sub_directory}'
if not Path(f'{BASE_TOPO_PATH}/default').exists():
make_default = True
if make_default:
Path(f'{BASE_TOPO_PATH}/default').unlink(missing_ok=True)
Path(f'{BASE_TOPO_PATH}/default').symlink_to(target=Path(target_path), target_is_directory=True)
def get_topologies() -> Sequence[str | None]:
"""
Returns a list of topology names form the default typology directory.
Returns:
List of str ie: ['CDP', 'LLDP']
"""
path: str = f'{BASE_TOPO_PATH}/default'
if not Path(path).exists():
return []
files = [f for f in Path(path).glob('*.json') if f.is_file()]
return [
json_loads(Path(file).read_text())['name'] for file in files if
json_loads(Path(file).read_text()).get('name') is not None
]
def add_dummy_topologies(sub_directory: str):
path: str = f'{BASE_TOPO_PATH}/default'
# don't overwrite existing topology
if Path(path).exists() and not Path(f'{path}/data_{TOPOLOGY_NAME.lower()}.json').exists():
dummy_topology = {'version': 1, 'name': TOPOLOGY_NAME, 'objects': {}, 'connections': []}
save_topology(
data=dummy_topology,
sub_directory='default',
)
for topology in get_topologies():
if not Path(f'{BASE_TOPO_PATH}/{sub_directory}/data_{topology.lower()}.json').exists():
save_topology(
data={'version': 1, 'name': topology, 'objects': {}, 'connections': []},
sub_directory=sub_directory
)
#
# live status
#
class LiveStatusConnection:
def __init__(self):
self.sites: SiteConfigurations = SiteConfigurations({})
self.sites_mk = Path(f'{OMD_ROOT}/etc/check_mk/multisite.d/sites.mk')
self.socket_path = f'unix:{OMD_ROOT}/tmp/run/live'
if self.sites_mk.exists():
# make eval() "secure"
# https://realpython.com/python-eval-function/#minimizing-the-security-issues-of-eval
_code = compile(self.sites_mk.read_text(), '<string>', 'eval')
allowed_names = ['sites', 'update']
for name in _code.co_names:
if name not in allowed_names:
raise NameError(f'Use of {name} in {self.sites_mk.name} not allowed.')
sites_raw: MutableMapping = {}
eval(self.sites_mk.read_text(), {'__builtins__': {}}, {'sites': sites_raw})
for site, data in sites_raw.items():
self.sites.update({site: {
'alias': data['alias'],
'timeout': data['timeout'],
}})
if data['socket'] == ('local', None):
self.sites[site]['socket'] = self.socket_path
else:
protocol, socket = data['socket']
address, port = socket['address']
self.sites[site]['socket'] = f'{protocol}:{address}:{port}'
self.sites[site]['tls'] = socket['tls']
else:
self.sites.update({SiteId('local'): {
'alias': 'Local site',
'timeout': 5,
'socket': self.socket_path
}})
self.c = MultiSiteConnection(self.sites)
dead_sites = [site['site']['alias'] for site in self.c.dead_sites().values()]
if dead_sites:
self.c.set_only_sites(self.c.alive_sites())
def query(self, query: str):
data: MutableSequence[Tuple[str, str]] = self.c.query(query=query)
if data:
return data
else:
return None
def filter_sites(self, include: bool, sites: Sequence[str]):
if include is True:
site_list = [site for site in self.c.sites if site in sites]
else:
site_list = [site for site in self.c.sites if site not in sites]
self.c.set_only_sites(site_list)
class TopoObjects:
def __init__(self) -> None:
self.topo_objects: MutableMapping[str, object] = {}
def add_host(
self,
host: str,
emblem: str | None = None,
icon: str | None = None,
link2core: bool = True,
obj_id_prefix: str = '',
):
if self.topo_objects.get(f'{obj_id_prefix}{host}') is not None:
return
metadata = {}
if emblem or icon:
metadata = {'images': {}}
if emblem is not None:
metadata['images'].update({'emblem': emblem}) # node image
if icon is not None:
metadata['images'].update({'icon': icon}) # node icon
self.topo_objects[f'{obj_id_prefix}{host}'] = {
'name': host,
'link': {'core': host} if link2core else {},
'metadata': metadata,
}
def add_service(
self,
host: str,
service: str,
emblem: str | None = None,
link2core: bool = True,
):
obj_id = f'{service}@{host}'
if self.topo_objects.get(obj_id) is not None:
return
metadata = {}
if emblem is not None:
metadata = {
'images': {
'emblem': emblem, # node image
},
}
self.topo_objects[obj_id] = {
'name': service,
'link': {'core': [host, service]} if link2core else {},
'metadata': metadata,
}
def get_connection_color(left_state: int, right_state:int) -> str:
state_to_color = {
0: 'white',
1: 'yellow',
2: 'red',
3: 'orange',
}
return state_to_color.get(max(left_state, right_state), 'orange')
class TopoConnections:
def __init__(self) -> None:
self.topo_connections: MutableSequence = []
self.clean_connections: MutableSequence[Sequence[str]] = []
def add_connection(
self,
left: str,
right: str,
left_state: int = 0,
right_state: int = 0,
):
connection = [left, right]
connection.sort()
if connection not in self.clean_connections:
self.clean_connections.append(connection)
self.topo_connections.append([
connection, {
'line_config': {
'css_styles': {
'stroke-dasharray': 'unset'
},
'color': get_connection_color(left_state, right_state),
}
}
])
# taken from https://github.com/Checkmk/checkmk/blob/master/cmk/plugins/smb/lib/check_disk_smb.py
# Change-Id: I7426f6553a906c5ac50a8306157931a10640a526
@dataclass
class Metric:
name: str
value: float
levels: tuple[float, float] | None = None
boundaries: tuple[float, float] | None = None
def __str__(self) -> str:
l = f"{self.levels[0]};{self.levels[1]};" if self.levels else ";;"
b = f"{self.boundaries[0]};{self.boundaries[1]}" if self.boundaries else ";"
# I'm not too sure about the single quotes here, but keeping it for now
return f"'{self.name}'={self.value}B;{l}{b}"
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
#
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2024-07-20
# File : bgp_topology/lib_exec/bgp_topology
import sys
from cmk_addons.plugins.bgp_topology.lib.bgp_topology import main
if __name__ == "__main__":
sys.exit(main())
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
#
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2024-07-06
# File : bgp_topology/rulesets/bgp_topology.py
from collections.abc import Sequence
from cmk.rulesets.v1 import Help, Label, Message, Title
from cmk.rulesets.v1.form_specs import (
CascadingSingleChoice,
CascadingSingleChoiceElement,
DefaultValue,
DictElement,
Dictionary,
FixedValue,
SingleChoice,
SingleChoiceElement,
String,
List
)
from cmk.rulesets.v1.form_specs.validators import LengthInRange, ValidationError
from cmk.rulesets.v1.rule_specs import ActiveCheck, Topic
from cmk_addons.plugins.bgp_topology.constants import (
EMBLEM_BGP_AS,
EMBLEM_BGP_ID,
PARAM_BGP_AS,
PARAM_BGP_EXT_ANCHOR,
PARAM_BGP_EXT_ANCHOR_AS,
PARAM_BGP_EXT_ANCHOR_BOTH,
PARAM_BGP_EXT_ANCHOR_ID,
PARAM_BGP_EXT_ANCHOR_NONE,
PARAM_BGP_ID,
PARAM_BGP_SITES_EXCLUDE,
PARAM_BGP_SITES_FILTER,
PARAM_BGP_SITES_INCLUDE,
PARAM_EMBLEM_CUSTOM,
PARAM_EMBLEM_DEFAULT,
PARAM_EMBLEM_NO_EMBLEM,
PARAM_MAKE_DEFAULT,
PICTURE_TYPE_EMBLEM,
RULE_SET_NAME_BGP_TOPOLOGY,
)
class DuplicateInList: # pylint: disable=too-few-public-methods
""" Custom validator that ensures the validated list has no duplicate entries. """
def __init__(
self,
) -> None:
pass
@staticmethod
def _get_default_errmsg(_duplicates: Sequence) -> Message:
return Message(f"Duplicate element in list. Duplicate elements: {', '.join(_duplicates)}")
def __call__(self, value: List[str] | None) -> None:
if not isinstance(value, list):
return
_duplicates = [value[i] for i, x in enumerate(value) if value.count(x) > 1]
_duplicates = list(set(_duplicates))
if _duplicates:
raise ValidationError(message=self._get_default_errmsg(_duplicates))
def get_emblem_element(default_emblem: str, picture_type: str) -> Sequence[CascadingSingleChoiceElement]:
return [
CascadingSingleChoiceElement(
name=PARAM_EMBLEM_NO_EMBLEM,
title=Title(f'No custom {picture_type}'),
parameter_form=FixedValue(
value=True,
label=Label(f'No custom {picture_type} will be used')
)),
CascadingSingleChoiceElement(
name=PARAM_EMBLEM_DEFAULT,
title=Title(f'Use default {picture_type}'),
parameter_form=FixedValue(
value=True,
label=Label(f'"{default_emblem}" will be used as {picture_type}')
)),
CascadingSingleChoiceElement(
name=PARAM_EMBLEM_CUSTOM,
title=Title(f'Use custom {picture_type}'),
parameter_form=String(
custom_validate=(LengthInRange(min_value=1),),
prefill=DefaultValue(default_emblem),
))
]
def _parameter_form() -> Dictionary:
return Dictionary(
elements={
PARAM_BGP_AS: DictElement(
parameter_form=CascadingSingleChoice(
title=Title('BGP AS emblem'),
elements=get_emblem_element(EMBLEM_BGP_AS, PICTURE_TYPE_EMBLEM),
prefill=DefaultValue(PARAM_EMBLEM_DEFAULT),
help_text=Help(
'Here you can change the picture for the BGP-AS object. '
'If you use the built-in icons prefix the name with "icon_"'
),
)),
PARAM_BGP_ID: DictElement(
parameter_form=CascadingSingleChoice(
title=Title('BGP ID emblem'),
elements=get_emblem_element(EMBLEM_BGP_ID, PICTURE_TYPE_EMBLEM),
prefill=DefaultValue(PARAM_EMBLEM_DEFAULT),
help_text=Help(
'Here you can change the picture for the BGP-ID object. '
'If you use the built-in icons prefix the name with "icon_"'
),
)),
PARAM_BGP_EXT_ANCHOR: DictElement(
parameter_form=SingleChoice(
title=Title('Anchor for external BGP objects'),
elements=[
SingleChoiceElement(name=PARAM_BGP_EXT_ANCHOR_BOTH, title=Title('Use BGP AS and ID')),
SingleChoiceElement(name=PARAM_BGP_EXT_ANCHOR_AS, title=Title('Use BGP AS only')),
SingleChoiceElement(name=PARAM_BGP_EXT_ANCHOR_ID, title=Title('Use BGP ID only')),
SingleChoiceElement(name=PARAM_BGP_EXT_ANCHOR_NONE, title=Title(f'No anchor object'))
],
prefill=DefaultValue(PARAM_BGP_EXT_ANCHOR_BOTH),
help_text=Help('Select how BGP elements external to Checkmk will be created'),
)),
PARAM_MAKE_DEFAULT: DictElement(
parameter_form=FixedValue(
title=Title('Make default'),
label=Label('This will be the default topology'),
help_text=Help(
'Makes the topology the default topology. If there no default topology, this '
'topology becomes always the default.'
),
value=True
)),
PARAM_BGP_SITES_FILTER: DictElement(
parameter_form=CascadingSingleChoice(
title=Title('Filter Checkmk sites'),
elements=[
CascadingSingleChoiceElement(
name=PARAM_BGP_SITES_INCLUDE,
title=Title('Include checkmk sites'),
parameter_form=List(element_template=String(
title=Title('checkmk site name'),
custom_validate=(DuplicateInList(),),
))
),
CascadingSingleChoiceElement(
name=PARAM_BGP_SITES_EXCLUDE,
title=Title('Exclude checkmk sites'),
parameter_form=List(element_template=String(
title=Title('Checkmk site name'),
custom_validate=(DuplicateInList(),),
))
)
],
prefill=DefaultValue(PARAM_BGP_SITES_INCLUDE),
help_text=Help(''),
)),
}
)
rule_spec_bgp_topo = ActiveCheck(
name=RULE_SET_NAME_BGP_TOPOLOGY,
topic=Topic.NETWORKING,
parameter_form=_parameter_form,
title=Title('BGP Topology'),
)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
#
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2024-07-20
# File : bgp_topology/server_side_calls/bgp_topology.py
from collections.abc import Iterator, Mapping, Sequence
from pydantic import BaseModel
from typing import Literal
from cmk.utils import debug
from cmk.server_side_calls.v1 import (
HostConfig,
Secret,
ActiveCheckCommand,
ActiveCheckConfig,
)
from cmk_addons.plugins.bgp_topology.constants import (
ARG_PARSER_ANCHOR,
ARG_PARSER_ANCHOR_AS,
ARG_PARSER_ANCHOR_ID,
ARG_PARSER_EMBLEM_AS,
ARG_PARSER_EMBLEM_ID,
ARG_PARSER_HOST,
ARG_PARSER_MAKE_DEFAULT,
ARG_PARSER_NONE,
ARG_PARSER_SITES_EXCLUDE,
ARG_PARSER_SITES_INCLUDE,
PARAM_BGP_EXT_ANCHOR_AS,
PARAM_BGP_EXT_ANCHOR_BOTH,
PARAM_BGP_EXT_ANCHOR_ID,
PARAM_BGP_EXT_ANCHOR_NONE,
PARAM_BGP_SITES_EXCLUDE,
PARAM_BGP_SITES_INCLUDE,
PARAM_EMBLEM_CUSTOM,
PARAM_EMBLEM_DEFAULT,
PARAM_EMBLEM_NO_EMBLEM,
RULE_SET_NAME_BGP_TOPOLOGY,
)
EMBLEM = (
tuple[Literal["default_emblem"], bool] |
tuple[Literal['custom_emblem'], str] |
tuple[Literal['no_emblem'], bool] |
None
)
FILTER_SITES = (
tuple[Literal['exclude_sites'], Sequence[str]] |
tuple[Literal['include_sites'], Sequence[str]]
)
class BgpAnchorType:
anchor_both: str = PARAM_BGP_EXT_ANCHOR_BOTH
anchor_as: str = PARAM_BGP_EXT_ANCHOR_AS
anchor_id: str = PARAM_BGP_EXT_ANCHOR_ID
anchor_none: str = PARAM_BGP_EXT_ANCHOR_NONE
class Emblem:
default: str = PARAM_EMBLEM_DEFAULT
custom: str = PARAM_EMBLEM_CUSTOM
no_emblem: str = PARAM_EMBLEM_NO_EMBLEM
class Params(BaseModel):
bgp_as: EMBLEM = None
bgp_id: EMBLEM = None
bgp_ext_anchor: str | None = None
make_default: bool | None = None
bgp_filter_sites: FILTER_SITES | None = None
__params = {
'bgp_as': ('custom_emblem', 'icon_cloud'),
'bgp_id': ('no_emblem', True),
'bgp_ext_anchor': 'bgp_ext_anchor_both',
'make_default': True
}
__parsed = Params(
bgp_as=('custom_emblem', 'icon_cloud'),
bgp_id=('no_emblem', True),
bgp_ext_anchor='bgp_ext_anchor_both',
make_default=True
)
def _commands_bgp_topology_parser(params: Mapping[str, object]) -> Params:
if debug.enabled():
print(params)
return Params.model_validate(params)
def commands_bgp_topology_arguments(
params: Params, host_config: HostConfig
) -> Iterator[ActiveCheckCommand]:
if debug.enabled():
pass
# print(host_config)
args: list[str | Secret] = []
args += [ARG_PARSER_HOST, host_config.name.replace(' ', '_')]
if params.make_default is True:
args.append(ARG_PARSER_MAKE_DEFAULT)
if params.bgp_ext_anchor is not None:
match params.bgp_ext_anchor:
case BgpAnchorType.anchor_both:
pass # this is the default
case BgpAnchorType.anchor_as:
args += [ARG_PARSER_ANCHOR, ARG_PARSER_ANCHOR_AS]
case BgpAnchorType.anchor_id:
args += [ARG_PARSER_ANCHOR, ARG_PARSER_ANCHOR_ID]
case BgpAnchorType.anchor_none:
args += [ARG_PARSER_ANCHOR, ARG_PARSER_NONE]
case _:
pass
if params.bgp_id:
key, value = params.bgp_id
match key:
case Emblem.default:
pass # this is the default
case Emblem.custom:
args += [ARG_PARSER_EMBLEM_ID, value]
case Emblem.no_emblem:
args += [ARG_PARSER_EMBLEM_ID, ARG_PARSER_NONE]
case _:
pass
if params.bgp_as:
key, value = params.bgp_as
match key:
case Emblem.default:
pass # this is the default
case Emblem.custom:
args += [ARG_PARSER_EMBLEM_AS, value]
case Emblem.no_emblem:
args += [ARG_PARSER_EMBLEM_AS, ARG_PARSER_NONE]
case _:
pass
if params.bgp_filter_sites:
class FilterMode:
exclude_sites = PARAM_BGP_SITES_EXCLUDE
include_sites = PARAM_BGP_SITES_INCLUDE
mode, site_list = params.bgp_filter_sites
match mode:
case FilterMode.exclude_sites:
args.append(ARG_PARSER_SITES_EXCLUDE)
args += site_list
case FilterMode.include_sites:
args.append(ARG_PARSER_SITES_INCLUDE)
args += site_list
case _:
pass
if debug.enabled():
print(args)
yield ActiveCheckCommand(
service_description="BGP Topology",
command_arguments=args
)
active_check_bgp_topology = ActiveCheckConfig(
name=RULE_SET_NAME_BGP_TOPOLOGY,
parameter_parser=_commands_bgp_topology_parser,
commands_function=commands_bgp_topology_arguments,
)
{'author': 'Th.L. (thl-cmk[at]outlook[dot]com)',
'description': 'Active check to create the BGP peer topology\n',
'download_url': 'https://thl-cmk.hopto.org',
'files': {'cmk_addons_plugins': ['bgp_topology/constants.py',
'bgp_topology/lib/bgp_topology.py',
'bgp_topology/lib/utils.py',
'bgp_topology/libexec/check_bgp_topology',
'bgp_topology/rulesets/bgp_topology.py',
'bgp_topology/server_side_calls/bgp_topology.py',
'bgp_topology/graphing/bgp_topology.py']},
'name': 'bgp_topology',
'title': 'BGP peer topology',
'version': '0.0.1-20240722',
'version.min_required': '2.3.0b1',
'version.packaged': 'cmk-mkp-tool 0.2.0',
'version.usable_until': '2.4.0b1'}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment