diff --git a/agent_based/inv_cisco_eox.py b/agent_based/inv_cisco_eox.py index 58bcccfc668b294a9680e366021170c76256e685..23ce74f8e3b3f3f18be69397205e0c3c5eee25dc 100644 --- a/agent_based/inv_cisco_eox.py +++ b/agent_based/inv_cisco_eox.py @@ -58,7 +58,7 @@ def _create_eox_record(eoxfile, optional_columns): with open(eoxfile) as f: try: eoxrecord = json.load(f) - except ValueError as e: + except ValueError: return {} modifytime = os.path.getmtime(eoxfile) diff --git a/agent_based/inv_cisco_psirt.py b/agent_based/inv_cisco_psirt.py index 634831546fbc5e3a000f320a11d6e8402f343e09..f72a1080d8804769cfde46bf1e427ecb60250f01 100644 --- a/agent_based/inv_cisco_psirt.py +++ b/agent_based/inv_cisco_psirt.py @@ -63,8 +63,8 @@ def _create_psirt_record(filepath, filename, not_updated, dont_show_older_then, with open(psirtfile) as f: try: psirtrecord = json.load(f) - except ValueError as e: - exit() + except ValueError: + exit() advisories = psirtrecord.get('advisories') remove_advisories = [] @@ -132,31 +132,31 @@ def _get_profuct_family(sysdescription: str) -> str: def _get_os_version(cw_version, sysdescription, pids): - version = None + version = None - if str(cw_version).find('CW_VERSION$') == 0: - version = cw_version.split('$')[1] # .upper() - else: - sysdescription = sysdescription.split(',') - for entry in sysdescription: - # if entry.strip().lower().startswith('version'): - if 'version' in entry.lower(): - version = entry[entry.lower().find('version') + 7:].strip() - if version.startswith(':'): # AsyncOS - version = version[1:].strip() - version = version.split(' ')[0].strip() - version = version # .upper() - if not version: - for pid in pids: - physoftwarerev, phymodelname, physicalclass = pid - if physicalclass == '3': # chassies - version = physoftwarerev - # remove leading '0' in IOSXE version numbers - if version and version[0] == '0': - version = version.replace('.0', '.') - version = version.lstrip('0') - - return version + if str(cw_version).find('CW_VERSION$') == 0: + version = cw_version.split('$')[1] # .upper() + else: + sysdescription = sysdescription.split(',') + for entry in sysdescription: + # if entry.strip().lower().startswith('version'): + if 'version' in entry.lower(): + version = entry[entry.lower().find('version') + 7:].strip() + if version.startswith(':'): # AsyncOS + version = version[1:].strip() + version = version.split(' ')[0].strip() + version = version # .upper() + if not version: + for pid in pids: + physoftwarerev, phymodelname, physicalclass = pid + if physicalclass == '3': # chassies + version = physoftwarerev + # remove leading '0' in IOSXE version numbers + if version and version[0] == '0': + version = version.replace('.0', '.') + version = version.lstrip('0') + + return version def parse_inv_cisco_psirt(string_table: List[StringTable]) -> SnmpPsirt: diff --git a/agent_based/utils/inv_cisco_support.py b/agent_based/utils/inv_cisco_support.py index 958bf73a04211b8eea4d8bdd90b22db770dc63e2..90dc6a7be0fef94e0f97ae60419dc19d91beefee 100644 --- a/agent_based/utils/inv_cisco_support.py +++ b/agent_based/utils/inv_cisco_support.py @@ -31,9 +31,7 @@ class SnmpContractEntry: phydescr: str pid: str -# -# global variables -# + # list of PIDs to drop g_PID_black_list = ['BUILT-IN', 'MICRON', 'C400-MTFDD'] # list of PIDs to try by serial number diff --git a/bin/ciscoapi/cisco-bug.py b/bin/ciscoapi/cisco-bug.py index 9ddbb7fa6fbefb9598632f45e6d87b70d189dde5..e766beff13ad609cadcb9314ba2dda84e092fbe3 100755 --- a/bin/ciscoapi/cisco-bug.py +++ b/bin/ciscoapi/cisco-bug.py @@ -10,88 +10,55 @@ # # 2021-07-24: rewrite for python3.8 # -import logging -import ciscosupport -import ciscoapi import os import json -import time -import sys + +from cisco_live_cycle_utils import ( + configure_logger, + log_message, + expand_path, + get_ids_from_dir, + get_subdirs_from_dir, + remove_empty_sub_dirs, + sleep_random, + move_dir, +) +from ciscoapi import ( + AccessToken, + Settings, + get_bug_by_pid_and_release, +) def main(): + settings = Settings() + access_token = AccessToken(settings.client_id, settings.client_secret) + configure_logger(log_level=settings.log_level) - def get_new_token(): - response = ciscoapi.get_access_token() - access_token = response.get('access_token', '') - lifetime = int(time.time()) - int(response.get('lifetime', 3600)) - 30 - return access_token, lifetime - - ciscosupport.set_logging('debug') - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - base_path = '~/var/ciscoapi' - conf_file = '~/etc/ciscoapi/ciscoapi.json' - conf_file = os.path.expanduser(conf_file) - - access_token = '' - lifetime = 0 - config = {} - - refresh_found = 2 - refresh_notfound = 1 - - wait_after_start = True - max_wait_time = 15 - - loglevel = 'warning' - - if os.path.isfile(conf_file): - with open(conf_file) as f: - try: - config = json.load(f) - except ValueError as e: - logging.warning(f'snmp_cisco_eox:status:JSON load error: {e}') - exit() - # global options - base_path = config['global'].get('base_path', base_path) - wait_after_start = config['global'].get('wait_after_start', wait_after_start) - max_wait_time = config['global'].get('max_wait_time', max_wait_time) - loglevel = config['global'].get('loglevel', loglevel) - # bug api specific options - refresh_found = config['bug'].get('refresh_found', refresh_found) - refresh_notfound = config['bug'].get('refresh_notfound', refresh_notfound) - else: - logger.critical(f'Config file not found ({conf_file}).') - exit() - - # loglevel = 'debug' - ciscosupport.set_logging(loglevel) - - bug_path = base_path + '/bug' - path_found = ciscosupport.expand_path(bug_path + '/found') - path_not_found = ciscosupport.expand_path(bug_path + '/not_found') - path_request = ciscosupport.expand_path(bug_path + '/request') - path_missing = ciscosupport.expand_path(bug_path + '/missing') + bug_path = settings.base_path + '/bug' + path_found = expand_path(bug_path + '/found') + path_not_found = expand_path(bug_path + '/not_found') + path_request = expand_path(bug_path + '/request') + path_missing = expand_path(bug_path + '/missing') pids_requested = {} pids_refresh = {} - pids = [] # get list of bug reports to refresh - pids = ciscosupport.get_subdirs_from_dir(path_found) + pids = get_subdirs_from_dir(path_found) for pid in pids: - pids_refresh[pid.replace('_', '/')] = ','.join(ciscosupport.get_ids_from_dir(path_found+pid, refresh_time=refresh_found)) + pids_refresh[pid.replace('_', '/')] = ','.join(get_ids_from_dir(path_found+pid, + refresh_time=settings.bug_refresh_found)) # move not_found bug reports older then 'refresh_notfound' days to request (for refresh) - ciscosupport.move_dir(path_not_found, path_request, refresh_time=refresh_notfound) + move_dir(path_not_found, path_request, refresh_time=settings.bug_refresh_not_found) # get list of PIDs requests - pids = ciscosupport.get_subdirs_from_dir(path_request) - logger.debug('bug requests (PIDs): %s' % pids) + pids = get_subdirs_from_dir(path_request) + log_message(f'bug requests (PIDs): {pids}') for pid in pids: - pids_requested[pid.replace('_', '/')] = ','.join(ciscosupport.get_ids_from_dir(path_request+pid)) - logger.debug('bug requests (PIDs and releases): %s' % pids_requested) + pids_requested[pid.replace('_', '/')] = ','.join(get_ids_from_dir(path_request+pid)) + log_message(f'bug requests (PIDs and releases): {pids_requested}') if len(pids_refresh.keys()) > 0 or len(pids_requested.keys()) > 0: @@ -137,29 +104,26 @@ def main(): reqoptions = '' # wait random time after startup (load spread) - if wait_after_start: - ciscosupport.sleep_random(max_wait_time) - - # request access_token if no token available or if is expired - if access_token == '': - access_token, lifetime = get_new_token() - - if access_token == '': - logger.critical('failed to get access_token') - return + if settings.wait_after_start: + sleep_random(settings.max_wait_time) # first refresh bug reports for pid in pids_refresh.keys(): - if lifetime > int(time.time()): - access_token, lifetime = get_new_token() + # get bug records for time frame - bug_records = ciscoapi.get_bug_by_pid_and_release(pid, pids_refresh.get(pid), access_token, reqoptions) + bug_records = get_bug_by_pid_and_release( + pid, + pids_refresh.get(pid), + access_token, + reqoptions, + settings=settings + ) for entry in bug_records: pid = entry.get('pid') software_releases = entry.get('software_releases') status_code = int(entry.get('status_code', 200)) - logger.debug('bug return:PID: %s, Status: %s, Version: %s' % (pid, status_code, software_releases)) - path = ciscosupport.expand_path(path_found + pid.replace('/', '_')) + log_message(f'bug return:PID: {pid}, Status: {status_code}, Version: {software_releases}') + path = expand_path(path_found + pid.replace('/', '_')) # check if there where was an error, if so go to next pid if status_code == 200: bugs = entry.get('bugs', None) @@ -168,8 +132,7 @@ def main(): bug.pop('description') # remove description for release in software_releases.keys(): # create one bug list per software release bug_release = software_releases.get(release) - logger.debug('bug found:PID: %s, Releases: %s' % (pid, release)) - missing = entry.get('missing', {}) + log_message(f'bug found:PID: {pid}, Releases: {release}') new_bugs = [] for bug in bugs: if bug_release in bug.get('known_affected_releases'): @@ -180,7 +143,8 @@ def main(): try: bug_record = json.load(f) except ValueError as e: - logging.warning(f'{pid}:{release}:snmp_cisco_bug:bug_found:JSON load error: {e}') + log_message(f'{pid}:{release}:snmp_cisco_bug:bug_found:JSON load error: {e}', + level='WARNING') found_bugs = bug_record.get('bugs') for found_bug in found_bugs: @@ -193,16 +157,9 @@ def main(): bug_record['bugs'] = found_bugs # replace bug list with new list bug_record['total_records'] = len(found_bugs) # change number of records - # bug_record['missing'] = missing # replace missing state with new missing state - with open(path + release, 'w') as f: json.dump(bug_record, f) # write updated bug report - # if len(missing.keys()) != 0: - # path = ciscosupport.expand_path(path_missing + pid.replace('/', '_')) - # with open(path + release, 'w') as f: - # json.dump(missing, f) # write update missing state - else: for release in software_releases.keys(): if os.path.exists(path + release): @@ -242,7 +199,7 @@ def main(): # modified_date_earliest (earliest first) # reqoptions.append('sort_by=severity') - #reqoptions.append('page_index=2') + # reqoptions.append('page_index=2') if len(reqoptions) > 0: reqoptions = '?' + '&'.join(reqoptions) @@ -250,14 +207,18 @@ def main(): reqoptions = '' for pid in pids_requested.keys(): - if lifetime > int(time.time()): - access_token, lifetime = get_new_token() - bug_records = ciscoapi.get_bug_by_pid_and_release(pid, pids_requested.get(pid), access_token, reqoptions) + bug_records = get_bug_by_pid_and_release( + pid, + pids_requested.get(pid), + access_token, + reqoptions, + settings=settings + ) for entry in bug_records: pid = entry.get('pid') software_releases = entry.get('software_releases') status_code = int(entry.get('status_code', 200)) - logger.debug('bug return:PID: %s, Status: %s, Version: %s' % (pid, status_code, software_releases)) + log_message(f'bug return:PID: {pid}, Status: {status_code}, Version: {software_releases}') # check if there where was an error, if so go to next pid if status_code == 200: bugs = entry.get('bugs', None) @@ -267,49 +228,48 @@ def main(): for release in software_releases.keys(): # create one bug list per software release bug_release = software_releases.get(release) if bugs: - logger.debug('bug found:PID: %s, Releases: %s' % (pid, release)) + log_message(f'bug found:PID: %s{pid}, Releases: {release}') missing = entry.get('missing', {}) # split bugs by release - release_bugs = {'pid': pid, - 'software_release': release, - 'bugs': [], - 'missing': missing} + release_bugs = { + 'pid': pid, + 'software_release': release, + 'bugs': [], + 'missing': missing + } for bug in bugs: if bug_release in bug.get('known_affected_releases'): release_bugs['bugs'].append(bug) - # changed from entry.get('total_records') --> total records for all requested software releases release_bugs['total_records'] = len(release_bugs['bugs']) - path = ciscosupport.expand_path(path_found + pid.replace('/', '_')) + path = expand_path(path_found + pid.replace('/', '_')) with open(path + release, 'w') as f: json.dump(release_bugs, f) if len(missing.keys()) != 0: - path = ciscosupport.expand_path(path_missing + pid.replace('/', '_')) + path = expand_path(path_missing + pid.replace('/', '_')) with open(path + release, 'w') as f: json.dump(missing, f) else: - logger.debug('bug not found:PID: %s, Version: %s' % (pid, release)) - path = ciscosupport.expand_path(path_not_found + pid.replace('/', '_')) - logger.debug('not found: %s' % entry) + log_message(f'bug not found:PID: {pid}, Version: {release}') + path = expand_path(path_not_found + pid.replace('/', '_')) + log_message(f'not found: {entry}') with open(path + release, 'w') as f: json.dump(entry, f) pass # remove request file try: - logger.debug('bug delete request:PID: %s, Version: %s' % (pid, release)) + log_message(f'bug delete request:PID: {pid}, Version: {release}') os.remove(path_request + pid.replace('/', '_') + '/' + release) except OSError: pass # clean up (remove empty directories) - ciscosupport.remove_empty_sub_dirs(path_request) - ciscosupport.remove_empty_sub_dirs(path_found) - ciscosupport.remove_empty_sub_dirs(path_not_found) - ciscosupport.remove_empty_sub_dirs(path_missing) - -main() - + remove_empty_sub_dirs(path_request) + remove_empty_sub_dirs(path_found) + remove_empty_sub_dirs(path_not_found) + remove_empty_sub_dirs(path_missing) +main() diff --git a/bin/ciscoapi/cisco-eox.py b/bin/ciscoapi/cisco-eox.py index 85ed7769317d9038fee8e9bac8dabd97a29edcec..630ca3c13a1071204f3ceba4da7807c420797a22 100755 --- a/bin/ciscoapi/cisco-eox.py +++ b/bin/ciscoapi/cisco-eox.py @@ -11,20 +11,27 @@ # # 2021-07-23: rewrite for python 3.8 # -import logging -import os import json -import time -import ciscosupport -import ciscoapi -import sys +from cisco_live_cycle_utils import ( + configure_logger, + log_message, + expand_path, + get_ids_from_dir, + remove_ids_from_list, + refresh_ids_from_dir, + remove_ids_from_dir, + sleep_random, +) +from ciscoapi import ( + AccessToken, + Settings, + get_eox_by_pid, + get_eox_by_serials, +) # split pids in known and unknown eox state def split_pids(eoxr): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - eox_known = [] eox_unkown = [] @@ -39,30 +46,27 @@ def split_pids(eoxr): for PID in EOXRecord: if PID.get('EOLProductID') != '': eox_known.append(PID) - logger.debug('EOLProductID : %s' % PID.get('EOLProductID')) + log_message(message=f'EOLProductID : {PID.get("EOLProductID")}') else: eox_unkown.append(PID) - logger.debug('EOXInputValue : %s' % PID.get('EOXInputValue')) + log_message(f'EOXInputValue : {PID.get("EOXInputValue")}') return {'eox_known': eox_known, 'eox_unknown': eox_unkown} # split serials, expects a list of EoX Records from get EoX by serial def split_serials(eoxr): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - serials = [] for PID in eoxr: EOLProductID = PID.get('EOLProductID') - logger.debug('serial split Eox: found PID: %s' % EOLProductID) + log_message(f'serial split Eox: found PID: {EOLProductID}') EOXInputValue = PID.get('EOXInputValue').split(',') for serial in EOXInputValue: - logger.debug('found Serial: %s' % serial) + log_message(f'found Serial: {serial}') eox_serial = PID.copy() eox_serial.update({'EOXInputValue': serial}) serials.append(eox_serial) - logger.debug('Serial EoX: %s' % eox_serial) + log_message(f'Serial EoX: {eox_serial}') return serials @@ -71,9 +75,6 @@ def split_serials(eoxr): # expects a list of EoX Records from Cisco EoX API 5.0, # returns a list of saved PIDs def save_eox(eox, path): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - saved = [] for PID in eox: @@ -82,147 +83,96 @@ def save_eox(eox, path): if EOLProductID == '': EOLProductID = PID.get('EOXInputValue') - with open(path + (EOLProductID.replace('/','_')), 'w') as f: + with open(path + (EOLProductID.replace('/', '_')), 'w') as f: json.dump(PID, f) saved.append(EOLProductID) return saved -# save EoX records to file by serial number, -# expects a list of EoX Records from Cisco EoX API 5.0, -# returns a list of saved Serials def save_serials(eox, path): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) + """ + Saves EoX records to file by serial number. + Args: + eox: List of EoX Records from Cisco EoX API 5.0 + path: file path where to save the EoX records + + Returns: List of serial numbers of saved EoX records + + """ saved = [] - for SERIAL in eox: - EOXInputValue = SERIAL.get('EOXInputValue') + for serial in eox: + EOXInputValue = serial.get('EOXInputValue') if EOXInputValue != '': with open(path + EOXInputValue, 'w') as f: - json.dump(SERIAL, f) + json.dump(serial, f) saved.append(EOXInputValue) return saved def main(): + settings = Settings() + access_token = AccessToken(settings.client_id, settings.client_secret) + configure_logger(log_level=settings.log_level) - ciscosupport.set_logging('debug') - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - base_path = '~/var/ciscoapi' - conf_file = '~/etc/ciscoapi/ciscoapi.json' - conf_file = os.path.expanduser(conf_file) - - access_token = '' - lifetime = 0 - pids = [] - serials = [] - config = {} - - # refresh time in days - eox_refresh_known = 31 - eox_refresh_unknown = 7 - - wait_after_start = True - max_wait_time = 15 - - loglevel = 'warning' - - if os.path.isfile(conf_file): - with open(conf_file) as f: - try: - config = json.load(f) - except ValueError as e: - logging.warning(f'snmp_cisco_eox:status:JSON load error: {e}') - exit() - - base_path = config['global'].get('base_path', base_path) - wait_after_start = config['global'].get('wait_after_start', wait_after_start) - max_wait_time = config['global'].get('max_wait_time', max_wait_time) - loglevel = config['global'].get('loglevel', loglevel) - - eox_refresh_known = config['eox'].get('eox_refresh_known', eox_refresh_known) - eox_refresh_unknown = config['eox'].get('eox_refresh_unknown', eox_refresh_unknown) - else: - logger.critical('Config file not found (%s).' % conf_file) - return False - - ciscosupport.set_logging(loglevel) - - eox_path = base_path + '/EoX' + eox_path = settings.base_path + '/EoX' path_found = eox_path + '/found' path_not_found = eox_path + '/not_found' path_request = eox_path + '/request' - path_request_pid = ciscosupport.expand_path(path_request + '/pid') - path_found_pid = ciscosupport.expand_path(path_found + '/pid') - path_not_found_pid = ciscosupport.expand_path(path_not_found + '/pid') + path_request_pid = expand_path(path_request + '/pid') + path_found_pid = expand_path(path_found + '/pid') + path_not_found_pid = expand_path(path_not_found + '/pid') - path_request_ser = ciscosupport.expand_path(path_request + '/ser') - path_found_ser = ciscosupport.expand_path(path_found + '/ser') - path_not_found_ser = ciscosupport.expand_path(path_not_found + '/ser') + path_request_ser = expand_path(path_request + '/ser') + path_found_ser = expand_path(path_found + '/ser') + path_not_found_ser = expand_path(path_not_found + '/ser') # create list of PIDs to request EoX status for - pids = ciscosupport.get_ids_from_dir(path_request_pid) - logger.debug('pid requests : %s' % pids) + pids = get_ids_from_dir(path_request_pid) + log_message(f'pid requests : {pids}') # remove already known PIDs from list - pids = ciscosupport.remove_ids_from_list(pids, path_found_pid) - logger.debug('pid requests : %s' % pids) + pids = remove_ids_from_list(pids, path_found_pid) + log_message(f'pid requests : {pids}') # remove PIDs already requested with unknown EoX status from list - pids = ciscosupport.remove_ids_from_list(pids, path_not_found_pid) - logger.debug('pid requests : %s' % pids) + pids = remove_ids_from_list(pids, path_not_found_pid) + log_message(f'pid requests : {pids}') # refresh PIDs after 30 days by default - pids = ciscosupport.refresh_ids_from_dir(path_not_found_pid, eox_refresh_unknown, pids, True) - logger.debug('pid requests : %s' % pids) - pids = ciscosupport.refresh_ids_from_dir(path_found_pid, eox_refresh_known, pids, False) - logger.debug('pid requests : %s' % pids) + pids = refresh_ids_from_dir(path_not_found_pid, settings.eox_refresh_unknown, pids, True) + log_message(f'pid requests : {pids}') + pids = refresh_ids_from_dir(path_found_pid, settings.eox_refresh_known, pids, False) + log_message(f'pid requests : {pids}') # create list of serial numbers to request EoX status for - serials = ciscosupport.get_ids_from_dir(path_request_ser) - logger.debug('ser requests : %s' % serials) + serials = get_ids_from_dir(path_request_ser) + log_message(f'ser requests : {serials}') # remove already known serials from list - serials = ciscosupport.remove_ids_from_list(serials, path_found_ser) - logger.debug('ser requests : %s' % serials) + serials = remove_ids_from_list(serials, path_found_ser) + log_message(f'ser requests : {serials}') # remove serials already requested with unknown EoX status from list - serials = ciscosupport.remove_ids_from_list(serials, path_not_found_ser) - logger.debug('ser requests : %s' % serials) + serials = remove_ids_from_list(serials, path_not_found_ser) + log_message(f'ser requests : {serials}') # refresh serials after 30 days by default - serials = ciscosupport.refresh_ids_from_dir(path_not_found_ser, eox_refresh_unknown, serials, True) - logger.debug('ser requests : %s' % serials) - serials = ciscosupport.refresh_ids_from_dir(path_found_ser, eox_refresh_known, serials, False) - logger.debug('ser requests : %s' % serials) + serials = refresh_ids_from_dir(path_not_found_ser, settings.eox_refresh_unknown, serials, True) + log_message(f'ser requests : {serials}') + serials = refresh_ids_from_dir(path_found_ser, settings.eox_refresh_known, serials, False) + log_message(f'ser requests : {serials}') if pids == [] and serials == []: - logger.debug('all list are empty. Do nothing.') + log_message('all list are empty. Do nothing.') return # wait random time after startup (load spread) - if wait_after_start: - ciscosupport.sleep_random(max_wait_time) - - # get time since epoch in seconds - starttime = int(time.time()) - - # request access_token if no token available or if is expired - if (access_token == '') or lifetime < starttime: - response = ciscoapi.get_access_token() - access_token = response.get('access_token', '') - lifetime = int(response.get('lifetime', 3600)) - - lifetime = starttime + lifetime - 30 - - if access_token == '': - logger.critical('failed to get access_token') - return + if settings.wait_after_start: + sleep_random(settings.max_wait_time) - if pids != []: - eox = ciscoapi.get_eox_by_pid(pids, access_token) + if pids is not []: + eox = get_eox_by_pid(pids=pids, access_token=access_token, settings=settings) # split eox records in a list of known and unknown pid records eox = split_pids(eox) @@ -230,18 +180,18 @@ def main(): # save known pid reports pids = save_eox(eox.get('eox_known'), path_found_pid) # delete requests for known pids - ciscosupport.remove_ids_from_dir(pids, path_request_pid) + remove_ids_from_dir(pids, path_request_pid) # save unknown pid reports pids = save_eox(eox.get('eox_unknown'), path_not_found_pid) # delete requests for unknown pids - ciscosupport.remove_ids_from_dir(pids, path_request_pid) + remove_ids_from_dir(pids, path_request_pid) # delete pids from known were the status changed to unknown - ciscosupport.remove_ids_from_dir(pids, path_found_pid) + remove_ids_from_dir(pids, path_found_pid) - if serials != []: - eox = ciscoapi.get_eox_by_serials(serials, access_token) - logger.debug('eox by ser: %s' % eox) + if serials is not []: + eox = get_eox_by_serials(serials=serials, access_token=access_token, settings=settings) + log_message(f'eox by ser: {eox}') # split eox records in a list of known and unknown pid records eox = split_pids(eox) @@ -250,18 +200,18 @@ def main(): serials = split_serials(eox.get('eox_known')) # save EoX records for serials with known EoX state serials = save_serials(serials, path_found_ser) - logger.debug('EoX Serials: known: %s' % serials) + log_message(f'EoX Serials: known: {serials}') # delete requests for known serials - ciscosupport.remove_ids_from_dir(serials, path_request_ser) + remove_ids_from_dir(serials, path_request_ser) # split EoX records for unknown PIDs in one entry per serial serials = split_serials(eox.get('eox_unknown')) # save EoX records for serials with known EoX state serials = save_serials(serials, path_not_found_ser) # delete requests for unknown serials - ciscosupport.remove_ids_from_dir(serials, path_request_ser) + remove_ids_from_dir(serials, path_request_ser) # delete serials from known were the status changed to unknown - ciscosupport.remove_ids_from_dir(serials, path_found_ser) + remove_ids_from_dir(serials, path_found_ser) main() diff --git a/bin/ciscoapi/cisco-psirt.py b/bin/ciscoapi/cisco-psirt.py index cc1caa6b7dd74aba4661a6fe76a4fef63de48155..3b0db9b4e42e83a7c925bf3f190cf242a7f444c7 100755 --- a/bin/ciscoapi/cisco-psirt.py +++ b/bin/ciscoapi/cisco-psirt.py @@ -7,21 +7,33 @@ # # https://developer.cisco.com/docs/support-apis/ # -# 2018-06-06: fixed handling if state changes form found to notfound (delete old psirt found file) +# 2018-06-06: fixed handling if state changes form found to not_found (delete old psirt found file) # 2021-07-24: rewritten for python 3.8 # # -import logging import ntpath import os import json -import time -import ciscosupport -import ciscoapi -import sys from typing import List from dataclasses import dataclass +from cisco_live_cycle_utils import ( + configure_logger, + log_message, + get_ids_from_dir, + remove_ids_from_list, + refresh_ids_from_dir, + sleep_random, + expand_path, +) +from ciscoapi import ( + AccessToken, + Settings, + get_psirt_by_product_family, + get_psirt_by_iosxe_version, + get_psirt_by_ios_version, +) + @dataclass class Paths: @@ -40,21 +52,15 @@ g_logger = None def _psirt_remove_id_file(psirt_path: str, psirt_id: str): - # set logg module name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - # delete psirt file try: - logger.debug(f'delete psirt id file : {psirt_path + psirt_id}') + log_message(f'delete psirt id file : {psirt_path + psirt_id}') os.remove(psirt_path + psirt_id) except OSError: pass def _psirt_dump_record(psirt_record, psirt_id: str, psirt_path: str, psirt_path_request: str): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - with open(psirt_path + psirt_id, 'w') as f: json.dump(psirt_record, f) # delete request file @@ -72,8 +78,6 @@ def _check_psirt_record(psirt_record, psirt_id, psirt_path_found, psirt_path_req :param psirt_path_request: :returns: """ - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) for advisory in psirt_record.get('advisories'): # remove unwanted information from advisories @@ -102,22 +106,20 @@ def _get_psirt_id_list(product_family: str, paths: Paths, refresh: Refresh) -> L @return: list of PIDs """ # create list of ID's to request PSIRT status for - psirt_id_list = ciscosupport.get_ids_from_dir(paths.request + product_family) - g_logger.debug(f'psirt requests : {psirt_id_list}') + psirt_id_list = get_ids_from_dir(paths.request + product_family) + log_message(f'psirt requests : {psirt_id_list}') # remove already found ID's from list - psirt_id_list = ciscosupport.remove_ids_from_list(psirt_id_list, paths.found + product_family) - g_logger.debug(f'psirt requests : {psirt_id_list}') + psirt_id_list = remove_ids_from_list(psirt_id_list, paths.found + product_family) + log_message(f'psirt requests : {psirt_id_list}') # remove not found ID's from list - psirt_id_list = ciscosupport.remove_ids_from_list(psirt_id_list, paths.not_found + product_family) - g_logger.debug(f'psirt requests : {psirt_id_list}') + psirt_id_list = remove_ids_from_list(psirt_id_list, paths.not_found + product_family) + log_message(f'psirt requests : {psirt_id_list}') # refresh psirt after 1 day by default - psirt_id_list = ciscosupport.refresh_ids_from_dir(paths.not_found + product_family, refresh.not_found, - psirt_id_list, True) - g_logger.debug(f'psirt requests : {psirt_id_list}') - psirt_id_list = ciscosupport.refresh_ids_from_dir(paths.found + product_family, refresh.found, - psirt_id_list, False) - g_logger.debug(f'psirt requests : {psirt_id_list}') + psirt_id_list = refresh_ids_from_dir(paths.not_found + product_family, refresh.not_found, psirt_id_list, True) + log_message(f'psirt requests : {psirt_id_list}') + psirt_id_list = refresh_ids_from_dir(paths.found + product_family, refresh.found, psirt_id_list, False) + log_message(f'psirt requests : {psirt_id_list}') return psirt_id_list @@ -141,58 +143,19 @@ def _update_psirt_id(psirt_records: list, family_name: str, paths: Paths): def main(): - global g_logger - - ciscosupport.set_logging('debug') - # set logg modul name <file>:<module>.<function> - g_logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - - conf_file = '~/etc/ciscoapi/ciscoapi.json' - conf_file = os.path.expanduser(conf_file) - - # default paths - base_path = '~/var/ciscoapi' - - access_token = '' - lifetime = 0 - config = {} - - # refresh time in days - psirt_refresh_found = 1 - psirt_refresh_notfound = 1 - - wait_after_start = True - max_wait_time = 15 - loglevel = 'warning' - # loglevel = 'debug' - - if os.path.isfile(conf_file): - with open(conf_file) as f: - try: - config = json.load(f) - except ValueError as e: - logging.warning(f'snmp_cisco_eox:status:JSON load error: {e}') - exit() - - base_path = config['global'].get('base_path', base_path) - wait_after_start = config['global'].get('wait_after_start', wait_after_start) - max_wait_time = config['global'].get('max_wait_time', max_wait_time) - loglevel = config['global'].get('loglevel', loglevel) - refresh = Refresh( - found=config['psirt'].get('psirt_refresh_found', psirt_refresh_found), - not_found=config['psirt'].get('psirt_refresh_notfound', psirt_refresh_notfound) - ) - else: - g_logger.critical(f'Config file not found ({conf_file}).') - return False - - ciscosupport.set_logging(loglevel) - - base_path = ciscosupport.expand_path(base_path) - psirt_dir = base_path + 'psirt' + settings = Settings() + access_token = AccessToken(settings.client_id, settings.client_secret) + configure_logger(log_level=settings.log_level) + + refresh = Refresh( + found=settings.psirt_refresh_found, + not_found=settings.psirt_refresh_not_found + ) + + psirt_dir = expand_path(settings.base_path + '/psirt') paths = Paths( found=psirt_dir + '/found/', - not_found= psirt_dir + '/not_found/', + not_found=psirt_dir + '/not_found/', request=psirt_dir + '/request/' ) @@ -201,38 +164,23 @@ def main(): psirt_family = _get_psirt_id_list('family', paths, refresh) if (psirt_ios == []) and psirt_ios_xe == [] and psirt_family == []: - g_logger.debug('all list are empty. Do nothing.') + log_message('all list are empty. Do nothing.') return # wait random time after startup - if wait_after_start: - ciscosupport.sleep_random(max_wait_time) - - # get time since epoch in seconds - starttime = int(time.time()) - - # request access_token if no token available or if is expired - if (access_token == '') or lifetime < starttime: - response = ciscoapi.get_access_token() - access_token = response.get('access_token', '') - lifetime = int(response.get('lifetime', 3600)) - - lifetime = starttime + lifetime - 30 - - if access_token == '': - g_logger.critical('failed to get access_token') - return + if settings.wait_after_start: + sleep_random(settings.max_wait_time) - if psirt_family != []: - psirt_records = ciscoapi.get_psirt_by_product_family(psirt_family, access_token) + if psirt_family is not []: + psirt_records = get_psirt_by_product_family(psirt_family, access_token, settings=settings) _update_psirt_id(psirt_records, 'family', paths) - if psirt_ios_xe != []: - psirt_records = ciscoapi.get_psirt_by_iosxe_version(psirt_ios_xe, access_token) + if psirt_ios_xe is not []: + psirt_records = get_psirt_by_iosxe_version(psirt_ios_xe, access_token, settings=settings) _update_psirt_id(psirt_records, 'IOS-XE', paths) - if psirt_ios != []: - psirt_records = ciscoapi.get_psirt_by_ios_version(psirt_ios, access_token) + if psirt_ios is not []: + psirt_records = get_psirt_by_ios_version(psirt_ios, access_token, settings=settings) _update_psirt_id(psirt_records, 'IOS', paths) diff --git a/bin/ciscoapi/cisco-sn2info.py b/bin/ciscoapi/cisco-sn2info.py index 3a9b2e613b8ea54788ae638016adbe3f66a76b02..1a45b2bde42bb646cc867cf44e2ad418db17b81c 100755 --- a/bin/ciscoapi/cisco-sn2info.py +++ b/bin/ciscoapi/cisco-sn2info.py @@ -13,23 +13,30 @@ # # 2021-07-23: rewrite for python 3.8 # -import logging -import os import json -import time -import ciscosupport -import ciscoapi -import sys - - -def sn2info_split_covered(sn2inforecords): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) +from cisco_live_cycle_utils import ( + configure_logger, + log_message, + expand_path, + get_ids_from_dir, + remove_ids_from_list, + refresh_ids_from_dir, + remove_ids_from_dir, + sleep_random, +) +from ciscoapi import ( + AccessToken, + Settings, + get_coverage_summary_by_serials, +) + + +def sn2info_split_covered(sn2info_records): sn2info_covered = [] sn2info_notcovered = [] - for response in sn2inforecords: + for response in sn2info_records: sn2inforecord = response.get('serial_numbers') # PaginationResponseRecord = response.get('PaginationResponseRecord') # PageIndex = PaginationResponseRecord.get('PageIndex') @@ -40,17 +47,14 @@ def sn2info_split_covered(sn2inforecords): for serial in sn2inforecord: if serial.get('is_covered') == 'YES': sn2info_covered.append(serial) - logger.debug('SN2INFO covered : %s' % serial.get('sr_no')) + log_message(f'SN2INFO covered : {serial.get("sr_no")}') else: sn2info_notcovered.append(serial) - logger.debug('SN2INFO not covered : %s' % serial.get('sr_no')) + log_message(f'SN2INFO not covered : {serial.get("sr_no")}') return {'sn2info_covered': sn2info_covered, 'sn2info_notcovered': sn2info_notcovered} def sn2info_save_serials(sn2infos, path): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - saved = [] for sn2info in sn2infos: @@ -64,106 +68,53 @@ def sn2info_save_serials(sn2infos, path): def main(): + settings = Settings() + access_token = AccessToken(settings.client_id, settings.client_secret) + configure_logger(log_level=settings.log_level) - ciscosupport.set_logging('debug') - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - - conf_file = '~/etc/ciscoapi/ciscoapi.json' - conf_file = os.path.expanduser(conf_file) - - # default paths - base_path = '~/var/ciscoapi' - - access_token = '' - lifetime = 0 - sn2info = [] - config = {} - - # refresh time in days - sn2info_refresh_covered = 31 - sn2info_refresh_notcovered = 7 - - wait_after_start = True - max_wait_time = 15 - loglevel = 'warning' - - if os.path.isfile(conf_file): - with open(conf_file) as f: - try: - config = json.load(f) - except ValueError as e: - logging.warning(f'snmp_cisco_eox:status:JSON load error: {e}') - exit() - - wait_after_start = config['global'].get('base_path', base_path) - sn2info_refresh_covered = config['sn2info'].get('sn2info_refresh_covered', sn2info_refresh_covered) - sn2info_refresh_notcovered = config['sn2info'].get('sn2info_refresh_notcovered', sn2info_refresh_notcovered) - - wait_after_start = config['global'].get('wait_after_start', wait_after_start) - max_wait_time = config['global'].get('max_wait_time', max_wait_time) - loglevel = config['global'].get('loglevel', loglevel) - else: - logger.critical(f'Config file not found ({conf_file}).') - exit() - - ciscosupport.set_logging(loglevel) - - sn2info_dir = base_path + '/sn2info' - path_found = ciscosupport.expand_path(sn2info_dir + '/found/') - path_not_found = ciscosupport.expand_path(sn2info_dir + '/not_found/') - path_request = ciscosupport.expand_path(sn2info_dir + '/request/') + sn2info_dir = settings.base_path + '/sn2info' + path_found = expand_path(sn2info_dir + '/found/') + path_not_found = expand_path(sn2info_dir + '/not_found/') + path_request = expand_path(sn2info_dir + '/request/') # create list of serial numbers to request SN2INFO status for - sn2info = ciscosupport.get_ids_from_dir(path_request) - logger.debug('sn2info requests : %s' % sn2info) + sn2info = get_ids_from_dir(path_request) + log_message(f'sn2info requests : {sn2info}') # remove covered serials from list - sn2info = ciscosupport.remove_ids_from_list(sn2info, path_found) - logger.debug('sn2info requests : %s' % sn2info) + sn2info = remove_ids_from_list(sn2info, path_found) + log_message(f'sn2info requests : {sn2info}') # remove not covered serials from list - sn2info = ciscosupport.remove_ids_from_list(sn2info, path_not_found) - logger.debug('sn2info requests : %s' % sn2info) + sn2info = remove_ids_from_list(sn2info, path_not_found) + log_message(f'sn2info requests : {sn2info}') # refresh sn2info serials after 31 days by default - sn2info = ciscosupport.refresh_ids_from_dir(path_not_found, sn2info_refresh_notcovered, sn2info, True) - logger.debug('sn2info requests : %s' % sn2info) - sn2info = ciscosupport.refresh_ids_from_dir(path_found, sn2info_refresh_covered, sn2info, False) - logger.debug('sn2info requests : %s' % sn2info) + sn2info = refresh_ids_from_dir(path_not_found, settings.sn2info_refresh_not_covered, sn2info, True) + log_message(f'sn2info requests : {sn2info}') + sn2info = refresh_ids_from_dir(path_found, settings.sn2info_refresh_covered, sn2info, False) + log_message(f'sn2info requests : {sn2info}') - if sn2info == [] : - logger.debug('all list are empty. Do nothing.') + if sn2info is []: + log_message('all list are empty. Do nothing.') return # wait random time after startup - if wait_after_start: - ciscosupport.sleep_random(max_wait_time) - - # get time since epoch in seconds - starttime = int(time.time()) - - # request access_token if no token available or if is expired - if (access_token == '') or lifetime < starttime: - response = ciscoapi.get_access_token() - access_token = response.get('access_token', '') - lifetime = int(response.get('lifetime', 3600)) - - lifetime = starttime + lifetime - 30 - - if access_token == '': - logger.critical('failed to get access_token') - return - - if sn2info != []: - sn2inforecords = ciscoapi.get_coverage_summary_by_serials(sn2info, access_token) - logger.debug('sn2info response: %s' % sn2inforecords) - sn2info = sn2info_split_covered(sn2inforecords) + if settings.wait_after_start: + sleep_random(settings.max_wait_time) + + if sn2info is not []: + sn2info_records = get_coverage_summary_by_serials( + serials=sn2info, + access_token=access_token, + settings=settings + ) + log_message(f'sn2info response: {sn2info_records}') + sn2info = sn2info_split_covered(sn2info_records) serials = sn2info_save_serials(sn2info.get('sn2info_covered'), path_found) - ciscosupport.remove_ids_from_dir(serials, path_request) + remove_ids_from_dir(serials, path_request) serials = sn2info_save_serials(sn2info.get('sn2info_notcovered'), path_not_found) - ciscosupport.remove_ids_from_dir(serials, path_request) + remove_ids_from_dir(serials, path_request) # delete serials from covered were the status changed to uncovered - ciscosupport.remove_ids_from_dir(serials, path_found) + remove_ids_from_dir(serials, path_found) main() - diff --git a/bin/ciscoapi/cisco_live_cycle_utils.py b/bin/ciscoapi/cisco_live_cycle_utils.py new file mode 100755 index 0000000000000000000000000000000000000000..a32ddf948dd64cfb5545826b58265f65e5132aa8 --- /dev/null +++ b/bin/ciscoapi/cisco_live_cycle_utils.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python +# -*- encoding: utf-8; py-indent-offset: 4 -*- + +# +# 15.04.2017 : Th.L. : Support for Cisco API +# +# https://developer.cisco.com/docs/support-apis/ +# + +import logging +import os +import time +import random +import sys +# import json +# import copy +# from ciscoapi import ( +# Settings, +# get_product_mdf_information_by_pid, +# ) + + +def configure_logger(_path: str = '', _log_to_console: bool = True, log_level: str = 'INFO'): + log_formatter = logging.Formatter('%(asctime)s :: %(levelname)s :: %(name)s :: %(module)s ::%(message)s') + log = logging.getLogger('root') + + numeric_level = getattr(logging, log_level.upper(), None) + if isinstance(numeric_level, int): + logging.getLogger().setLevel(numeric_level) + else: + logging.getLogger().setLevel(logging.WARNING) + + # log_file = path + # # create a new file > 5 mb size + # log_handler_file = logging.handlers.RotatingFileHandler( + # log_file, + # mode='a', + # maxBytes=5 * 1024 * 1024, + # backupCount=10, + # # encoding=None, + # # delay=0 + # ) + + # log_handler_file.setFormatter(log_formatter) + # log_handler_file.setLevel(logging.INFO) + # log.addHandler(log_handler_file) + + log_handler_console = logging.StreamHandler(sys.stdout) + log_handler_console.setFormatter(log_formatter) + log_handler_console.setLevel(logging.INFO) + log.addHandler(log_handler_console) + + +def log_message(message: str, level: str = 'DEBUG'): + log = logging.getLogger() + if level.upper() == 'CRITICAL': + log.critical(message) + elif level.upper() == 'ERROR': + log.error(message) + elif level.upper() == 'WARNING': + log.warning(message) + elif level.upper() == 'INFO': + log.info(message) + elif level.upper() == 'DEBUG': + log.debug(message) + else: + log.warning(f'unknown log_level: {level}') + + +def sleep_random(max_minutes): + sleep_time = random.randint(1, 60 * max_minutes) + log_message(message=f'{sleep_time} seconds', level='INFO') + time.sleep(sleep_time) + return + + +# read list of files from dir (eq. (P)IDs or SERIALs) (don't change to uppercase) +def get_ids_from_dir(directory, refresh_time: int = 0): + refresh_time = refresh_time * 86400 + start_time = int(time.time()) + ids = [] + for (dir_path, dir_names, file_names) in os.walk(directory): + for entry in file_names: + modify_time = int(os.path.getmtime(dir_path + '/' + entry)) + if (start_time - modify_time) > refresh_time: + ids.append(str(entry).replace('_', '/')) + # do not read subdirs + break + # insert cleanup here (filter unwanted names, chars, etc...) + return ids + + +# read list of sub directories from directory (PIDs) (don't anything) +def get_subdirs_from_dir(base_dir): + sub_dirs = [] + for (dir_path, sub_dirs, filenames) in os.walk(base_dir): + break + # insert cleanup here (filter unwanted names, chars, etc...) + return sub_dirs + + +# read list of IOS/IOSXE Versions from directory (don't change to uppercase) +def get_version_from_dir(directory): + versions = [] + for (dir_path, dir_names, file_names) in os.walk(directory): + for entry in file_names: + versions.append(str(entry)) + # do not read subdirs + break + # insert cleanup here (filter unwanted names, chars, etc...) + return versions + + +# delete (P)IDs or SERIALs files from directory (requests) +def remove_ids_from_dir(ids, directory): + for entry in ids: + try: + os.remove(directory + entry.replace('/', '_')) + except OSError: + pass + + +# remove (P)IDs or SERIALs from list of (P)ID or serials +def remove_ids_from_list(ids, directory): + known_ids = [] + for (dir_path, dir_names, file_names) in os.walk(directory): + known_ids.extend(file_names) + # do not read subdirs + break + + for known_id in known_ids: + known_id = known_id.replace('_', '/') + for entry in ids: + if known_id == entry: + ids.remove(entry) + return ids + + +# returns al list of ids to refresh, +# expects a directory with ids to check, the time interval, a list of IDs to add +# if remove True it will delete the ID files from refresh_dir +def refresh_ids_from_dir(refresh_dir, refresh_time, ids, remove): + refresh_dir = expand_path(refresh_dir) + # get seconds from # of days (days * 24 * 60 * 60 --> days * 86400) + refresh_time = int(refresh_time) * 86400 + start_time = int(time.time()) + refresh_ids = get_ids_from_dir(refresh_dir) + if refresh_ids is not []: + for entry in refresh_ids: + modify_time = int(os.path.getmtime(refresh_dir + entry.replace('/', '_'))) + if (start_time - modify_time) > refresh_time: + ids.append(entry) + if remove: + try: + os.remove(refresh_dir + entry.replace('/', '_')) + except OSError: + pass + return ids + + +# check if dir exists, if not try to create it. +# return True if dir exists or creation was ok. +# return False if dir not exists and creation was not ok +def check_dir_and_create(directory): + directory = os.path.dirname(directory) + if not os.path.exists(directory): + try: + os.makedirs(directory) + except: + return False + return True + + +# expand homedir and add '/' if necessary and create directory if it not exists +def expand_path(path): + homedir = os.path.expanduser('~') + + if path.startswith('~'): + path = homedir + path[1:] + + if not path.endswith('/'): + path += '/' + + if not check_dir_and_create(path): + return '' + + return path + +# needs to be moved out of cisco_live_cycle_utils +# get cisco product series by pid +# def get_cisco_product_series_by_pid(pids, access_token, settings: Settings): +# +# conf_file = '~/etc/ciscoapi/ciscoapi.json' +# conf_file = os.path.expanduser(conf_file) +# +# productseriesfile = '~/var/ciscoapi/productinfo' +# product_series = {} +# +# if os.path.isfile(conf_file): +# with open(conf_file) as f: +# try: +# config = json.load(f) +# productseriesfile = config['productinfo'].get('productseriesfile', productseriesfile) +# except ValueError as e: +# log_message(f'snmp_cisco_eox:status:JSON load error: {e}', level='WARNING') +# +# productseriesfile = expand_path(productseriesfile) + 'productseries.json' +# +# if os.path.isfile(productseriesfile): +# with open(productseriesfile) as f: +# product_series = json.load(f) +# +# requestpids = copy.deepcopy(pids) +# if product_series != {}: +# keys = product_series.keys() +# for pid in requestpids: +# if pid in keys: +# requestpids.remove(pid) +# +# if requestpids is not []: +# product_infos = get_product_mdf_information_by_pid(pids, access_token, settings=settings) +# +# for entry in product_infos: +# product_series.update({entry.get('product_id'): entry.get('product_series')}) +# +# with open(productseriesfile, 'w') as f: +# json.dump(product_series, f) +# +# return_product_series = {} +# for pid in pids: +# return_product_series.update({pid: product_series.get(pid, 'not found')}) +# +# return return_product_series + + +# remove empty directories +def remove_empty_sub_dirs(base_dir): + subdirs = get_subdirs_from_dir(base_dir) + for subdir in subdirs: + try: + os.rmdir(base_dir + subdir) + except OSError as e: + log_message(f'can not delete: {base_dir}, Error:{e}') + pass + + +# move contents of source_dir to destination_dir +# only one level deep, lave source_dir +def move_dir(source_dir, destination_dir, **kwargs): + refresh_time = int(kwargs.get('refresh_time', 0)) * 86400 + starttime = int(time.time()) + + sub_dirs = get_subdirs_from_dir(source_dir) + for sub_dir in sub_dirs: + files = get_ids_from_dir(source_dir + sub_dir) + if len(files) > 0: + source_path = expand_path(source_dir + sub_dir) + destination_path = expand_path(destination_dir + sub_dir) + for file in files: + source_file = source_path + file + destination_file = destination_path + file + modify_time = int(os.path.getmtime(source_file)) + if (starttime - modify_time) > refresh_time: + try: + os.rename(source_file, destination_file) # rename (move) contents of not_found to request + except OSError as e: + log_message(message=f'error:{e}, source: {source_file}, destionation: {destination_file}', + level='ERROR') + + remove_empty_sub_dirs(source_dir) diff --git a/bin/ciscoapi/ciscoapi.py b/bin/ciscoapi/ciscoapi.py index 4232b96164e59ac28914bc43702cd1e4d9447323..133c77e68172a6444c4c811094630e87fa25f7f4 100755 --- a/bin/ciscoapi/ciscoapi.py +++ b/bin/ciscoapi/ciscoapi.py @@ -11,157 +11,208 @@ # 2018-09-25: performance improvement "psirt_response.encoding = 'UTF-8'", drops json.loads # from about 4 min to some seconds # 2021-07-23: rewritten for python 3.8 +# 2023-06-09: changed for new rest api endpoint (apix.cisco.com) +# refactoring get_token and settings +# some cleanup +# +# supportapis[dash]help[at]cisco[dot]com # import requests import json import time -import logging -from os.path import isfile -from os.path import expanduser -import urllib -import sys - -# global variables -g_client_id = '' -g_client_secret = '' -g_proxies = {} -g_use_system_proxies = True -g_useauthproxy = False -g_client_fqdn = '' -g_root_cert = True -g_authproxyurl = 'https://cmk.bech-noc.de/api/cauthproxy.py' - - -def read_config(): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - - # define global variables - global g_client_id - global g_client_secret - global g_proxies - global g_use_system_proxies - global g_useauthproxy - global g_client_fqdn - global g_root_cert - global g_authproxyurl - - # read parameters from config file - conf_file = '~/etc/ciscoapi/ciscoapi.json' - conf_file = expanduser(conf_file) - - if isfile(conf_file): +from os.path import ( + expanduser, +) +from typing import Dict, List +from cisco_live_cycle_utils import ( + log_message, +) + + +class Settings: + def __init__(self): + conf_file = '~/etc/ciscoapi/ciscoapi.json' + conf_file = expanduser(conf_file) + with open(conf_file) as f: try: - config = json.load(f) + self.__settings = json.load(f) except ValueError as e: - logging.warning(f'snmp_cisco_eox:status:JSON load error: {e}') + log_message(f'snmp_cisco_eox:status:JSON load error: {e}', level='WARNING') + exit() + except FileNotFoundError as e: + log_message(f'Config file not found {e}.', level='CRITICAL') exit() - g_client_id = config['cisco_api']['client_id'] - g_client_secret = config['cisco_api']['client_secret'] - if config['global'].get('http_proxy'): - g_proxies.update({'http': config['global'].get('http_proxy')}) - if config['global'].get('https_proxy'): - g_proxies.update({'https': config['global'].get('https_proxy')}) - g_use_system_proxies = (config['global'].get('use_system_proxies', False)) - # parameters for authproxy - g_useauthproxy = config['cisco_api']['useauthproxy'] - if g_useauthproxy: - g_client_fqdn = config['cisco_api'].get('client_fqdn') - g_root_cert = config['cisco_api'].get('root_cert') - g_authproxyurl = config['cisco_api'].get('authproxyurl') - else: - logger.critical('Config file not found (%s).' % conf_file) - return False - - #if g_proxies == {}: - # if g_use_system_proxies: - # g_proxies = urllib.getproxies() - - return True - - -# Request Cisco API access token, token is valid for 1 hour (by default) -def get_access_token(): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - - # define global variables - global g_client_id - global g_client_secret - global g_proxies - global g_useauthproxy - global g_client_fqdn - global g_root_cert - global g_authproxyurl - - # define local variables - access_token = '' - lifetime = 0 - grant_type = 'client_credentials' - authheaders = {'content-Type': 'application/x-www-form-urlencoded', 'accept': 'application/json'} - authurl = 'https://cloudsso.cisco.com/as/token.oauth2' - param_missing = False - verify = True - - if read_config(): - if g_client_id == '': - logging.critical('Option client_id missing.') - param_missing = True - if g_client_secret == '': - logging.critical('Option client_secret missing.') - param_missing = True - # parameters for authproxy - if g_useauthproxy: - # if not g_root_cert: - # g_root_cert = expanduser(g_root_cert) - # if not isfile(g_root_cert): - # logging.critical('Root Cert not found: %s' % g_root_cert) - # param_missing = True - if g_authproxyurl == '': - logging.critical('Option authproxyurl missing') - param_missing = True - - else: - return {'access_token': access_token, 'lifetime': lifetime} - - if param_missing: - return {'access_token': access_token, 'lifetime': lifetime} - - authreqdata = {'client_id': g_client_id, 'client_secret': g_client_secret, 'grant_type': grant_type} - - # Disable invalid certificate warnings. - # requests.packages.urllib3.disable_warnings() - - # modify request if we use an authentication proxy - if g_useauthproxy: - if g_client_fqdn != '': - authreqdata.update({'client_fqdn': g_client_fqdn}) - verify = g_root_cert - authurl = g_authproxyurl - - logger.debug('auth request: authurl=%s headers=%s data=%s proxies=%s verify=%s' % (authurl, authheaders, authreqdata, g_proxies, verify)) - response = requests.post(authurl, headers=authheaders, data=authreqdata, proxies=g_proxies, verify=verify) - - logger.debug('response.text: %s' % response.text) - if response.ok: - authresponse = json.loads(response.text) - access_token = authresponse.get("access_token") - # access_token = access_token.decode('ascii') - lifetime = int(authresponse.get("expires_in")) + self.__base_path = '~/var/ciscoapi' + self.__auth_proxy_url = 'https://cmk.bech-noc.de/api/cauthproxy.py' + self.__proxies = {} + self.__wait_after_start = True + self.__max_wait_time = 15 + self.__log_level = 'warning' + self.__eox_refresh_known = 31 + self.__eox_refresh_unknown = 7 + self.__sn2info_refresh_covered = 31 + self.__sn2info_refresh_not_covered = 7 + self.__bug_refresh_found = 2 + self.__bug_refresh_not_found = 1 + self.__psirt_refresh_found = 1 + self.__psirt_refresh_not_found = 1 + self.__suggestion_refresh_found = 31 + self.__suggestion_refresh_not_found = 7 + + if self.__settings['global'].get('http_proxy'): + self.__proxies .update({'http': self.__settings['global'].get('http_proxy')}) + if self.__settings['global'].get('https_proxy'): + self.__proxies .update({'https': self.__settings['global'].get('https_proxy')}) + + @property + def client_id(self) -> str: + return self.__settings['cisco_api']['client_id'] + + @property + def client_secret(self) -> str: + return self.__settings['cisco_api']['client_secret'] + + @property + def proxies(self) -> Dict[str, str]: + return self.__proxies + + @property + def use_system_proxies(self) -> bool: + return self.__settings['global'].get('use_system_proxies', False) + + @property + def use_auth_proxy(self) -> bool: + return self.__settings['cisco_api'].get('use_auth_proxy', False) + + @property + def client_fqdn(self) -> str: + return self.__settings['cisco_api'].get('client_fqdn') + + @property + def root_cert(self) -> bool: + return self.__settings['cisco_api'].get('root_cert', False) + + @property + def auth_proxy_url(self) -> str: + return self.__settings['cisco_api'].get('auth_proxy_url') + + @property + def base_path(self) -> str: + return self.__settings['global'].get('base_path', self.__base_path) + + @property + def wait_after_start(self) -> bool: + return self.__settings['global'].get('wait_after_start', self.__wait_after_start) + + @property + def max_wait_time(self) -> int: + return self.__settings['global'].get('max_wait_time', self.__max_wait_time) + + @property + def log_level(self) -> str: + return self.__settings['global'].get('log_level', self.__log_level) + + @property + def eox_refresh_known(self) -> int: + return self.__settings['eox'].get('refresh_known', self.__eox_refresh_known) + + @property + def eox_refresh_unknown(self) -> int: + return self.__settings['eox'].get('refresh_known', self.__eox_refresh_unknown) + + @property + def sn2info_refresh_covered(self) -> int: + return self.__settings['sn2info'].get('refresh_covered', self.__sn2info_refresh_covered) + + @property + def sn2info_refresh_not_covered(self) -> int: + return self.__settings['sn2info'].get('refresh_not_covered', self.__sn2info_refresh_not_covered) + + @property + def bug_refresh_found(self) -> int: + return self.__settings['bug'].get('refresh_found', self.__bug_refresh_found) + + @property + def bug_refresh_not_found(self) -> int: + return self.__settings['bug'].get('refresh_found', self.__bug_refresh_not_found) + + @property + def psirt_refresh_found(self) -> int: + return self.__settings['psirt'].get('refresh_found', self.__psirt_refresh_found) + + @property + def psirt_refresh_not_found(self) -> int: + return self.__settings['psirt'].get('refresh_not_found', self.__psirt_refresh_not_found) + + @property + def suggestion_refresh_found(self) -> int: + return self.__settings['suggestion'].get('refresh_found', self.__suggestion_refresh_found) + + @property + def suggestion_refresh_not_found(self) -> int: + return self.__settings['suggestion'].get('refresh_not_found', self.__suggestion_refresh_not_found) + + +class AccessToken: + def __init__( + self, + client_id: str, + client_secret: str, + ): + self.__client_id = client_id + self.__client_secret = client_secret + self.__proxies = {} + self.__use_auth_proxy = None + self.__client_fqdn = '' + self.__root_cert = True + self.__auth_proxy_url = 'https://cmk.bech-noc.de/api/cauthproxy.py' + self.__access_token = '' + self.__lifetime = 0 + self.__time = 0 + self.__auth_headers = { + 'content-Type': 'application/x-www-form-urlencoded', + 'accept': 'application/json' + } + self.__grant_type = 'client_credentials' + self.__auth_url = 'https://id.cisco.com/oauth2/default/v1/token' + self.__verify = True + self.__auth_req_data = { + 'client_id': self.__client_id, + 'client_secret': self.__client_secret, + 'grant_type': self.__grant_type + } + + @property + def token(self) -> str: + + if self.__access_token and time.time() < self.__lifetime: + return self.__access_token + else: + self.__time = time.time() + response = requests.post( + self.__auth_url, + headers=self.__auth_headers, + data=self.__auth_req_data, + proxies=self.__proxies, + verify=self.__verify) - return {'access_token': access_token, 'lifetime': lifetime} + if response.ok: + auth_response = json.loads(response.text) + self.__lifetime = self.__time + int(auth_response.get("expires_in")) - 60 + self.__access_token = auth_response.get("access_token") + return self.__access_token # generic cisco api request for all get info by serialnumber -def get_info_by_serials(serials, access_token, requrl, max_serials): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - - # define global variables - global g_proxies - +def get_info_by_serials( + serials: List[str], + access_token: AccessToken, + req_url: str, + max_serials: int, + settings: Settings +): # locale variablen max_serial_length = 40 max_req_per_second = 5 @@ -171,9 +222,6 @@ def get_info_by_serials(serials, access_token, requrl, max_serials): count = 1 info = [] - # read config, we need only the proxies - read_config() - # split list of Serials in chunks of max 75 serials, each max 40 bytes length for serial in serials: if len(serial) <= max_serial_length: @@ -185,32 +233,26 @@ def get_info_by_serials(serials, access_token, requrl, max_serials): count = 1 optimisedserials.append(serialsstr[:-1]) - headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token} + headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token.token} count = 0 for serials in optimisedserials: # Disable invalid certificate warnings. # requests.packages.urllib3.disable_warnings() - apiresponse = requests.get(requrl + serials, headers=headers, proxies=g_proxies) + response = requests.get(req_url + serials, headers=headers, proxies=settings.proxies) count += 1 # only 5 request per second are allowed if count == max_req_per_second: time.sleep(wait_time) count = 0 - if apiresponse.ok: - apiresponse.encoding = 'UTF-8' - info.append(json.loads(apiresponse.text)) + if response.ok: + response.encoding = 'UTF-8' + info.append(json.loads(response.text)) return info -def get_eox_by_pid(pids, access_token): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - - # define global variables - global g_proxies - +def get_eox_by_pid(pids: List[str], access_token: AccessToken, settings: Settings): # local variables max_pid_length = 240 max_pids = 20 @@ -221,9 +263,6 @@ def get_eox_by_pid(pids, access_token): count = 1 eoxr = [] - # read config, we need only the proxies - read_config() - # split list of PIDs in chunks of max 240 bytes length for pid in pids: if (len(pidstr) + len(pid)) < max_pid_length: @@ -234,15 +273,14 @@ def get_eox_by_pid(pids, access_token): pidstr = '' count = 1 optimisedpids.append(pidstr[:-1]) - - headers = {'accept': 'application/json', 'Authorization': f'Bearer {access_token}'} - requrl = 'https://api.cisco.com/supporttools/eox/rest/5/EOXByProductID/1/' + headers = {'accept': 'application/json', 'Authorization': f'Bearer {access_token.token}'} + req_url = 'https://apix.cisco.com/supporttools/eox/rest/5/EOXByProductID/1/' count = 0 for productids in optimisedpids: # Disable invalid certificate warnings. # requests.packages.urllib3.disable_warnings() - eoxresponse = requests.get(requrl + productids, headers=headers, proxies=g_proxies) + eoxresponse = requests.get(req_url + productids, headers=headers, proxies=settings.proxies) count += 1 # only 5 request per second are allowed if count == max_req_per_second: @@ -253,181 +291,108 @@ def get_eox_by_pid(pids, access_token): return eoxr -def get_eox_by_serials(serials, access_token): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) +def get_eox_by_serials(serials: List[str], access_token: AccessToken, settings: Settings): max_serials = 20 - requrl = 'https://api.cisco.com/supporttools/eox/rest/5/EOXBySerialNumber/1/' - info = get_info_by_serials(serials, access_token, requrl, max_serials) - return info - - -def get_coverage_status_by_serials(serials, access_token): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - max_serials = 75 - requrl = 'https://api.cisco.com/sn2info/v2/coverage/status/serial_numbers/' - info = get_info_by_serials(serials, access_token, requrl, max_serials) + req_url = 'https://apix.cisco.com/supporttools/eox/rest/5/EOXBySerialNumber/1/' + info = get_info_by_serials(serials, access_token, req_url, max_serials, settings) return info -def get_coverage_summary_by_serials(serials, access_token): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) +def get_coverage_summary_by_serials(serials: List[str], access_token: AccessToken, settings: Settings): max_serials = 75 - requrl = 'https://api.cisco.com/sn2info/v2/coverage/summary/serial_numbers/' - info = get_info_by_serials(serials, access_token, requrl, max_serials) + req_url = 'https://apix.cisco.com/sn2info/v2/coverage/summary/serial_numbers/' + info = get_info_by_serials(serials, access_token, req_url, max_serials, settings) return info -def get_orderable_pid_by_serials(serials, access_token): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - max_serials = 75 - requrl = 'https://api.cisco.com/sn2info/v2/identifiers/orderable/serial_numbers/' - info = get_info_by_serials(serials, access_token, requrl, max_serials) - return info - - -def get_owner_coverage_status_by_serials(serials, access_token): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - max_serials = 75 - requrl = 'https://api.cisco.com/sn2info/v2/coverage/owner_status/serial_numbers/' - info = get_info_by_serials(serials, access_token, requrl, max_serials) - return info - - -def get_psirt_by_ios_version(psirtios, access_token): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - +def get_psirt_by_ios_version(psirtios: List[str], access_token: AccessToken, settings: Settings): info = [] - # read config, we need only the proxies - read_config() - - headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token} - requrl = 'https://api.cisco.com/security/advisories/ios?version=' + headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token.token} + req_url = 'https://apix.cisco.com/security/advisories/ios?version=' # requests.packages.urllib3.disable_warnings() - if list(psirtios) != []: + if list(psirtios) is not []: for ios_version in psirtios: - logger.debug('request ios_version: %s, time: %s' % (ios_version, time.asctime(time.localtime(time.time())))) - psirt_response = requests.get(requrl + ios_version, headers=headers, proxies=g_proxies) + log_message('request ios_version: %s, time: %s' % (ios_version, time.asctime(time.localtime(time.time())))) + psirt_response = requests.get(req_url + ios_version, headers=headers, proxies=settings.proxies) if psirt_response.ok: - logger.debug('ok ios_version: %s, time: %s, len: %s' % (ios_version, time.asctime(time.localtime(time.time())), len(str(psirt_response)))) - psirt_response.encoding = 'UTF-8' # makes json.loads() mutch more faster (from 4 min. down to 1 sec for about 2MB) + log_message( + f'ok. ios_version: {ios_version}, ' + f'time: {time.asctime(time.localtime(time.time()))}, ' + f'len: {len(str(psirt_response))}' + ) + # makes json.loads() mutch more faster (from 4 min. down to 1 sec for about 2MB) + psirt_response.encoding = 'UTF-8' response = (json.loads(psirt_response.text)) - logger.debug('response loaded: ios_version: %s, time: %s, len: %s' % (ios_version, time.asctime(time.localtime(time.time())), len(str(response)))) + log_message( + f'response loaded: ios_version: {ios_version}, ' + f'time: {time.asctime(time.localtime(time.time()))}, ' + f'len: {len(str(response))}' + ) info.append({'version': ios_version, 'advisories': response.get('advisories', 'notfound')}) - logger.debug('ciscoapi:psirt-ios-found: %s' % info) + log_message('ciscoapi:psirt-ios-found: %s' % info) else: - logger.debug('notfound ios_version: %s, time: %s, len: %s' % (ios_version, time.asctime(time.localtime(time.time())), len(str(psirt_response)))) + log_message( + f'notfound. ios_version: {ios_version}, ' + f'time: {time.asctime(time.localtime(time.time()))}, ' + f'len: {len(str(psirt_response))}' + ) info.append({'version': ios_version, 'advisories': 'notfound'}) - logger.debug('ciscoapi:psirt-ios-notfound: %s' % info) + log_message('ciscoapi:psirt-ios-notfound: %s' % info) return info -def get_psirt_by_iosxe_version(psirtios, access_token): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - +def get_psirt_by_iosxe_version(psirtios: List[str], access_token: AccessToken, settings: Settings): info = [] - # read config, we need only the proxies - read_config() - - headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token} - requrl = 'https://api.cisco.com/security/advisories/iosxe?version=' + headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token.token} + req_url = 'https://apix.cisco.com/security/advisories/iosxe?version=' # requests.packages.urllib3.disable_warnings() - if list(psirtios) != []: + if list(psirtios) is not []: for ios_version in psirtios: - psirt_response = requests.get(requrl + ios_version, headers=headers, proxies=g_proxies) + psirt_response = requests.get(req_url + ios_version, headers=headers, proxies=settings.proxies) if psirt_response.ok: psirt_response.encoding = 'UTF-8' response = (json.loads(psirt_response.text)) info.append({'version': ios_version, 'advisories': response.get('advisories', 'notfound')}) - logger.debug('ciscoapi:psirt-iosxe-found: %s' % info) + log_message(f'ciscoapi:psirt-iosxe-found: {info}') else: info.append({'version': ios_version, 'advisories': 'notfound'}) - logger.debug('ciscoapi:psirt-iosxe-notfound: %s' % info) + log_message(f'ciscoapi:psirt-iosxe-notfound: {info}') return info -def get_psirt_by_product_family(families, access_token): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - +def get_psirt_by_product_family(families: List[str], access_token: AccessToken, settings: Settings): info = [] - # read config, we need only the proxies - read_config() - headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token} - requrl = 'https://api.cisco.com/security/advisories/cvrf/product?product=' + headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token.token} + req_url = 'https://apix.cisco.com/security/advisories/cvrf/product?product=' # requests.packages.urllib3.disable_warnings() - if list(families) != []: - + if list(families) is not []: for family in families: - psirt_response = requests.get(requrl + family, headers=headers, proxies=g_proxies) + psirt_response = requests.get(req_url + family, headers=headers, proxies=settings.proxies) if psirt_response.ok: psirt_response.encoding = 'UTF-8' response = (json.loads(psirt_response.text)) info.append({'family': family, 'advisories': response.get('advisories', 'notfound')}) - logger.debug('ciscoapi:psirt-family-found: %s' % info) + log_message('ciscoapi:psirt-family-found: %s' % info) else: info.append({'family': family, 'advisories': 'notfound'}) - logger.debug('ciscoapi:psirt-family-notfound: %s' % info) + log_message('ciscoapi:psirt-family-notfound: %s' % info) return info -def get_suggested_image_and_release_by_pid(pids, access_token): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - - info = [] - - # read config, we need only the proxies - read_config() - - headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token} - # requrl = 'https://api.cisco.com/software/suggestion/v1.0/suggestions/productid/' - requrl = 'https://api.cisco.com/software/suggestion/v2/suggestions/software/productIds/' - - # requests.packages.urllib3.disable_warnings() - if list(pids) != []: - for pid in pids: - suggestion_response = requests.get(requrl + pid, headers=headers, proxies=g_proxies) - if suggestion_response.ok: - suggestion_response.encoding = 'UTF-8' - response = (json.loads(suggestion_response.text)) - logger.debug('cisco-api:get_suggested_image_and_release_by_pid:response: %s' % response) - info.append({'pid': pid, 'suggetion': response.get('advisories', 'notfound')}) - logger.debug('ciscoapi:info-found: %s' % info) - else: - info.append({'pid': pid, 'advisories': 'notfound'}) - logger.debug('ciscoapi:info-notfound: %s' % info) - return info - - -def get_suggested_release_by_pid(pids, access_token): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - +def get_suggested_release_by_pid(pids: List[str], access_token: AccessToken, settings: Settings): max_pid_length = 240 max_pids = 10 - max_req_per_second = 5 - wait_time = 5 optimisedpids = [] pidstr = '' count = 1 info = [] - # read config, we need only the proxies - read_config() - # split list of PIDs in chunks of max 240 bytes length for pid in pids: if (len(pidstr) + len(pid)) < max_pid_length: @@ -439,32 +404,24 @@ def get_suggested_release_by_pid(pids, access_token): count = 1 optimisedpids.append(pidstr[:-1]) - headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token} - # requrl = 'https://api.cisco.com/software/suggestion/v1.0/suggestions/releases/' - requrl = 'https://api.cisco.com/software/suggestion/v2/suggestions/releases/productIds/' + headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token.token} + req_url = 'https://apix.cisco.com/software/suggestion/v2/suggestions/releases/productIds/' # requests.packages.urllib3.disable_warnings() - if optimisedpids != []: + if optimisedpids is not []: for productids in optimisedpids: - suggestion_response = requests.get(requrl + productids, headers=headers, proxies=g_proxies) - logger.debug('cisco-api:get_sugessted_release_by_pid:response:response.text %s' % suggestion_response.text) + suggestion_response = requests.get(req_url + productids, headers=headers, proxies=settings.proxies) + log_message('cisco-api:get_sugessted_release_by_pid:response:response.text %s' % suggestion_response.text) if suggestion_response.ok: suggestion_response.encoding = 'UTF-8' response = (json.loads(suggestion_response.text)) - logger.debug('cisco-api:get_suggested_release_by_pid:response: %s' % response) + log_message('cisco-api:get_suggested_release_by_pid:response: %s' % response) info.append(response.get('productList', 'notfound')) - logger.debug('ciscoapi:info-found: %s' % info) + log_message('ciscoapi:info-found: %s' % info) return info -def get_sugessted_compatible_release_by_pid(pids, access_token): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - - return - - # get_clean_sn_for_bug_api('ASA5510', '9.1(7)15,8.4(7)30,9.1(6)1') # return {'9.1(7)15': '9.1(7.15)', '9.1(6)1': '9.1(6.1)', '8.4(7)30': '8.4(7.30)'} def get_clean_sn_for_bug_api(pid, software_releases): @@ -525,22 +482,21 @@ def get_clean_sn_for_bug_api(pid, software_releases): return clean_sns -def get_bug_by_pid_and_release(pid, release, access_token, reqoptions): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - +def get_bug_by_pid_and_release(pid, release, access_token: AccessToken, reqoptions, settings: Settings): info = [] - # read config, we need only the proxies - read_config() - headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token} - requrl = 'https://api.cisco.com/bug/v2.0/bugs/products/product_id/' + headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token.token} + req_url = 'https://apix.cisco.com/bug/v2.0/bugs/products/product_id/' if release != '': software_releases = get_clean_sn_for_bug_api(pid, release) clean_sn = ','.join(software_releases.values()) missing = {} - bug_response = requests.get(requrl + '%s/software_releases/%s%s' % (pid, clean_sn, reqoptions), headers=headers, proxies=g_proxies) + bug_response = requests.get( + url=req_url + f'{pid}/software_releases/{clean_sn}{reqoptions}', + headers=headers, + proxies=settings.proxies + ) if bug_response.ok: bug_response.encoding = 'UTF-8' response = (json.loads(bug_response.text)) @@ -548,7 +504,8 @@ def get_bug_by_pid_and_release(pid, release, access_token, reqoptions): pagination_record = response.get('pagination_response_record') last_page = int(pagination_record.get('last_index')) total_records = int(pagination_record.get('total_records')) - logger.info('PID: %s, Version: %s, Total records: %s, Pages: %s' % (pid, clean_sn, total_records, last_page)) + log_message(message=f'PID: {pid}, Version: {clean_sn}, Total records: {total_records}, Pages: {last_page}', + level='INFO') if last_page > 1: if reqoptions != '': reqoptions = reqoptions + '&page_index=' @@ -557,7 +514,8 @@ def get_bug_by_pid_and_release(pid, release, access_token, reqoptions): for page in range(2, last_page + 1): # time.sleep(2) page_options = reqoptions + '%s' % page - bug_response = requests.get(requrl + '%s/software_releases/%s%s' % (pid, clean_sn, page_options), headers=headers, proxies=g_proxies) + bug_response = requests.get(req_url + f'{pid}/software_releases/{clean_sn}{page_options}', + headers=headers, proxies=settings.proxies) if bug_response.ok: response = (json.loads(bug_response.text)) bug_list += response.get('bugs') @@ -567,7 +525,9 @@ def get_bug_by_pid_and_release(pid, release, access_token, reqoptions): reason = bug_response.reason url = bug_response.url text = bug_response.text - logging.warning('ciscoapi error: %s, %s, Page: %s, LastPage: %s, URL: \'%s\'. Text: \'%s\'' % (status_code, reason, page, last_page, url, text)) + log_message(message=f'ciscoapi error: {status_code}, {reason}, Page: {page}, LastPage: ' + f'{last_page}, URL: \'{url}\'. Text: \'{text}\'', + level='WARNING') missing.update({page: {'status_code': status_code, 'reason': reason, 'url': url, @@ -577,7 +537,7 @@ def get_bug_by_pid_and_release(pid, release, access_token, reqoptions): 'bugs': bug_list, 'total_records': total_records, 'missing': missing}) - logger.debug('ciscoapi:bug-found: %s' % info) + log_message('ciscoapi:bug-found: %s' % info) else: bug_response.encoding = 'UTF-8' status_code = bug_response.status_code @@ -585,7 +545,7 @@ def get_bug_by_pid_and_release(pid, release, access_token, reqoptions): url = bug_response.url text = bug_response.text page = 'ALL' - logging.warning('ciscoapi error: %s, %s, URL: \'%s\'. Text: \'%s\'' % (status_code, reason, url, text)) + log_message(f'ciscoapi error: {status_code}, {reason}, URL: \'{url}\'. Text: \'{text}\'', level='WARNING') missing.update({page: {'status_code': status_code, 'reason': reason, 'url': url, @@ -598,54 +558,14 @@ def get_bug_by_pid_and_release(pid, release, access_token, reqoptions): return info -def get_bug_by_productseries_and_affected_release(ps_release, access_token, reqoptions): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - - info = [] - - # read config, we need only the proxies - read_config() - headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token} - requrl = 'https://api.cisco.com/bug/v2.0/bugs/product_series/' - - if len(ps_release.keys()) > 0: - for product_series in ps_release.keys(): - affected_releases = ps_release.get(product_series).split(',') - for affected_release in affected_releases: - bug_response = requests.get(requrl + '%s/affected_releases/%s%s' % (product_series, affected_release, reqoptions), headers=headers, proxies=g_proxies) - if bug_response.ok: - bug_response.encoding = 'UTF-8' - response = (json.loads(bug_response.text)) - last_page = int(response.get('pagination_response_record').get('last_index')) - total_records = int(response.get('pagination_response_record').get('total_records')) - logger.debug('Product series: %s, Version: %s, Total records: %s, Pages: %s' % (product_series, affected_release, total_records, last_page)) - - info.append({'produckt_series': product_series, 'affected_releases': affected_releases, 'bugs': response}) - logger.debug('ciscoapi:bug-found: %s' % info) - else: - info.append({'produckt_series': product_series, 'affected_releases': affected_releases, 'bugs': 'notfound'}) - logger.debug('ciscoapi:bug-not-found: %s' % info) - - return info - - -def get_product_mdf_information_by_pid(pids, access_token): - # set logg modul name <file>:<module>.<function> - logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name) - +def get_product_mdf_information_by_pid(pids, access_token: AccessToken, settings: Settings): max_pid_length = 40 max_pids = 5 - max_req_per_second = 2 - wait_time = 5 optimisedpids = [] pidstr = '' count = 1 info = [] - # read config, we need only the proxies - read_config() - # split list of PIDs in chunks of max 240 bytes length for pid in pids: if (len(pidstr) + len(pid)) < max_pid_length: @@ -657,21 +577,23 @@ def get_product_mdf_information_by_pid(pids, access_token): count = 1 optimisedpids.append(pidstr[:-1]) - headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token} - requrl = 'https://api.cisco.com/product/v1/information/product_ids_mdf/' -# requrl = 'https://api.cisco.com/product/v1/information/product_ids/' + headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token.token} + req_url = 'https://apix.cisco.com/product/v1/information/product_ids_mdf/' +# req_url = 'https://apix.cisco.com/product/v1/information/product_ids/' # requests.packages.urllib3.disable_warnings() - if optimisedpids != []: + if optimisedpids is not []: for productids in optimisedpids: - productinfo_response = requests.get(requrl + productids, headers=headers, proxies=g_proxies) - logger.debug('cisco-api:get_product_mdf_information_by_pid:response:response.text %s' % productinfo_response.text) + productinfo_response = requests.get(req_url + productids, headers=headers, proxies=settings.proxies) + log_message( + f'cisco-api:get_product_mdf_information_by_pid:response:response.text {productinfo_response.text}' + ) if productinfo_response.ok: productinfo_response.encoding = 'UTF-8' response = (json.loads(productinfo_response.text)) - logger.debug('cisco-api:get_product_mdf_information_by_pid:response: %s' % response) + log_message('cisco-api:get_product_mdf_information_by_pid:response: %s' % response) # info.append(response) info.extend(response.get('product_list', 'notfound')) - logger.debug('ciscoapi:info-found: %s' % info) + log_message('ciscoapi:info-found: %s' % info) - return info \ No newline at end of file + return info diff --git a/gui/views/inv_cisco_livecycle.py b/gui/views/inv_cisco_livecycle.py new file mode 100644 index 0000000000000000000000000000000000000000..acca1e3d10cae476a1e4dc2ac10c6bc638b27e3d --- /dev/null +++ b/gui/views/inv_cisco_livecycle.py @@ -0,0 +1,307 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# License: GNU General Public License v2 +# +# Author: thl-cmk[at]outlook[dot]com +# URL : https://thl-cmk.hopto.org +# Date : 2017-08-14 +# +# CheckMK views for Cisco support APIs (EoX, SN2Info, PSIRT, SUGGESTION) +# +# 2021-07-23: rewrite for CMK 2.0 +# suggestion removed --> table to complicated :-( +# +# 2021-07-25: removed inv_cisco_suggestion +# rework painter section + +import time +from cmk.gui.plugins.views.inventory import ( + declare_invtable_view, + decorate_inv_paint, + PaintResult, +) +from cmk.gui.plugins.visuals.inventory import ( + FilterInvtableText, +) +from cmk.gui.i18n import _ +from cmk.gui.plugins.views.utils import ( + inventory_displayhints, +) +from cmk.gui.htmllib import HTML + +# +# to get the status colors and the clickable IDs for EOL, PSIRT and Bug you need to copy the file +# ~/lib/check_mk/gui/plugins/views/inventory.py to ~/local/lib/check_mk/gui/plugins/views. +# Then change the value of ENABLE_PAINTERS to True and restart the apache service (omd restart apache) + +ENABLE_PAINTERS = True + +# ################################################################################# +# +# Painter functions START +# + + +@decorate_inv_paint() +def inv_paint_date_status(date_string) -> PaintResult: + + warn_days = -90 + crit_days = -30 + + # check if date_sting not None, if so return no CSS Class and None + if date_string is None: + return '', '' + + try: + days = int((time.time() - time.mktime(time.strptime(date_string, '%Y-%m-%d'))) / 86400) + except ValueError: + return '', date_string + + if days > crit_days: + css_class = 'date_crit' + elif days > warn_days: + css_class = 'date_warn' + else: + css_class = 'date_default' + + return css_class, '%s' % date_string + + +@decorate_inv_paint() +def inv_paint_last_checked_status(date_string) -> PaintResult: + warn_days = 32 + crit_days = 40 + if date_string is None: + return '', '' + try: + days = int((time.time() - time.mktime(time.strptime(date_string, '%Y-%m-%d'))) / 86400) + except ValueError: + return '', date_string + if days <= warn_days: + css_class = '' + elif days >= crit_days: + css_class = 'date_crit' + else: + css_class = 'date_warn' + return css_class, ' %s' % date_string + + +@decorate_inv_paint() +def inv_paint_psirt_advisoryid(advisoryId) -> PaintResult: + psirt_url = HTML( + f'<a class="href_blue" target="_blank" ' + f'href="https://tools.cisco.com/security/center/content/CiscoSecurityAdvisory/{advisoryId}">{advisoryId}</a>') + return '', psirt_url + + +@decorate_inv_paint() +def inv_paint_eox_eolid(eolid) -> PaintResult: + if eolid is not None: + search_eolid_url = HTML( + f'<a class="href_blue" target="_blank" ' + f'href="https://search.cisco.com/search?query={eolid}">{eolid}</a>') + else: + search_eolid_url = '' + return '', search_eolid_url + + +@decorate_inv_paint() +def inv_paint_bug_bugid(bugid) -> PaintResult: + if bugid is not None: + search_bugid_url = HTML( + f'<a class="href_blue" target="_blank" ' + f'href="https://bst.cloudapps.cisco.com/bugsearch/bug/{bugid}">{bugid}</a>') + else: + search_bugid_url = '' + return '', search_bugid_url + + +@decorate_inv_paint() +def inv_paint_psirt_bugid(bugids) -> PaintResult: + search_bugid_url = '' + bugids = bugids.split(',') + if bugids: + search_bugid_url = [] + for bugid in bugids: + bugid = bugid.strip(' ') + search_bugid_url.append(f'<a class="href_blue" target="_blank" ' + f'href="https://bst.cloudapps.cisco.com/bugsearch/bug/{bugid}">{bugid}</a>') + search_bugid_url = HTML(', '.join(search_bugid_url)) + return '', search_bugid_url + +# +# Painter functions END +# +# ################################################################################# + + +# EoX display hints +inventory_displayhints.update({ + '.hardware.support': {'title': _('Cisco Live Cycle')}, + '.hardware.support.cisco_eox:': { + 'title': _('EoX'), + 'keyorder': + [ + 'pid', 'serial_number', 'ProductIDDescription', 'Last_checked', 'ProductBulletinNumber', + 'EOXExternalAnnouncementDate', 'EndOfSaleDate', 'LastDateOfSupport', 'EndOfSvcAttachDate', + 'UpdatedTimeStamp', + ], + 'view': 'invciscoeox_of_host', + }, + '.hardware.support.cisco_eox:*.pid': {'title': _('PID (EoX)'), }, + '.hardware.support.cisco_eox:*.serial_number': {'title': _('Serial number'), }, + '.hardware.support.cisco_eox:*.ProductIDDescription': {'title': _('Description'), }, + '.hardware.support.cisco_eox:*.LinkToProductBulletinURL': {'title': _('EOL bulletin URL'), }, + '.hardware.support.cisco_eox:*.UpdatedTimeStamp': {'title': _('EOL bulletin last update'), }, + '.hardware.support.cisco_eox:*.MigrationProductId': {'title': _('Migration PID'), }, + '.hardware.support.cisco_eox:*.MigrationInformation': {'title': _('Migration information'), }, + '.hardware.support.cisco_eox:*.MigrationProductInfoURL': {'title': _('Migration PID URL'), }, + '.hardware.support.cisco_eox:*.MigrationProductName': {'title': _('Migration product name'), }, +}) + +# SN2Info (contract) display hints +inventory_displayhints.update({ + '.hardware.support.cisco_contract:': { + 'title': _('Contract'), + 'keyorder': [ + 'pid', 'serial_number', 'ProductIDDescription', 'Last_checked', 'is_covered', 'service_contract_number', + 'covered_product_line_end_date', + ], + 'view': 'invciscocontract_of_host', + }, + '.hardware.support.cisco_contract:*.pid': {'title': _('PID (contract)'), }, + '.hardware.support.cisco_contract:*.serial_number': {'title': _('Serial number'), }, + '.hardware.support.cisco_contract:*.ProductIDDescription': {'title': _('Description'), }, + '.hardware.support.cisco_contract:*.is_covered': {'title': _('is covered'), }, + '.hardware.support.cisco_contract:*.contract_site_customer_name': {'title': _('Customer name'), }, + '.hardware.support.cisco_contract:*.contract_site_address1': {'title': _('Address'), }, + '.hardware.support.cisco_contract:*.contract_site_city': {'title': _('City'), }, + '.hardware.support.cisco_contract:*.contract_site_state_province': {'title': _('State/Province'), }, + '.hardware.support.cisco_contract:*.contract_site_country': {'title': _('Country'), }, + '.hardware.support.cisco_contract:*.service_line_descr': {'title': _('Service description'), }, + '.hardware.support.cisco_contract:*.service_contract_number': {'title': _('Contract number'), }, + '.hardware.support.cisco_contract:*.parent_sr_no': {'title': _('Parent S/N'), }, + '.hardware.support.cisco_contract:*.warranty_type': {'title': _('Warranty type'), }, + '.hardware.support.cisco_contract:*.warranty_type_description': {'title': _('Warranty Description'), }, + '.hardware.support.cisco_contract:*.warranty_end_date': {'title': _('Warranty end date'), }, +}) + +# BUG display hints +inventory_displayhints.update({ + '.software.support.cisco_bug.Total_records': {'title': _('Records total'), }, + '.software.support.cisco_bug.duplicate_records': {'title': _('Records duplicate'), }, + '.software.support.cisco_bug.missing_records': {'title': _('Records missing'), }, + '.software.support.cisco_bug.PID': {'title': _('PID'), }, + '.software.support.cisco_bug.os_version': {'title': _('OS version'), }, + '.software.support.cisco_bug.bugs:': { + 'title': _('Cisco BUG IDs'), + 'keyorder': [ + 'bug_id', 'severity', 'status', 'last_modified_date', 'headline', 'support_case_count', 'behavior_changed', + ], + 'view': 'invciscobugs_of_host', + }, + '.software.support.cisco_bug.bugs:*.status': {'title': _('Status'), }, + '.software.support.cisco_bug.bugs:*.product': {'title': _('Product'), }, + '.software.support.cisco_bug.bugs:*.description': {'title': _('Description'), }, + '.software.support.cisco_bug.bugs:*.headline': {'title': _('Headline'), }, + '.software.support.cisco_bug.bugs:*.support_case_count': {'title': _('Support case count'), }, + '.software.support.cisco_bug.bugs:*.last_modified_date': {'title': _('Last modified date'), }, + '.software.support.cisco_bug.bugs:*.behavior_changed': {'title': _('Behavior changed'), }, + '.software.support.cisco_bug.bugs:*.base_pid': {'title': _('Base PID'), }, + '.software.support.cisco_bug.bugs:*.known_fixed_releases': {'title': _('Known fixed releases'), }, + '.software.support.cisco_bug.bugs:*.id': {'title': _('ID'), }, + '.software.support.cisco_bug.bugs:*.known_affected_releases': {'title': _('known affected releases'), }, + '.software.support.cisco_bug.bugs:*.severity': {'title': _('Severity'), }, +}) + +# PSIRT display hints +inventory_displayhints.update({ + '.software.support.cisco_psirt.dont_show_older_than': {'title': _('Don\'t show advisories not updated since'), }, + '.software.support.cisco_psirt.dont_show_not_updated_since': { + 'title': _('Don\'t show advisories not updated for X days'), }, + '.software.support.cisco_psirt.removed_advisories': {'title': _('Advisories removed'), }, + '.software.support.cisco_psirt.advisories:': { + 'title': _('Cisco PSIRT advisories'), + 'keyorder': [ + 'advisoryId', 'sir', 'cvssBaseScore', 'advisoryTitle', + ], + 'view': 'invciscopsirt_of_host', + }, + '.software.support.cisco_psirt.advisories:*.advisoryTitle': {'title': _('Advisory Title'), }, + '.software.support.cisco_psirt.advisories:*.cvssBaseScore': {'title': _('CVSS base Score'), }, + '.software.support.cisco_psirt.advisories:*.firstFixed': {'title': _('First fixed in'), }, + '.software.support.cisco_psirt.advisories:*.firstPublished': {'title': _('First Published'), }, + '.software.support.cisco_psirt.advisories:*.installed_version': {'title': _('OS version/family'), }, + '.software.support.cisco_psirt.advisories:*.lastUpdated': {'title': _('Last Updated'), }, + '.software.support.cisco_psirt.advisories:*.publicationUrl': {'title': _('Public URL'), }, + '.software.support.cisco_psirt.advisories:*.sir': {'title': _('Severity'), }, + '.software.support.cisco_psirt.advisories:*.summary': {'title': _('Summary'), }, + '.software.support.cisco_psirt.advisories:*.cwe': {'title': _('CWE'), }, + '.software.support.cisco_psirt.advisories:*.cves': {'title': _('CVEs'), }, + '.software.support.cisco_psirt.advisories:*.productNames': {'title': _('Product names'), }, + '.software.support.cisco_psirt.advisories:*.ipsSignatures': {'title': _('IPS signatures')}, + '.software.support.cisco_psirt.advisories:*.cvrfUrl': {'title': _('CVRF URL')}, + '.software.support.cisco_psirt.advisories:*.ovalUrl': {'title': _('OVAL URL')}, + '.software.support.cisco_psirt.os_version': {'title': _('OS version')}, + '.software.support.cisco_psirt.not_updated_for_x_days': {'title': _('don\'t show advisories not updated for X days')}, + '.software.support.cisco_psirt.dont_show_older_then': {'title': _('don\'t show advisories not updated after')}, +}) + + +if ENABLE_PAINTERS: + inventory_displayhints.update({ + # EoX + '.hardware.support.cisco_eox:*.EOXExternalAnnouncementDate': {'title': _('EOL Announcement'), 'paint': 'date_status'}, + '.hardware.support.cisco_eox:*.EndOfSvcAttachDate': {'title': _('End of service attachment'), 'paint': 'date_status'}, + '.hardware.support.cisco_eox:*.EndOfSecurityVulSupportDate': {'title': _('End of service vulnerability support'), 'paint': 'date_status'}, + '.hardware.support.cisco_eox:*.EndOfSWMaintenanceReleases': {'title': _('End of software maintenace releases'), 'paint': 'date_status'}, + '.hardware.support.cisco_eox:*.EndOfRoutineFailureAnalysisDate': {'title': _('End of routine failure analysis'), 'paint': 'date_status'}, + '.hardware.support.cisco_eox:*.EndOfSaleDate': {'title': _('End of sale'), 'paint': 'date_status'}, + '.hardware.support.cisco_eox:*.LastDateOfSupport': {'title': _('End of support'), 'paint': 'date_status'}, + '.hardware.support.cisco_eox:*.ProductBulletinNumber': {'title': _('EOL bulletin ID'), 'filter': FilterInvtableText,'paint': 'eox_eolid'}, + '.hardware.support.cisco_eox:*.Last_checked': {'title': _('Last checked'), 'paint': 'last_checked_status'}, + + # SN2Info + '.hardware.support.cisco_contract:*.Last_checked': {'title': _('Last checked'), 'paint': 'last_checked_status'}, + '.hardware.support.cisco_contract:*.covered_product_line_end_date': {'title': _('Contract end date'), 'paint': 'date_status'}, + + # Bug + '.software.support.cisco_bug.bugs:*.bug_id': {'title': _('Bug ID'), 'paint': 'bug_bugid'}, + + # Psirt + '.software.support.cisco_psirt.advisories:*.advisoryId': {'title': _('Advisory ID'), 'paint': 'psirt_advisoryid'}, + '.software.support.cisco_psirt.advisories:*.bugIDs': {'title': _('Bug IDs'), 'paint': 'psirt_bugid'}, + '.software.support.cisco_psirt.Last_checked': {'title': _('Last checked'), 'paint': 'last_checked_status'}, + }) +else: + inventory_displayhints.update(({ + # EoX + '.hardware.support.cisco_eox:*.EOXExternalAnnouncementDate': {'title': _('EOL Announcement')}, + '.hardware.support.cisco_eox:*.EndOfSvcAttachDate': {'title': _('End of service attachment')}, + '.hardware.support.cisco_eox:*.EndOfSecurityVulSupportDate': {'title': _('End of service vulnerability support')}, + '.hardware.support.cisco_eox:*.EndOfSWMaintenanceReleases': {'title': _('End of software maintenace releases')}, + '.hardware.support.cisco_eox:*.EndOfRoutineFailureAnalysisDate': {'title': _('End of routine failure analysis')}, + '.hardware.support.cisco_eox:*.EndOfSaleDate': {'title': _('End of sale')}, + '.hardware.support.cisco_eox:*.LastDateOfSupport': {'title': _('End of support')}, + '.hardware.support.cisco_eox:*.ProductBulletinNumber': {'title': _('EOL bulletin ID')}, + '.hardware.support.cisco_eox:*.Last_checked': {'title': _('Last checked'),}, + + # SN2Info + '.hardware.support.cisco_contract:*.Last_checked': {'title': _('Last checked')}, + '.hardware.support.cisco_contract:*.covered_product_line_end_date': {'title': _('Contract end date')}, + + # Bug + '.software.support.cisco_bug.bugs:*.bug_id': {'title': _('Bug ID')}, + + # Psirt + '.software.support.cisco_psirt.advisories:*.advisoryId': {'title': _('Advisory ID')}, + '.software.support.cisco_psirt.advisories:*.bugIDs': {'title': _('Bug IDs')}, + '.software.support.cisco_psirt.Last_checked': {'title': _('Last checked')}, + })) + + +declare_invtable_view('invciscoeox', '.hardware.support.cisco_eox:', _('Cisco EoX status'), _('Cisco EoX status')) +declare_invtable_view('invciscocontract', '.hardware.support.cisco_contract:', _('Cisco contract status'), _('Cisco contract status')) +declare_invtable_view('invciscopsirt', '.software.support.cisco_psirt.advisories:', _('Cisco PSIRT advisories'), _('Cisco PSIRT advisories')) +declare_invtable_view('invciscobugs', '.software.support.cisco_bug.bugs:', _('Cisco BUG IDs'), _('Cisco Bug IDs')) diff --git a/gui/wato/inv_cisco_bug.py b/gui/wato/inv_cisco_bug.py new file mode 100644 index 0000000000000000000000000000000000000000..c8c6b4dbe72480c0c78fca08e72681236585d3a0 --- /dev/null +++ b/gui/wato/inv_cisco_bug.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# License: GNU General Public License v2 + +# Author: thl-cmk[at]outlook[dot]com +# URL : https://thl-cmk.hopto.org +# Date : +# +# 2021-07-23: rewritten for CMK 2.0 +# + +from cmk.gui.i18n import _ +from cmk.gui.plugins.wato.utils import ( + HostRulespec, + rulespec_registry, +) +from cmk.gui.valuespec import ( + Dictionary, + ListChoice, +) +from cmk.gui.plugins.wato.inventory import ( + RulespecGroupInventory, +) + +_removecolumns_inv_cisco_bug = [ + # ('status', 'Status'), + ('product', 'Product'), + ('description', 'Description'), + # ('headline', 'Headline'), + # ('support_case_count', 'Support case count'), + # ('last_modified_date', 'Last modified date'), + # ('behavior_changed', 'Behavior changed'), + # ('bug_id', 'Bug ID'), + ('base_pid', 'Base PID'), + ('known_fixed_releases', 'Known fixed releases'), + ('id', 'ID'), + ('known_affected_releases', 'known affected releases'), + # ('severity', 'Severity'), +] + + +def _valuespec_inv_cisco_bug(): + return Dictionary( + title=_('Cisco bugs'), + elements=[ + ('removecolumns', + ListChoice( + title=_('remove columns'), + help=_('remove information from report'), + choices=_removecolumns_inv_cisco_bug, + default_value=[ + 'base_pid', + 'description', + 'id', + 'known_affected_releases', + 'product', + ], + )), + ] + ) + + +rulespec_registry.register( + HostRulespec( + group=RulespecGroupInventory, + match_type='dict', + name='inv_parameters:inv_cisco_bug', + valuespec=_valuespec_inv_cisco_bug, + )) diff --git a/gui/wato/inv_cisco_contract.py b/gui/wato/inv_cisco_contract.py new file mode 100644 index 0000000000000000000000000000000000000000..55dc4b32464c255c7e638e8f481056fce09944b7 --- /dev/null +++ b/gui/wato/inv_cisco_contract.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# License: GNU General Public License v2 + +# Author: thl-cmk[at]outlook[dot]com +# URL : https://thl-cmk.hopto.org +# Date : +# +# 2021-07-23: rewritten for CMK 2.0 +# + +from cmk.gui.i18n import _ +from cmk.gui.plugins.wato.utils import ( + HostRulespec, + rulespec_registry, +) +from cmk.gui.valuespec import ( + Dictionary, + ListChoice, + ListOfStrings, +) +from cmk.gui.plugins.wato.inventory import ( + RulespecGroupInventory, +) + +_removecolumns_inv_cisco_contract = [ + ('contract_site_address1', 'Address'), + ('contract_site_city', 'City'), + ('contract_site_country', 'Country'), + ('contract_site_customer_name', 'Customer name'), + ('parent_sr_no', 'Parent S/N'), + ('service_line_descr', 'Service description'), + ('contract_site_state_province', 'State/Province'), + ('warranty_type_description', 'Warranty Description'), + ('warranty_end_date', 'Warranty end date'), + ('warranty_type', 'Warranty type'), +] + + +def _valuespec_inv_cisco_contract(): + return Dictionary( + title=_('Cisco contract staus'), + elements=[ + ('removecolumns', + ListChoice( + title=_('remove columns'), + help=_('remove information from report'), + choices=_removecolumns_inv_cisco_contract, + default_value=[ + 'contract_site_state_province', + 'warranty_type_description', + 'warranty_end_date', + 'warranty_type', + ], + )), + ('PID_black_list', + ListOfStrings( + title=_('drop Product IDs beginning with'), + orientation='horizontal', + help=_('there will be no request for the following PID(s)'), + )), + ], + ) + + +rulespec_registry.register( + HostRulespec( + group=RulespecGroupInventory, + match_type='dict', + name='inv_parameters:inv_cisco_contract', + valuespec=_valuespec_inv_cisco_contract, + )) diff --git a/gui/wato/inv_cisco_eox.py b/gui/wato/inv_cisco_eox.py new file mode 100644 index 0000000000000000000000000000000000000000..1475fbb83ade60a296dcbede3dcec24761f286c4 --- /dev/null +++ b/gui/wato/inv_cisco_eox.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# License: GNU General Public License v2 + +# Author: thl-cmk[at]outlook[dot]com +# URL : https://thl-cmk.hopto.org +# Date : +# +# 2021-07-23: rewritten for CMK 2.0 +# + +from cmk.gui.i18n import _ +from cmk.gui.plugins.wato.utils import ( + HostRulespec, + rulespec_registry, +) +from cmk.gui.valuespec import ( + Dictionary, + FixedValue, + ListOfStrings, + ListChoice, +) +from cmk.gui.plugins.wato.inventory import ( + RulespecGroupInventory, +) + +_removecolumns_inv_cisco_eox = [ + # ('ProductIDDescription', 'PID Description'), + ('LinkToProductBulletinURL', 'EOL bulletin URL'), + ('EndOfSecurityVulSupportDate', 'End of service vulnerability support'), + ('EndOfSWMaintenanceReleases', 'End of software maintenance releases'), + ('EndOfRoutineFailureAnalysisDate', 'End of routine failure analysis'), + ('MigrationProductId', 'Migration PID'), + ('MigrationInformation', 'Migration information'), + ('MigrationProductInfoURL', 'Migration PID URL'), + ('MigrationProductName', 'Migration product name'), +] + + +def _valuespec_inv_cisco_eox(): + return Dictionary( + title=_('Cisco EoX staus'), + elements=[ + + ('always_use_serial', + FixedValue( + True, + title=_('always use serial number'), + help=_('if true, CMK will request Cisco EoX information via serial number (default is use PID)'), + )), + ('removecolumns', + ListChoice( + title=_('remove columns'), + help=_('remove information from EoX report'), + choices=_removecolumns_inv_cisco_eox, + default_value=[ + 'EndOfSecurityVulSupportDate', + 'EndOfSWMaintenanceReleases', + 'EndOfRoutineFailureAnalysisDate', + 'MigrationInformation', + 'MigrationProductInfoURL', + 'MigrationProductName', + ], + )), + ('PID_black_list', + ListOfStrings( + title=_('drop Product IDs beginning with'), + orientation='horizontal', + help=_('there will be no request for the following PID'), + )), + ('PID_bad_list', + ListOfStrings( + title=_('request EoX information for the following Product IDs via serial number'), + orientation='horizontal', + help=_('the EoX request for the following PID will by via serial number'), + )), + ('SN_black_list', + ListOfStrings( + title=_('drop entrys with the following serial number(s)'), + orientation='horizontal', + help=_('there will be noe EoX request for the following serial numbers'), + )), + ], + ) + + +rulespec_registry.register( + HostRulespec( + group=RulespecGroupInventory, + match_type='dict', + name='inv_parameters:inv_cisco_eox', + valuespec=_valuespec_inv_cisco_eox, + )) diff --git a/gui/wato/inv_cisco_psirt.py b/gui/wato/inv_cisco_psirt.py new file mode 100644 index 0000000000000000000000000000000000000000..f213718291dff300979c030f1f383f77fd5cf8b8 --- /dev/null +++ b/gui/wato/inv_cisco_psirt.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# License: GNU General Public License v2 + +# Author: thl-cmk[at]outlook[dot]com +# URL : https://thl-cmk.hopto.org +# Date : +# +# 2021-07-23: rewritten for CMK 2.0 +# + +from cmk.gui.i18n import _ +from cmk.gui.plugins.wato.utils import ( + HostRulespec, + rulespec_registry, +) +from cmk.gui.valuespec import ( + Dictionary, + FixedValue, + TextAscii, + ListChoice, + Integer, + Alternative, +) +from cmk.gui.plugins.wato.inventory import ( + RulespecGroupInventory, +) + +_removecolumns_inv_cisco_psirt = [ + ('bugIDs', 'Cisco Bug IDs'), + ('firstFixed', 'First fixed in'), + ('firstPublished', 'First Published'), + ('installed_version', 'installed version'), + ('lastUpdated', 'Last Updated'), + ('publicationUrl', 'Public URL'), + ('summary', 'Summary'), + ('cwe', 'CWE'), + ('cves', 'CVEs'), + ('productNames', 'Product names'), + ('ipsSignatures', 'IPS signatures'), + ('iosRelease', 'IOS Releases'), + ('cvrfUrl', 'CVRF URL'), + ('ovalUrl', 'OVAL URL'), +] + + +def _valuespec_inv_cisco_psirt(): + return Dictionary( + title=_('Cisco PSIRT advisories'), + elements=[ + ('removecolumns', + ListChoice( + title=_('remove columns'), + help=_('remove information from report'), + choices=_removecolumns_inv_cisco_psirt, + default_value=[ + 'publicationUrl', + 'summary', + 'cwe', + 'cves', + 'productNames', + 'iosRelease', + 'ipsSignatures', + 'cvrfUrl', + 'ovalUrl', + ], + )), + ('psirt_type', + Alternative( + title=_('Cisco PSIRT advisory type'), + style='dropdown', + # default_value=True, + elements=[ + FixedValue( + 'IOS', + title=_('add Cisco PSIRT IOS security advisories'), + # tototext=_('Cisco PSIRT IOS security advisories info enabled'), + help=_('CMK will request Cisco IOS PSIRT security advisories'), + ), + FixedValue( + 'IOS-XE', + title=_('add Cisco PSIRT IOS-XE security advisories'), + # tototext=_('Cisco PSIRT IOS-XE security advisories info enabled'), + help=_('CMK will request Cisco IOS-XE PSIRT security advisories'), + ), + Dictionary( + title=_('add Cisco PSIRT advisories by base product id'), + elements=[ + ('product_family', + TextAscii( + title=_('product family'), + help=_('Product family for example: NEXUS, WSA, ESA or ASA'), + allow_empty=False, + size=24, + )), + ('not_updated', + Integer( + title=_('don\'t show advisories not updated for the last X days'), + default_value=730, + # allow_empty=False, + ) + ), + ('dont_show_older_then', + TextAscii( + title=_('don\'t show advisories from before YYYY-MM-DD'), + default_value='2000-01-01', + allow_empty=False, + )), + ]) + ] + )), + ]) + + +rulespec_registry.register( + HostRulespec( + group=RulespecGroupInventory, + match_type='dict', + name='inv_parameters:inv_cisco_psirt', + valuespec=_valuespec_inv_cisco_psirt, + )) diff --git a/inv_cisco_support-0.2.0-20230609.mkp b/inv_cisco_support-0.2.0-20230609.mkp new file mode 100644 index 0000000000000000000000000000000000000000..67db7b6f6e6bbea07cd0aa1337e0b9243ae32d33 Binary files /dev/null and b/inv_cisco_support-0.2.0-20230609.mkp differ diff --git a/inv_cisco_support.mkp b/inv_cisco_support.mkp index 865d7a381c8f887616c722a162d3556e9267b80b..67db7b6f6e6bbea07cd0aa1337e0b9243ae32d33 100644 Binary files a/inv_cisco_support.mkp and b/inv_cisco_support.mkp differ diff --git a/packages/inv_cisco_support b/packages/inv_cisco_support index 967067e01fc6dc3e3e5d31c9683a887aebfd29fe..47f89b525a945bfc6991689bbb605463e68f6073 100644 --- a/packages/inv_cisco_support +++ b/packages/inv_cisco_support @@ -13,7 +13,10 @@ 'v.0.0.11: fixes for CMK 1.5.x\n' '\n' 'v0.1: rewrite for CMK 2.0\n' - ' - suggestion and api_status removed\n', + ' - suggestion and api_status removed\n' + '\n' + 'v0.2.0: moved to CMK 2.1\n' + ' - changed to new rest API (apix.cisco.com)\n', 'download_url': 'https://thl-cmk.hopto.org', 'files': {'agent_based': ['inv_cisco_eox.py', 'inv_cisco_contract.py', @@ -21,22 +24,21 @@ 'inv_cisco_psirt.py', 'utils/inv_cisco_support.py'], 'bin': ['ciscoapi/ciscoapi.py', - 'ciscoapi/ciscosupport.py', 'ciscoapi/cisco-eox.py', 'ciscoapi/cisco-sn2info.py', 'ciscoapi/cisco-psirt.py', - 'ciscoapi/cisco-bug.py'], - 'web': ['plugins/views/inv_cisco_support.py', - 'plugins/wato/inv_cisco_bug.py', - 'plugins/wato/inv_cisco_eox.py', - 'plugins/wato/inv_cisco_contract.py', - 'plugins/wato/inv_cisco_psirt.py', - 'htdocs/css/inv_cisco_support.css']}, + 'ciscoapi/cisco-bug.py', + 'ciscoapi/cisco_live_cycle_utils.py'], + 'gui': ['views/inv_cisco_livecycle.py', + 'wato/inv_cisco_bug.py', + 'wato/inv_cisco_contract.py', + 'wato/inv_cisco_eox.py', + 'wato/inv_cisco_psirt.py'], + 'web': ['htdocs/css/inv_cisco_support.css']}, 'name': 'inv_cisco_support', - 'num_files': 17, 'title': 'Inventory for Cisco Bug, EoX, contract status, PSIRT advisories and ' 'suggested software', - 'version': '20221130.v0.1c', - 'version.min_required': '2.0.0', - 'version.packaged': '2021.09.20', - 'version.usable_until': None} \ No newline at end of file + 'version': '0.2.0-20230609', + 'version.min_required': '2.1.0b1', + 'version.packaged': '2.1.0p21', + 'version.usable_until': '2.2.0b1'} \ No newline at end of file