Collection of CheckMK checks (see https://checkmk.com/). All checks and plugins are provided as is. Absolutely no warranty. Send any comments to thl-cmk[at]outlook[dot]com

Skip to content
Snippets Groups Projects
Commit aeaa2381 authored by thl-cmk's avatar thl-cmk :flag_na:
Browse files

added support for IPv6, changed hostlabel to nvdct/l3v4_topology, rewritten for check API 2.0

parent b4da77b8
No related branches found
No related tags found
No related merge requests found
[PACKAGE]: ../../raw/master/mkp/inv_ipv4_addresses-0.0.4-20240407.mkp "inv_ipv4_addresses-0.0.4-20240407.mkp"
[PACKAGE]: ../../raw/master/mkp/inv_ip_address-0.0.5-20241209.mkp "inv_ip_address-0.0.5-20241209.mkp"
# Inventory of IP addresses
The plugin adds the IP-addresses information to the inventory for devices monitored via SNMP. The plugin supports IPv4 and IPv6.
......
File added
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
#
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-12-27
# File : inv_ipv4_addresses.py
#
# inventory of IPv4 address information
#
# 2024-04-07: fixed missing/wrong netmask (ThX bitwiz@forum.checkmk.com)
# improved validation if SNMP input data
# drop this host ip address (0.0.0.0)
from dataclasses import dataclass
from typing import List
from ipaddress import AddressValueError, IPv4Address, IPv4Network, NetmaskValueError
from cmk.base.plugins.agent_based.agent_based_api.v1 import (
OIDEnd,
)
from cmk.base.plugins.agent_based.agent_based_api.v1 import (
HostLabel,
SNMPTree,
TableRow,
exists,
register,
)
from cmk.base.plugins.agent_based.agent_based_api.v1.type_defs import (
HostLabelGenerator,
InventoryResult,
StringTable,
)
@dataclass(frozen=True)
class Ipv4Info:
address: str
broadcast: str
cidr: int
if_index: int | None
if_name: str
max_re_assemble: int | None
netmask: str
network: str
def parse_inv_ipv4_addresses(string_table: List[StringTable]) -> List[Ipv4Info] | None:
try:
ipv4_info, if_info = string_table
except ValueError:
return
try:
interface_by_index = {if_index: if_name for if_index, if_name in if_info}
except ValueError:
interface_by_index = {}
ipv4_infos = []
for entry in ipv4_info:
try:
ipv4_address, if_index, ipv4_netmask, ipv4_bcast, max_re_assemble = entry
except ValueError:
continue
try:
ipv4_address = IPv4Address(ipv4_address)
except AddressValueError:
continue
if ipv4_address.is_loopback: # Drop localhost
continue
if ipv4_address.exploded == '0.0.0.0': # drop this host address
continue
try:
ipv4 = IPv4Network(address=f'{ipv4_address.exploded}/{ipv4_netmask}', strict=False)
except (AddressValueError, NetmaskValueError):
continue
ipv4_infos.append(
Ipv4Info(
address=str(ipv4_address.exploded),
broadcast=str(ipv4.broadcast_address),
cidr=int(ipv4.prefixlen),
if_index=int(if_index) if if_index.isdigit() else None,
if_name=str(interface_by_index.get(if_index, if_index)),
max_re_assemble=int(max_re_assemble) if max_re_assemble.isdigit() else None,
netmask=str(ipv4.netmask),
network=str(ipv4.network_address),
)
)
return ipv4_infos
def host_label_inv_ipv4_addresses(section: List[Ipv4Info]) -> HostLabelGenerator:
_non_host_ips = 0
for ipv4 in section:
if ipv4.cidr != 32 and not ipv4.address.startswith('127.'):
_non_host_ips += 1
if _non_host_ips > 1:
yield HostLabel(name="nvdct/routing_capable", value="yes")
def inventory_ipv4_addresses(section: List[Ipv4Info]) -> InventoryResult:
path = ['networking', 'addresses']
for ipv4_info in section:
key_columns = {
'address': ipv4_info.address,
'device': ipv4_info.if_name,
}
inventory_columns = {
'broadcast': ipv4_info.broadcast,
'cidr': ipv4_info.cidr,
# 'if_index': ipv4_info.if_index,
# 'max_re_assemble': ipv4_info.max_re_assemble,
'netmask': ipv4_info.netmask,
'network': ipv4_info.network,
'type': 'ipv4',
}
yield TableRow(
path=path,
key_columns=key_columns,
inventory_columns=inventory_columns
)
register.snmp_section(
name='inv_ipv4_addresses',
parse_function=parse_inv_ipv4_addresses,
host_label_function=host_label_inv_ipv4_addresses,
fetch=[
SNMPTree(
base='.1.3.6.1.2.1.4.20.1', # IP-MIB::ipAddrEntry
oids=[
'1', # ipAdEntAddr
'2', # ipAdEntIfIndex
'3', # ipAdEntNetMask
'4', # ipAdEntBcastAddr
'5', # ipAdEntReasmMaxSize
]
),
SNMPTree(
base='.1.3.6.1.2.1.31.1.1.1', #
oids=[
OIDEnd(), # ifIndex
'1', # ifName
]),
],
detect=exists('.1.3.6.1.2.1.4.20.1.1.*'), #
)
register.inventory_plugin(
name='inv_ipv4_addresses',
inventory_function=inventory_ipv4_addresses,
)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
#
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-12-27
# File : inv_ip_addresses.py
#
# inventory of IPv4 address information
#
# 2024-04-07: fixed missing/wrong netmask (ThX bitwiz@forum.checkmk.com)
# improved validation of SNMP input data
# drop this host ip address (0.0.0.0)
# 2024-12-02: incompatible: changed host label to nvdct/l3v4_host:yes and nvdct/l3v4_routing:yes
# 2024-12-03: added IP-MIB::ipAddressTable for IPv6 support
# incompatible: renamed to inv_ip_address -> remove inv_ipv4_address.mkp before updating
# 2024-12-05 changed to use ip_interface
# 2024-12-06: incompatible: changed hostlabel to nvdct/l3v4_topology:host and nvdct/l3v4_topology:router
# 2024-12-09: rewritten for CMK checkAPI 2.0
from collections.abc import Mapping, MutableSequence, Sequence
from ipaddress import AddressValueError, NetmaskValueError, ip_interface
from re import match as re_match
from typing import List
from cmk.agent_based.v2 import (
HostLabel,
HostLabelGenerator,
InventoryPlugin,
InventoryResult,
OIDBytes,
OIDEnd,
SNMPSection,
SNMPTree,
StringByteTable,
TableRow,
exists,
)
__ip_info_34_ios = [
[
'1.4.10.10.10.230', # OID end -> type: ipv4, length: 4, ipv4 address
[], # ip address -> empty
'3', # interface index
'.1.3.6.1.2.1.4.32.1.5.3.1.4.10.10.10.228.30' # prefix -> last number (30)
],
[
'2.16.42.0.28.160.16.0.1.53.0.0.0.0.0.0.0.2', # OID end -> type: ipv6, length: 16, ipv6 address
[],
'3',
'.1.3.6.1.2.1.4.32.1.5.3.2.16.42.0.28.160.16.0.1.53.0.0.0.0.0.0.0.0.64'
],
[
'4.20.254.128.0.0.0.0.0.0.114.219.152.255.254.159.41.2.18.0.0.8',
# OID end -> type: ipv6z, length: 20, ipv6 address with interface identifier (18.0.0.8)
[],
'3',
'.0.0'
],
]
__ip_info_34_ibm = [
[
'1.15.48.49.48.46.49.52.48.46.49.54.48.46.48.49.55',
# OID end -> type: ipv4, length: 15, ipv4 address ('010.140.160.017')
[],
'805306370',
'.0.0'
],
[
'2.39.48.48.48.48.58.48.48.48.48.58.48.48.48.48.58.48.48.48.48.58.48.48.48.48.58.48.48.48.48.58.48.48.48.48.58.48.48.48.49',
# OID end -> type: ipv6, length: 39, ipv6 address ('0000:0000:0000:0000:0000:0000:0000:0001')
[],
'805306371',
'.0.0'
],
]
__ip_info_34_firepower = [
[
'1.10.1.1.2', # OID end -> type: ipv4, , ipv4 address ('10.1.1.2')
[10, 1, 1, 2], # ip address in dec bytes
'18',
'.1.3.6.1.2.1.4.32.1.5.18.1.10.1.1.0.24'
],
[
'2.253.0.0.0.0.0.0.1.0.0.0.0.0.0.0.1',
# OID end -> type: ipv6, ipv6 address ('253.0.0.0.0.0.0.1.0.0.0.0.0.0.0.1')
[253, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1], # ip address in dec bytes
'4',
'.1.3.6.1.2.1.4.32.1.5.4.2.253.0.0.0.0.0.0.1.0.0.0.0.0.0.0.0.64'
],
]
__ip_info_34_fortinet = [
[
'1.10.118.132.1.76', # OID end -> type: ipv4, ipv4 address (10.118.132.1), interface index (76)
[],
'76',
'.0.0.0' # prefix -> missing
],
[
'2.10762.22982.8208.4113.0.0.0.282.40',
# OID end -> type: ipv6, ipv6 address (10762.22982.8208.4113.0.0.0.282), interface index (76)
[],
'40',
'.0.0.0' # prefix -> missing
],
]
Section = Sequence[Mapping[str, ip_interface]] | None
def parse_inv_ip_addresses(string_table: List[StringByteTable]) -> Section:
try:
ip_info_20, ip_info_34, if_info = string_table
except ValueError:
return
try:
interface_by_index = {if_index: if_name for if_index, if_name in if_info}
except ValueError:
interface_by_index = {}
ip_infos: MutableSequence[Mapping[str, ip_interface]] = []
for entry in ip_info_34:
try:
oid_end, dec_ip_address, if_index, ip_prefix = entry
except ValueError:
continue
if (prefix := ip_prefix.split('.')[-1]) == '0': # drop entries without prefix (0) -> fortinet
continue
if not (raw_ip := re_match(r'(\d+)\.(\d+)\.([\d|\.]+)', oid_end)):
continue
raw_type, raw_length, raw_address = raw_ip.groups()
if dec_ip_address:
raw_address = '.'.join(str(x) for x in dec_ip_address)
raw_length = str(len(dec_ip_address))
match raw_type:
case '1': # IPv4 address
if raw_length == '15':
raw_address = ''.join([chr(int(x)) for x in raw_address.split('.')])
raw_address = '.'.join([str(int(x)) for x in raw_address.split('.')])
case '2': # IPv6 address
match raw_length:
case '16':
raw_address = [f'{int(x):02x}' for x in raw_address.split('.')]
raw_address = ':'.join(
[''.join([raw_address[i], raw_address[i + 1]]) for i in range(0, len(raw_address), 2)]
)
case '39':
raw_address = ''.join([chr(int(x)) for x in raw_address.split('.')])
case _:
continue
try:
interface = ip_interface(f'{raw_address}/{prefix}')
except (AddressValueError, NetmaskValueError):
continue
if interface.ip.is_loopback: # Drop localhost
continue
if interface.ip.exploded == '0.0.0.0': # drop this host address
continue
ip_infos.append({(str(interface_by_index.get(if_index, if_index))): interface})
for entry in ip_info_20:
try:
raw_address, if_index, raw_netmask = entry
except ValueError:
continue
try:
interface = ip_interface(f'{raw_address}/{raw_netmask}')
except (AddressValueError, NetmaskValueError):
continue
if interface.ip.is_loopback: # Drop localhost
continue
if interface.ip.exploded == '0.0.0.0': # drop this host address
continue
ip_infos.append({str(interface_by_index.get(if_index, if_index)): interface})
return ip_infos
def host_label_inv_ip_addresses(section: Section) -> HostLabelGenerator:
"""
Host label function
Labels:
nvdct/l3v4_topology:
This label is set to "host" for all devices with one IPv4 address except form 127.0.0.0/8 and to
"router" for all devices with more than one IPv4 address except form 127.0.0.0/8
"""
non_host_ips = 0
for entry in section:
ip_data = entry.values()
if ip_data.version == 4 and not ip_data.ip.is_loopback:
non_host_ips += 1
if non_host_ips == 1:
yield HostLabel(name="nvdct/l3v4_topology", value="host")
if non_host_ips > 1:
yield HostLabel(name="nvdct/l3v4_topology", value="router")
return
def inventory_ip_addresses(section: Section) -> InventoryResult:
address_type = {
4: 'ipv4',
6: 'ipv6',
}
for entry in section:
for if_name, ip_data in entry.items():
yield TableRow(
path=['networking', 'addresses'],
key_columns={
'address': str(ip_data.ip.compressed),
'device': if_name,
},
inventory_columns={
'broadcast': str(ip_data.network.broadcast_address),
'cidr': ip_data.network.prefixlen,
'netmask': str(ip_data.network.netmask),
'network': str(ip_data.network.network_address),
'type': address_type.get(ip_data.version).lower(),
}
)
snmp_section_inv_ip_address = SNMPSection(
name='inv_ip_addresses',
parse_function=parse_inv_ip_addresses,
host_label_function=host_label_inv_ip_addresses,
fetch=[
SNMPTree(
base='.1.3.6.1.2.1.4.20.1', # IP-MIB::ipAddrEntry
oids=[
'1', # ipAdEntAddr
'2', # ipAdEntIfIndex
'3', # ipAdEntNetMask
]
),
SNMPTree(
base='.1.3.6.1.2.1.4.34.1', # IP-MIB::ipAddrEntry
oids=[
OIDEnd(), # type.length.ip-address
OIDBytes('2'), # ipAddressAddr
'3', # ipAddressIfIndex
'5', # ipAddressPrefix
]
),
SNMPTree(
base='.1.3.6.1.2.1.31.1.1.1', #
oids=[
OIDEnd(), # ifIndex
'1', # ifName
]),
],
detect=exists('.1.3.6.1.2.1.4.20.1.1.*'), #
)
inventory_plugin_inv_ip_address = InventoryPlugin(
name='inv_ip_addresses',
inventory_function=inventory_ip_addresses,
)
{'author': 'Th.L. (thl-cmk[at]outlook[dot]com)',
'description': 'Adds the IP addresses of a device monitored via SNMP to the '
'inventory\n'
'\n'
'Adds host labels:\n'
' - nvdct/l3v4_host: This label is set to "yes" for all '
'devices with one IPv4 address except form 127.0.0.0/8\n'
' - nvdct/l3v4_routing: This label is set to "yes" for all '
'devices with more than one IPv4 address except form '
'127.0.0.0/8\n',
'download_url': 'https://thl-cmk.hopto.org',
'files': {'cmk_addons_plugins': ['inv_ip_address/agent_based/inv_ip_addresses.py'],
'web': ['plugins/views/inv_ip_addresses.py']},
'name': 'inv_ip_address',
'title': 'Inventory of IP addresses',
'version': '0.0.5-20241209',
'version.min_required': '2.3.0b1',
'version.packaged': 'cmk-mkp-tool 0.2.0',
'version.usable_until': '2.4.0b1'}
{'author': 'Th.L. (thl-cmk[at]outlook[dot]com)',
'description': 'Adds the IPv4 addresses of a device monitored via SNMP to the '
'inventory\n',
'download_url': 'https://thl-cmk.hopto.org',
'files': {'agent_based': ['inv_ipv4_addresses.py'],
'web': ['plugins/views/inv_ipv4_addresses.py']},
'name': 'inv_ipv4_addresses',
'title': 'Inventory of IPv4 addresses',
'version': '0.0.4-20240407',
'version.min_required': '2.2.0b1',
'version.packaged': 'cmk-mkp-tool 0.2.0',
'version.usable_until': '2.4.0b1'}
......@@ -19,16 +19,16 @@ inventory_displayhints.update({
'address',
'device',
'type',
'cidr',
'netmask',
'network',
'netmask',
'cidr',
'broadcast',
],
'view': 'invipaddresses_of_host',
},
'.networking.addresses:*.address': {'title': _l('Address')},
'.networking.addresses:*.broadcast': {'title': _l('Broadcast')},
'.networking.addresses:*.cidr': {'title': _l('CIDR'), }, # 'filter': FilterInvtableIDRange},
'.networking.addresses:*.cidr': {'title': _l('Prefix'), }, # 'filter': FilterInvtableIDRange},
'.networking.addresses:*.device': {'title': _l('Device')},
'.networking.addresses:*.netmask': {'title': _l('Netmask')},
'.networking.addresses:*.network': {'title': _l('Network')},
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment