Collection of CheckMK checks (see https://checkmk.com/). All checks and plugins are provided as is. Absolutely no warranty. Send any comments to thl-cmk[at]outlook[dot]com

Skip to content
Snippets Groups Projects
Commit 554840df authored by thl-cmk's avatar thl-cmk :flag_na:
Browse files

update project

parent 23a9eb23
No related branches found
No related tags found
No related merge requests found
......@@ -36,7 +36,7 @@ from cmk.base.plugins.agent_based.agent_based_api.v1.type_defs import (
InventoryResult,
)
from cmk.base.plugins.agent_based.utils.ciscoapi import (
from cmk.base.plugins.agent_based.utils.inv_cisco_support import (
set_loglevel,
expand_path,
get_base_path,
......
......@@ -19,12 +19,8 @@
# cleanup create_sn2info_record
# 2021-07-24: rewritten for CMK 2.0
# 2021-07-25: some cleanup
# 2021-07-30: moved parse function to utils
#
# ToDo: parse function and SNMP section can be shared between inv_cisco_eox and inv_cisco_contract
# pro: only one section/parse function
# con: with only one pare function these plugins can not disabled separately
# solution: separate SNMP section, common parse function
# ToDo: error handling for json.load -> in eox/contract pass, in psirt/bug -> exit
# ToDo: exit codes ??
#
import os
......@@ -32,7 +28,6 @@ import json
import time
from typing import Dict
from dataclasses import dataclass
from cmk.base.plugins.agent_based.agent_based_api.v1 import (
register,
......@@ -43,30 +38,27 @@ from cmk.base.plugins.agent_based.agent_based_api.v1 import (
TableRow,
)
from cmk.base.plugins.agent_based.agent_based_api.v1.type_defs import (
StringTable,
InventoryResult,
)
from cmk.base.plugins.agent_based.utils.ciscoapi import (
from cmk.base.plugins.agent_based.utils.inv_cisco_support import (
expand_path,
check_bad_serial,
pid_on_black_list,
set_pid_black_list,
get_base_path,
parse_inv_cisco_contract_eox,
SnmpContractEntry,
)
@dataclass
class SnmpContractEntry:
phydescr: str
pid: str
def _create_sn2info_record(sn2infofile, optional_columns):
sn2info = {}
if os.path.isfile(sn2infofile):
with open(sn2infofile) as f:
sn2inforecord = json.load(f)
try:
sn2inforecord = json.load(f)
except ValueError as e:
return {'serial_number': f'json load error {e}'}
modifytime = os.path.getmtime(sn2infofile)
sn2info.update({'Last_checked': time.strftime('%Y-%m-%d', time.localtime(modifytime))})
......@@ -95,31 +87,6 @@ def _create_sn2info_record(sn2infofile, optional_columns):
return sn2info
def parse_inv_cisco_contract(string_table: StringTable) -> Dict[str, SnmpContractEntry]:
serials: Dict[str, SnmpContractEntry] = {}
for phydescr, physerialnum, phymodelname in string_table:
if not check_bad_serial(physerialnum):
if not pid_on_black_list(phymodelname):
phymodelname = phymodelname.strip()
physerialnum = physerialnum.strip()
if phymodelname == '':
phymodelname = 'UNSPECIFIED'
pid = phymodelname.split()[0].upper()
serial = physerialnum.split()[0].upper()
if serial not in serials.keys():
serials.update({serial: SnmpContractEntry(
phydescr=phydescr,
pid=pid
)})
else:
temp = serials[serial]
temp.phydescr = ', '.join(list(set(temp.phydescr.split(', ') + [phydescr])))
temp.pid = ', '.join(list(set(temp.pid.split(', )') + [pid])))
serials[serial] = temp
return serials
def inventory_cisco_contract(params, section: Dict[str, SnmpContractEntry]) -> InventoryResult:
optionalcolumns = [
'contract_site_state_province',
......@@ -178,7 +145,7 @@ def inventory_cisco_contract(params, section: Dict[str, SnmpContractEntry]) -> I
register.snmp_section(
name='inv_cisco_contract',
parse_function=parse_inv_cisco_contract,
parse_function=parse_inv_cisco_contract_eox,
fetch=SNMPTree(
base='.1.3.6.1.2.1.47.1.1.1.1', # ENTITY-MIB::entPhysicalEntry
oids=[
......
......@@ -19,13 +19,13 @@
# moved node tree from hardware.system.support to hardware.system
# 2021-07-23: rewrite for CMK 2.0
# 2021-07-26: cleanup
# 2021-07-30: moved parse function to utils
import os
import json
import time
from typing import Dict
from dataclasses import dataclass
from cmk.base.plugins.agent_based.agent_based_api.v1 import (
register,
......@@ -36,34 +36,30 @@ from cmk.base.plugins.agent_based.agent_based_api.v1 import (
TableRow,
)
from cmk.base.plugins.agent_based.agent_based_api.v1.type_defs import (
StringTable,
InventoryResult,
)
# from cmk.base.plugins.agent_based.utils import ciscoapi as utils
from cmk.base.plugins.agent_based.utils.ciscoapi import (
from cmk.base.plugins.agent_based.utils.inv_cisco_support import (
expand_path,
check_bad_serial,
pid_on_black_list,
sn_on_black_list,
pid_on_bad_list,
set_pid_bad_list,
set_sn_black_list,
get_base_path,
parse_inv_cisco_contract_eox,
SnmpContractEntry,
)
@dataclass
class SnmpContractEntry:
phydescr: str
pid: str
def _create_eox_record(eoxfile, optional_columns):
eox = {}
if os.path.isfile(eoxfile):
with open(eoxfile) as f:
eoxrecord = json.load(f)
try:
eoxrecord = json.load(f)
except ValueError as e:
return {}
modifytime = os.path.getmtime(eoxfile)
......@@ -104,31 +100,6 @@ def _create_eox_record(eoxfile, optional_columns):
return eox
def parse_inv_cisco_eox(string_table: StringTable) -> Dict[str, SnmpContractEntry]:
serials: Dict[str, SnmpContractEntry] = {}
for phydescr, physerialnum, phymodelname in string_table:
if not check_bad_serial(physerialnum):
if not pid_on_black_list(phymodelname):
phymodelname = phymodelname.strip()
physerialnum = physerialnum.strip()
if phymodelname == '':
phymodelname = 'UNSPECIFIED'
pid = phymodelname.split()[0].upper()
serial = physerialnum.split()[0].upper()
if serial not in serials.keys():
serials.update({serial: SnmpContractEntry(
phydescr=phydescr,
pid=pid
)})
else:
temp = serials[serial]
temp.phydescr = ', '.join(list(set(temp.phydescr.split(', ') + [phydescr])))
temp.pid = ', '.join(list(set(temp.pid.split(', )') + [pid])))
serials[serial] = temp
return serials
def inventory_cisco_eox(params, section: Dict[str, SnmpContractEntry]) -> InventoryResult:
always_use_serial = False
optionalcolumns = ['EndOfSecurityVulSupportDate',
......@@ -268,7 +239,7 @@ def inventory_cisco_eox(params, section: Dict[str, SnmpContractEntry]) -> Invent
register.snmp_section(
name='inv_cisco_eox',
parse_function=parse_inv_cisco_eox,
parse_function=parse_inv_cisco_contract_eox,
fetch=SNMPTree(
base='.1.3.6.1.2.1.47.1.1.1.1', # ENTITY-MIB::entPhysicalEntry
oids=[
......
......@@ -44,7 +44,7 @@ from cmk.base.plugins.agent_based.agent_based_api.v1.type_defs import (
InventoryResult,
)
from cmk.base.plugins.agent_based.utils.ciscoapi import (
from cmk.base.plugins.agent_based.utils.inv_cisco_support import (
expand_path,
get_base_path,
)
......@@ -61,7 +61,11 @@ def _create_psirt_record(filepath, filename, not_updated, dont_show_older_then,
advisories = {}
if os.path.isfile(psirtfile):
with open(psirtfile) as f:
psirtrecord = json.load(f)
try:
psirtrecord = json.load(f)
except ValueError as e:
exit()
advisories = psirtrecord.get('advisories')
remove_advisories = []
for advisory in advisories:
......
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
#
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2017-03-20
#
# include file, will be used with snmp_cisco_eox and snmp_cisco_contract
#
# 2017-05-29: fixed empty pid handling
# added serial number cleanup
# 2021-07-23: rewrite for CMK 2.0
# 2021-07-30: renamed form ciscoapi.py to inv_cisco_support.py
import os
import logging
import re
import json
from typing import List, Dict
from dataclasses import dataclass
from cmk.base.plugins.agent_based.agent_based_api.v1.type_defs import (
StringTable,
)
@dataclass
class SnmpContractEntry:
phydescr: str
pid: str
#
# global variables
#
# list of PIDs to drop
g_PID_black_list = ['BUILT-IN', 'MICRON', 'C400-MTFDD']
# list of PIDs to try by serial number
g_PID_bad_list = ['UNSPECIFIED', 'FABRIC', 'ASA', 'C2611XM-2FE', 'FTLX8570D3BCL', 'FTLF8519P2BCL', 'FTLX8571D3BCL',
'FTRJ-8519-7D', 'PLRXPL-SC-S43'] #
# list of S/Ns to drop
g_SN_black_list = []
def parse_inv_cisco_contract_eox(string_table: StringTable) -> Dict[str, SnmpContractEntry]:
serials: Dict[str, SnmpContractEntry] = {}
for phydescr, physerialnum, phymodelname in string_table:
if not check_bad_serial(physerialnum):
if not pid_on_black_list(phymodelname):
phymodelname = phymodelname.strip()
physerialnum = physerialnum.strip()
if phymodelname == '':
phymodelname = 'UNSPECIFIED'
pid = phymodelname.split()[0].upper()
serial = physerialnum.split()[0].upper()
if serial not in serials.keys():
serials.update({serial: SnmpContractEntry(
phydescr=phydescr,
pid=pid
)})
else:
temp = serials[serial]
temp.phydescr = ', '.join(list(set(temp.phydescr.split(', ') + [phydescr])))
temp.pid = ', '.join(list(set(temp.pid.split(', )') + [pid])))
serials[serial] = temp
return serials
def get_base_path() -> str:
conf_file = os.path.expanduser('~/etc/ciscoapi/ciscoapi.json')
base_path = '~/var/ciscoapi'
# check for conf_file and read parameters
if os.path.isfile(conf_file):
with open(conf_file) as f:
try:
config = json.load(f)
base_path = config['global'].get('base_dir', base_path)
except ValueError as e:
logging.warning(f'inv_cisco_contract:status:JSON load error: {e}')
return base_path
def set_pid_black_list(pid_black_list: List[str]):
global g_PID_black_list
if pid_black_list:
g_PID_black_list = list(set(g_PID_black_list + pid_black_list))
def set_pid_bad_list(pid_bad_list: List[str]):
global g_PID_bad_list
if pid_bad_list:
g_PID_bad_list = list(set(g_PID_bad_list + pid_bad_list))
def set_sn_black_list(sn_black_list: List[str]):
global g_SN_black_list
if sn_black_list:
g_SN_black_list = list(set(g_SN_black_list + sn_black_list))
def check_dir_and_create(directory: str) -> bool:
"""
check if dir exists, if not try to create it.
Args:
directory:
Returns:
True: if dir exists or creation was ok
False: if dir not exists and creation was not ok
"""
directory = os.path.dirname(directory)
if not os.path.exists(directory):
try:
os.makedirs(directory)
except:
return False
return True
def expand_path(path: str) -> str:
"""
expand user dir and add '/' if necessary and create directory if it not exists
"""
homedir = os.path.expanduser('~')
if path.startswith('~'):
path = homedir + path[1:]
if not path.endswith('/'):
path += '/'
if not check_dir_and_create(path):
return ''
return path
def sn_on_black_list(serial: str) -> bool:
"""
Checks if serial on black list
Args:
serial: the serial number to check
Returns:
True: if serial number on black list
False: if serial number is not on black list
"""
global g_SN_black_list
if serial.upper() in g_SN_black_list:
return True
return False
def pid_on_black_list(pid: str) -> bool:
"""
returns True if pid on black list
"""
global g_PID_black_list
for drop_PID in g_PID_black_list:
if pid.startswith(drop_PID.upper()):
return True
# if PID not on Black list return false
return False
def pid_on_bad_list(pid: str) -> bool:
"""
returns True if PID on Bad list
"""
global g_PID_bad_list
# remove all chars from string, except allowedchars
allowedchars = re.compile('[^A-Z0-9_=\/\-\+\.\\\]')
cleanpid = allowedchars.sub('', pid).strip().upper()
# if PID contains illegal signs or if pid empty try by serial number
if (cleanpid != pid) or (cleanpid == ''):
return True
# list of bad PIDs :-( we need to try get EoX info via serial number for this PIDs
# needs to be configurable via a file if too big for wato ;-(
for bad_PID in g_PID_bad_list:
if pid.startswith(bad_PID):
return True
return False
def check_bad_serial(serial: str) -> bool:
"""
Checks a serial number for illegal signs (anything except A-Z0-9_=-+.\)
Args:
serial: serial number to check
Returns:
False: if serial number is ok
True: if serial number contains bad signs
"""
logging.info('Check_bad_serial:serial: %s' % serial)
serial = serial.replace(' ', '').upper()
# remove all chars from string, except allowedchars
allowedchars = re.compile('[^A-Z0-9_=\-\+\.\\\]')
cleanserial = allowedchars.sub('', serial).strip()
logging.info('Check_bad_serial:cleanserial: %s' % cleanserial)
# if serial contains illegal signs or empty return true
if (cleanserial != serial) or (cleanserial == ''):
logging.info('Check_bad_serial:bad:serial is bad')
return True
logging.info('Check_bad_serial:serial is god')
return False
def set_loglevel():
# set default logglevel
logging.basicConfig(level=logging.WARNING)
# has no effect as long previous command is active (is by design)
logging.basicConfig(level=logging.INFO)
......@@ -249,122 +249,6 @@ def main():
else:
reqoptions = ''
# pids_requested = {
# # 'ISR4431/K9': '03.16.05.S,03.16.04b.S', # 03.16.05.S,03.16.04b.S
# # 'ISR-AP1100AC-E': '8.5.110.0',
# #
# # 'WS-X4013+TS': '12.2(25)SG4', # --> not in database
# # 'WS-C3750G-24TS-1U': '12.2(55)SE8', # WS-C3750G-24TS-E1U --> wrong PID from inventory??
# #
# # 'C1861W-UC-2BRI-K9': '15.1(4)M12a', # pid not in database (only 1861 and 1861E not W...)
# # 'C6800IA-48TD': '15.0(2)EX7',
# # 'C6880-X-LE-16P10G': '15.1(2)SY4a',
# # 'C6880-X-LE-SUP': '15.1(2)SY4a',
# # 'CISCO2801': '15.1(4)M12a',
# # 'CISCO891-K9': '15.3(3)M3,15.3(3)M2',
# # 'WS-C2960S-F24TS-L': '15.0(2)SE10a,15.0(2)SE6',
# # 'WS-C2960S-F48TS-L': '15.0(2)SE10a,15.0(2)SE11,15.0(2)SE6',
# # 'CISCO888-K9': '15.1(4)M4,15.1(4)M5,15.3(3)M4',
# # 'WS-C2960G-24TC-L': '12.2(55)SE1,12.2(44)SE6',
# # 'CISCO881-K9': '15.2(4)M6a,15.4(3)M6a',
# # 'CISCO871-K9': '12.4(24)T8,12.4(15)T12,15.1(4)M8',
# # 'WS-C2960PD-8TT-L': '15.0(2)SE6,12.2(55)SE12,15.0(2)SE10a',
# #
# # 'N3K-C3064PQ-10GX': '7.0(3)I4(4)',
# # 'N5K-C5548UP': '7.0(8)N1(1)',
# # 'N5K-C5596UP': '7.3(2)N1(1)',
# # 'N5K-C56128P': '7.3(0)N1(1)',
# # 'N5K-C5672UP': '7.0(8)N1(1)',
# #
# # 'CISCO3845-MB': '12.4(15)T13',
# # 'WS-C2960-24TC-L': '12.2(44)SE6',
# # 'WS-C3750-24PS-S': '12.2(35)SE5',
# # 'WS-C3750G-12S-E': '12.2(55)SE8',
# # 'WS-C3750G-24PS-S': '12.2(55)SE1',
# #
# # 'N9K-C93180YC-EX': '13.2(2o)',
# # 'N9K-C9336PQ': '13.2(2o)',
# #
# # 'APIC-SERVER-M2': '3.2(2o)',
# # 'C891F-K9': '15.3(3)M4',
# # 'CISCO1941/K9': '15.0(1)M7',
# # 'CISCO876W-G-E-K9': '15.1(4)M10',
# # 'CISCO881-SEC-K9': '15.2(4)M6a',
# # 'CISCO892-K9': '15.1(4)M5',
# # 'WS-C2960X-24TD-L': '15.2(2)E3',
# # 'WS-C2960X-24TS-LL': '15.2(2)E5',
# # 'WS-C2960X-48FPD-L': '15.0(2)EX5',
# # 'WS-C2960X-48FPS-L': '15.0(2)EX5',
# # 'WS-C2960X-48LPD-L': '15.2(2)E3',
# # 'WS-C2960X-48TS-LL': '15.2(6)E',
# # 'WS-C3560CG-8TC-S': '15.2(2)E8',
# # 'WS-C3750G-48TS-S': '15.0(2)SE6',
# # 'WS-SUP720-3B': '15.1(2)SY2',
# #
# # 'WS-C3850-12S': '03.03.04SE', # 3.3(4)SE
# # 'WS-C3650-24TS-S': '03.07.02E', # 3.7(2)E
# # 'WS-C3650-48TQ': '03.03.05SE', # 3.3(5)SE
# #
# # 'C9500-40X': '16.8.1a',
# # 'C1117-4PMLTEEAWE': '16.09.01',
# # 'ISR4351/K9': '16.09.01',
# # 'ASR1001-X': '16.05.01b',
# # 'ISR4451-X/K9': '16.05.01b',
# # 'C9300-48T': '16.06.03',
# #
# # 'ASA5505': '8.4(4)1',
# # 'ASA5506W': '9.9(2)1',
# # 'ASA5508': '9.6(1)5,9.6(1)10',
# # 'ASA5510': '9.1(6)1,8.4(7)30,9.1(7)15',
# 'ASA5512': '9.6(1),9.2(2)4,9.6(3)8',
# # 'ASA5515': '9.8(1)5,9.9(2)14',
# # 'ASA5520': '9.1(6)11',
# # 'ASA5525': '9.8(1)',
# # 'ASA5550': '9.1(5)',
# # 'ASA5555': '9.9(1)2,9.7(1)4',
# # 'PIX-515E': '8.0(4)32', # pix is not in database
# #
# # 'AIR-CT5520-K9': '8.0(152.0),8.2(166.0),8.2(100.0)',
# # 'AIR-AP1852E-E-K9': '8.2.100.0',
# # 'AIR-CT2504-K9': '8.5.120.0',
# }
#
# try to get product series and get bugs by product series --> does not realy work :-(
#
# product_series_mdf = ciscoapi.get_product_mdf_information_by_pid(pids_requested.keys(), access_token)
# pid_product_series = {}
# for entry in product_series_mdf:
# pid_product_series.update({entry.get('product_id'):entry.get('product_series')})
#
# product_series_release = {}
# for pid in pid_product_series.keys():
# if pid_product_series.get(pid) != '':
# if product_series_release.get(pid_product_series.get(pid), '') == '':
# product_series_release.update({pid_product_series.get(pid): pids_requested.get(pid)})
# else:
# release = product_series_release.get(pid_product_series.get(pid)) + ',' + pids_requested.get(pid)
# product_series_release.update({pid_product_series.get(pid): release})
#
# for product_series in product_series_release.keys():
# release = product_series_release.get(product_series)
# release = release.split(',')
# release_clean = []
# for entry in release:
# if entry not in release_clean:
# release_clean.append(entry)
# product_series_release.update({product_series: ','.join(release_clean)})
# print 'get bugs by product series and release'
# bugs = ciscoapi.get_bug_by_productseries_and_affected_release(product_series_release, access_token, reqoptions)
#
# end product series
#
# print 'get bugs by PID and release'
for pid in pids_requested.keys():
if lifetime > int(time.time()):
access_token, lifetime = get_new_token()
......
......@@ -12,39 +12,66 @@
#
#
import logging
import ntpath
import os
import json
import time
import ciscosupport
import ciscoapi
import sys
from typing import List
from dataclasses import dataclass
def psirt_remove_id_file(psirt_path, psirt_id):
# set logg modul name <file>:<module>.<function>
@dataclass
class Paths:
found: str
not_found: str
request: str
@dataclass
class Refresh:
found: int
not_found: int
g_logger = None
def _psirt_remove_id_file(psirt_path: str, psirt_id: str):
# set logg module name <file>:<module>.<function>
logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name)
# delete psirt file
try:
logger.debug('delete psirt id file : %s' % psirt_path + psirt_id)
logger.debug(f'delete psirt id file : {psirt_path + psirt_id}')
os.remove(psirt_path + psirt_id)
except OSError:
pass
def psirt_dump_record(psirt_record, psirt_id, psirt_path, psirt_path_request):
def _psirt_dump_record(psirt_record, psirt_id: str, psirt_path: str, psirt_path_request: str):
# set logg modul name <file>:<module>.<function>
logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name)
with open(psirt_path + psirt_id, 'w') as f:
json.dump(psirt_record, f)
# delete request file
psirt_remove_id_file(psirt_path_request, psirt_id)
_psirt_remove_id_file(psirt_path_request, psirt_id)
return
def check_psirt_record(psirt_record, psirt_id, psirt_path_found, psirt_path_request):
def _check_psirt_record(psirt_record, psirt_id, psirt_path_found, psirt_path_request):
"""
:param psirt_record:
:param psirt_id:
:param psirt_path_found:
:param psirt_path_request:
:returns:
"""
# set logg modul name <file>:<module>.<function>
logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name)
......@@ -61,53 +88,64 @@ def check_psirt_record(psirt_record, psirt_id, psirt_path_found, psirt_path_requ
if advisory.get(key, None) in [['NA'], 'NA']:
advisory.pop(key, None)
psirt_dump_record(psirt_record, psirt_id, psirt_path_found, psirt_path_request)
_psirt_dump_record(psirt_record, psirt_id, psirt_path_found, psirt_path_request)
return
def _get_psirt_id_list(product_family: str, paths: Paths, refresh: Refresh) -> List[str]:
"""
@param product_family:
@param paths: Path object with path to founnd/not found/requested PSIRT records
@param refresh: Refresh object with number of days before a PSIRT record needs to be refreshed for found/not found
@return: list of PIDs
"""
# create list of ID's to request PSIRT status for
psirt_id_list = ciscosupport.get_ids_from_dir(paths.request + product_family)
g_logger.debug(f'psirt requests : {psirt_id_list}')
# remove already found ID's from list
psirt_id_list = ciscosupport.remove_ids_from_list(psirt_id_list, paths.found + product_family)
g_logger.debug(f'psirt requests : {psirt_id_list}')
# remove not found ID's from list
psirt_id_list = ciscosupport.remove_ids_from_list(psirt_id_list, paths.not_found + product_family)
g_logger.debug(f'psirt requests : {psirt_id_list}')
# refresh psirt after 1 day by default
psirt_id_list = ciscosupport.refresh_ids_from_dir(paths.not_found + product_family, refresh.not_found,
psirt_id_list, True)
g_logger.debug(f'psirt requests : {psirt_id_list}')
psirt_id_list = ciscosupport.refresh_ids_from_dir(paths.found + product_family, refresh.found,
psirt_id_list, False)
g_logger.debug(f'psirt requests : {psirt_id_list}')
return psirt_id_list
def _update_psirt_id(psirt_records: list, family_name: str, paths: Paths):
for psirt_record in psirt_records:
if family_name in ['IOS', 'IOS-XE']:
psirt_id = psirt_record.get('version')
else:
psirt_id = psirt_record.get('family')
if psirt_record.get('advisories') != 'notfound':
_check_psirt_record(psirt_record, psirt_id, paths.found + family_name + '/',
ntpath.sep + family_name + '/')
else:
_psirt_dump_record(psirt_record, psirt_id, paths.not_found + family_name + '/',
paths.request + family_name + '/')
# remove psirt_found file (happens when product family is removed form bug ID)
_psirt_remove_id_file(paths.found + family_name + '/', psirt_id)
return
def main():
def get_psirt_id_list(product_family):
# create list of ID's to request PSIRT status for
psirt_id_list = ciscosupport.get_ids_from_dir(path_request + product_family)
logger.debug('psirt requests : %s' % psirt_id_list)
# remove already found ID's from list
psirt_id_list = ciscosupport.remove_ids_from_list(psirt_id_list, path_found + product_family)
logger.debug('psirt requests : %s' % psirt_id_list)
# remove not found ID's from list
psirt_id_list = ciscosupport.remove_ids_from_list(psirt_id_list, path_not_found + product_family)
logger.debug('psirt requests : %s' % psirt_id_list)
# refresh psirt after 1 day by default
psirt_id_list = ciscosupport.refresh_ids_from_dir(path_not_found + product_family, psirt_refresh_notfound,
psirt_id_list, True)
logger.debug('psirt requests : %s' % psirt_id_list)
psirt_id_list = ciscosupport.refresh_ids_from_dir(path_found + product_family, psirt_refresh_found,
psirt_id_list, False)
logger.debug('psirt requests : %s' % psirt_id_list)
return psirt_id_list
def update_psirt_id(psirt_records, family_name):
for psirt_record in psirt_records:
if family_name in ['IOS', 'IOS-XE']:
psirt_id = psirt_record.get('version')
else:
psirt_id = psirt_record.get('family')
if psirt_record.get('advisories') != 'notfound':
check_psirt_record(psirt_record, psirt_id, path_found + family_name + '/',
path_request + family_name + '/')
else:
psirt_dump_record(psirt_record, psirt_id, path_not_found + family_name + '/',
path_request + family_name + '/')
# remove psirt_found file (happens when product family is removed form bug ID)
psirt_remove_id_file(path_found + family_name + '/', psirt_id)
return
global g_logger
ciscosupport.set_logging('debug')
# set logg modul name <file>:<module>.<function>
logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name)
g_logger = logging.getLogger(__file__ + ':' + __name__ + '.' + sys._getframe().f_code.co_name)
conf_file = '~/etc/ciscoapi/ciscoapi.json'
conf_file = os.path.expanduser(conf_file)
......@@ -136,30 +174,34 @@ def main():
logging.warning(f'snmp_cisco_eox:status:JSON load error: {e}')
exit()
wait_after_start = config['global'].get('base_path', base_path)
base_path = config['global'].get('base_path', base_path)
wait_after_start = config['global'].get('wait_after_start', wait_after_start)
max_wait_time = config['global'].get('max_wait_time', max_wait_time)
loglevel = config['global'].get('loglevel', loglevel)
psirt_refresh_found = config['psirt'].get('psirt_refresh_found', psirt_refresh_found)
psirt_refresh_notfound = config['psirt'].get('psirt_refresh_notfound', psirt_refresh_notfound)
refresh = Refresh(
found=config['psirt'].get('psirt_refresh_found', psirt_refresh_found),
not_found=config['psirt'].get('psirt_refresh_notfound', psirt_refresh_notfound)
)
else:
logger.critical('Config file not found (%s).' % conf_file)
g_logger.critical(f'Config file not found ({conf_file}).')
return False
ciscosupport.set_logging(loglevel)
base_path = ciscosupport.expand_path(base_path)
psirt_dir = base_path + 'psirt'
path_found = psirt_dir + '/found/'
path_not_found = psirt_dir + '/not_found/'
path_request = psirt_dir + '/request/'
paths = Paths(
found=psirt_dir + '/found/',
not_found= psirt_dir + '/not_found/',
request=psirt_dir + '/request/'
)
psirt_ios = get_psirt_id_list('IOS')
psirt_ios_xe = get_psirt_id_list('IOS-XE')
psirt_family = get_psirt_id_list('family')
psirt_ios = _get_psirt_id_list('IOS', paths, refresh)
psirt_ios_xe = _get_psirt_id_list('IOS-XE', paths, refresh)
psirt_family = _get_psirt_id_list('family', paths, refresh)
if (psirt_ios == []) and psirt_ios_xe == [] and psirt_family == []:
logger.debug('all list are empty. Do nothing.')
g_logger.debug('all list are empty. Do nothing.')
return
# wait random time after startup
......@@ -178,20 +220,20 @@ def main():
lifetime = starttime + lifetime - 30
if access_token == '':
logger.critical('failed to get access_token')
g_logger.critical('failed to get access_token')
return
if psirt_family != []:
psirt_records = ciscoapi.get_psirt_by_product_family(psirt_family, access_token)
update_psirt_id(psirt_records, 'family')
_update_psirt_id(psirt_records, 'family', paths)
if psirt_ios_xe != []:
psirt_records = ciscoapi.get_psirt_by_iosxe_version(psirt_ios_xe, access_token)
update_psirt_id(psirt_records, 'IOS-XE')
_update_psirt_id(psirt_records, 'IOS-XE', paths)
if psirt_ios != []:
psirt_records = ciscoapi.get_psirt_by_ios_version(psirt_ios, access_token)
update_psirt_id(psirt_records, 'IOS')
_update_psirt_id(psirt_records, 'IOS', paths)
main()
No preview for this file type
......@@ -17,9 +17,9 @@
'download_url': 'https://thl-cmk.hopto.org',
'files': {'agent_based': ['inv_cisco_eox.py',
'inv_cisco_contract.py',
'utils/ciscoapi.py',
'inv_cisco_bug.py',
'inv_cisco_psirt.py'],
'inv_cisco_psirt.py',
'utils/inv_cisco_support.py'],
'bin': ['ciscoapi/ciscoapi.py',
'ciscoapi/ciscosupport.py',
'ciscoapi/cisco-eox.py',
......@@ -36,7 +36,7 @@
'num_files': 17,
'title': 'Inventory for Cisco Bug, EoX, contract status, PSIRT advisories and '
'suggested software',
'version': '2021-07-25.v0.1a',
'version': '2021-07-20.v0.1b',
'version.min_required': '2.0.0',
'version.packaged': '2021.07.14',
'version.usable_until': None}
\ No newline at end of file
......@@ -127,7 +127,8 @@ def inv_paint_psirt_bugid(bugids):
search_bugid_url = []
for bugid in bugids:
bugid = bugid.strip(' ')
search_bugid_url.append(f'<a class="href_blue" target="_blank" href="https://bst.cloudapps.cisco.com/bugsearch/bug/{bugid}">{bugid}</a>')
search_bugid_url.append(f'<a class="href_blue" target="_blank" '
f'href="https://bst.cloudapps.cisco.com/bugsearch/bug/{bugid}">{bugid}</a>')
search_bugid_url = HTML(', '.join(search_bugid_url))
return '', search_bugid_url
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment