Collection of CheckMK checks (see https://checkmk.com/). All checks and plugins are provided as is. Absolutely no warranty. Send any comments to thl-cmk[at]outlook[dot]com

Skip to content
Snippets Groups Projects
Commit 94e764e3 authored by thl-cmk's avatar thl-cmk :flag_na:
Browse files

update project

parent 72cb56b0
No related branches found
No related tags found
No related merge requests found
[PACKAGE]: ../../raw/master/packagee-0.1.2-20230706.mkp "package-0.1.2-20230706.mkp"
[PACKAGE]: ../../raw/master/vzlogger-0.0.1-230806.mkp "vzlogger-0.0.1-230806.mkp"
# Title
A short description about the plugin
......
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-08-05
# File : vzlogger.py
#
# vzlogger
#
#
# https://wiki.volkszaehler.org/software/controller/vzlogger
#
#
# 2023-08-05: initial release
import json
import time
from dataclasses import dataclass
from typing import Dict, Mapping, Tuple, Optional, Any
from cmk.base.plugins.agent_based.agent_based_api.v1.type_defs import (
DiscoveryResult,
CheckResult,
StringTable,
)
from cmk.base.plugins.agent_based.agent_based_api.v1 import (
register,
Service,
Result,
State,
check_levels_predictive,
check_levels,
)
@dataclass
class VzLoggerChannel:
uuid: str
interval: int
last: int
protocol: str
value: float
value_last: int
_vzlogger_channel_types = {
'meter_reading': 'Meter reading',
'voltage': 'Voltage',
'electrical_power': 'Electrical Power',
}
def _render_voltage(value: float) -> str:
return f'{value:.2f} V'
def _render_electrical_power(value: float) -> str:
return f'{value:.2f} W'
def _render_meter_reading(value: float) -> str:
return f'{value:.0f} kWh'
def _yield_channel_details(channel: VzLoggerChannel) -> CheckResult:
yield Result(state=State.OK, notice=f'UUID: {channel.uuid}')
yield Result(state=State.OK, notice=f'Interval: {channel.interval}')
yield Result(state=State.OK, notice=f'Last: {time.ctime(channel.last)}')
yield Result(state=State.OK, notice=f'Protocol: {channel.protocol}')
yield Result(state=State.OK, notice=f'Value: {channel.value}')
yield Result(state=State.OK, notice=f'Value last: {time.ctime(channel.value_last)}')
def _yield_channel_value(
value: float,
label: str,
metric: str,
render_func: Any,
levels_upper: Optional[Tuple[int, int]] = None,
level_lower: Optional[Tuple[int, int]] = None,
) -> CheckResult:
metric = metric.lower().replace(' ', '_').replace('-', '_')
yield from check_levels_predictive(
value=value,
label=label,
metric_name=f'{metric}',
render_func=render_func,
levels=levels_upper,
boundaries=(0, None),
) if isinstance(levels_upper, dict) else check_levels(
value=value,
# label=label,
metric_name=f'{metric}',
render_func=render_func,
levels_upper=levels_upper,
levels_lower=level_lower,
# boundaries=(0, None),
)
def parse_vzlogger(string_table: StringTable) -> Dict[str, VzLoggerChannel]:
section = {}
data = json.loads(string_table[0][0])
channels = data['data']
for channel in channels:
value_last, value = channel['tuples'][0]
section[channel['uuid']] = VzLoggerChannel(
uuid=channel['uuid'],
interval=channel['interval'],
last=channel['last'],
protocol=channel['protocol'],
value=value,
value_last=value_last,
)
return section
def discovery_vzlogger(params: Mapping[str, any], section: Dict[str, VzLoggerChannel]) -> DiscoveryResult:
# add vzlogger channels with info from discovery rule vzlogger
for uuid, channel_type, item in params['channels']:
if uuid in section.keys():
yield Service(
item=f'{_vzlogger_channel_types.get(channel_type, uuid)} {item}',
parameters={
'channel_type': channel_type,
'uuid': uuid,
}
)
section.pop(uuid)
# add unknown vzlogger channels
for uuid in section.keys():
yield Service(
item=f'vzlogger {uuid}',
parameters={
'channel_type': 'unknown',
'uuid': uuid,
}
)
def check_vzlogger(item: str, params: Mapping[str, any], section: Dict[str, VzLoggerChannel]) -> CheckResult:
try:
channel = section[params['uuid']]
except KeyError:
yield Result(state=State.UNKNOWN, summary=f'Data not found in section')
return
if params['channel_type'] == 'voltage':
yield from _yield_channel_value(
value=channel.value,
label=item[8:],
metric=item,
render_func=_render_voltage,
levels_upper=params.get('levels_upper'),
level_lower=params.get('levels_lower'),
)
elif params['channel_type'] == 'electrical_power':
yield from _yield_channel_value(
value=channel.value,
label=item[17:],
metric=item,
render_func=_render_electrical_power,
levels_upper=params.get('levels_upper'),
level_lower=params.get('levels_lower'),
)
elif params['channel_type'] == 'meter_reading':
yield from _yield_channel_value(
value=(channel.value/1000),
label=item[14:],
metric=item,
render_func=_render_meter_reading,
levels_upper=params.get('levels_upper'),
level_lower=params.get('levels_lower'),
)
elif params['channel_type'] not in _vzlogger_channel_types.keys():
yield Result(
state=State.UNKNOWN,
summary=f'Unknown channel type (see details)',
details=f'Please configure a channel type and item name in discovery rules -> vzlogger.'
)
yield from _yield_channel_details(channel)
else:
yield from _yield_channel_details(channel)
yield Result(state=State.UNKNOWN, summary=f'not yet implemented')
register.agent_section(
name='vzlogger',
parse_function=parse_vzlogger,
)
register.check_plugin(
name='vzlogger',
service_name='%s',
discovery_function=discovery_vzlogger,
discovery_default_parameters={},
discovery_ruleset_name='discovery_vzlogger',
check_function=check_vzlogger,
check_default_parameters={},
check_ruleset_name='vzlogger',
)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-08-05
# File : vzlogger.py
#
# vzlogger
#
# 2023-08-05: initial relese
#
from cmk.special_agents.agent_vzlogger import main
if __name__ == '__main__':
main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-08-05
# File : agent_vzlogger.py
#
# vzlogger
#
# 2023-08-05: initial release
from typing import Mapping, Any
def agent_vzlogger_arguments(params: Mapping[str, Any], hostname: str, ipaddress: str):
args = []
if params.get('port'):
args += ['--port', params['port']]
if params.get('timeout'):
args += ['--timeout', params['timeout']]
if params.get('testing'):
args += ['--testing']
if params.get('use_ip_address'):
args += [ipaddress]
else:
args += [hostname]
return args
special_agent_info['vzlogger'] = agent_vzlogger_arguments
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-08-05
# File : vzlogger.py
#
# https://wiki.volkszaehler.org/software/controller/vzlogger
#
# 2023-08-05: initial release
from cmk.gui.i18n import _
from cmk.gui.valuespec import (
Dictionary,
ValueSpec,
Integer,
FixedValue,
)
from cmk.gui.plugins.wato.utils import (
HostRulespec,
rulespec_registry,
)
from cmk.gui.plugins.wato.special_agents.common import RulespecGroupDatasourceProgramsApps
def _valuespec_special_agent_vzlogger() -> ValueSpec:
return Dictionary(
title=_("vzlogger"),
elements=[
('port', Integer(title=_('Port'), default_value=8081, minvalue=1, maxvalue=65565)),
('timeout', Integer(title=_('Connection timeout'), default_value=5, minvalue=1, maxvalue=59)),
('use_ip_address', FixedValue(True, title=_('Use IP-Address instead of hostname'), totext='')),
('testing', FixedValue(True, title=_('Use test data only'), totext='')),
],
)
rulespec_registry.register(
HostRulespec(
group=RulespecGroupDatasourceProgramsApps,
name="special_agents:vzlogger",
valuespec=_valuespec_special_agent_vzlogger,
)
)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-08-05
# File : vzlogger.py
#
# vzlogger
#
#
# https://wiki.volkszaehler.org/software/controller/vzlogger
#
# 2023-08-05: initial release
from cmk.gui.i18n import _
from cmk.gui.valuespec import (
Dictionary,
ListOf,
TextUnicode,
Tuple,
DropdownChoice,
Integer,
TextAscii,
)
from cmk.gui.plugins.wato.utils import (
HostRulespec,
rulespec_registry,
RulespecGroupCheckParametersDiscovery,
CheckParameterRulespecWithItem,
RulespecGroupCheckParametersApplications,
Levels,
)
def _parameter_valuespec_vzlogger():
return Dictionary(
title=_('vzlogger'),
elements=[
('levels_upper',
Levels(
title=_('Levels (upper)'),
help=_('This sets the upper limits for the vzlogger channel.'),
unit=_(''),
)),
('levels_lower',
Tuple(
title=_('Levels lower'),
help=_('This sets the lower limits for the vzlogger channel. '
'This will only be used if "Levels (upper)" is not using Predictive Levels.'
),
elements=[
Integer(title=_('Warning blow'), minvalue=0, unit=_('')),
Integer(title=_('Critical below'), minvalue=0, unit=_('')),
]))
],
optional_keys=False,
)
rulespec_registry.register(
CheckParameterRulespecWithItem(
check_group_name="vzlogger",
group=RulespecGroupCheckParametersApplications,
parameter_valuespec=_parameter_valuespec_vzlogger,
item_spec=lambda: TextAscii(title=_('vzlogger item')),
))
def _parameter_valuespec_discovery_vzlogger():
return Dictionary(
title=_('vzlogger'),
elements=[
('channels',
ListOf(
Tuple(
orientation='horizontal',
elements=[
TextUnicode(
title=_('UUID'),
help=_(
'Value must (or better should) match the pattern '
'^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$'
'.'
),
allow_empty=False,
size=36,
),
DropdownChoice(
title=_('Channel type'),
help=_('Defines what type of counter the channel delivers.'),
choices=[
('meter_reading', 'Meter reading'),
('voltage', 'Voltage (V)'),
('electrical_power', 'Electrical Power (W)'),
],
sorted=True,
),
TextUnicode(
title=_('Service name (item)'),
help=_('Name of the service in CMK. Must be unique within the channel type.'),
allow_empty=False,
size=30,
),
]),
add_label=_('Add channel'),
movable=False,
title=_('Channels'),
)),
],
)
rulespec_registry.register(
HostRulespec(
group=RulespecGroupCheckParametersDiscovery,
match_type='dict',
name='discovery_vzlogger',
valuespec=_parameter_valuespec_discovery_vzlogger,
))
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-08-05
# File : vzlogger.py
#
# vzlogger
#
#
# https://wiki.volkszaehler.org/software/controller/vzlogger
#
# 2023-08-05: initial release
# from __future__ import annotations
import requests
import json
import argparse
from os import environ
# import logging
from collections.abc import Sequence
from cmk.special_agents.utils.agent_common import (
special_agent_main,
)
from cmk.special_agents.utils.argument_parsing import Args, create_default_argument_parser
def parse_arguments(argv: Sequence[str] | None) -> argparse.Namespace:
parser = create_default_argument_parser(description=__doc__)
parser.add_argument('hostname', type=str)
parser.add_argument('--port', type=int, default=8081)
parser.add_argument('--timeout', type=int, default=5)
parser.add_argument('--testing', const=True, required=False, action='store_const')
# parser.add_argument('--proxy', type=str)
return parser.parse_args(argv)
def agent_vzlogger_main(args: Args) -> int: # , retrurn=None
_vars = vars(args)
_base_dir = environ['OMD_ROOT']
vzlogger_file = f'{_base_dir}/vzlogger.json'
if _vars['testing']:
try:
with open(vzlogger_file, 'r') as file:
data = json.load(file)
except (FileNotFoundError, json.decoder.JSONDecodeError) as e:
print('Error retrieving data:', e)
exit()
else:
try:
response = requests.get(f'http://{_vars["hostname"]}:{_vars["port"]}/', timeout=_vars['timeout'])
response.raise_for_status()
except (requests.exceptions.RequestException, requests.exceptions.HTTPError) as e:
print('Error retrieving data:', e)
exit()
else:
data = response.json()
print('<<<vzlogger:sep(0)>>>')
print(json.dumps(data))
return 0
def main() -> int:
return special_agent_main(parse_arguments, agent_vzlogger_main)
{'author': 'Th.L. (thl-cmk[at]outlook[dot]com)',
'description': 'Special agent for vzlogger (prototype)\n'
'\n'
' https://wiki.volkszaehler.org/software/controller/vzlogger\n',
'download_url': 'https://thl-cmk.hopto.org',
'files': {'agent_based': ['vzlogger.py'],
'agents': ['special/agent_vzlogger'],
'checks': ['agent_vzlogger'],
'gui': ['wato/check_parameters/agent_vzlogger.py',
'wato/check_parameters/vzlogger.py'],
'lib': ['python3/cmk/special_agents/agent_vzlogger.py']},
'name': 'vzlogger',
'title': 'vzlogger',
'version': '0.0.1-230806',
'version.min_required': '2.2.0b1',
'version.packaged': '2.2.0p7',
'version.usable_until': None}
File added
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment