Collection of CheckMK checks (see https://checkmk.com/). All checks and plugins are provided as is. Absolutely no warranty. Send any comments to thl-cmk[at]outlook[dot]com

Skip to content
Snippets Groups Projects
Commit 53ce7ff5 authored by thl-cmk's avatar thl-cmk :flag_na:
Browse files

update project

parent 400e7333
No related branches found
No related tags found
No related merge requests found
[PACKAGE]: ../../raw/master/inv_lldp_cache-0.9.0-20231013.mkp "inv_lldp_cache-0.9.0-20231013.mkp"
# LLDP inventory plugin
Adds the LLDP information from network devices to the inventory
......
......@@ -25,17 +25,20 @@
# 2023-01-19: fix using wrong local interface id, switched from IF-MIB::ifName to LLDP-MIB::lldpLocPortId
# 2023-02-16: replaced TypedDic (Neighbour) with Dict, removed Dataclass --> wasn't working in CMK 2.1
# 2023-02-17: moved wato/metrics from ~/local/share/.. to ~/local/lib/... for CMK2.1
# 2023-10-13: refactoring: render ipv4/ipv6/mac address
# fixed handling sub types for chassis and ports
import re
from typing import List, Dict
from typing import List, Dict, Optional
from ipaddress import ip_address
from cmk.base.plugins.agent_based.agent_based_api.v1.type_defs import (
StringTable,
InventoryResult,
StringByteTable,
)
from cmk.base.plugins.agent_based.agent_based_api.v1 import (
OIDEnd,
OIDBytes,
)
from cmk.base.plugins.agent_based.agent_based_api.v1 import (
......@@ -62,13 +65,14 @@ _interface_displayhints = {
}
def _get_short_if_name(ifname: str) -> str:
def _get_short_if_name(ifname: str) -> Optional[str]:
"""
returns short interface name from long interface name
ifname: is the long interface name
:type ifname: str
"""
if not ifname:
return ifname
for ifname_prefix in _interface_displayhints.keys():
if ifname.lower().startswith(ifname_prefix.lower()):
ifname_short = _interface_displayhints[ifname_prefix]
......@@ -76,34 +80,34 @@ def _get_short_if_name(ifname: str) -> str:
return ifname
def parse_inv_lldp_cache(string_table: List[StringTable]) -> List[Dict[str, str]]:
lldp_chassisidsubtype = {
def parse_inv_lldp_cache(string_table: List[StringByteTable]) -> List[Dict[str, str]]:
lldp_chassis_id_sub_type = {
0: 'n/a',
1: 'chassiscomponent',
2: 'interfacealias',
3: 'portcomponent',
4: 'macaddress',
5: 'networkaddress',
6: 'interfacename',
1: 'chassis component',
2: 'interface alias',
3: 'port component',
4: 'mac address',
5: 'network address',
6: 'interface name',
7: 'local',
}
lldp_portidsubtype = {
lldp_port_id_sub_type = {
0: 'n/a',
1: 'interfacealias',
2: 'portcomponent',
3: 'macaddress',
4: 'networkaddress',
5: 'interfacename',
6: 'agentcircuitid',
1: 'interface alias',
2: 'port component',
3: 'mac address',
4: 'network address',
5: 'interface name',
6: 'agent circuit id',
7: 'local',
}
lldp_manaddrifsubtype = {
lldp_man_addr_if_sub_type = {
0: 'n/a',
1: 'unknown',
2: 'ifindex',
3: 'systemportnumber',
2: 'interface index',
3: 'system port number',
}
lldp_capabilities = {
......@@ -118,51 +122,36 @@ def parse_inv_lldp_cache(string_table: List[StringTable]) -> List[Dict[str, str]
128: 'Station only',
}
def render_mac_address(bytestring):
if len(bytestring) == 17:
return bytestring
else:
return ':'.join(['%02s' % hex(ord(m))[2:] for m in bytestring]).replace(' ', '0').upper()
def render_mac_address(bytes_: List[int]):
if len(bytes_) == 6:
return ':'.join(f'{hex(m).upper()[2:]:0>2}' for m in bytes_)
def render_networkaddress(bytestring):
if ord(bytestring[0]) == 1: # ipv4 address
return render_ipv4_address(bytestring[1:])
elif ord(bytestring[0]) == 2:
return render_ipv6_address(bytestring[1:])
else:
return bytestring
def render_ipv4_address(bytestring):
return '.'.join(['%s' % ord(m) for m in bytestring])
def shorten_ipv6_address(address):
address = address.split(':')
span = 2
address = [''.join(address[i:i + span]) for i in range(0, len(address), span)]
for m in range(0, len(address)):
address[m] = re.sub(r'^0{1,3}', r'', address[m])
address = ':'.join(address)
zeros = ':0:0:0:0:0:0:'
while not zeros == '':
if zeros in address:
address = re.sub(r'%s' % zeros, r'::', address)
zeros = ''
else:
zeros = zeros[:-2]
return address
def render_ipv4_address(bytes_: List[int]):
return '.'.join(str(m) for m in bytes_)
def render_ipv6_address(bytes_: List[int]):
def _hex(_byte: int) -> str:
return f'{hex(_byte)[2:]:0>2}'
def render_ipv6_address(bytestring):
address = ":".join(["%02s" % hex(ord(m))[2:] for m in bytestring]).replace(' ', '0').upper()
address = shorten_ipv6_address(address)
address = ':'.join([''.join([_hex(bytes_[i]), _hex(bytes_[i + 1])]) for i in range(0, len(bytes_), 2)])
address = str(ip_address(address))
return address
def render_capabilities(bytestring):
if len(bytestring) == 0:
def render_networkaddress(bytes_: List[int]):
if bytes_[0] == 1 and len(bytes_) == 5: # ipv4 address
return render_ipv4_address(bytes_[1:])
elif bytes_[0] == 2 and len(bytes_) == 17:
return render_ipv6_address(bytes_[1:])
else:
return''.join(chr(m) for m in bytes_)
def render_capabilities(bytes_):
if len(bytes_) == 0:
return ''
capabilities = []
bytestring = int(ord(bytestring[0]))
bytes_ = int(ord(bytes_[0]))
for x in lldp_capabilities:
tempcap = lldp_capabilities.get(bytestring & x)
tempcap = lldp_capabilities.get(bytes_ & x)
if tempcap != '':
capabilities.append(tempcap)
......@@ -173,45 +162,53 @@ def parse_inv_lldp_cache(string_table: List[StringTable]) -> List[Dict[str, str]
neighbours = []
for oid_end, lldpchassisidsubtype, lldpchassisid, lldpportidsubtype, lldpportid, lldpportdescription, \
lldpsystemname, lldpsystemdescription, ldpcapabilitiesmapsupported, lldpcachecapabilities in lldp_info:
interfaces = {}
for if_index, if_sub_type, if_name in if_info:
interfaces[if_index] = {'if_sub_type': if_sub_type, 'if_name': if_name}
for oid_end, chassis_id_sub_type, chassis_id, port_id_sub_type, port_id, port_description, \
system_name, system_description, capabilities_map_supported, cache_capabilities in lldp_info:
try:
local_ifindex = oid_end.split('.')[1]
local_if_index = oid_end.split('.')[1]
except IndexError:
local_ifindex = oid_end.split('.')[0]
local_port = 'N/A'
for ifIndex, ifName in if_info:
if local_ifindex == ifIndex:
local_port = ifName
local_if_index = oid_end.split('.')[0]
if lldpchassisidsubtype == '4': # mac address
lldpchassisid = render_mac_address(lldpchassisid)
elif lldpchassisidsubtype == '5': # network address
lldpchassisid = render_networkaddress(lldpchassisid)
if lldpportidsubtype == '3': # macaddress
lldpportid = render_mac_address(lldpportid)
try:
interface = interfaces[local_if_index]
except KeyError:
continue
if interface['if_sub_type'] == '3' and len(interface['if_name']): # mac address
local_port = render_mac_address(interface['if_name'])
elif interface['if_sub_type'] in ['5', '7']: #
local_port = render_networkaddress(interface['if_name'])
else:
local_port = ''.join(chr(m) for m in interface['if_name'])
if len(lldpportid) == 6:
# if lldpportid contains 'illegal' signs try to treat as mac address
# remove all chars from string, except allowedchars
allowedchars = re.compile('[^a-zA-Z0-9_=\-\+\.\\\/]')
cleanport_id = allowedchars.sub('', lldpportid).strip()
if chassis_id_sub_type == '4' and len(chassis_id) == 6: # mac address
chassis_id = render_mac_address(chassis_id)
elif chassis_id_sub_type == '5': # network address
chassis_id = render_networkaddress(chassis_id)
else:
chassis_id = ''.join(chr(m) for m in chassis_id)
if (cleanport_id != lldpportid) or (cleanport_id == ''):
lldpportid = render_mac_address(lldpportid)
if port_id_sub_type == '3' and len(port_id) == 6: # mac address
port_id = render_mac_address(port_id)
elif port_id_sub_type in ['5', '7']: # network address, local
port_id = render_networkaddress(port_id)
else:
port_id = ''.join(chr(m) for m in port_id)
neighbour: Dict = {
'local_port_num': local_port,
'chassis_id': lldpchassisid,
'port_id': lldpportid,
'port_description': lldpportdescription,
'system_name': lldpsystemname,
'system_description': lldpsystemdescription,
'capabilities_map_supported': render_capabilities(ldpcapabilitiesmapsupported),
'cache_capabilities': render_capabilities(lldpcachecapabilities),
'chassis_id': chassis_id,
'port_id': port_id,
'port_description': port_description,
'system_name': system_name if system_name else chassis_id,
'system_description': system_description,
'capabilities_map_supported': render_capabilities(capabilities_map_supported),
'cache_capabilities': render_capabilities(cache_capabilities),
}
neighbours.append(neighbour)
return neighbours
......@@ -226,7 +223,7 @@ def inventory_lldp_cache(params, section: List[Dict[str, str]]) -> InventoryResu
path = ['networking', 'lldp_cache']
for neighbour in section:
if remove_domain:
if remove_domain and neighbour['system_name']:
if not domain_name == '':
neighbour['system_name'] = neighbour['system_name'].replace(domain_name, '')
else:
......@@ -264,9 +261,9 @@ register.snmp_section(
oids=[
OIDEnd(), # ifIndex
'4', # lldpChassisIdSubtype
'5', # lldpChassisId
OIDBytes('5'), # lldpChassisId
'6', # lldpPortIdSubtype
'7', # lldpPortId
OIDBytes('7'), # lldpPortId
'8', # lldpPortDescription
'9', # lldpSystemName
'10', # lldpSystemDescription
......@@ -278,7 +275,8 @@ register.snmp_section(
base='.1.0.8802.1.1.2.1.3.7.1', # lldpRemEntry
oids=[
OIDEnd(), # interface index
'3', # lldpLocPortId
'2', # lldpLocPortIdSubtype
OIDBytes('3'), # lldpLocPortId
]
),
],
......
File added
File deleted
......@@ -9,14 +9,15 @@
' - 2019-03-04: added support for cmk1.5.x\n'
' - 2020-05-13: added support for CMK1.6x\n'
' - 2021-03-17: rewritten for CMK 2.0\n'
' - 2023-02-16: changed for CMK 2.1\n',
' - 2023-02-16: changed for CMK 2.1\n'
' - 2023-06-14: changed for CMK 2.2\n',
'download_url': 'https://thl-cmk.hopto.org',
'files': {'agent_based': ['inv_lldp_cache.py'],
'gui': ['views/inv_lldp_cache.py',
'wato/check_parameters/inv_lldp_cache.py']},
'name': 'inv_lldp_cache',
'title': 'inventory for LLDP cache',
'version': '0.8.0-20230614',
'version': '0.9.0-20231013',
'version.min_required': '2.2.0b1',
'version.packaged': '2.2.0p2',
'version.packaged': '2.2.0p11',
'version.usable_until': None}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment