diff --git a/doc/sample_bug.png b/img/sample_bug.png similarity index 100% rename from doc/sample_bug.png rename to img/sample_bug.png diff --git a/doc/sample_contract.png b/img/sample_contract.png similarity index 100% rename from doc/sample_contract.png rename to img/sample_contract.png diff --git a/doc/sample_eox.png b/img/sample_eox.png similarity index 100% rename from doc/sample_eox.png rename to img/sample_eox.png diff --git a/doc/sample_psirt.png b/img/sample_psirt.png similarity index 100% rename from doc/sample_psirt.png rename to img/sample_psirt.png diff --git a/mkp/inv_cisco_support-0.3.0-20231025.mkp b/mkp/inv_cisco_support-0.3.0-20231025.mkp index 8b6b4a2aaef9fa571f1ba56e0765b696a2329f77..5f2bc5d6ebc0a74a8f9d442d8dc4a166b6f0c8a2 100644 Binary files a/mkp/inv_cisco_support-0.3.0-20231025.mkp and b/mkp/inv_cisco_support-0.3.0-20231025.mkp differ diff --git a/agent_based/inv_cisco_bug.py b/source/agent_based/inv_cisco_bug.py similarity index 100% rename from agent_based/inv_cisco_bug.py rename to source/agent_based/inv_cisco_bug.py diff --git a/agent_based/inv_cisco_contract.py b/source/agent_based/inv_cisco_contract.py similarity index 100% rename from agent_based/inv_cisco_contract.py rename to source/agent_based/inv_cisco_contract.py diff --git a/agent_based/inv_cisco_eox.py b/source/agent_based/inv_cisco_eox.py similarity index 100% rename from agent_based/inv_cisco_eox.py rename to source/agent_based/inv_cisco_eox.py diff --git a/agent_based/inv_cisco_psirt.py b/source/agent_based/inv_cisco_psirt.py similarity index 100% rename from agent_based/inv_cisco_psirt.py rename to source/agent_based/inv_cisco_psirt.py diff --git a/agent_based/utils/inv_cisco_support.py b/source/agent_based/utils/inv_cisco_support.py similarity index 100% rename from agent_based/utils/inv_cisco_support.py rename to source/agent_based/utils/inv_cisco_support.py diff --git a/source/bin/ciscoapi/cisco-bug.py b/source/bin/ciscoapi/cisco-bug.py new file mode 100755 index 0000000000000000000000000000000000000000..bca0cb378c213b5b0a81de5e1e74751a3cab9a62 --- /dev/null +++ b/source/bin/ciscoapi/cisco-bug.py @@ -0,0 +1,275 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Author: thl-cmk[at]outlook[dot]com +# URL : https://thl-cmk.hopto.org +# Date : 2018-01-25 +# +# add on for cisco support api. calls cisco bug api 2.0. +# for more information see: https://developer.cisco.com/docs/support-apis/#bug +# +# 2021-07-24: rewrite for python3.8 +# +import os +import json + +from cisco_live_cycle_utils import ( + configure_logger, + log_message, + expand_path, + get_ids_from_dir, + get_subdirs_from_dir, + remove_empty_sub_dirs, + sleep_random, + move_dir, +) +from ciscoapi import ( + AccessToken, + Settings, + get_bug_by_pid_and_release, +) + + +def main(): + settings = Settings() + access_token = AccessToken(settings.client_id, settings.client_secret, settings.proxies) + configure_logger(log_level=settings.log_level) + + bug_path = settings.base_path + '/bug' + path_found = expand_path(bug_path + '/found') + path_not_found = expand_path(bug_path + '/not_found') + path_request = expand_path(bug_path + '/request') + path_missing = expand_path(bug_path + '/missing') + + pids_requested = {} + pids_refresh = {} + + # get list of bug reports to refresh + pids = get_subdirs_from_dir(path_found) + for pid in pids: + pids_refresh[pid.replace('_', '/')] = ','.join(get_ids_from_dir(path_found+pid, + refresh_time=settings.bug_refresh_found)) + + # move not_found bug reports older then 'refresh_notfound' days to request (for refresh) + move_dir(path_not_found, path_request, refresh_time=settings.bug_refresh_not_found) + + # get list of PIDs requests + pids = get_subdirs_from_dir(path_request) + log_message(f'bug requests (PIDs): {pids}') + for pid in pids: + pids_requested[pid.replace('_', '/')] = ','.join(get_ids_from_dir(path_request+pid)) + log_message(f'bug requests (PIDs and releases): {pids_requested}') + + if len(pids_refresh.keys()) > 0 or len(pids_requested.keys()) > 0: + + reqoptions = [] + + # modified_date + # 1 = Last Week + # 2 = Last 30 Days(default) + # 3 = Last 6 Months + # 4 = Last Year + # 5 = All + reqoptions.append('modified_date=1') +# reqoptions.append('modified_date=3') # for testing + + # severity (default is all) + # 1 = Severity 1 + # 2 = Severity 2 + # 3 = Severity 3 + # 4 = Severity 4 + # 5 = Severity 5 + # 6 = Severity 6 + # reqoptions.append('severity=2') + + # status (default is all) + # O = Open + # F = Fixed + # T = Terminated + # reqoptions.append('status=O') + + # sort_by + # status + # modified_date (recent first, default) + # severity + # support_case_count + # modified_date_earliest (earliest first) + # reqoptions.append('sort_by=severity') + + # reqoptions.append('page_index=2') + + if len(reqoptions) > 0: + reqoptions = '?' + '&'.join(reqoptions) + else: + reqoptions = '' + + # wait random time after startup (load spread) + if settings.wait_after_start: + sleep_random(settings.max_wait_time) + + # first refresh bug reports + for pid in pids_refresh.keys(): + + # get bug records for time frame + bug_records = get_bug_by_pid_and_release( + pid, + pids_refresh.get(pid), + access_token, + reqoptions, + settings=settings + ) + for entry in bug_records: + pid = entry.get('pid') + software_releases = entry.get('software_releases') + status_code = int(entry.get('status_code', 200)) + log_message(f'bug return:PID: {pid}, Status: {status_code}, Version: {software_releases}') + path = expand_path(path_found + pid.replace('/', '_')) + # check if there were was an error, if so go to next pid + if status_code == 200: + bugs = entry.get('bugs', None) + if bugs: # if new/changed bugs found + for bug in bugs: + bug.pop('description') # remove description + for release in software_releases.keys(): # create one bug list per software release + bug_release = software_releases.get(release) + log_message(f'bug found:PID: {pid}, Releases: {release}') + new_bugs = [] + for bug in bugs: + if bug_release in bug.get('known_affected_releases'): + new_bugs.append(bug) + + # open found file + with open(path + release) as f: + try: + bug_record = json.load(f) + except ValueError as e: + log_message(f'{pid}:{release}:snmp_cisco_bug:bug_found:JSON load error: {e}', + level='WARNING') + + found_bugs = bug_record.get('bugs') + for found_bug in found_bugs: + for new_bug in new_bugs: + if found_bug.get('bug_id') == new_bug.get('bug_id'): + found_bug.update(new_bug) # update old bug with new data (changed bugs) + new_bugs.remove(new_bug) + for bug in new_bugs: + found_bugs.append(bug) # add new bugs + bug_record['bugs'] = found_bugs # replace bug list with new list + bug_record['total_records'] = len(found_bugs) # change number of records + + with open(path + release, 'w') as f: + json.dump(bug_record, f) # write updated bug report + + else: + for release in software_releases.keys(): + if os.path.exists(path + release): + os.utime(path + release, None) + + reqoptions = [] + + # modified_date + # 1 = Last Week + # 2 = Last 30 Days(default) + # 3 = Last 6 Months + # 4 = Last Year + # 5 = All + reqoptions.append('modified_date=5') +# reqoptions.append('modified_date=2') # for testing + + # severity (default is all) + # 1 = Severity 1 + # 2 = Severity 2 + # 3 = Severity 3 + # 4 = Severity 4 + # 5 = Severity 5 + # 6 = Severity 6 + # reqoptions.append('severity=2') + + # status (default is all) + # O = Open + # F = Fixed + # T = Terminated + # reqoptions.append('status=O') + + # sort_by + # status + # modified_date (recent first, default) + # severity + # support_case_count + # modified_date_earliest (earliest first) + # reqoptions.append('sort_by=severity') + + # reqoptions.append('page_index=2') + + if len(reqoptions) > 0: + reqoptions = '?' + '&'.join(reqoptions) + else: + reqoptions = '' + + for pid in pids_requested.keys(): + bug_records = get_bug_by_pid_and_release( + pid, + pids_requested.get(pid), + access_token, + reqoptions, + settings=settings + ) + for entry in bug_records: + pid = entry.get('pid') + software_releases = entry.get('software_releases') + status_code = int(entry.get('status_code', 200)) + log_message(f'bug return:PID: {pid}, Status: {status_code}, Version: {software_releases}') + # check if there where was an error, if so go to next pid + if status_code == 200: + bugs = entry.get('bugs', None) + if bugs: + for bug in bugs: + bug.pop('description') + for release in software_releases.keys(): # create one bug list per software release + bug_release = software_releases.get(release) + if bugs: + log_message(f'bug found:PID: %s{pid}, Releases: {release}') + missing = entry.get('missing', {}) + # split bugs by release + release_bugs = { + 'pid': pid, + 'software_release': release, + 'bugs': [], + 'missing': missing + } + for bug in bugs: + if bug_release in bug.get('known_affected_releases'): + release_bugs['bugs'].append(bug) + release_bugs['total_records'] = len(release_bugs['bugs']) + + path = expand_path(path_found + pid.replace('/', '_')) + with open(path + release, 'w') as f: + json.dump(release_bugs, f) + + if len(missing.keys()) != 0: + path = expand_path(path_missing + pid.replace('/', '_')) + with open(path + release, 'w') as f: + json.dump(missing, f) + else: + log_message(f'bug not found:PID: {pid}, Version: {release}') + path = expand_path(path_not_found + pid.replace('/', '_')) + log_message(f'not found: {entry}') + with open(path + release, 'w') as f: + json.dump(entry, f) + pass + + # remove request file + try: + log_message(f'bug delete request:PID: {pid}, Version: {release}') + os.remove(path_request + pid.replace('/', '_') + '/' + release) + except OSError: + pass + + # clean up (remove empty directories) + remove_empty_sub_dirs(path_request) + remove_empty_sub_dirs(path_found) + remove_empty_sub_dirs(path_not_found) + remove_empty_sub_dirs(path_missing) + + +main() diff --git a/source/bin/ciscoapi/cisco-eox.py b/source/bin/ciscoapi/cisco-eox.py new file mode 100755 index 0000000000000000000000000000000000000000..cfcc556b5a203dc9bf011e006f080ad34526a30c --- /dev/null +++ b/source/bin/ciscoapi/cisco-eox.py @@ -0,0 +1,218 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# License: GNU General Public License v2 +# +# Author: thl-cmk[at]outlook[dot]com +# URL : https://thl-cmk.hopto.org +# Date : 2017-05-15 +# +# https://developer.cisco.com/docs/support-apis/ +# +# 2021-07-23: rewrite for python 3.8 +# +import json +from cisco_live_cycle_utils import ( + configure_logger, + log_message, + expand_path, + get_ids_from_dir, + remove_ids_from_list, + refresh_ids_from_dir, + remove_ids_from_dir, + sleep_random, +) +from ciscoapi import ( + AccessToken, + Settings, + get_eox_by_pid, + get_eox_by_serials, +) + + +# split pids in known and unknown eox state +def split_pids(eoxr): + eox_known = [] + eox_unkown = [] + + for response in eoxr: + EOXRecord = response.get('EOXRecord') + # PaginationResponseRecord = response.get('PaginationResponseRecord') + # PageIndex = PaginationResponseRecord.get('PageIndex') + # LastIndex = PaginationResponseRecord.get('LastIndex') + # PageRecords = PaginationResponseRecord.get('PageRecords') + # TotalRecords = PaginationResponseRecord.get('TotalRecords') + if EOXRecord is not None: + for PID in EOXRecord: + if PID.get('EOLProductID') != '': + eox_known.append(PID) + log_message(message=f'EOLProductID : {PID.get("EOLProductID")}') + else: + eox_unkown.append(PID) + log_message(f'EOXInputValue : {PID.get("EOXInputValue")}') + return {'eox_known': eox_known, 'eox_unknown': eox_unkown} + + +# split serials, expects a list of EoX Records from get EoX by serial +def split_serials(eoxr): + serials = [] + + for PID in eoxr: + EOLProductID = PID.get('EOLProductID') + log_message(f'serial split Eox: found PID: {EOLProductID}') + EOXInputValue = PID.get('EOXInputValue').split(',') + for serial in EOXInputValue: + log_message(f'found Serial: {serial}') + eox_serial = PID.copy() + eox_serial.update({'EOXInputValue': serial}) + serials.append(eox_serial) + log_message(f'Serial EoX: {eox_serial}') + + return serials + + +# save EoX records to file by PID, +# expects a list of EoX Records from Cisco EoX API 5.0, +# returns a list of saved PIDs +def save_eox(eox, path): + saved = [] + + for PID in eox: + EOLProductID = PID.get('EOLProductID') + # EOLProductID is empty for EoX Records with unknown EoX state (error or not announced) + if EOLProductID == '': + EOLProductID = PID.get('EOXInputValue') + + if EOLProductID: + with open(path + (EOLProductID.replace('/', '_')), 'w') as f: + json.dump(PID, f) + saved.append(EOLProductID) + + return saved + + +def save_serials(eox, path): + """ + Saves EoX records to file by serial number. + Args: + eox: List of EoX Records from Cisco EoX API 5.0 + path: file path where to save the EoX records + + Returns: List of serial numbers of saved EoX records + + """ + + saved = [] + + for serial in eox: + EOXInputValue = serial.get('EOXInputValue') + if EOXInputValue: + with open(path + EOXInputValue, 'w') as f: + json.dump(serial, f) + saved.append(EOXInputValue) + + return saved + + +def main(): + settings = Settings() + access_token = AccessToken(settings.client_id, settings.client_secret, settings.proxies) + configure_logger(log_level=settings.log_level) + + eox_path = settings.base_path + '/EoX' + path_found = eox_path + '/found' + path_not_found = eox_path + '/not_found' + path_request = eox_path + '/request' + + path_request_pid = expand_path(path_request + '/pid') + path_found_pid = expand_path(path_found + '/pid') + path_not_found_pid = expand_path(path_not_found + '/pid') + + path_request_ser = expand_path(path_request + '/ser') + path_found_ser = expand_path(path_found + '/ser') + path_not_found_ser = expand_path(path_not_found + '/ser') + + # create list of PIDs to request EoX status for + pids = get_ids_from_dir(path_request_pid) + log_message(f'pid requests : {pids}') + # remove already known PIDs from list + pids = remove_ids_from_list(pids, path_found_pid) + log_message(f'pid requests : {pids}') + # remove PIDs already requested with unknown EoX status from list + pids = remove_ids_from_list(pids, path_not_found_pid) + log_message(f'pid requests : {pids}') + + # refresh PIDs after 30 days by default + pids = refresh_ids_from_dir(path_not_found_pid, settings.eox_refresh_unknown, pids, True) + log_message(f'pid requests : {pids}') + pids = refresh_ids_from_dir(path_found_pid, settings.eox_refresh_known, pids, False) + log_message(f'pid requests : {pids}') + + # create list of serial numbers to request EoX status for + serials = get_ids_from_dir(path_request_ser) + log_message(f'ser requests : {serials}') + # remove already known serials from list + serials = remove_ids_from_list(serials, path_found_ser) + log_message(f'ser requests : {serials}') + # remove serials already requested with unknown EoX status from list + serials = remove_ids_from_list(serials, path_not_found_ser) + log_message(f'ser requests : {serials}') + + # refresh serials after 30 days by default + serials = refresh_ids_from_dir(path_not_found_ser, settings.eox_refresh_unknown, serials, True) + log_message(f'ser requests : {serials}') + serials = refresh_ids_from_dir(path_found_ser, settings.eox_refresh_known, serials, False) + log_message(f'ser requests : {serials}') + + if pids == [] and serials == []: + log_message('all list are empty. Do nothing.') + return + + # wait random time after startup (load spread) + if settings.wait_after_start: + sleep_random(settings.max_wait_time) + + if pids is not []: + eox = get_eox_by_pid(pids=pids, access_token=access_token, settings=settings) + + # split eox records in a list of known and unknown pid records + eox = split_pids(eox) + + # save known pid reports + pids = save_eox(eox.get('eox_known'), path_found_pid) + # delete requests for known pids + remove_ids_from_dir(pids, path_request_pid) + + # save unknown pid reports + pids = save_eox(eox.get('eox_unknown'), path_not_found_pid) + # delete requests for unknown pids + remove_ids_from_dir(pids, path_request_pid) + # delete pids from known were the status changed to unknown + remove_ids_from_dir(pids, path_found_pid) + + if serials is not []: + eox = get_eox_by_serials(serials=serials, access_token=access_token, settings=settings) + log_message(f'eox by ser: {eox}') + + # split eox records in a list of known and unknown pid records + eox = split_pids(eox) + + # split EoX records for known PIDs in one entry per serial + serials = split_serials(eox.get('eox_known')) + # save EoX records for serials with known EoX state + serials = save_serials(serials, path_found_ser) + log_message(f'EoX Serials: known: {serials}') + # delete requests for known serials + remove_ids_from_dir(serials, path_request_ser) + + # split EoX records for unknown PIDs in one entry per serial + serials = split_serials(eox.get('eox_unknown')) + # save EoX records for serials with known EoX state + serials = save_serials(serials, path_not_found_ser) + # delete requests for unknown serials + remove_ids_from_dir(serials, path_request_ser) + # delete serials from known were the status changed to unknown + remove_ids_from_dir(serials, path_found_ser) + + +main() diff --git a/source/bin/ciscoapi/cisco-psirt.py b/source/bin/ciscoapi/cisco-psirt.py new file mode 100755 index 0000000000000000000000000000000000000000..876d50c4888b1a2f1cd8548fcc6edc107313e134 --- /dev/null +++ b/source/bin/ciscoapi/cisco-psirt.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Author: thl-cmk[at]outlook[dot]com +# URL : https://thl-cmk.hopto.org +# Date : 2017-07-10 +# +# https://developer.cisco.com/docs/support-apis/ +# +# 2018-06-06: fixed handling if state changes form found to not_found (delete old psirt found file) +# 2021-07-24: rewritten for python 3.8 +# +# +import ntpath +import os +import json +from typing import List +from dataclasses import dataclass + +from cisco_live_cycle_utils import ( + configure_logger, + log_message, + get_ids_from_dir, + remove_ids_from_list, + refresh_ids_from_dir, + sleep_random, + expand_path, +) +from ciscoapi import ( + AccessToken, + Settings, + get_psirt_by_product_family, + get_psirt_by_iosxe_version, + get_psirt_by_ios_version, +) + + +@dataclass +class Paths: + found: str + not_found: str + request: str + + +@dataclass +class Refresh: + found: int + not_found: int + + +g_logger = None + + +def _psirt_remove_id_file(psirt_path: str, psirt_id: str): + # delete psirt file + try: + log_message(f'delete psirt id file : {psirt_path + psirt_id}') + os.remove(psirt_path + psirt_id) + except OSError: + pass + + +def _psirt_dump_record(psirt_record, psirt_id: str, psirt_path: str, psirt_path_request: str): + with open(psirt_path + psirt_id, 'w') as f: + json.dump(psirt_record, f) + # delete request file + _psirt_remove_id_file(psirt_path_request, psirt_id) + + return + + +def _check_psirt_record(psirt_record, psirt_id, psirt_path_found, psirt_path_request): + """ + + :param psirt_record: + :param psirt_id: + :param psirt_path_found: + :param psirt_path_request: + :returns: + """ + + for advisory in psirt_record.get('advisories'): + # remove unwanted information from advisories + advisory.pop('productNames', None) + advisory.pop('ipsSignatures', None) + advisory.pop('iosRelease', None) + advisory.pop('cvrfUrl', None) + advisory.pop('ovalUrl', None) + advisory.pop('summary', None) + temp_advisory = advisory.copy() + for key in temp_advisory.keys(): + if advisory.get(key, None) in [['NA'], 'NA']: + advisory.pop(key, None) + + _psirt_dump_record(psirt_record, psirt_id, psirt_path_found, psirt_path_request) + + return + + +def _get_psirt_id_list(product_family: str, paths: Paths, refresh: Refresh) -> List[str]: + """ + + @param product_family: + @param paths: Path object with path to founnd/not found/requested PSIRT records + @param refresh: Refresh object with number of days before a PSIRT record needs to be refreshed for found/not found + @return: list of PIDs + """ + # create list of ID's to request PSIRT status for + psirt_id_list = get_ids_from_dir(paths.request + product_family) + log_message(f'psirt requests : {psirt_id_list}') + # remove already found ID's from list + psirt_id_list = remove_ids_from_list(psirt_id_list, paths.found + product_family) + log_message(f'psirt requests : {psirt_id_list}') + # remove not found ID's from list + psirt_id_list = remove_ids_from_list(psirt_id_list, paths.not_found + product_family) + log_message(f'psirt requests : {psirt_id_list}') + + # refresh psirt after 1 day by default + psirt_id_list = refresh_ids_from_dir(paths.not_found + product_family, refresh.not_found, psirt_id_list, True) + log_message(f'psirt requests : {psirt_id_list}') + psirt_id_list = refresh_ids_from_dir(paths.found + product_family, refresh.found, psirt_id_list, False) + log_message(f'psirt requests : {psirt_id_list}') + + return psirt_id_list + + +def _update_psirt_id(psirt_records: list, family_name: str, paths: Paths): + for psirt_record in psirt_records: + if family_name in ['IOS', 'IOS-XE']: + psirt_id = psirt_record.get('version') + else: + psirt_id = psirt_record.get('family') + + if psirt_record.get('advisories') != 'notfound': + _check_psirt_record(psirt_record, psirt_id, paths.found + family_name + '/', + ntpath.sep + family_name + '/') + else: + _psirt_dump_record(psirt_record, psirt_id, paths.not_found + family_name + '/', + paths.request + family_name + '/') + # remove psirt_found file (happens when product family is removed form bug ID) + _psirt_remove_id_file(paths.found + family_name + '/', psirt_id) + return + + +def main(): + settings = Settings() + access_token = AccessToken(settings.client_id, settings.client_secret, settings.proxies) + configure_logger(log_level=settings.log_level) + + refresh = Refresh( + found=settings.psirt_refresh_found, + not_found=settings.psirt_refresh_not_found + ) + + psirt_dir = expand_path(settings.base_path + '/psirt') + paths = Paths( + found=psirt_dir + '/found/', + not_found=psirt_dir + '/not_found/', + request=psirt_dir + '/request/' + ) + + psirt_ios = _get_psirt_id_list('IOS', paths, refresh) + psirt_ios_xe = _get_psirt_id_list('IOS-XE', paths, refresh) + psirt_family = _get_psirt_id_list('family', paths, refresh) + + if (psirt_ios == []) and psirt_ios_xe == [] and psirt_family == []: + log_message('all list are empty. Do nothing.') + return + + # wait random time after startup + if settings.wait_after_start: + sleep_random(settings.max_wait_time) + + if psirt_family is not []: + psirt_records = get_psirt_by_product_family(psirt_family, access_token, settings=settings) + _update_psirt_id(psirt_records, 'family', paths) + + if psirt_ios_xe is not []: + psirt_records = get_psirt_by_iosxe_version(psirt_ios_xe, access_token, settings=settings) + _update_psirt_id(psirt_records, 'IOS-XE', paths) + + if psirt_ios is not []: + psirt_records = get_psirt_by_ios_version(psirt_ios, access_token, settings=settings) + _update_psirt_id(psirt_records, 'IOS', paths) + + +main() diff --git a/source/bin/ciscoapi/cisco-sn2info.py b/source/bin/ciscoapi/cisco-sn2info.py new file mode 100755 index 0000000000000000000000000000000000000000..6521bd5b615f4207ee6c6d52707b9d5315c5d245 --- /dev/null +++ b/source/bin/ciscoapi/cisco-sn2info.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# License: GNU General Public License v2 +# +# Author: thl-cmk[at]outlook[dot]com +# URL : https://thl-cmk.hopto.org +# Date : 2017-04-15: +# +# Cisco SN2INFO API Framework +# +# https://developer.cisco.com/docs/support-apis/#serial-number-to-information +# +# 2021-07-23: rewrite for python 3.8 +# +import json + +from cisco_live_cycle_utils import ( + configure_logger, + log_message, + expand_path, + get_ids_from_dir, + remove_ids_from_list, + refresh_ids_from_dir, + remove_ids_from_dir, + sleep_random, +) +from ciscoapi import ( + AccessToken, + Settings, + get_coverage_summary_by_serials, +) + + +def sn2info_split_covered(sn2info_records): + sn2info_covered = [] + sn2info_notcovered = [] + + for response in sn2info_records: + sn2inforecord = response.get('serial_numbers') +# PaginationResponseRecord = response.get('PaginationResponseRecord') +# PageIndex = PaginationResponseRecord.get('PageIndex') +# LastIndex = PaginationResponseRecord.get('LastIndex') +# PageRecords = PaginationResponseRecord.get('PageRecords') +# TotalRecords = PaginationResponseRecord.get('TotalRecords') + if sn2inforecord is not None: + for serial in sn2inforecord: + if serial.get('is_covered') == 'YES': + sn2info_covered.append(serial) + log_message(f'SN2INFO covered : {serial.get("sr_no")}') + else: + sn2info_notcovered.append(serial) + log_message(f'SN2INFO not covered : {serial.get("sr_no")}') + return {'sn2info_covered': sn2info_covered, 'sn2info_notcovered': sn2info_notcovered} + + +def sn2info_save_serials(sn2infos, path): + saved = [] + + for sn2info in sn2infos: + serial = str(sn2info.get('sr_no')) + if serial: + with open(path + serial, 'w') as f: + json.dump(sn2info, f) + saved.append(serial) + + return saved + + +def main(): + settings = Settings() + access_token = AccessToken(settings.client_id, settings.client_secret, settings.proxies) + configure_logger(log_level=settings.log_level) + + sn2info_dir = settings.base_path + '/sn2info' + path_found = expand_path(sn2info_dir + '/found/') + path_not_found = expand_path(sn2info_dir + '/not_found/') + path_request = expand_path(sn2info_dir + '/request/') + + # create list of serial numbers to request SN2INFO status for + sn2info = get_ids_from_dir(path_request) + log_message(f'sn2info requests : {sn2info}') + # remove covered serials from list + sn2info = remove_ids_from_list(sn2info, path_found) + log_message(f'sn2info requests : {sn2info}') + # remove not covered serials from list + sn2info = remove_ids_from_list(sn2info, path_not_found) + log_message(f'sn2info requests : {sn2info}') + + # refresh sn2info serials after 31 days by default + sn2info = refresh_ids_from_dir(path_not_found, settings.sn2info_refresh_not_covered, sn2info, True) + log_message(f'sn2info requests : {sn2info}') + sn2info = refresh_ids_from_dir(path_found, settings.sn2info_refresh_covered, sn2info, False) + log_message(f'sn2info requests : {sn2info}') + + if sn2info is []: + log_message('all list are empty. Do nothing.') + return + + # wait random time after startup + if settings.wait_after_start: + sleep_random(settings.max_wait_time) + + if sn2info is not []: + sn2info_records = get_coverage_summary_by_serials( + serials=sn2info, + access_token=access_token, + settings=settings + ) + log_message(f'sn2info response: {sn2info_records}') + sn2info = sn2info_split_covered(sn2info_records) + serials = sn2info_save_serials(sn2info.get('sn2info_covered'), path_found) + remove_ids_from_dir(serials, path_request) + serials = sn2info_save_serials(sn2info.get('sn2info_notcovered'), path_not_found) + remove_ids_from_dir(serials, path_request) + # delete serials from covered were the status changed to uncovered + remove_ids_from_dir(serials, path_found) + + +main() diff --git a/source/bin/ciscoapi/cisco_live_cycle_utils.py b/source/bin/ciscoapi/cisco_live_cycle_utils.py new file mode 100755 index 0000000000000000000000000000000000000000..17dc7732638e64c13218f014c8553c6b7eaa4f93 --- /dev/null +++ b/source/bin/ciscoapi/cisco_live_cycle_utils.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python +# -*- encoding: utf-8; py-indent-offset: 4 -*- + +# +# 15.04.2017 : Th.L. : Support for Cisco API +# +# https://developer.cisco.com/docs/support-apis/ +# + +import logging +import os +import time +import random +import sys + + +def configure_logger(_path: str = '', _log_to_console: bool = True, log_level: str = 'INFO'): + log_formatter = logging.Formatter('%(asctime)s :: %(levelname)s :: %(name)s :: %(module)s ::%(message)s') + log = logging.getLogger('root') + + numeric_level = getattr(logging, log_level.upper(), None) + if isinstance(numeric_level, int): + logging.getLogger().setLevel(numeric_level) + else: + logging.getLogger().setLevel(logging.WARNING) + + log_handler_console = logging.StreamHandler(sys.stdout) + log_handler_console.setFormatter(log_formatter) + log_handler_console.setLevel(logging.INFO) + log.addHandler(log_handler_console) + + +def log_message(message: str, level: str = 'DEBUG'): + log = logging.getLogger() + if level.upper() == 'CRITICAL': + log.critical(message) + elif level.upper() == 'ERROR': + log.error(message) + elif level.upper() == 'WARNING': + log.warning(message) + elif level.upper() == 'INFO': + log.info(message) + elif level.upper() == 'DEBUG': + log.debug(message) + else: + log.warning(f'unknown log_level: {level}') + + +def sleep_random(max_minutes): + sleep_time = random.randint(1, 60 * max_minutes) + log_message(message=f'{sleep_time} seconds', level='INFO') + time.sleep(sleep_time) + return + + +# read list of files from dir (eq. (P)IDs or SERIALs) (don't change to uppercase) +def get_ids_from_dir(directory, refresh_time: int = 0): + refresh_time = refresh_time * 86400 + start_time = int(time.time()) + ids = [] + for (dir_path, dir_names, file_names) in os.walk(directory): + for entry in file_names: + modify_time = int(os.path.getmtime(dir_path + '/' + entry)) + if (start_time - modify_time) > refresh_time: + ids.append(str(entry).replace('_', '/')) + # do not read subdirs + break + # insert cleanup here (filter unwanted names, chars, etc...) + return ids + + +# read list of subdirectories from directory (PIDs) (don't anything) +def get_subdirs_from_dir(base_dir): + sub_dirs = [] + for (dir_path, sub_dirs, filenames) in os.walk(base_dir): + break + # insert cleanup here (filter unwanted names, chars, etc...) + return sub_dirs + + +# read list of IOS/IOSXE Versions from directory (don't change to uppercase) +def get_version_from_dir(directory): + versions = [] + for (dir_path, dir_names, file_names) in os.walk(directory): + for entry in file_names: + versions.append(str(entry)) + # do not read subdirs + break + # insert cleanup here (filter unwanted names, chars, etc...) + return versions + + +# delete (P)IDs or SERIALs files from directory (requests) +def remove_ids_from_dir(ids, directory): + for entry in ids: + try: + os.remove(directory + entry.replace('/', '_')) + except OSError: + pass + + +# remove (P)IDs or SERIALs from list of (P)ID or serials +def remove_ids_from_list(ids, directory): + known_ids = [] + for (dir_path, dir_names, file_names) in os.walk(directory): + known_ids.extend(file_names) + # do not read subdirs + break + + for known_id in known_ids: + known_id = known_id.replace('_', '/') + for entry in ids: + if known_id == entry: + ids.remove(entry) + return ids + + +# returns al list of ids to refresh, +# expects a directory with ids to check, the time interval, a list of IDs to add +# if remove True it will delete the ID files from refresh_dir +def refresh_ids_from_dir(refresh_dir, refresh_time, ids, remove): + refresh_dir = expand_path(refresh_dir) + # get seconds from # of days (days * 24 * 60 * 60 --> days * 86400) + refresh_time = int(refresh_time) * 86400 + start_time = int(time.time()) + refresh_ids = get_ids_from_dir(refresh_dir) + if refresh_ids is not []: + for entry in refresh_ids: + modify_time = int(os.path.getmtime(refresh_dir + entry.replace('/', '_'))) + if (start_time - modify_time) > refresh_time: + ids.append(entry) + if remove: + try: + os.remove(refresh_dir + entry.replace('/', '_')) + except OSError: + pass + return ids + + +# check if dir exists, if not try to create it. +# return True if dir exists or creation was ok. +# return False if dir not exists and creation was not ok +def check_dir_and_create(directory): + directory = os.path.dirname(directory) + if not os.path.exists(directory): + try: + os.makedirs(directory) + except: + return False + return True + + +# expand homedir and add '/' if necessary and create directory if it not exists +def expand_path(path): + homedir = os.path.expanduser('~') + + if path.startswith('~'): + path = homedir + path[1:] + + if not path.endswith('/'): + path += '/' + + if not check_dir_and_create(path): + return '' + + return path + +# remove empty directories +def remove_empty_sub_dirs(base_dir): + subdirs = get_subdirs_from_dir(base_dir) + for subdir in subdirs: + try: + os.rmdir(base_dir + subdir) + except OSError as e: + log_message(f'can not delete: {base_dir}, Error:{e}') + pass + + +# move contents of source_dir to destination_dir +# only one level deep, lave source_dir +def move_dir(source_dir, destination_dir, **kwargs): + refresh_time = int(kwargs.get('refresh_time', 0)) * 86400 + starttime = int(time.time()) + + sub_dirs = get_subdirs_from_dir(source_dir) + for sub_dir in sub_dirs: + files = get_ids_from_dir(source_dir + sub_dir) + if len(files) > 0: + source_path = expand_path(source_dir + sub_dir) + destination_path = expand_path(destination_dir + sub_dir) + for file in files: + source_file = source_path + file + destination_file = destination_path + file + modify_time = int(os.path.getmtime(source_file)) + if (starttime - modify_time) > refresh_time: + try: + os.rename(source_file, destination_file) # rename (move) contents of not_found to request + except OSError as e: + log_message(message=f'error:{e}, source: {source_file}, destionation: {destination_file}', + level='ERROR') + + remove_empty_sub_dirs(source_dir) diff --git a/source/bin/ciscoapi/ciscoapi.py b/source/bin/ciscoapi/ciscoapi.py new file mode 100755 index 0000000000000000000000000000000000000000..2ec1e74d2c78baa19731e6895c5bc0865dfc9a3d --- /dev/null +++ b/source/bin/ciscoapi/ciscoapi.py @@ -0,0 +1,521 @@ +#!/usr/bin/env python +# -*- encoding: utf-8; py-indent-offset: 4 -*- + +# https://developer.cisco.com/docs/support-apis/ +# +# 2017-03-30: Cisco EoX API Framework +# added support for Cisco SN2INFO API (get contract status) +# 2017-07-09: added support for Cisco Psirt API (IOS and IOSXE) +# 2017-07-19: added support for Cisco Software Suggestion API +# 2018-01-25: adding support for Cisco bug api 2.0 +# 2018-09-25: performance improvement "psirt_response.encoding = 'UTF-8'", drops json.loads +# from about 4 min to some seconds +# 2021-07-23: rewritten for python 3.8 +# 2023-06-09: changed for new rest api endpoint (apix.cisco.com) +# refactoring get_token and settings +# some cleanup +# +# supportapis[dash]help[at]cisco[dot]com +# +import requests +import json +import time +from os.path import ( + expanduser, +) +from typing import Dict, List +from cisco_live_cycle_utils import ( + log_message, +) + + +class Settings: + def __init__(self): + conf_file = '~/etc/ciscoapi/ciscoapi.json' + conf_file = expanduser(conf_file) + + with open(conf_file) as f: + try: + self.__settings = json.load(f) + except ValueError as e: + log_message(f'ciscoapi:settings:JSON load error: {e}', level='WARNING') + exit() + except FileNotFoundError as e: + log_message(f'Config file not found {e}.', level='CRITICAL') + exit() + + self.__base_path = '~/var/ciscoapi' + self.__auth_proxy_url = 'https://cmk.bech-noc.de/api/cauthproxy.py' + self.__proxies = {} + self.__wait_after_start = True + self.__max_wait_time = 15 + self.__log_level = 'warning' + self.__eox_refresh_known = 31 + self.__eox_refresh_unknown = 7 + self.__sn2info_refresh_covered = 31 + self.__sn2info_refresh_not_covered = 7 + self.__bug_refresh_found = 2 + self.__bug_refresh_not_found = 1 + self.__psirt_refresh_found = 1 + self.__psirt_refresh_not_found = 1 + self.__suggestion_refresh_found = 31 + self.__suggestion_refresh_not_found = 7 + + if self.__settings['global'].get('http_proxy'): + self.__proxies .update({'http': self.__settings['global'].get('http_proxy')}) + if self.__settings['global'].get('https_proxy'): + self.__proxies .update({'https': self.__settings['global'].get('https_proxy')}) + + @property + def client_id(self) -> str: + return self.__settings['cisco_api']['client_id'] + + @property + def client_secret(self) -> str: + return self.__settings['cisco_api']['client_secret'] + + @property + def proxies(self) -> Dict[str, str]: + return self.__proxies + + @property + def use_system_proxies(self) -> bool: + return self.__settings['global'].get('use_system_proxies', False) + + @property + def use_auth_proxy(self) -> bool: + return self.__settings['cisco_api'].get('use_auth_proxy', False) + + @property + def client_fqdn(self) -> str: + return self.__settings['cisco_api'].get('client_fqdn') + + @property + def root_cert(self) -> bool: + return self.__settings['cisco_api'].get('root_cert', False) + + @property + def auth_proxy_url(self) -> str: + return self.__settings['cisco_api'].get('auth_proxy_url') + + @property + def base_path(self) -> str: + return self.__settings['global'].get('base_path', self.__base_path) + + @property + def wait_after_start(self) -> bool: + return self.__settings['global'].get('wait_after_start', self.__wait_after_start) + + @property + def max_wait_time(self) -> int: + return self.__settings['global'].get('max_wait_time', self.__max_wait_time) + + @property + def log_level(self) -> str: + return self.__settings['global'].get('log_level', self.__log_level) + + @property + def eox_refresh_known(self) -> int: + return self.__settings['eox'].get('refresh_known', self.__eox_refresh_known) + + @property + def eox_refresh_unknown(self) -> int: + return self.__settings['eox'].get('refresh_known', self.__eox_refresh_unknown) + + @property + def sn2info_refresh_covered(self) -> int: + return self.__settings['sn2info'].get('refresh_covered', self.__sn2info_refresh_covered) + + @property + def sn2info_refresh_not_covered(self) -> int: + return self.__settings['sn2info'].get('refresh_not_covered', self.__sn2info_refresh_not_covered) + + @property + def bug_refresh_found(self) -> int: + return self.__settings['bug'].get('refresh_found', self.__bug_refresh_found) + + @property + def bug_refresh_not_found(self) -> int: + return self.__settings['bug'].get('refresh_found', self.__bug_refresh_not_found) + + @property + def psirt_refresh_found(self) -> int: + return self.__settings['psirt'].get('refresh_found', self.__psirt_refresh_found) + + @property + def psirt_refresh_not_found(self) -> int: + return self.__settings['psirt'].get('refresh_not_found', self.__psirt_refresh_not_found) + + @property + def suggestion_refresh_found(self) -> int: + return self.__settings['suggestion'].get('refresh_found', self.__suggestion_refresh_found) + + @property + def suggestion_refresh_not_found(self) -> int: + return self.__settings['suggestion'].get('refresh_not_found', self.__suggestion_refresh_not_found) + + +class AccessToken: + def __init__( + self, + client_id: str, + client_secret: str, + proxies: Dict, + ): + self.__client_id = client_id + self.__client_secret = client_secret + self.__proxies = proxies + self.__use_auth_proxy = None + self.__client_fqdn = '' + self.__root_cert = True + self.__auth_proxy_url = 'https://cmk.bech-noc.de/api/cauthproxy.py' + self.__access_token = '' + self.__lifetime = 0 + self.__time = 0 + self.__auth_headers = { + 'content-Type': 'application/x-www-form-urlencoded', + 'accept': 'application/json' + } + self.__grant_type = 'client_credentials' + self.__auth_url = 'https://id.cisco.com/oauth2/default/v1/token' + self.__verify = True + self.__auth_req_data = { + 'client_id': self.__client_id, + 'client_secret': self.__client_secret, + 'grant_type': self.__grant_type + } + + @property + def token(self) -> str: + if self.__access_token and time.time() < self.__lifetime: + return self.__access_token + else: + self.__time = time.time() + response = requests.post( + self.__auth_url, + headers=self.__auth_headers, + data=self.__auth_req_data, + proxies=self.__proxies, + verify=self.__verify) + + if response.ok: + auth_response = json.loads(response.text) + self.__lifetime = self.__time + int(auth_response.get("expires_in")) - 60 + self.__access_token = auth_response.get("access_token") + return self.__access_token + + +# generic cisco api request for all get info by serialnumber +def get_info_by_serials( + serials: List[str], + access_token: AccessToken, + req_url: str, + max_serials: int, + settings: Settings +): + # locale variablen + max_serial_length = 40 + max_req_per_second = 5 + wait_time = 5 + optimisedserials = [] + serialsstr = '' + count = 1 + info = [] + + # split list of Serials in chunks of max 75 serials, each max 40 bytes length + for serial in serials: + if len(serial) <= max_serial_length: + serialsstr += serial + "," + count += 1 + if count == max_serials: + optimisedserials.append(serialsstr[:-1]) + serialsstr = '' + count = 1 + optimisedserials.append(serialsstr[:-1]) + + headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token.token} + + count = 0 + for serials in optimisedserials: + # Disable invalid certificate warnings. + # requests.packages.urllib3.disable_warnings() + response = requests.get(req_url + serials, headers=headers, proxies=settings.proxies) + count += 1 + # only 5 request per second are allowed + if count == max_req_per_second: + time.sleep(wait_time) + count = 0 + if response.ok: + response.encoding = 'UTF-8' + info.append(json.loads(response.text)) + + return info + + +def get_eox_by_pid(pids: List[str], access_token: AccessToken, settings: Settings): + # local variables + max_pid_length = 240 + max_pids = 20 + max_req_per_second = 5 + wait_time = 5 + optimisedpids = [] + pidstr = '' + count = 1 + eoxr = [] + + # split list of PIDs in chunks of max 240 bytes length + for pid in pids: + if (len(pidstr) + len(pid)) < max_pid_length: + pidstr += pid + "," + count += 1 + if (count == max_pids) or ((len(pidstr) + len(pid)) >= max_pid_length): + optimisedpids.append(pidstr[:-1]) + pidstr = '' + count = 1 + optimisedpids.append(pidstr[:-1]) + headers = {'accept': 'application/json', 'Authorization': f'Bearer {access_token.token}'} + req_url = 'https://apix.cisco.com/supporttools/eox/rest/5/EOXByProductID/1/' + + count = 0 + for productids in optimisedpids: + # Disable invalid certificate warnings. + # requests.packages.urllib3.disable_warnings() + eoxresponse = requests.get(req_url + productids, headers=headers, proxies=settings.proxies) + count += 1 + # only 5 request per second are allowed + if count == max_req_per_second: + time.sleep(wait_time) + count = 0 + if eoxresponse.ok: + eoxr.append(json.loads(eoxresponse.text)) + return eoxr + + +def get_eox_by_serials(serials: List[str], access_token: AccessToken, settings: Settings): + max_serials = 20 + req_url = 'https://apix.cisco.com/supporttools/eox/rest/5/EOXBySerialNumber/1/' + info = get_info_by_serials(serials, access_token, req_url, max_serials, settings) + return info + + +def get_coverage_summary_by_serials(serials: List[str], access_token: AccessToken, settings: Settings): + max_serials = 75 + req_url = 'https://apix.cisco.com/sn2info/v2/coverage/summary/serial_numbers/' + info = get_info_by_serials(serials, access_token, req_url, max_serials, settings) + return info + + +def get_psirt_by_ios_version(psirtios: List[str], access_token: AccessToken, settings: Settings): + info = [] + + headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token.token} + req_url = 'https://apix.cisco.com/security/advisories/ios?version=' + + # requests.packages.urllib3.disable_warnings() + if list(psirtios) is not []: + for ios_version in psirtios: + log_message('request ios_version: %s, time: %s' % (ios_version, time.asctime(time.localtime(time.time())))) + psirt_response = requests.get(req_url + ios_version, headers=headers, proxies=settings.proxies) + if psirt_response.ok: + log_message( + f'ok. ios_version: {ios_version}, ' + f'time: {time.asctime(time.localtime(time.time()))}, ' + f'len: {len(str(psirt_response))}' + ) + # makes json.loads() mutch more faster (from 4 min. down to 1 sec for about 2MB) + psirt_response.encoding = 'UTF-8' + response = (json.loads(psirt_response.text)) + log_message( + f'response loaded: ios_version: {ios_version}, ' + f'time: {time.asctime(time.localtime(time.time()))}, ' + f'len: {len(str(response))}' + ) + info.append({'version': ios_version, 'advisories': response.get('advisories', 'notfound')}) + log_message('ciscoapi:psirt-ios-found: %s' % info) + else: + log_message( + f'notfound. ios_version: {ios_version}, ' + f'time: {time.asctime(time.localtime(time.time()))}, ' + f'len: {len(str(psirt_response))}' + ) + info.append({'version': ios_version, 'advisories': 'notfound'}) + log_message('ciscoapi:psirt-ios-notfound: %s' % info) + return info + + +def get_psirt_by_iosxe_version(psirtios: List[str], access_token: AccessToken, settings: Settings): + info = [] + + headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token.token} + req_url = 'https://apix.cisco.com/security/advisories/iosxe?version=' + + # requests.packages.urllib3.disable_warnings() + if list(psirtios) is not []: + for ios_version in psirtios: + psirt_response = requests.get(req_url + ios_version, headers=headers, proxies=settings.proxies) + if psirt_response.ok: + psirt_response.encoding = 'UTF-8' + response = (json.loads(psirt_response.text)) + info.append({'version': ios_version, 'advisories': response.get('advisories', 'notfound')}) + log_message(f'ciscoapi:psirt-iosxe-found: {info}') + else: + info.append({'version': ios_version, 'advisories': 'notfound'}) + log_message(f'ciscoapi:psirt-iosxe-notfound: {info}') + return info + + +def get_psirt_by_product_family(families: List[str], access_token: AccessToken, settings: Settings): + info = [] + + headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token.token} + req_url = 'https://apix.cisco.com/security/advisories/cvrf/product?product=' + + # requests.packages.urllib3.disable_warnings() + if list(families) is not []: + for family in families: + psirt_response = requests.get(req_url + family, headers=headers, proxies=settings.proxies) + if psirt_response.ok: + psirt_response.encoding = 'UTF-8' + response = (json.loads(psirt_response.text)) + info.append({'family': family, 'advisories': response.get('advisories', 'notfound')}) + log_message('ciscoapi:psirt-family-found: %s' % info) + else: + info.append({'family': family, 'advisories': 'notfound'}) + log_message('ciscoapi:psirt-family-notfound: %s' % info) + return info + + +# get_clean_sn_for_bug_api('ASA5510', '9.1(7)15,8.4(7)30,9.1(6)1') +# return {'9.1(7)15': '9.1(7.15)', '9.1(6)1': '9.1(6.1)', '8.4(7)30': '8.4(7.30)'} +def get_clean_sn_for_bug_api(pid, software_releases): + if software_releases == '' or pid == '': + return {} + software_releases = software_releases.split(',') + clean_sns = {} + + if pid.startswith('ASA'): + # change ASA version from 9.1(2)10 to 9.1(2.10) + for software_release in software_releases: + clean_sn = software_release + if clean_sn[-1] != ')': + clean_sn = clean_sn.split(')') + if len(clean_sn) == 2: + clean_sn = '%s.%s)' % (clean_sn[0], clean_sn[1]) + clean_sns.update({software_release: clean_sn}) + + elif pid.startswith('AIR'): + # change WLC version from 8.5.120.0 to 8.5(120.0) + for software_release in software_releases: + clean_sn = software_release + if clean_sn[-1] != ')': + if len(clean_sn.split('.')) == 4: + clean_sn = clean_sn.split('.') + clean_sn = '%s.%s(%s.%s)' % (clean_sn[0], clean_sn[1], clean_sn[2], clean_sn[3]) + clean_sns.update({software_release: clean_sn}) + + else: + for software_release in software_releases: + clean_sn = software_release + if clean_sn.startswith('16.0'): + # remove leading zeros 16.09.01 to 16.9.1 + clean_sn = clean_sn.split('.') + for x in range(0, len(clean_sn)): + clean_sn[x] = clean_sn[x].lstrip('0') + if clean_sn[x] == '': + clean_sn = '0' + clean_sn = '.'.join(clean_sn) + # remove trailing A-Za-z + while clean_sn[-1].upper() in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ': + clean_sn = clean_sn[:-1] + elif clean_sn.startswith('03.0'): + # convert version from 03.03.04SE to 3.3(4)SE + clean_sn = clean_sn.split('.') + for x in range(0, len(clean_sn)): + clean_sn[x] = clean_sn[x].lstrip('0') + if clean_sn[x] == '': + clean_sn = '0' + digits = '' + for x in clean_sn[2]: + if x.isdigit(): + digits += x + clean_sn[2] = clean_sn[2].replace(digits, '') + clean_sn = '%s.%s(%s)%s' % (clean_sn[0], clean_sn[1], digits, clean_sn[2]) + clean_sns.update({software_release: clean_sn}) + + return clean_sns + + +def get_bug_by_pid_and_release(pid, release, access_token: AccessToken, reqoptions, settings: Settings): + info = [] + + headers = {'accept': 'application/json', 'Authorization': 'Bearer ' + access_token.token} + req_url = 'https://apix.cisco.com/bug/v2.0/bugs/products/product_id/' + + if release != '': + software_releases = get_clean_sn_for_bug_api(pid, release) + clean_sn = ','.join(software_releases.values()) + missing = {} + bug_response = requests.get( + url=req_url + f'{pid}/software_releases/{clean_sn}{reqoptions}', + headers=headers, + proxies=settings.proxies + ) + if bug_response.ok: + bug_response.encoding = 'UTF-8' + response = (json.loads(bug_response.text)) + bug_list = response.get('bugs') + pagination_record = response.get('pagination_response_record') + last_page = int(pagination_record.get('last_index')) + total_records = int(pagination_record.get('total_records')) + log_message(message=f'PID: {pid}, Version: {clean_sn}, Total records: {total_records}, Pages: {last_page}', + level='INFO') + if last_page > 1: + if reqoptions != '': + reqoptions = reqoptions + '&page_index=' + else: + reqoptions = '?page_index=' + for page in range(2, last_page + 1): + # time.sleep(2) + page_options = reqoptions + '%s' % page + bug_response = requests.get(req_url + f'{pid}/software_releases/{clean_sn}{page_options}', + headers=headers, proxies=settings.proxies) + if bug_response.ok: + response = (json.loads(bug_response.text)) + bug_list += response.get('bugs') + else: + bug_response.encoding = 'UTF-8' + status_code = bug_response.status_code + reason = bug_response.reason + url = bug_response.url + text = bug_response.text + log_message(message=f'ciscoapi error: {status_code}, {reason}, Page: {page}, LastPage: ' + f'{last_page}, URL: \'{url}\'. Text: \'{text}\'', + level='WARNING') + missing.update({page: {'status_code': status_code, + 'reason': reason, + 'url': url, + 'text': text}}) + info.append({'pid': pid, + 'software_releases': software_releases, + 'bugs': bug_list, + 'total_records': total_records, + 'missing': missing}) + log_message('ciscoapi:bug-found: %s' % info) + else: + bug_response.encoding = 'UTF-8' + status_code = bug_response.status_code + reason = bug_response.reason + url = bug_response.url + text = bug_response.text + page = 'ALL' + log_message(f'ciscoapi error: {status_code}, {reason}, URL: \'{url}\'. Text: \'{text}\'', level='WARNING') + missing.update({page: {'status_code': status_code, + 'reason': reason, + 'url': url, + 'text': text}}) + info.append({'pid': pid, + 'software_releases': software_releases, + 'status_code': status_code, + 'reason': reason, + 'missing': missing}) + return info diff --git a/gui/views/inv_cisco_livecycle.py b/source/gui/views/inv_cisco_livecycle.py similarity index 100% rename from gui/views/inv_cisco_livecycle.py rename to source/gui/views/inv_cisco_livecycle.py diff --git a/gui/wato/inv_cisco_bug.py b/source/gui/wato/inv_cisco_bug.py similarity index 100% rename from gui/wato/inv_cisco_bug.py rename to source/gui/wato/inv_cisco_bug.py diff --git a/gui/wato/inv_cisco_contract.py b/source/gui/wato/inv_cisco_contract.py similarity index 100% rename from gui/wato/inv_cisco_contract.py rename to source/gui/wato/inv_cisco_contract.py diff --git a/gui/wato/inv_cisco_eox.py b/source/gui/wato/inv_cisco_eox.py similarity index 100% rename from gui/wato/inv_cisco_eox.py rename to source/gui/wato/inv_cisco_eox.py diff --git a/gui/wato/inv_cisco_psirt.py b/source/gui/wato/inv_cisco_psirt.py similarity index 100% rename from gui/wato/inv_cisco_psirt.py rename to source/gui/wato/inv_cisco_psirt.py diff --git a/packages/inv_cisco_support b/source/packages/inv_cisco_support similarity index 98% rename from packages/inv_cisco_support rename to source/packages/inv_cisco_support index 34bca191bb192083a35e3973dd104ffa74b3e45b..71a53238092cc376c06195789be1c38dace5b958 100644 --- a/packages/inv_cisco_support +++ b/source/packages/inv_cisco_support @@ -40,5 +40,5 @@ 'suggested software', 'version': '0.3.0-20231025', 'version.min_required': '2.2.0b1', - 'version.packaged': '2.2.0p11', + 'version.packaged': '2.2.0p24', 'version.usable_until': '2.3.0b1'} diff --git a/web/htdocs/css/inv_cisco_support.css b/source/web/htdocs/css/inv_cisco_support.css similarity index 100% rename from web/htdocs/css/inv_cisco_support.css rename to source/web/htdocs/css/inv_cisco_support.css