Collection of CheckMK checks (see https://checkmk.com/). All checks and plugins are provided as is. Absolutely no warranty. Send any comments to thl-cmk[at]outlook[dot]com

Skip to content
Snippets Groups Projects
Commit 177ac397 authored by thl-cmk's avatar thl-cmk :flag_na:
Browse files

update project

parent bbbc218b
No related branches found
No related tags found
No related merge requests found
No preview for this file type
......@@ -15,6 +15,8 @@ def check_traceroute_arguments(params):
args.append('-I')
elif method == 'tcp':
args.append('-T')
elif method == 'udp':
args.append('-U')
# else: None -> default method
for router, state in params['routers']:
......
......@@ -12,10 +12,11 @@
# - max hops
# - queries per hop
# - source address
# - source interface (needs root permissions).
# - source interface (needs root permissions)
# - method UDP (default is UDP lite)
#
# Source address and source interface uses the "shell=True" option in in
# the "subprocess.Popen" command. This is highly insecure, so be careful.
# Source address and source interface uses the 'shell=True' option in in
# the 'subprocess.Popen' command. This is highly insecure, so be careful.
# This check does a traceroute to the specified target host
# (usually $HOSTADDRESS$ itself) and checks which route(s) are
......@@ -81,41 +82,41 @@ class ExecutionError(Exception):
def parse_exception(exc):
exc = str(exc)
if exc[0] == "{":
exc = "%d - %s" % list(ast.literal_eval(exc).values())[0]
if exc[0] == '{':
exc = '%d - %s' % list(ast.literal_eval(exc).values())[0]
return str(exc)
def output_check_result(s, perfdata):
if perfdata:
perfdata_output_entries = ["%s=%s" % (p[0], ";".join(map(str, p[1:]))) for p in perfdata]
s += " | %s" % " ".join(perfdata_output_entries)
sys.stdout.write("%s\n" % s)
perfdata_output_entries = ['%s=%s' % (p[0], ';'.join(map(str, p[1:]))) for p in perfdata]
s += ' | %s' % ' '.join(perfdata_output_entries)
sys.stdout.write('%s\n' % s)
def option_to_state(c):
return {"w": 1, "c": 2}[c.lower()]
return {'w': 1, 'c': 2}[c.lower()]
def _execute_traceroute(target, nodns, method, address_family, queries, max_ttl, port, source_addr, source_int):
cmd = ["traceroute"]
cmd = ['traceroute']
if nodns:
cmd.append("-n")
cmd.append('-n')
if method:
cmd.append(method)
if address_family:
cmd.append(address_family)
if port and method != "-I":
cmd.append(f"-p {port}")
if port and method != '-I':
cmd.append(f'-p {port}')
if queries:
cmd.append(f"-q {queries}")
cmd.append(f'-q {queries}')
if max_ttl:
cmd.append(f"-m {max_ttl}")
cmd.append(f'-m {max_ttl}')
if source_int:
cmd.append(f"-i {source_int}")
cmd.append(f'-i {source_int}')
if source_addr:
cmd.append(f"-s {source_addr}")
cmd.append(f'-s {source_addr}')
cmd.append(target)
......@@ -125,10 +126,10 @@ def _execute_traceroute(target, nodns, method, address_family, queries, max_ttl,
else:
shell = False
p = subprocess.Popen(args=cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf8", shell=shell)
p = subprocess.Popen(args=cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf8', shell=shell)
sto, ste = p.communicate()
if p.returncode:
raise ExecutionError("UNKNOWN - " + ste.replace("\n", " "))
raise ExecutionError('UNKNOWN - ' + ste.replace('\n', ' '))
return sto
......@@ -140,7 +141,7 @@ def check_traceroute(lines, routes):
parts = line.strip().split()
for part in parts:
try:
part = part.lstrip("(").rstrip(",").rstrip(")")
part = part.lstrip('(').rstrip(',').rstrip(')')
ipaddress.ip_interface(part)
routers.add(part)
except ValueError:
......@@ -153,24 +154,24 @@ def check_traceroute(lines, routes):
s = option_to_state(option)
if option.islower() and route in routers:
state = max(state, s)
bad_routers.append("%s(%s)" % (route, "!" * s))
bad_routers.append('%s(%s)' % (route, '!' * s))
elif option.isupper() and route not in routers:
state = max(state, s)
missing_routers.append("%s(%s)" % (route, "!" * s))
missing_routers.append('%s(%s)' % (route, '!' * s))
info_text = "%d hops, missing routers: %s, bad routers: %s" % (
info_text = '%d hops, missing routers: %s, bad routers: %s' % (
hops,
missing_routers and ", ".join(missing_routers) or "none",
bad_routers and ", ".join(bad_routers) or "none",
missing_routers and ', '.join(missing_routers) or 'none',
bad_routers and ', '.join(bad_routers) or 'none',
)
perfdata = [("hops", hops)]
perfdata = [('hops', hops)]
return state, info_text, perfdata
def validate_ip_version(address_arg, ip_version_arg):
# ipv6 address can have an appended interface index/name: 'fe80::%{interface}'
try:
ip_address_version = ipaddress.ip_interface(address_arg.split("%")[0]).ip.version
ip_address_version = ipaddress.ip_interface(address_arg.split('%')[0]).ip.version
except ValueError:
# May be a host or DNS name, don't execute the validation in this case.
# check_traceroute will perform the name resolution for us.
......@@ -178,14 +179,13 @@ def validate_ip_version(address_arg, ip_version_arg):
if not ip_address_version == ip_version_arg:
raise ProtocolVersionError(
'IP protocol version "%s" not the same as the IP address version "%s".'
% (ip_version_arg, ip_address_version)
f'IP protocol version "{ip_version_arg}" not the same as the IP address version "{ip_address_version}".'
)
def usage():
sys.stdout.write(
"""check_traceroute -{c|w|C|W} ROUTE [-{o|c|w|O|C|W} ROUTE...] TARGET
'''check_traceroute -{c|w|C|W} ROUTE [-{o|c|w|O|C|W} ROUTE...] TARGET
Check by which routes TARGET is being reached. Each possible route is being
prefixed with a state option:
......@@ -202,6 +202,7 @@ Other options:
-n disable reverse DNS lookups
-I Use ICMP ECHO for probes
-T Use TCP SYN for probes
-U Uses UDP without increasing the port per each probe
-4 Use IPv4
-6 Use IPv6
-p port Set the destination port to use
......@@ -210,7 +211,7 @@ Other options:
-i device Specify a network interface to operate with
-q nqueries Set the number of probes per each hop. Default is 3
"""
'''
)
......@@ -218,7 +219,7 @@ def main(args=None):
if args is None:
args = sys.argv[1:]
os.unsetenv("LANG")
os.unsetenv('LANG')
opt_verbose = 0
opt_debug = False
......@@ -231,11 +232,11 @@ def main(args=None):
opt_queries = None
opt_max_ttl = None
short_options = "hw:W:c:C:i:s:p:q:m:nTI46"
short_options = 'hw:W:c:C:i:s:p:q:m:nTIU46'
long_options = [
"verbose",
"help",
"debug",
'verbose',
'help',
'debug',
]
route_params = []
......@@ -245,48 +246,48 @@ def main(args=None):
if len(args) < 1:
usage()
raise MissingValueError("Please specify the target destination.")
raise MissingValueError('Please specify the target destination.')
target_address = args[0]
# first parse modifers
for o, a in opts:
if o in ["-v", "--verbose"]:
if o in ['-v', '--verbose']:
opt_verbose += 1
elif o in ["-d", "--debug"]:
elif o in ['-d', '--debug']:
opt_debug = True
elif o in ["-w", "-W", "-c", "-C"]:
elif o in ['-w', '-W', '-c', '-C']:
route_params.append((o[1], a))
elif o == "-n":
elif o == '-n':
opt_nodns = True
elif o in ["-T", "-I"]:
elif o in ['-T', '-I', '-U']:
opt_method = o
elif o in ["-4", "-6"]:
elif o in ['-4', '-6']:
if opt_address_family:
raise ProtocolVersionError("Cannot use both IPv4 and IPv6")
validate_ip_version(target_address, int(o.lstrip("-")))
raise ProtocolVersionError('Cannot use both IPv4 and IPv6')
validate_ip_version(target_address, int(o.lstrip('-')))
opt_address_family = o
elif o in ["-s"]:
elif o in ['-s']:
opt_source_addr = a
elif o in ["-i"]:
elif o in ['-i']:
opt_source_int = a
elif o in ["-p"]:
elif o in ['-p']:
opt_port = a
elif o in ["-q"]:
elif o in ['-q']:
opt_queries = a
elif o in ["-m"]:
elif o in ['-m']:
opt_max_ttl = a
# now handle action options
for o, a in opts:
if o in ["-h", "--help"]:
if o in ['-h', '--help']:
usage()
sys.exit(0)
sto = _execute_traceroute(target_address, opt_nodns, opt_method, opt_address_family, opt_queries,
opt_max_ttl, opt_port, opt_source_addr, opt_source_int)
status, output, perfdata = check_traceroute(sto.split("\n"), route_params)
info_text = output.strip() + "\n%s" % sto
status, output, perfdata = check_traceroute(sto.split('\n'), route_params)
info_text = output.strip() + '\n%s' % sto
return status, info_text, perfdata
except ExecutionError as e:
......@@ -301,10 +302,10 @@ def main(args=None):
except Exception as e:
if opt_debug:
raise
return 2, "Unhandled exception: %s" % parse_exception(e), None
return 2, 'Unhandled exception: %s' % parse_exception(e), None
if __name__ == "__main__":
if __name__ == '__main__':
exitcode, info, perf = main()
output_check_result(info, perf)
sys.exit(exitcode)
......@@ -115,7 +115,8 @@ def _valuespec_active_checks_traceroute():
DropdownChoice(
title=_('Method of probing'),
choices=[
(None, _('UDP (default behaviour of traceroute)')),
(None, _('UDP lite (default behaviour of traceroute)')),
('udp', _('UDP (without increasing the port for each probe)')),
('icmp', _('ICMP Echo Request')),
('tcp', _('TCP SYN, needs root permissions')),
],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment