Collection of CheckMK checks (see https://checkmk.com/). All checks and plugins are provided as is. Absolutely no warranty. Send any comments to thl-cmk[at]outlook[dot]com

Skip to content
Snippets Groups Projects
Commit 18ef13ea authored by thl-cmk's avatar thl-cmk :flag_na:
Browse files

update project

parent e24649bc
No related branches found
No related tags found
No related merge requests found
Showing
with 1523 additions and 0 deletions
File moved
File moved
File moved
File moved
No preview for this file type
File moved
File moved
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2018-01-25
#
# add on for cisco support api. calls cisco bug api 2.0.
# for more information see: https://developer.cisco.com/docs/support-apis/#bug
#
# 2021-07-24: rewrite for python3.8
#
import os
import json
from cisco_live_cycle_utils import (
configure_logger,
log_message,
expand_path,
get_ids_from_dir,
get_subdirs_from_dir,
remove_empty_sub_dirs,
sleep_random,
move_dir,
)
from ciscoapi import (
AccessToken,
Settings,
get_bug_by_pid_and_release,
)
def main():
settings = Settings()
access_token = AccessToken(settings.client_id, settings.client_secret, settings.proxies)
configure_logger(log_level=settings.log_level)
bug_path = settings.base_path + '/bug'
path_found = expand_path(bug_path + '/found')
path_not_found = expand_path(bug_path + '/not_found')
path_request = expand_path(bug_path + '/request')
path_missing = expand_path(bug_path + '/missing')
pids_requested = {}
pids_refresh = {}
# get list of bug reports to refresh
pids = get_subdirs_from_dir(path_found)
for pid in pids:
pids_refresh[pid.replace('_', '/')] = ','.join(get_ids_from_dir(path_found+pid,
refresh_time=settings.bug_refresh_found))
# move not_found bug reports older then 'refresh_notfound' days to request (for refresh)
move_dir(path_not_found, path_request, refresh_time=settings.bug_refresh_not_found)
# get list of PIDs requests
pids = get_subdirs_from_dir(path_request)
log_message(f'bug requests (PIDs): {pids}')
for pid in pids:
pids_requested[pid.replace('_', '/')] = ','.join(get_ids_from_dir(path_request+pid))
log_message(f'bug requests (PIDs and releases): {pids_requested}')
if len(pids_refresh.keys()) > 0 or len(pids_requested.keys()) > 0:
reqoptions = []
# modified_date
# 1 = Last Week
# 2 = Last 30 Days(default)
# 3 = Last 6 Months
# 4 = Last Year
# 5 = All
reqoptions.append('modified_date=1')
# reqoptions.append('modified_date=3') # for testing
# severity (default is all)
# 1 = Severity 1
# 2 = Severity 2
# 3 = Severity 3
# 4 = Severity 4
# 5 = Severity 5
# 6 = Severity 6
# reqoptions.append('severity=2')
# status (default is all)
# O = Open
# F = Fixed
# T = Terminated
# reqoptions.append('status=O')
# sort_by
# status
# modified_date (recent first, default)
# severity
# support_case_count
# modified_date_earliest (earliest first)
# reqoptions.append('sort_by=severity')
# reqoptions.append('page_index=2')
if len(reqoptions) > 0:
reqoptions = '?' + '&'.join(reqoptions)
else:
reqoptions = ''
# wait random time after startup (load spread)
if settings.wait_after_start:
sleep_random(settings.max_wait_time)
# first refresh bug reports
for pid in pids_refresh.keys():
# get bug records for time frame
bug_records = get_bug_by_pid_and_release(
pid,
pids_refresh.get(pid),
access_token,
reqoptions,
settings=settings
)
for entry in bug_records:
pid = entry.get('pid')
software_releases = entry.get('software_releases')
status_code = int(entry.get('status_code', 200))
log_message(f'bug return:PID: {pid}, Status: {status_code}, Version: {software_releases}')
path = expand_path(path_found + pid.replace('/', '_'))
# check if there were was an error, if so go to next pid
if status_code == 200:
bugs = entry.get('bugs', None)
if bugs: # if new/changed bugs found
for bug in bugs:
bug.pop('description') # remove description
for release in software_releases.keys(): # create one bug list per software release
bug_release = software_releases.get(release)
log_message(f'bug found:PID: {pid}, Releases: {release}')
new_bugs = []
for bug in bugs:
if bug_release in bug.get('known_affected_releases'):
new_bugs.append(bug)
# open found file
with open(path + release) as f:
try:
bug_record = json.load(f)
except ValueError as e:
log_message(f'{pid}:{release}:snmp_cisco_bug:bug_found:JSON load error: {e}',
level='WARNING')
found_bugs = bug_record.get('bugs')
for found_bug in found_bugs:
for new_bug in new_bugs:
if found_bug.get('bug_id') == new_bug.get('bug_id'):
found_bug.update(new_bug) # update old bug with new data (changed bugs)
new_bugs.remove(new_bug)
for bug in new_bugs:
found_bugs.append(bug) # add new bugs
bug_record['bugs'] = found_bugs # replace bug list with new list
bug_record['total_records'] = len(found_bugs) # change number of records
with open(path + release, 'w') as f:
json.dump(bug_record, f) # write updated bug report
else:
for release in software_releases.keys():
if os.path.exists(path + release):
os.utime(path + release, None)
reqoptions = []
# modified_date
# 1 = Last Week
# 2 = Last 30 Days(default)
# 3 = Last 6 Months
# 4 = Last Year
# 5 = All
reqoptions.append('modified_date=5')
# reqoptions.append('modified_date=2') # for testing
# severity (default is all)
# 1 = Severity 1
# 2 = Severity 2
# 3 = Severity 3
# 4 = Severity 4
# 5 = Severity 5
# 6 = Severity 6
# reqoptions.append('severity=2')
# status (default is all)
# O = Open
# F = Fixed
# T = Terminated
# reqoptions.append('status=O')
# sort_by
# status
# modified_date (recent first, default)
# severity
# support_case_count
# modified_date_earliest (earliest first)
# reqoptions.append('sort_by=severity')
# reqoptions.append('page_index=2')
if len(reqoptions) > 0:
reqoptions = '?' + '&'.join(reqoptions)
else:
reqoptions = ''
for pid in pids_requested.keys():
bug_records = get_bug_by_pid_and_release(
pid,
pids_requested.get(pid),
access_token,
reqoptions,
settings=settings
)
for entry in bug_records:
pid = entry.get('pid')
software_releases = entry.get('software_releases')
status_code = int(entry.get('status_code', 200))
log_message(f'bug return:PID: {pid}, Status: {status_code}, Version: {software_releases}')
# check if there where was an error, if so go to next pid
if status_code == 200:
bugs = entry.get('bugs', None)
if bugs:
for bug in bugs:
bug.pop('description')
for release in software_releases.keys(): # create one bug list per software release
bug_release = software_releases.get(release)
if bugs:
log_message(f'bug found:PID: %s{pid}, Releases: {release}')
missing = entry.get('missing', {})
# split bugs by release
release_bugs = {
'pid': pid,
'software_release': release,
'bugs': [],
'missing': missing
}
for bug in bugs:
if bug_release in bug.get('known_affected_releases'):
release_bugs['bugs'].append(bug)
release_bugs['total_records'] = len(release_bugs['bugs'])
path = expand_path(path_found + pid.replace('/', '_'))
with open(path + release, 'w') as f:
json.dump(release_bugs, f)
if len(missing.keys()) != 0:
path = expand_path(path_missing + pid.replace('/', '_'))
with open(path + release, 'w') as f:
json.dump(missing, f)
else:
log_message(f'bug not found:PID: {pid}, Version: {release}')
path = expand_path(path_not_found + pid.replace('/', '_'))
log_message(f'not found: {entry}')
with open(path + release, 'w') as f:
json.dump(entry, f)
pass
# remove request file
try:
log_message(f'bug delete request:PID: {pid}, Version: {release}')
os.remove(path_request + pid.replace('/', '_') + '/' + release)
except OSError:
pass
# clean up (remove empty directories)
remove_empty_sub_dirs(path_request)
remove_empty_sub_dirs(path_found)
remove_empty_sub_dirs(path_not_found)
remove_empty_sub_dirs(path_missing)
main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
#
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2017-05-15
#
# https://developer.cisco.com/docs/support-apis/
#
# 2021-07-23: rewrite for python 3.8
#
import json
from cisco_live_cycle_utils import (
configure_logger,
log_message,
expand_path,
get_ids_from_dir,
remove_ids_from_list,
refresh_ids_from_dir,
remove_ids_from_dir,
sleep_random,
)
from ciscoapi import (
AccessToken,
Settings,
get_eox_by_pid,
get_eox_by_serials,
)
# split pids in known and unknown eox state
def split_pids(eoxr):
eox_known = []
eox_unkown = []
for response in eoxr:
EOXRecord = response.get('EOXRecord')
# PaginationResponseRecord = response.get('PaginationResponseRecord')
# PageIndex = PaginationResponseRecord.get('PageIndex')
# LastIndex = PaginationResponseRecord.get('LastIndex')
# PageRecords = PaginationResponseRecord.get('PageRecords')
# TotalRecords = PaginationResponseRecord.get('TotalRecords')
if EOXRecord is not None:
for PID in EOXRecord:
if PID.get('EOLProductID') != '':
eox_known.append(PID)
log_message(message=f'EOLProductID : {PID.get("EOLProductID")}')
else:
eox_unkown.append(PID)
log_message(f'EOXInputValue : {PID.get("EOXInputValue")}')
return {'eox_known': eox_known, 'eox_unknown': eox_unkown}
# split serials, expects a list of EoX Records from get EoX by serial
def split_serials(eoxr):
serials = []
for PID in eoxr:
EOLProductID = PID.get('EOLProductID')
log_message(f'serial split Eox: found PID: {EOLProductID}')
EOXInputValue = PID.get('EOXInputValue').split(',')
for serial in EOXInputValue:
log_message(f'found Serial: {serial}')
eox_serial = PID.copy()
eox_serial.update({'EOXInputValue': serial})
serials.append(eox_serial)
log_message(f'Serial EoX: {eox_serial}')
return serials
# save EoX records to file by PID,
# expects a list of EoX Records from Cisco EoX API 5.0,
# returns a list of saved PIDs
def save_eox(eox, path):
saved = []
for PID in eox:
EOLProductID = PID.get('EOLProductID')
# EOLProductID is empty for EoX Records with unknown EoX state (error or not announced)
if EOLProductID == '':
EOLProductID = PID.get('EOXInputValue')
if EOLProductID:
with open(path + (EOLProductID.replace('/', '_')), 'w') as f:
json.dump(PID, f)
saved.append(EOLProductID)
return saved
def save_serials(eox, path):
"""
Saves EoX records to file by serial number.
Args:
eox: List of EoX Records from Cisco EoX API 5.0
path: file path where to save the EoX records
Returns: List of serial numbers of saved EoX records
"""
saved = []
for serial in eox:
EOXInputValue = serial.get('EOXInputValue')
if EOXInputValue:
with open(path + EOXInputValue, 'w') as f:
json.dump(serial, f)
saved.append(EOXInputValue)
return saved
def main():
settings = Settings()
access_token = AccessToken(settings.client_id, settings.client_secret, settings.proxies)
configure_logger(log_level=settings.log_level)
eox_path = settings.base_path + '/EoX'
path_found = eox_path + '/found'
path_not_found = eox_path + '/not_found'
path_request = eox_path + '/request'
path_request_pid = expand_path(path_request + '/pid')
path_found_pid = expand_path(path_found + '/pid')
path_not_found_pid = expand_path(path_not_found + '/pid')
path_request_ser = expand_path(path_request + '/ser')
path_found_ser = expand_path(path_found + '/ser')
path_not_found_ser = expand_path(path_not_found + '/ser')
# create list of PIDs to request EoX status for
pids = get_ids_from_dir(path_request_pid)
log_message(f'pid requests : {pids}')
# remove already known PIDs from list
pids = remove_ids_from_list(pids, path_found_pid)
log_message(f'pid requests : {pids}')
# remove PIDs already requested with unknown EoX status from list
pids = remove_ids_from_list(pids, path_not_found_pid)
log_message(f'pid requests : {pids}')
# refresh PIDs after 30 days by default
pids = refresh_ids_from_dir(path_not_found_pid, settings.eox_refresh_unknown, pids, True)
log_message(f'pid requests : {pids}')
pids = refresh_ids_from_dir(path_found_pid, settings.eox_refresh_known, pids, False)
log_message(f'pid requests : {pids}')
# create list of serial numbers to request EoX status for
serials = get_ids_from_dir(path_request_ser)
log_message(f'ser requests : {serials}')
# remove already known serials from list
serials = remove_ids_from_list(serials, path_found_ser)
log_message(f'ser requests : {serials}')
# remove serials already requested with unknown EoX status from list
serials = remove_ids_from_list(serials, path_not_found_ser)
log_message(f'ser requests : {serials}')
# refresh serials after 30 days by default
serials = refresh_ids_from_dir(path_not_found_ser, settings.eox_refresh_unknown, serials, True)
log_message(f'ser requests : {serials}')
serials = refresh_ids_from_dir(path_found_ser, settings.eox_refresh_known, serials, False)
log_message(f'ser requests : {serials}')
if pids == [] and serials == []:
log_message('all list are empty. Do nothing.')
return
# wait random time after startup (load spread)
if settings.wait_after_start:
sleep_random(settings.max_wait_time)
if pids is not []:
eox = get_eox_by_pid(pids=pids, access_token=access_token, settings=settings)
# split eox records in a list of known and unknown pid records
eox = split_pids(eox)
# save known pid reports
pids = save_eox(eox.get('eox_known'), path_found_pid)
# delete requests for known pids
remove_ids_from_dir(pids, path_request_pid)
# save unknown pid reports
pids = save_eox(eox.get('eox_unknown'), path_not_found_pid)
# delete requests for unknown pids
remove_ids_from_dir(pids, path_request_pid)
# delete pids from known were the status changed to unknown
remove_ids_from_dir(pids, path_found_pid)
if serials is not []:
eox = get_eox_by_serials(serials=serials, access_token=access_token, settings=settings)
log_message(f'eox by ser: {eox}')
# split eox records in a list of known and unknown pid records
eox = split_pids(eox)
# split EoX records for known PIDs in one entry per serial
serials = split_serials(eox.get('eox_known'))
# save EoX records for serials with known EoX state
serials = save_serials(serials, path_found_ser)
log_message(f'EoX Serials: known: {serials}')
# delete requests for known serials
remove_ids_from_dir(serials, path_request_ser)
# split EoX records for unknown PIDs in one entry per serial
serials = split_serials(eox.get('eox_unknown'))
# save EoX records for serials with known EoX state
serials = save_serials(serials, path_not_found_ser)
# delete requests for unknown serials
remove_ids_from_dir(serials, path_request_ser)
# delete serials from known were the status changed to unknown
remove_ids_from_dir(serials, path_found_ser)
main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2017-07-10
#
# https://developer.cisco.com/docs/support-apis/
#
# 2018-06-06: fixed handling if state changes form found to not_found (delete old psirt found file)
# 2021-07-24: rewritten for python 3.8
#
#
import ntpath
import os
import json
from typing import List
from dataclasses import dataclass
from cisco_live_cycle_utils import (
configure_logger,
log_message,
get_ids_from_dir,
remove_ids_from_list,
refresh_ids_from_dir,
sleep_random,
expand_path,
)
from ciscoapi import (
AccessToken,
Settings,
get_psirt_by_product_family,
get_psirt_by_iosxe_version,
get_psirt_by_ios_version,
)
@dataclass
class Paths:
found: str
not_found: str
request: str
@dataclass
class Refresh:
found: int
not_found: int
g_logger = None
def _psirt_remove_id_file(psirt_path: str, psirt_id: str):
# delete psirt file
try:
log_message(f'delete psirt id file : {psirt_path + psirt_id}')
os.remove(psirt_path + psirt_id)
except OSError:
pass
def _psirt_dump_record(psirt_record, psirt_id: str, psirt_path: str, psirt_path_request: str):
with open(psirt_path + psirt_id, 'w') as f:
json.dump(psirt_record, f)
# delete request file
_psirt_remove_id_file(psirt_path_request, psirt_id)
return
def _check_psirt_record(psirt_record, psirt_id, psirt_path_found, psirt_path_request):
"""
:param psirt_record:
:param psirt_id:
:param psirt_path_found:
:param psirt_path_request:
:returns:
"""
for advisory in psirt_record.get('advisories'):
# remove unwanted information from advisories
advisory.pop('productNames', None)
advisory.pop('ipsSignatures', None)
advisory.pop('iosRelease', None)
advisory.pop('cvrfUrl', None)
advisory.pop('ovalUrl', None)
advisory.pop('summary', None)
temp_advisory = advisory.copy()
for key in temp_advisory.keys():
if advisory.get(key, None) in [['NA'], 'NA']:
advisory.pop(key, None)
_psirt_dump_record(psirt_record, psirt_id, psirt_path_found, psirt_path_request)
return
def _get_psirt_id_list(product_family: str, paths: Paths, refresh: Refresh) -> List[str]:
"""
@param product_family:
@param paths: Path object with path to founnd/not found/requested PSIRT records
@param refresh: Refresh object with number of days before a PSIRT record needs to be refreshed for found/not found
@return: list of PIDs
"""
# create list of ID's to request PSIRT status for
psirt_id_list = get_ids_from_dir(paths.request + product_family)
log_message(f'psirt requests : {psirt_id_list}')
# remove already found ID's from list
psirt_id_list = remove_ids_from_list(psirt_id_list, paths.found + product_family)
log_message(f'psirt requests : {psirt_id_list}')
# remove not found ID's from list
psirt_id_list = remove_ids_from_list(psirt_id_list, paths.not_found + product_family)
log_message(f'psirt requests : {psirt_id_list}')
# refresh psirt after 1 day by default
psirt_id_list = refresh_ids_from_dir(paths.not_found + product_family, refresh.not_found, psirt_id_list, True)
log_message(f'psirt requests : {psirt_id_list}')
psirt_id_list = refresh_ids_from_dir(paths.found + product_family, refresh.found, psirt_id_list, False)
log_message(f'psirt requests : {psirt_id_list}')
return psirt_id_list
def _update_psirt_id(psirt_records: list, family_name: str, paths: Paths):
for psirt_record in psirt_records:
if family_name in ['IOS', 'IOS-XE']:
psirt_id = psirt_record.get('version')
else:
psirt_id = psirt_record.get('family')
if psirt_record.get('advisories') != 'notfound':
_check_psirt_record(psirt_record, psirt_id, paths.found + family_name + '/',
ntpath.sep + family_name + '/')
else:
_psirt_dump_record(psirt_record, psirt_id, paths.not_found + family_name + '/',
paths.request + family_name + '/')
# remove psirt_found file (happens when product family is removed form bug ID)
_psirt_remove_id_file(paths.found + family_name + '/', psirt_id)
return
def main():
settings = Settings()
access_token = AccessToken(settings.client_id, settings.client_secret, settings.proxies)
configure_logger(log_level=settings.log_level)
refresh = Refresh(
found=settings.psirt_refresh_found,
not_found=settings.psirt_refresh_not_found
)
psirt_dir = expand_path(settings.base_path + '/psirt')
paths = Paths(
found=psirt_dir + '/found/',
not_found=psirt_dir + '/not_found/',
request=psirt_dir + '/request/'
)
psirt_ios = _get_psirt_id_list('IOS', paths, refresh)
psirt_ios_xe = _get_psirt_id_list('IOS-XE', paths, refresh)
psirt_family = _get_psirt_id_list('family', paths, refresh)
if (psirt_ios == []) and psirt_ios_xe == [] and psirt_family == []:
log_message('all list are empty. Do nothing.')
return
# wait random time after startup
if settings.wait_after_start:
sleep_random(settings.max_wait_time)
if psirt_family is not []:
psirt_records = get_psirt_by_product_family(psirt_family, access_token, settings=settings)
_update_psirt_id(psirt_records, 'family', paths)
if psirt_ios_xe is not []:
psirt_records = get_psirt_by_iosxe_version(psirt_ios_xe, access_token, settings=settings)
_update_psirt_id(psirt_records, 'IOS-XE', paths)
if psirt_ios is not []:
psirt_records = get_psirt_by_ios_version(psirt_ios, access_token, settings=settings)
_update_psirt_id(psirt_records, 'IOS', paths)
main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
#
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2017-04-15:
#
# Cisco SN2INFO API Framework
#
# https://developer.cisco.com/docs/support-apis/#serial-number-to-information
#
# 2021-07-23: rewrite for python 3.8
#
import json
from cisco_live_cycle_utils import (
configure_logger,
log_message,
expand_path,
get_ids_from_dir,
remove_ids_from_list,
refresh_ids_from_dir,
remove_ids_from_dir,
sleep_random,
)
from ciscoapi import (
AccessToken,
Settings,
get_coverage_summary_by_serials,
)
def sn2info_split_covered(sn2info_records):
sn2info_covered = []
sn2info_notcovered = []
for response in sn2info_records:
sn2inforecord = response.get('serial_numbers')
# PaginationResponseRecord = response.get('PaginationResponseRecord')
# PageIndex = PaginationResponseRecord.get('PageIndex')
# LastIndex = PaginationResponseRecord.get('LastIndex')
# PageRecords = PaginationResponseRecord.get('PageRecords')
# TotalRecords = PaginationResponseRecord.get('TotalRecords')
if sn2inforecord is not None:
for serial in sn2inforecord:
if serial.get('is_covered') == 'YES':
sn2info_covered.append(serial)
log_message(f'SN2INFO covered : {serial.get("sr_no")}')
else:
sn2info_notcovered.append(serial)
log_message(f'SN2INFO not covered : {serial.get("sr_no")}')
return {'sn2info_covered': sn2info_covered, 'sn2info_notcovered': sn2info_notcovered}
def sn2info_save_serials(sn2infos, path):
saved = []
for sn2info in sn2infos:
serial = str(sn2info.get('sr_no'))
if serial:
with open(path + serial, 'w') as f:
json.dump(sn2info, f)
saved.append(serial)
return saved
def main():
settings = Settings()
access_token = AccessToken(settings.client_id, settings.client_secret, settings.proxies)
configure_logger(log_level=settings.log_level)
sn2info_dir = settings.base_path + '/sn2info'
path_found = expand_path(sn2info_dir + '/found/')
path_not_found = expand_path(sn2info_dir + '/not_found/')
path_request = expand_path(sn2info_dir + '/request/')
# create list of serial numbers to request SN2INFO status for
sn2info = get_ids_from_dir(path_request)
log_message(f'sn2info requests : {sn2info}')
# remove covered serials from list
sn2info = remove_ids_from_list(sn2info, path_found)
log_message(f'sn2info requests : {sn2info}')
# remove not covered serials from list
sn2info = remove_ids_from_list(sn2info, path_not_found)
log_message(f'sn2info requests : {sn2info}')
# refresh sn2info serials after 31 days by default
sn2info = refresh_ids_from_dir(path_not_found, settings.sn2info_refresh_not_covered, sn2info, True)
log_message(f'sn2info requests : {sn2info}')
sn2info = refresh_ids_from_dir(path_found, settings.sn2info_refresh_covered, sn2info, False)
log_message(f'sn2info requests : {sn2info}')
if sn2info is []:
log_message('all list are empty. Do nothing.')
return
# wait random time after startup
if settings.wait_after_start:
sleep_random(settings.max_wait_time)
if sn2info is not []:
sn2info_records = get_coverage_summary_by_serials(
serials=sn2info,
access_token=access_token,
settings=settings
)
log_message(f'sn2info response: {sn2info_records}')
sn2info = sn2info_split_covered(sn2info_records)
serials = sn2info_save_serials(sn2info.get('sn2info_covered'), path_found)
remove_ids_from_dir(serials, path_request)
serials = sn2info_save_serials(sn2info.get('sn2info_notcovered'), path_not_found)
remove_ids_from_dir(serials, path_request)
# delete serials from covered were the status changed to uncovered
remove_ids_from_dir(serials, path_found)
main()
#!/usr/bin/env python
# -*- encoding: utf-8; py-indent-offset: 4 -*-
#
# 15.04.2017 : Th.L. : Support for Cisco API
#
# https://developer.cisco.com/docs/support-apis/
#
import logging
import os
import time
import random
import sys
def configure_logger(_path: str = '', _log_to_console: bool = True, log_level: str = 'INFO'):
log_formatter = logging.Formatter('%(asctime)s :: %(levelname)s :: %(name)s :: %(module)s ::%(message)s')
log = logging.getLogger('root')
numeric_level = getattr(logging, log_level.upper(), None)
if isinstance(numeric_level, int):
logging.getLogger().setLevel(numeric_level)
else:
logging.getLogger().setLevel(logging.WARNING)
log_handler_console = logging.StreamHandler(sys.stdout)
log_handler_console.setFormatter(log_formatter)
log_handler_console.setLevel(logging.INFO)
log.addHandler(log_handler_console)
def log_message(message: str, level: str = 'DEBUG'):
log = logging.getLogger()
if level.upper() == 'CRITICAL':
log.critical(message)
elif level.upper() == 'ERROR':
log.error(message)
elif level.upper() == 'WARNING':
log.warning(message)
elif level.upper() == 'INFO':
log.info(message)
elif level.upper() == 'DEBUG':
log.debug(message)
else:
log.warning(f'unknown log_level: {level}')
def sleep_random(max_minutes):
sleep_time = random.randint(1, 60 * max_minutes)
log_message(message=f'{sleep_time} seconds', level='INFO')
time.sleep(sleep_time)
return
# read list of files from dir (eq. (P)IDs or SERIALs) (don't change to uppercase)
def get_ids_from_dir(directory, refresh_time: int = 0):
refresh_time = refresh_time * 86400
start_time = int(time.time())
ids = []
for (dir_path, dir_names, file_names) in os.walk(directory):
for entry in file_names:
modify_time = int(os.path.getmtime(dir_path + '/' + entry))
if (start_time - modify_time) > refresh_time:
ids.append(str(entry).replace('_', '/'))
# do not read subdirs
break
# insert cleanup here (filter unwanted names, chars, etc...)
return ids
# read list of subdirectories from directory (PIDs) (don't anything)
def get_subdirs_from_dir(base_dir):
sub_dirs = []
for (dir_path, sub_dirs, filenames) in os.walk(base_dir):
break
# insert cleanup here (filter unwanted names, chars, etc...)
return sub_dirs
# read list of IOS/IOSXE Versions from directory (don't change to uppercase)
def get_version_from_dir(directory):
versions = []
for (dir_path, dir_names, file_names) in os.walk(directory):
for entry in file_names:
versions.append(str(entry))
# do not read subdirs
break
# insert cleanup here (filter unwanted names, chars, etc...)
return versions
# delete (P)IDs or SERIALs files from directory (requests)
def remove_ids_from_dir(ids, directory):
for entry in ids:
try:
os.remove(directory + entry.replace('/', '_'))
except OSError:
pass
# remove (P)IDs or SERIALs from list of (P)ID or serials
def remove_ids_from_list(ids, directory):
known_ids = []
for (dir_path, dir_names, file_names) in os.walk(directory):
known_ids.extend(file_names)
# do not read subdirs
break
for known_id in known_ids:
known_id = known_id.replace('_', '/')
for entry in ids:
if known_id == entry:
ids.remove(entry)
return ids
# returns al list of ids to refresh,
# expects a directory with ids to check, the time interval, a list of IDs to add
# if remove True it will delete the ID files from refresh_dir
def refresh_ids_from_dir(refresh_dir, refresh_time, ids, remove):
refresh_dir = expand_path(refresh_dir)
# get seconds from # of days (days * 24 * 60 * 60 --> days * 86400)
refresh_time = int(refresh_time) * 86400
start_time = int(time.time())
refresh_ids = get_ids_from_dir(refresh_dir)
if refresh_ids is not []:
for entry in refresh_ids:
modify_time = int(os.path.getmtime(refresh_dir + entry.replace('/', '_')))
if (start_time - modify_time) > refresh_time:
ids.append(entry)
if remove:
try:
os.remove(refresh_dir + entry.replace('/', '_'))
except OSError:
pass
return ids
# check if dir exists, if not try to create it.
# return True if dir exists or creation was ok.
# return False if dir not exists and creation was not ok
def check_dir_and_create(directory):
directory = os.path.dirname(directory)
if not os.path.exists(directory):
try:
os.makedirs(directory)
except:
return False
return True
# expand homedir and add '/' if necessary and create directory if it not exists
def expand_path(path):
homedir = os.path.expanduser('~')
if path.startswith('~'):
path = homedir + path[1:]
if not path.endswith('/'):
path += '/'
if not check_dir_and_create(path):
return ''
return path
# remove empty directories
def remove_empty_sub_dirs(base_dir):
subdirs = get_subdirs_from_dir(base_dir)
for subdir in subdirs:
try:
os.rmdir(base_dir + subdir)
except OSError as e:
log_message(f'can not delete: {base_dir}, Error:{e}')
pass
# move contents of source_dir to destination_dir
# only one level deep, lave source_dir
def move_dir(source_dir, destination_dir, **kwargs):
refresh_time = int(kwargs.get('refresh_time', 0)) * 86400
starttime = int(time.time())
sub_dirs = get_subdirs_from_dir(source_dir)
for sub_dir in sub_dirs:
files = get_ids_from_dir(source_dir + sub_dir)
if len(files) > 0:
source_path = expand_path(source_dir + sub_dir)
destination_path = expand_path(destination_dir + sub_dir)
for file in files:
source_file = source_path + file
destination_file = destination_path + file
modify_time = int(os.path.getmtime(source_file))
if (starttime - modify_time) > refresh_time:
try:
os.rename(source_file, destination_file) # rename (move) contents of not_found to request
except OSError as e:
log_message(message=f'error:{e}, source: {source_file}, destionation: {destination_file}',
level='ERROR')
remove_empty_sub_dirs(source_dir)
This diff is collapsed.
File moved
File moved
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment