Collection of CheckMK checks (see https://checkmk.com/). All checks and plugins are provided as is. Absolutely no warranty. Send any comments to thl-cmk[at]outlook[dot]com

Skip to content
Snippets Groups Projects
Commit 21efdbae authored by thl-cmk's avatar thl-cmk :flag_na:
Browse files

update project

parent bc001c2f
No related branches found
No related tags found
No related merge requests found
No preview for this file type
...@@ -30,8 +30,6 @@ from typing import Optional, Sequence ...@@ -30,8 +30,6 @@ from typing import Optional, Sequence
from cmk.utils.paths import tmp_dir from cmk.utils.paths import tmp_dir
# import datetime as datetime
def parse_arguments(argv: Sequence[str]) -> argparse.Namespace: def parse_arguments(argv: Sequence[str]) -> argparse.Namespace:
''''Parse arguments needed to construct an URL and for connection conditions''' ''''Parse arguments needed to construct an URL and for connection conditions'''
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
...@@ -44,18 +42,11 @@ def parse_arguments(argv: Sequence[str]) -> argparse.Namespace: ...@@ -44,18 +42,11 @@ def parse_arguments(argv: Sequence[str]) -> argparse.Namespace:
return parser.parse_args(argv) return parser.parse_args(argv)
def print_check_header(): def connect_ssllabs_api(ssl_host_address: str, host_cache: str, args: argparse.Namespace, ):
print('<<<ssllabs_grade:sep(0)>>>')
def connect_ssllabs_api(ssl_host_address: str, host_cache: str,
host_state_cache: str, grade_cache: str, args: argparse.Namespace, ):
server = 'api.ssllabs.com' server = 'api.ssllabs.com'
uri = 'api/v3/analyze' uri = 'api/v3/analyze'
maxAge = args.maxage # default 167 (1 week minus 1 hour) maxAge = args.maxage # default 167 (1 week minus 1 hour)
publish = args.publish # default off publish = args.publish # default off
all = 'done'
startNew = 'off'
fromCache = 'on' fromCache = 'on'
now = time.time() now = time.time()
...@@ -66,7 +57,6 @@ def connect_ssllabs_api(ssl_host_address: str, host_cache: str, ...@@ -66,7 +57,6 @@ def connect_ssllabs_api(ssl_host_address: str, host_cache: str,
proxies = {'https': args.proxy.split('/')[-1]} # remove 'https://' from proxy string proxies = {'https': args.proxy.split('/')[-1]} # remove 'https://' from proxy string
try: try:
response = requests.get( response = requests.get(
url=url, url=url,
timeout=args.timeout, timeout=args.timeout,
...@@ -77,49 +67,32 @@ def connect_ssllabs_api(ssl_host_address: str, host_cache: str, ...@@ -77,49 +67,32 @@ def connect_ssllabs_api(ssl_host_address: str, host_cache: str,
except Exception as err: except Exception as err:
sys.stdout.write(f'{ssl_host_address} Connection error: {err} on {url}') sys.stdout.write(f'{ssl_host_address} Connection error: {err} on {url}')
print(f'{ssl_host_address};{err};{now};2') # print(f'{ssl_host_address};{err};{now};2')
return return
print(response.text)
try: try:
starttime = jsonData.get('startTime')
statusmessage = jsonData.get('statusMessage')
if jsonData['status'] == 'READY': if jsonData['status'] == 'READY':
grade = jsonData['endpoints'][0]['grade']
testtime = jsonData['testTime']
print(f'{ssl_host_address};{grade};{testtime};0;{grade}')
# if test finish and json data ok --> write data in cache file # if test finish and json data ok --> write data in cache file
with open(host_cache, 'w') as outfile: with open(host_cache, 'w') as outfile:
json.dump(jsonData, outfile) json.dump(jsonData, outfile)
with open(host_state_cache, 'w') as f:
f.write(grade)
elif jsonData['status'] == 'ERROR':
print(f'{ssl_host_address};{statusmessage};{starttime};2;{grade_cache}')
elif jsonData['status'] == 'IN_PROGRESS':
statusdetails = jsonData['endpoints'][0]['statusDetails']
print(f'{ssl_host_address};Testing, step: {statusdetails};{starttime};1;{grade_cache}')
elif jsonData['status'] == 'DNS':
print(f'{ssl_host_address};{statusmessage};{starttime};1;{grade_cache}')
else:
print(f'{ssl_host_address};status unknown;{now};3;{grade_cache}')
except (ValueError, KeyError, TypeError): except (ValueError, KeyError, TypeError):
print(f'{ssl_host_address};request JSON format error;{now};2;{grade_cache}') print(f'{ssl_host_address};request JSON format error;{now};2') # ;{grade_cache}
def read_cache(ssl_host_address, host_cache, grade_cache): def read_cache(host_cache: str):
now = time.time()
# read cache file # read cache file
with open(host_cache) as json_file: with open(host_cache) as json_file:
# check if cache file contains valid json data # check if cache file contains valid json data
try: try:
jsonData = json.load(json_file) jsonData = json.load(json_file)
if jsonData['status'] == 'READY': if jsonData['status'] == 'READY':
grade = jsonData['endpoints'][0]['grade'] print(json.dumps(jsonData))
testtime = jsonData['testTime']
print(f'{ssl_host_address};{grade};{testtime};0;{grade}')
except (ValueError, KeyError, TypeError): except (ValueError, KeyError, TypeError):
print(f'{ssl_host_address};cache JSON format error;{now};2;{grade_cache}') # print(f'{ssl_host_address};cache JSON format error;{now};2;{grade_cache}')
return
def main(argv: Optional[Sequence[str]] = None) -> None: def main(argv: Optional[Sequence[str]] = None) -> None:
...@@ -135,50 +108,36 @@ def main(argv: Optional[Sequence[str]] = None) -> None: ...@@ -135,50 +108,36 @@ def main(argv: Optional[Sequence[str]] = None) -> None:
sys.stdout.write('<<<check_mk>>>\n') sys.stdout.write('<<<check_mk>>>\n')
sys.stdout.write('Version: %s\n' % VERSION) sys.stdout.write('Version: %s\n' % VERSION)
sys.stdout.write('AgentOS: linux\n') sys.stdout.write('AgentOS: linux\n')
# sys.stdout.write('<<<systemtime>>>\n')
# sys.stdout.write(time.strftime('%s\n'))
# create cache directory, if not exists # create cache directory, if not exists
if not os.path.exists(cache_dir): if not os.path.exists(cache_dir):
os.makedirs(cache_dir) os.makedirs(cache_dir)
print_check_header() print('<<<ssllabs_grade:sep(0)>>>')
for ssl_host_address in ssl_hosts: for ssl_host_address in ssl_hosts:
# changed cache file form host_address to ssl_host_address # changed cache file form host_address to ssl_host_address
host_cache = '%s/%s' % (cache_dir, ssl_host_address) host_cache = '%s/%s' % (cache_dir, ssl_host_address)
host_state_cache = '%s/%s_state' % (cache_dir, ssl_host_address)
# state cache form cache file
if os.path.exists(host_state_cache):
with open(host_state_cache) as cache_file:
grade_cache = cache_file.read()
else:
grade_cache = 'A'
# check if cache file exists and is not older as cache_age # check if cache file exists and is not older as cache_age
if not os.path.exists(host_cache): if not os.path.exists(host_cache):
connect_ssllabs_api( connect_ssllabs_api(
ssl_host_address=ssl_host_address, ssl_host_address=ssl_host_address,
host_cache=host_cache, host_cache=host_cache,
host_state_cache=host_state_cache,
grade_cache=grade_cache,
args=args) args=args)
else: else:
json_cache_file_age = os.path.getmtime(host_cache) json_cache_file_age = os.path.getmtime(host_cache)
cache_age_sec = now - json_cache_file_age cache_age_sec = now - json_cache_file_age
if cache_age_sec < cache_age: if cache_age_sec < cache_age:
read_cache( read_cache(
ssl_host_address=ssl_host_address,
host_cache=host_cache, host_cache=host_cache,
grade_cache=grade_cache) )
else: else:
connect_ssllabs_api( connect_ssllabs_api(
ssl_host_address=ssl_host_address, ssl_host_address=ssl_host_address,
host_cache=host_cache, host_cache=host_cache,
host_state_cache=host_state_cache, args=args
grade_cache=grade_cache, )
args=args)
if __name__ == '__main__': if __name__ == '__main__':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment