Collection of CheckMK checks (see https://checkmk.com/). All checks and plugins are provided as is. Absolutely no warranty. Send any comments to thl-cmk[at]outlook[dot]com

Skip to content
Snippets Groups Projects
Commit 03f368ff authored by thl-cmk's avatar thl-cmk :flag_na:
Browse files

split open-pnp.py

2023-02-26: reorganized open-pnp.py in to open_pnp_classes.py and open_pnp_utils.py
parent 916f885a
No related branches found
No related tags found
No related merge requests found
...@@ -120,16 +120,20 @@ to use the PnP server you need to configure the server by modifying the followin ...@@ -120,16 +120,20 @@ to use the PnP server you need to configure the server by modifying the followin
#### Global settings [**open-pnp.toml**](/pnp/open-pnp.toml) #### Global settings [**open-pnp.toml**](/pnp/open-pnp.toml)
``` ```
# [settings]
# bind_pnp_server = "0.0.0.0" # bind_pnp_server = "0.0.0.0"
# bind_pnp_server = "::"
# port = 8080 # port = 8080
# time_format = "%Y-%m-%dT%H:%M:%S" # time_format = "%y-%m-%dt%h:%m:%s"
# status_refresh = 60 # status_refresh = 10
# debug = false # debug = false
# log_to_console = false # log_to_file = true
# log_file = "log/pnp_debug.log" # log_file = "log/pnp_debug.log"
# default_cfg_file = "default.cfg"
# image_data = "images.toml" # image_data = "images.toml"
# image_base_url = "http://192.168.10.133:8080/images" # image_url = "http://192.168.10.133:8080/images"
# config_base_url = "http://192.168.10.133:8080/configs" # config_url = "http://192.168.10.133:8080/configs"
# default_cfg = "DEFAULT.cfg"
``` ```
- **bind_pnp_server**: the IP-address of your open-pnp server box. (Use `"::"` for IPv6) - **bind_pnp_server**: the IP-address of your open-pnp server box. (Use `"::"` for IPv6)
...@@ -142,6 +146,7 @@ to use the PnP server you need to configure the server by modifying the followin ...@@ -142,6 +146,7 @@ to use the PnP server you need to configure the server by modifying the followin
- **image_data**: the file containing the data of your IOS/IOS-XE images - **image_data**: the file containing the data of your IOS/IOS-XE images
- **image_base_url**: the base URL for your images - **image_base_url**: the base URL for your images
- **config_base_url**: the base URL for your configuration files - **config_base_url**: the base URL for your configuration files
- **default_cfg**: default config to use if no device specific config is found.
**Note**: you need to uncomment (remove `# `) the lines if you change the values. **Note**: you need to uncomment (remove `# `) the lines if you change the values.
...@@ -170,50 +175,46 @@ models = ["C1000-8T-2G-L", "C1000-24P-4G-L", "C1000-24T-4G-L", "C1000-24T-4X-L", ...@@ -170,50 +175,46 @@ models = ["C1000-8T-2G-L", "C1000-24P-4G-L", "C1000-24T-4G-L", "C1000-24T-4X-L",
### Command Line Options ### Command Line Options
With the Command Line Options you can override the default values, and the values from the _open-pnp.toml_ config file. With the Command Line Options you can override the default values, and the values from the _open-pnp.toml_ config file.
With the option --config_file CONFIG_FILE you can specify a costume config file to use instead of _open.pnp.toml_. With the option --config_file CONFIG_FILE you can specify a costume config file to use instead of _open-pnp.toml_.
``` ```
$ ./open-pnp.py -h $ ./open-pnp.py -h
usage: open-pnp.py [-h] [--bind_pnp_server BIND_PNP_SERVER] [--port PORT] usage: open-pnp.py [-h] [-b BIND_PNP_SERVER] [-p PORT] [-r STATUS_REFRESH]
[--time_format TIME_FORMAT] [-v] [--config_file CONFIG_FILE] [--config_url CONFIG_URL]
[--status_refresh STATUS_REFRESH] [--debug] [--image_data IMAGE_DATA] [--image_url IMAGE_URL] [--debug]
[--log_to_console] [--log_file LOG_FILE] [--default_cfg DEFAULT_CFG] [--log_file LOG_FILE]
[--image_data IMAGE_DATA] [--image_url IMAGE_URL] [--log_to_console] [--time_format TIME_FORMAT]
[--config_url CONFIG_URL] [--config_file CONFIG_FILE]
This is a basic implementation of the Cisco PnP protocol. It is intended to This is a basic implementation of the Cisco PnP protocol. It is intended to
roll out image updates and configurations for Cisco IOS/IOS-XE devices on roll out image updates and configurations for Cisco IOS/IOS-XE devices on day0.
day0.
optional arguments: 20230223.v1.0.1 | Written by: thl-cmk, for more information see: https://thl-cmk.hopto.org
options:
-h, --help show this help message and exit -h, --help show this help message and exit
--bind_pnp_server BIND_PNP_SERVER, -b BIND_PNP_SERVER -b BIND_PNP_SERVER, --bind_pnp_server BIND_PNP_SERVER
Bind PnP server to IP-address. (default: 0.0.0.0) Bind PnP server to IP-address. (default: 0.0.0.0)
--port PORT, -p PORT TCP port to listen on. (default: 8080) -p PORT, --port PORT TCP port to listen on. (default: 8080)
--time_format TIME_FORMAT -r STATUS_REFRESH, --status_refresh STATUS_REFRESH
Format string to render time. (default: Time in seconds to refresh PnP server status page. (default: 60)
%Y-%m-%dT%H:%M:%S) -v, --version Print open-pnp-server version and exit
--status_refresh STATUS_REFRESH, -r STATUS_REFRESH --config_file CONFIG_FILE
Time in seconds to refresh PnP server status page. Path/name of open PnP server config file. (default: open-pnp.toml)
(default: 60) --config_url CONFIG_URL
--debug Enable Debug output send to "log_file". Download URL for config files. I.e. http://192.168.10.133:8080/configs
--log_to_console Enable debug output send to stdout (requires --debug).
--log_file LOG_FILE Path/name of the logfile. (default: log/pnp_debug.log,
requires --debug)
--image_data IMAGE_DATA --image_data IMAGE_DATA
File containing the image description. (default: File containing the image description. (default: images.toml)
images.toml)
--image_url IMAGE_URL --image_url IMAGE_URL
Download URL for image files. I.e. Download URL for image files. I.e. http://192.168.10.133:8080/images
http://192.168.10.133:8080/images --debug Enable Debug output send to "log_file".
--config_url CONFIG_URL --default_cfg DEFAULT_CFG
Download URL for config files. I.e. default config to use if no device specific config is found. (default: DEFAULT.cfg)
http://192.168.10.133:8080/configs --log_file LOG_FILE Path/name of the logfile. (default: log/pnp_debug.log, requires --debug)
--config_file CONFIG_FILE --log_to_console Enable debug output send to stdout (requires --debug).
Path/name of open PnP server config file. (default: --time_format TIME_FORMAT
open-pnp.toml) Format string to render time. (default: %Y-%m-%dT%H:%M:%S)
Written by: thl-cmk, for more information see: https://thl-cmk.hopto.org Usage: python open-pnp.py --config_url http://192.168.10.133:8080/configs --image_url http://192.168.10.133:8080/images
``` ```
......
["cat9k_iosxe.17.06.01.SPA.bin"]
version = "17.6.1"
md5 = "fdb9c92bae37f9130d0ee6761afe2919"
size = 1
models = ["C9500-24Q", "C9300-24P"]
["asr1000-universalk9.17.05.01a.SPA.bin"]
version = "17.5.1a"
md5 = "0e4b1fc1448f8ee289634a41f75dc215"
size = 1
models = ["ASR1001-HX"]
["c1000-universalk9-mz.152-7.E7.bin"] ["c1000-universalk9-mz.152-7.E7.bin"]
version = "15.2(7)E7" version = "15.2(7)E7"
md5 = "1e6f508499c36434f7035b83a4018390" md5 = "1e6f508499c36434f7035b83a4018390"
...@@ -40,12 +28,6 @@ md5 = "1c4c0597d355a0926c3d7198c1167cae" ...@@ -40,12 +28,6 @@ md5 = "1c4c0597d355a0926c3d7198c1167cae"
size = 22967296 size = 22967296
models = ["WS-C3560CX-12PD-S"] models = ["WS-C3560CX-12PD-S"]
["ir1101-universalk9.17.06.01.SPA.bin"]
version = "17.6.1"
md5 = "2654cd5734a0722cfd3f521c25071552"
size = 638837341
models = []
["ir1101-universalk9.17.06.01a.SPA.bin"] ["ir1101-universalk9.17.06.01a.SPA.bin"]
version = "17.6.1a" version = "17.6.1a"
md5 = "492f47e99136de9e0982c755ac1e1972" md5 = "492f47e99136de9e0982c755ac1e1972"
......
...@@ -33,406 +33,50 @@ ...@@ -33,406 +33,50 @@
# 2023-02-22: removed regex -> was not working with PID: ISR4451-X/K9 # 2023-02-22: removed regex -> was not working with PID: ISR4451-X/K9
# added PNP_SERVER_VERSION # added PNP_SERVER_VERSION
# 2023-02-23: added cli option -v/--version, --default_cfg # 2023-02-23: added cli option -v/--version, --default_cfg
# 2023-02-26: reorganized open-pnp.py in to open_pnp_classes.py and open_pnp_utils.py
#
# pip install flask xmltodict requests ifaddr tomli # pip install flask xmltodict requests ifaddr tomli
# #
# ToDo: # ToDo:
# add remove inactive job on IOS-XE devices if no space for image update # add remove inactive job on IOS-XE devices if no space for image update
# system libs # system libs
import logging
from logging.handlers import RotatingFileHandler
# from re import compile as re_compile # from re import compile as re_compile
from time import strftime from time import strftime
from typing import Optional, List, Dict, Any from typing import Optional, Dict, Any
from requests import head from requests import head
from sys import stdout from logging import getLogger
from argparse import (
Namespace as arg_Namespace,
ArgumentParser,
RawTextHelpFormatter,
)
# additional libs # additional libs
from flask import Flask, request, send_from_directory, render_template, Response, redirect, cli from flask import (
Flask,
Response,
send_from_directory,
render_template,
redirect,
request,
cli
)
from xmltodict import parse as xml_parse from xmltodict import parse as xml_parse
from ifaddr import get_adapters
from tomli import load as toml_load
from tomli import TOMLDecodeError
PNP_SERVER_VERSION = '20230223.v1.0.1'
class Settings:
def __init__(
self,
cli_args: Dict[str, any],
version: bool = False,
cfg_file: Optional[str] = 'open-pnp.toml',
image_data: Optional[str] = 'images.toml',
bind_pnp_server: Optional[str] = '0.0.0.0',
port: Optional[int] = 8080,
time_format: Optional[str] = '%Y-%m-%dT%H:%M:%S',
status_refresh: Optional[int] = 60,
debug: Optional[bool] = False,
log_to_console: Optional[bool] = False,
log_file: Optional[str] = 'log/pnp_debug.log',
image_url: Optional[str] = '',
config_url: Optional[str] = '',
default_cfg: Optional[str] = 'DEFAULT.cfg',
):
self.__settings = {
'cfg_file': cfg_file,
'version': version,
'image_data': image_data,
'bind_pnp_server': bind_pnp_server,
'port': port,
'time_format': time_format,
'status_refresh': status_refresh,
'debug': debug,
'log_to_console': log_to_console,
'log_file': log_file,
'image_url': image_url,
'config_url': config_url,
'default_cfg_file': default_cfg,
}
self.__args = {}
self.__set_cli_args(cli_args)
def __set_cli_args(self, cli_args: Dict[str, any]):
self.__args = ({k: v for k, v in cli_args.items() if v})
self.__settings.update(self.__args)
def update(self, cfg_file: str):
try:
with open(cfg_file, 'rb') as f:
self.__settings.update(toml_load(f))
except FileNotFoundError as e:
print(f'ERROR: Data file {cfg_file} not found! ({e})')
exit(1)
except TOMLDecodeError as e:
print(f'ERROR: Data file {cfg_file} is not in valid toml format! ({e})')
exit(2)
self.__settings.update(self.__args)
@property
def cfg_file(self) -> str:
return self.__settings['cfg_file']
@property
def version(self) -> bool:
return self.__settings['version']
@property
def image_data(self) -> str:
return self.__settings['image_data']
@property
def bind_pnp_server(self) -> str:
return self.__settings['bind_pnp_server']
@property
def port(self) -> int:
return self.__settings['port']
@property
def time_format(self) -> str:
return self.__settings['time_format']
@property
def status_refresh(self) -> int:
return self.__settings['status_refresh']
@property
def debug(self) -> bool:
return self.__settings['debug']
@property
def log_to_console(self) -> bool:
return self.__settings['log_to_console']
@property
def log_file(self) -> str:
return self.__settings['log_file']
@property
def image_url(self) -> str:
return self.__settings['image_url']
@property
def config_url(self) -> str:
return self.__settings['config_url']
@property
def default_cfg(self) -> str:
return self.__settings['default_cfg']
class SoftwareImage:
def __init__(self, image: str, version: str, md5: str, size: int,):
self.image: str = image
self.version: str = version
self.md5: str = md5
self.size: int = size
class ErrorCodes:
__readable = {
0: 'No error',
100: 'unknown platform',
101: 'no free space for update',
102: 'unknown image',
103: 'config file not found',
104: 'image file not found',
1412: 'Invalid input detected (config)',
1413: 'Invalid input detected',
1609: 'Error while retrieving device filesystem info',
1816: 'Error verifying checksum for Image',
1829: 'Image copy was unsuccessful',
1803: 'Source file not found',
}
def __init__(self):
self.ERROR_NO_ERROR = 0
self.ERROR_NO_PLATFORM = 100
self.ERROR_NO_FREE_SPACE = 101
self.ERROR_NO_IMAGE = 102
self.ERROR_NO_CFG_FILE = 103
self.ERROR_NO_IMAGE_FILE = 104
self.PNP_ERROR_INVALID_CONFIG = 1412
self.PNP_ERROR_INVALID_INPUT = 1413
self.PNP_ERROR_NO_FILESYSTEM_INFO = 1609
self.PNP_ERROR_BAD_CHECKSUM = 1816
self.PNP_ERROR_IMAGE_COPY_UNSUCCESSFUL = 1829
self.PNP_ERROR_FILE_NOT_FOUND = 1803
def readable(self, error_code: int):
return self.__readable.get(error_code, f'unknown: {error_code}')
class PnpFlow:
__readable = {
0: 'None',
1: 'new device',
2: 'info',
10: 'update required',
11: 'update started',
12: 'no update required/done',
13: 'update done -> reloading',
21: 'config start',
22: 'config down -> reloading',
23: 'reload for config update',
99: 'finished',
}
def __init__(self):
self.NONE = 0
self.NEW = 1
self.INFO = 2
self.UPDATE_NEEDED = 10
self.UPDATE_START = 11
self.UPDATE_DOWN = 12
self.UPDATE_RELOAD = 13
self.CONFIG_START = 21
self.CONFIG_DOWN = 22
self.CONFIG_RELOAD = 23
self.FINISHED = 99
def readable(self, state: int):
return self.__readable.get(state, 'unknown')
class Device:
def __init__(self, udi: str, platform: str, hw_rev: str, serial: str, first_seen: str, last_contact: str,
src_address: str, current_job: str):
self.udi: str = udi
self.platform: str = platform
self.hw_rev: str = hw_rev
self.serial: str = serial
self.ip_address: str = src_address
self.current_job: str = current_job
self.first_seen: str = first_seen
self.last_contact: str = last_contact
self.version: str = ''
self.image: str = ''
self.destination_name: str = ''
self.destination_free: Optional[int] = None
self.status: str = ''
self.__pnp_flow: int = PNPFLOW.NEW
self.pnp_flow_readable: str = PNPFLOW.readable(PNPFLOW.NEW)
self.target_image: Optional[SoftwareImage] = None
self.backoff: bool = False
self.__refresh_data: bool = False
self.refresh_button: str = ''
self.__error_code: int = 0
self.error_code_readable: str = ERROR.readable(ERROR.ERROR_NO_ERROR)
self.error_count: int = 0
self.error_message = ''
self.__hard_error: bool = False
self.__status_class: str = ''
@property
def pnp_flow(self) -> int:
return self.__pnp_flow
@pnp_flow.setter
def pnp_flow(self, pnp_flow: int):
self.__pnp_flow = pnp_flow
self.pnp_flow_readable = PNPFLOW.readable(pnp_flow)
if pnp_flow == PNPFLOW.FINISHED:
self.__status_class = 'finished'
@property
def refresh_data(self) -> bool:
return self.__refresh_data
@refresh_data.setter
def refresh_data(self, new_state: bool):
self.__refresh_data = new_state
if new_state:
self.refresh_button = 'disabled='
else:
self.refresh_button = 'enabled='
@property
def error_code(self) -> int:
return self.__error_code
@error_code.setter
def error_code(self, error_code: int):
self.__error_code = error_code
self.error_code_readable = ERROR.readable(error_code)
self.__status_class = 'warning'
@property
def hard_error(self) -> bool:
return self.__hard_error
@hard_error.setter
def hard_error(self, hard_error: bool):
self.__hard_error = hard_error
self.__status_class = 'error'
@property
def status_class(self) -> str:
return self.__status_class
@status_class.setter
def status_class(self, status_class: str):
self.__status_class = status_class
class Images:
def __init__(self, images_file):
self.__images = {}
self.load_image_data(images_file)
def load_image_data(self, images_file):
try:
with open(images_file, 'rb') as f:
self.__images = toml_load(f)
except FileNotFoundError as e:
print(f'ERROR: Data file {images_file} not found! ({e})')
exit(1)
except TOMLDecodeError as e:
print(f'ERROR: Data file {images_file} is not in valid toml format! ({e})')
exit(2)
@property
def images(self) -> Dict[str, any]:
return self.__images
def configure_logger(path):
log_formatter = logging.Formatter('%(asctime)s :: %(levelname)s :: %(name)s :: %(module)s ::%(message)s')
log = logging.getLogger('root')
log.setLevel(logging.INFO)
log_file = path
# create a new file > 5 mb size
log_handler_file = RotatingFileHandler(
log_file,
mode='a',
maxBytes=5 * 1024 * 1024,
backupCount=10,
# encoding=None,
# delay=0
)
log_handler_file.setFormatter(log_formatter)
log_handler_file.setLevel(logging.INFO)
log.addHandler(log_handler_file)
if SETTINGS.log_to_console:
log_handler_console = logging.StreamHandler(stdout)
log_handler_console.setFormatter(log_formatter)
log_handler_console.setLevel(logging.INFO)
log.addHandler(log_handler_console)
from open_pnp_classes import (
Device,
SoftwareImage,
ErrorCodes,
PnpFlow,
Settings,
Images
)
def log_info(message): from open_pnp_utils import (
if SETTINGS.debug: configure_logger,
log = logging.getLogger('root') log_info,
log.info(message) parse_arguments,
get_local_ip_addresses,
)
def log_critical(message): PNP_SERVER_VERSION = '20230223.v1.0.1'
if SETTINGS.debug:
log = logging.getLogger('root')
log.critical(message)
def parse_arguments() -> arg_Namespace:
parser = ArgumentParser(
prog='open-pnp.py',
description='This is a basic implementation of the Cisco PnP protocol. It is intended to'
'\nroll out image updates and configurations for Cisco IOS/IOS-XE devices on day0.'
'\n'
f'\n{PNP_SERVER_VERSION} | Written by: thl-cmk, for more information see: https://thl-cmk.hopto.org',
formatter_class=RawTextHelpFormatter,
epilog='Usage: python open-pnp.py --config_url http://192.168.10.133:8080/configs '
'--image_url http://192.168.10.133:8080/images',
)
parser.add_argument('-b', '--bind_pnp_server', type=str,
help='Bind PnP server to IP-address. (default: 0.0.0.0)')
parser.add_argument('-p', '--port', type=int,
help='TCP port to listen on. (default: 8080)')
parser.add_argument('-r', '--status_refresh', type=int,
help='Time in seconds to refresh PnP server status page. (default: 60)')
parser.add_argument('-v', '--version', default=False, action='store_const', const=True,
help='Print open-pnp-server version and exit')
parser.add_argument('--config_file', type=str,
help='Path/name of open PnP server config file. (default: open-pnp.toml)')
parser.add_argument('--config_url', type=str,
help='Download URL for config files. I.e. http://192.168.10.133:8080/configs')
parser.add_argument('--image_data', type=str,
help='File containing the image description. (default: images.toml)')
parser.add_argument('--image_url', type=str,
help='Download URL for image files. I.e. http://192.168.10.133:8080/images')
parser.add_argument('--debug', default=False, action='store_const', const=True,
help='Enable Debug output send to "log_file".')
parser.add_argument('--default_cfg', type=str,
help='default config to use if no device specific config is found. (default: DEFAULT.cfg)')
parser.add_argument('--log_file', type=str,
help='Path/name of the logfile. (default: log/pnp_debug.log, requires --debug) ')
parser.add_argument('--log_to_console', default=False, action='store_const', const=True,
help='Enable debug output send to stdout (requires --debug).')
parser.add_argument('--time_format', type=str,
help='Format string to render time. (default: %%Y-%%m-%%dT%%H:%%M:%%S)')
return parser.parse_args()
def pnp_device_info(udi: str, correlator: str, info_type: str) -> str: def pnp_device_info(udi: str, correlator: str, info_type: str) -> str:
...@@ -447,7 +91,7 @@ def pnp_device_info(udi: str, correlator: str, info_type: str) -> str: ...@@ -447,7 +91,7 @@ def pnp_device_info(udi: str, correlator: str, info_type: str) -> str:
'info_type': info_type 'info_type': info_type
} }
_template = render_template('device_info.xml', **jinja_context) _template = render_template('device_info.xml', **jinja_context)
log_info(_template) log_info(_template, SETTINGS.debug)
return _template return _template
...@@ -466,7 +110,7 @@ def pnp_backoff(udi: str, correlator: str, minutes: Optional[int] = 1) -> str: ...@@ -466,7 +110,7 @@ def pnp_backoff(udi: str, correlator: str, minutes: Optional[int] = 1) -> str:
'hours': hours, 'hours': hours,
} }
_template = render_template('backoff.xml', **jinja_context) _template = render_template('backoff.xml', **jinja_context)
log_info(_template) log_info(_template, SETTINGS.debug)
return _template return _template
...@@ -481,7 +125,7 @@ def pnp_backoff_terminate(udi: str, correlator: str) -> str: ...@@ -481,7 +125,7 @@ def pnp_backoff_terminate(udi: str, correlator: str) -> str:
'correlator': correlator, 'correlator': correlator,
} }
_template = render_template('backoff_terminate.xml', **jinja_context) _template = render_template('backoff_terminate.xml', **jinja_context)
log_info(_template) log_info(_template, SETTINGS.debug)
return _template return _template
...@@ -503,7 +147,7 @@ def pnp_install_image(udi: str, correlator: str) -> Optional[str]: ...@@ -503,7 +147,7 @@ def pnp_install_image(udi: str, correlator: str) -> Optional[str]:
'delay': 0, # reload in seconds 'delay': 0, # reload in seconds
} }
_template = render_template('image_install.xml', **jinja_context) _template = render_template('image_install.xml', **jinja_context)
log_info(_template) log_info(_template, SETTINGS.debug)
return _template return _template
else: else:
device.error_code = ERROR.ERROR_NO_IMAGE_FILE device.error_code = ERROR.ERROR_NO_IMAGE_FILE
...@@ -528,12 +172,33 @@ def pnp_config_upgrade(udi: str, correlator: str) -> Optional[str]: ...@@ -528,12 +172,33 @@ def pnp_config_upgrade(udi: str, correlator: str) -> Optional[str]:
'udi': udi, 'udi': udi,
'correlator': correlator, 'correlator': correlator,
'base_url': SETTINGS.config_url, 'base_url': SETTINGS.config_url,
'serial_number': cfg_file, 'reload_delay': 30, # reload in seconds
'delay': 30, # reload in seconds
'cfg_file': cfg_file, 'cfg_file': cfg_file,
} }
_template = render_template('config_upgrade.xml', **jinja_context) _template = render_template('config_upgrade.xml', **jinja_context)
log_info(_template) log_info(_template, SETTINGS.debug)
return _template
def pnp_remove_inactive(udi: str, correlator: str) -> Optional[str]:
device = devices[udi]
cfg_file = 'REMOVE_INACTIVE.cfg'
response = head(f'{SETTINGS.config_url}/{cfg_file}')
if response.status_code != 200: # cfg not found
device.error_code = ERROR.ERROR_NO_CFG_FILE
device.hard_error = True
return
device.current_job = 'urn:cisco:pnp:config-upgrade'
device.pnp_flow = PNPFLOW.CLEANUP_START
jinja_context = {
'udi': udi,
'correlator': correlator,
'base_url': SETTINGS.config_url,
'cfg_file': cfg_file,
}
_template = render_template('config_upgrade.xml', **jinja_context)
log_info(_template, SETTINGS.debug)
return _template return _template
...@@ -543,7 +208,7 @@ def pnp_bye(udi: str, correlator: str) -> str: ...@@ -543,7 +208,7 @@ def pnp_bye(udi: str, correlator: str) -> str:
'correlator': correlator, 'correlator': correlator,
} }
_template = render_template('bye.xml', **jinja_context) _template = render_template('bye.xml', **jinja_context)
log_info(_template) log_info(_template, SETTINGS.debug)
return _template return _template
...@@ -614,26 +279,6 @@ def check_update(udi: str): ...@@ -614,26 +279,6 @@ def check_update(udi: str):
device.error_code = ERROR.ERROR_NO_FREE_SPACE device.error_code = ERROR.ERROR_NO_FREE_SPACE
device.hard_error = True device.hard_error = True
def get_local_ip_addresses() -> List[str]:
_addresses = []
adapters = get_adapters()
for adapter in adapters:
ip_addr = ''
for ip in adapter.ips:
drop_ip = False
if ip.is_IPv4:
ip_addr = ip.ip
elif ip.is_IPv6:
ip_addr = ip.ip[0]
for entry in ['127.', 'fe80::', '::1']:
if str(ip_addr).lower().startswith(entry):
drop_ip = True
if not drop_ip:
_addresses.append(ip_addr)
return _addresses
# flask # flask
app = Flask(__name__, template_folder='./templates') app = Flask(__name__, template_folder='./templates')
...@@ -699,7 +344,7 @@ def pnp_hello(): ...@@ -699,7 +344,7 @@ def pnp_hello():
def pnp_work_request(): def pnp_work_request():
src_add = request.environ.get('HTTP_X_REAL_IP', request.remote_addr) src_add = request.environ.get('HTTP_X_REAL_IP', request.remote_addr)
data = xml_parse(request.data) data = xml_parse(request.data)
log_info(f'REQUEST: {data}') log_info(f'REQUEST: {data}', SETTINGS.debug)
correlator = data['pnp']['info']['@correlator'] correlator = data['pnp']['info']['@correlator']
udi = data['pnp']['@udi'] udi = data['pnp']['@udi']
if udi in devices.keys(): if udi in devices.keys():
...@@ -710,7 +355,7 @@ def pnp_work_request(): ...@@ -710,7 +355,7 @@ def pnp_work_request():
return Response(pnp_backoff(udi, correlator, 10), mimetype='text/xml') return Response(pnp_backoff(udi, correlator, 10), mimetype='text/xml')
pass pass
if device.backoff: if device.backoff:
log_info('BACKOFF') log_info('BACKOFF', SETTINGS.debug)
# backoff more and more on errors, max error_count = 11 -> 5 * 11 = 55 # backoff more and more on errors, max error_count = 11 -> 5 * 11 = 55
# error_count == 12 -> like hard_error # error_count == 12 -> like hard_error
minutes = device.error_count + 1 minutes = device.error_count + 1
...@@ -718,26 +363,27 @@ def pnp_work_request(): ...@@ -718,26 +363,27 @@ def pnp_work_request():
device.hard_error = True device.hard_error = True
return Response(pnp_backoff(udi, correlator, minutes), mimetype='text/xml') return Response(pnp_backoff(udi, correlator, minutes), mimetype='text/xml')
if device.pnp_flow == PNPFLOW.NEW: if device.pnp_flow == PNPFLOW.NEW:
log_info('PNPFLOW.NEW') log_info('PNPFLOW.NEW', SETTINGS.debug)
device.pnp_flow = PNPFLOW.INFO device.pnp_flow = PNPFLOW.INFO
return Response(pnp_device_info(udi, correlator, 'all'), mimetype='text/xml') return Response(pnp_device_info(udi, correlator, 'all'), mimetype='text/xml')
if device.pnp_flow == PNPFLOW.UPDATE_NEEDED: if device.pnp_flow == PNPFLOW.UPDATE_NEEDED:
log_info('PNPFLOW.UPDATE_NEEDED') log_info('PNPFLOW.UPDATE_NEEDED', SETTINGS.debug)
device.pnp_flow = PNPFLOW.UPDATE_START device.pnp_flow = PNPFLOW.UPDATE_START
return Response(pnp_install_image(udi, correlator), mimetype='text/xml') return Response(pnp_install_image(udi, correlator), mimetype='text/xml')
if device.pnp_flow == PNPFLOW.UPDATE_RELOAD: if device.pnp_flow == PNPFLOW.UPDATE_RELOAD:
log_info('PNPFLOW.UPDATE_RELOAD') log_info('PNPFLOW.UPDATE_RELOAD', SETTINGS.debug)
return Response(pnp_device_info(udi, correlator, 'all'), mimetype='text/xml') return Response(pnp_device_info(udi, correlator, 'all'), mimetype='text/xml')
if device.pnp_flow == PNPFLOW.UPDATE_DOWN: if device.pnp_flow == PNPFLOW.UPDATE_DOWN:
log_info('PNPFLOW.UPDATE_DOWN') log_info('PNPFLOW.UPDATE_DOWN', SETTINGS.debug)
return Response(pnp_config_upgrade(udi, correlator), mimetype='text/xml') return Response(pnp_config_upgrade(udi, correlator), mimetype='text/xml')
if device.pnp_flow == PNPFLOW.CONFIG_DOWN: # will never reach this point, as pnp is removed bei EEM :-) if device.pnp_flow == PNPFLOW.CONFIG_DOWN: # will never reach this point, as pnp is removed bei EEM :-)
log_info('PNPFLOW.CONFIG_DOWN') log_info('PNPFLOW.CONFIG_DOWN', SETTINGS.debug)
return Response(pnp_backoff_terminate(udi, correlator), mimetype='text/xml') return Response(pnp_backoff_terminate(udi, correlator), mimetype='text/xml')
log_info(f'Other PNP_FLOW: {PNPFLOW.readable(device.pnp_flow)}') log_info(
f'Other PNP_FLOW: {PNPFLOW.readable(device.pnp_flow)}', SETTINGS.debug)
return Response('', 200) return Response('', 200)
else: else:
log_info('REQUEST NEW DEVICE FOUND') log_info('REQUEST NEW DEVICE FOUND', SETTINGS.debug)
create_new_device(udi, src_add) create_new_device(udi, src_add)
# return Response(device_info(udi, correlator, 'all'), mimetype='text/xml') # return Response(device_info(udi, correlator, 'all'), mimetype='text/xml')
devices[udi].pnp_flow = PNPFLOW.NEW devices[udi].pnp_flow = PNPFLOW.NEW
...@@ -748,7 +394,7 @@ def pnp_work_request(): ...@@ -748,7 +394,7 @@ def pnp_work_request():
@app.route('/pnp/WORK-RESPONSE', methods=['POST']) @app.route('/pnp/WORK-RESPONSE', methods=['POST'])
def pnp_work_response(): def pnp_work_response():
data = xml_parse(request.data) data = xml_parse(request.data)
log_info(f'RESPONSE: {data}') log_info(f'RESPONSE: {data}', SETTINGS.debug)
src_add = request.environ.get('HTTP_X_REAL_IP', request.remote_addr) src_add = request.environ.get('HTTP_X_REAL_IP', request.remote_addr)
udi = data['pnp']['@udi'] udi = data['pnp']['@udi']
job_type = data['pnp']['response']['@xmlns'] job_type = data['pnp']['response']['@xmlns']
...@@ -764,10 +410,10 @@ def pnp_work_response(): ...@@ -764,10 +410,10 @@ def pnp_work_response():
else: else:
correlator = data['pnp']['response']['@correlator'] correlator = data['pnp']['response']['@correlator']
job_status = int(data['pnp']['response']['@success']) job_status = int(data['pnp']['response']['@success'])
log_info(f'Correlator: {correlator}') log_info(f'Correlator: {correlator}', SETTINGS.debug)
log_info(f'Job type: {job_type}') log_info(f'Job type: {job_type}', SETTINGS.debug)
log_info(f'PnP flow: {device.pnp_flow_readable}') log_info(f'PnP flow: {device.pnp_flow_readable}', SETTINGS.debug)
log_info(f'Job status: {job_status}') log_info(f'Job status: {job_status}', SETTINGS.debug)
if job_status == 1: # success if job_status == 1: # success
if job_type not in ['urn:cisco:pnp:backoff']: if job_type not in ['urn:cisco:pnp:backoff']:
device.backoff = True device.backoff = True
...@@ -788,7 +434,7 @@ def pnp_work_response(): ...@@ -788,7 +434,7 @@ def pnp_work_response():
# device.pnp_flow = PNPFLOW.INFO # device.pnp_flow = PNPFLOW.INFO
pass pass
_response = pnp_bye(udi, correlator) _response = pnp_bye(udi, correlator)
log_info(_response) log_info(_response, SETTINGS.debug)
return Response(_response, mimetype='text/xml') return Response(_response, mimetype='text/xml')
elif job_status == 0: elif job_status == 0:
error_code = int(data['pnp']['response']['errorInfo']['errorCode'].split(' ')[-1]) error_code = int(data['pnp']['response']['errorInfo']['errorCode'].split(' ')[-1])
...@@ -799,7 +445,7 @@ def pnp_work_response(): ...@@ -799,7 +445,7 @@ def pnp_work_response():
device.hard_error = True device.hard_error = True
return Response(pnp_bye(udi, correlator), mimetype='text/xml') return Response(pnp_bye(udi, correlator), mimetype='text/xml')
device.current_job = 'none' device.current_job = 'none'
log_info('Empty Response') log_info('Empty Response', SETTINGS.debug)
return Response('') return Response('')
...@@ -807,7 +453,7 @@ if __name__ == '__main__': ...@@ -807,7 +453,7 @@ if __name__ == '__main__':
ERROR = ErrorCodes() ERROR = ErrorCodes()
PNPFLOW = PnpFlow() PNPFLOW = PnpFlow()
SETTINGS = Settings(vars(parse_arguments())) SETTINGS = Settings(vars(parse_arguments(PNP_SERVER_VERSION)))
SETTINGS.update(SETTINGS.cfg_file) SETTINGS.update(SETTINGS.cfg_file)
if SETTINGS.version: if SETTINGS.version:
...@@ -822,7 +468,7 @@ if __name__ == '__main__': ...@@ -822,7 +468,7 @@ if __name__ == '__main__':
app.debug = True app.debug = True
else: else:
# disable FLASK console output # disable FLASK console output
logging.getLogger("werkzeug").disabled = True getLogger("werkzeug").disabled = True
cli.show_server_banner = lambda *args: None cli.show_server_banner = lambda *args: None
if SETTINGS.image_url == '': if SETTINGS.image_url == '':
...@@ -833,8 +479,8 @@ if __name__ == '__main__': ...@@ -833,8 +479,8 @@ if __name__ == '__main__':
exit(1) exit(1)
if SETTINGS.debug: if SETTINGS.debug:
configure_logger(SETTINGS.log_file) configure_logger(SETTINGS.log_file, SETTINGS.log_to_console)
log_info('STARTED LOGGER') log_info('STARTED LOGGER', SETTINGS.debug)
print() print()
print(f'Running open PnP server. Stop with ctrl+c') print(f'Running open PnP server. Stop with ctrl+c')
......
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
#
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-02-26
# File : open_pnp_classes.py
from typing import Dict, Optional, Any
from tomli import load as toml_load
from tomli import TOMLDecodeError
class Settings:
def __init__(
self,
cli_args: Dict[str, Any],
version: bool = False,
cfg_file: Optional[str] = 'open-pnp.toml',
image_data: Optional[str] = 'images.toml',
bind_pnp_server: Optional[str] = '0.0.0.0',
port: Optional[int] = 8080,
time_format: Optional[str] = '%Y-%m-%dT%H:%M:%S',
status_refresh: Optional[int] = 60,
debug: Optional[bool] = False,
log_to_console: Optional[bool] = False,
log_file: Optional[str] = 'log/pnp_debug.log',
image_url: Optional[str] = '',
config_url: Optional[str] = '',
default_cfg: Optional[str] = 'DEFAULT.cfg',
):
self.__settings = {
'cfg_file': cfg_file,
'version': version,
'image_data': image_data,
'bind_pnp_server': bind_pnp_server,
'port': port,
'time_format': time_format,
'status_refresh': status_refresh,
'debug': debug,
'log_to_console': log_to_console,
'log_file': log_file,
'image_url': image_url,
'config_url': config_url,
'default_cfg_file': default_cfg,
}
self.__args = {}
self.__set_cli_args(cli_args)
def __set_cli_args(self, cli_args: Dict[str, Any]):
self.__args = ({k: v for k, v in cli_args.items() if v})
self.__settings.update(self.__args)
def update(self, cfg_file: str):
try:
with open(cfg_file, 'rb') as f:
self.__settings.update(toml_load(f))
except FileNotFoundError as e:
print(f'ERROR: Data file {cfg_file} not found! ({e})')
exit(1)
except TOMLDecodeError as e:
print(
f'ERROR: Data file {cfg_file} is not in valid toml format! ({e})')
exit(2)
self.__settings.update(self.__args)
@property
def cfg_file(self) -> str:
return self.__settings['cfg_file']
@property
def version(self) -> bool:
return self.__settings['version']
@property
def image_data(self) -> str:
return self.__settings['image_data']
@property
def bind_pnp_server(self) -> str:
return self.__settings['bind_pnp_server']
@property
def port(self) -> int:
return self.__settings['port']
@property
def time_format(self) -> str:
return self.__settings['time_format']
@property
def status_refresh(self) -> int:
return self.__settings['status_refresh']
@property
def debug(self) -> bool:
return self.__settings['debug']
@property
def log_to_console(self) -> bool:
return self.__settings['log_to_console']
@property
def log_file(self) -> str:
return self.__settings['log_file']
@property
def image_url(self) -> str:
return self.__settings['image_url']
@property
def config_url(self) -> str:
return self.__settings['config_url']
@property
def default_cfg(self) -> str:
return self.__settings['default_cfg']
class SoftwareImage:
def __init__(self, image: str, version: str, md5: str, size: int,):
self.image: str = image
self.version: str = version
self.md5: str = md5
self.size: int = size
class ErrorCodes:
__readable = {
0: 'No error',
100: 'unknown platform',
101: 'no free space for update',
102: 'unknown image',
103: 'config file not found',
104: 'image file not found',
1412: 'Invalid input detected (config)',
1413: 'Invalid input detected',
1609: 'Error while retrieving device filesystem info',
1816: 'Error verifying checksum for Image',
1829: 'Image copy was unsuccessful',
1803: 'Source file not found',
}
def __init__(self):
self.ERROR_NO_ERROR = 0
self.ERROR_NO_PLATFORM = 100
self.ERROR_NO_FREE_SPACE = 101
self.ERROR_NO_IMAGE = 102
self.ERROR_NO_CFG_FILE = 103
self.ERROR_NO_IMAGE_FILE = 104
self.PNP_ERROR_INVALID_CONFIG = 1412
self.PNP_ERROR_INVALID_INPUT = 1413
self.PNP_ERROR_NO_FILESYSTEM_INFO = 1609
self.PNP_ERROR_BAD_CHECKSUM = 1816
self.PNP_ERROR_IMAGE_COPY_UNSUCCESSFUL = 1829
self.PNP_ERROR_FILE_NOT_FOUND = 1803
def readable(self, error_code: int):
return self.__readable.get(error_code, f'unknown: {error_code}')
class PnpFlow:
__readable = {
0: 'None',
1: 'new device',
2: 'info',
10: 'update required',
11: 'update started',
12: 'no update required/done',
13: 'update done -> reloading',
21: 'config start',
22: 'config down -> reloading',
23: 'reload for config update',
30: 'cleanup required',
31: 'cleanup started',
32: 'no cleanup required/done',
99: 'finished',
}
def __init__(self):
self.NONE = 0
self.NEW = 1
self.INFO = 2
self.UPDATE_NEEDED = 10
self.UPDATE_START = 11
self.UPDATE_DOWN = 12
self.UPDATE_RELOAD = 13
self.CONFIG_START = 21
self.CONFIG_DOWN = 22
self.CONFIG_RELOAD = 23
self.CLEANUP_NEEDED = 30
self.CLEANUP_START = 31
self.CLEANUP_DOWN = 32
self.FINISHED = 99
def readable(self, state: int):
return self.__readable.get(state, 'unknown')
class Device:
def __init__(self, udi: str, platform: str, hw_rev: str, serial: str, first_seen: str, last_contact: str,
src_address: str, current_job: str):
self.udi: str = udi
self.platform: str = platform
self.hw_rev: str = hw_rev
self.serial: str = serial
self.ip_address: str = src_address
self.current_job: str = current_job
self.first_seen: str = first_seen
self.last_contact: str = last_contact
self.version: str = ''
self.image: str = ''
self.destination_name: str = ''
self.destination_free: Optional[int] = None
self.status: str = ''
self.__pnp_flow: int = PNPFLOW.NEW
self.pnp_flow_readable: str = PNPFLOW.readable(PNPFLOW.NEW)
self.target_image: Optional[SoftwareImage] = None
self.backoff: bool = False
self.__refresh_data: bool = False
self.refresh_button: str = ''
self.__error_code: int = 0
self.error_code_readable: str = ERROR.readable(ERROR.ERROR_NO_ERROR)
self.error_count: int = 0
self.error_message = ''
self.__hard_error: bool = False
self.__status_class: str = ''
@property
def pnp_flow(self) -> int:
return self.__pnp_flow
@pnp_flow.setter
def pnp_flow(self, pnp_flow: int):
self.__pnp_flow = pnp_flow
self.pnp_flow_readable = PNPFLOW.readable(pnp_flow)
if pnp_flow == PNPFLOW.FINISHED:
self.__status_class = 'finished'
@property
def refresh_data(self) -> bool:
return self.__refresh_data
@refresh_data.setter
def refresh_data(self, new_state: bool):
self.__refresh_data = new_state
if new_state:
self.refresh_button = 'disabled='
else:
self.refresh_button = 'enabled='
@property
def error_code(self) -> int:
return self.__error_code
@error_code.setter
def error_code(self, error_code: int):
self.__error_code = error_code
self.error_code_readable = ERROR.readable(error_code)
self.__status_class = 'warning'
@property
def hard_error(self) -> bool:
return self.__hard_error
@hard_error.setter
def hard_error(self, hard_error: bool):
self.__hard_error = hard_error
self.__status_class = 'error'
@property
def status_class(self) -> str:
return self.__status_class
@status_class.setter
def status_class(self, status_class: str):
self.__status_class = status_class
class Images:
def __init__(self, images_file):
self.__images = {}
self.load_image_data(images_file)
def load_image_data(self, images_file):
try:
with open(images_file, 'rb') as f:
self.__images = toml_load(f)
except FileNotFoundError as e:
print(f'ERROR: Data file {images_file} not found! ({e})')
exit(1)
except TOMLDecodeError as e:
print(
f'ERROR: Data file {images_file} is not in valid toml format! ({e})')
exit(2)
@property
def images(self) -> Dict[str, Any]:
return self.__images
ERROR = ErrorCodes()
PNPFLOW = PnpFlow()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: GNU General Public License v2
#
# Author: thl-cmk[at]outlook[dot]com
# URL : https://thl-cmk.hopto.org
# Date : 2023-02-26
# File : open_pnp_utils.py
from typing import List
import logging
from logging.handlers import RotatingFileHandler
from sys import stdout
from argparse import (
Namespace as arg_Namespace,
ArgumentParser,
RawTextHelpFormatter,
)
from ifaddr import get_adapters
def configure_logger(path: str, log_to_console: bool):
log_formatter = logging.Formatter(
'%(asctime)s :: %(levelname)s :: %(name)s :: %(module)s ::%(message)s')
log = logging.getLogger('root')
log.setLevel(logging.INFO)
log_file = path
# create a new file > 5 mb size
log_handler_file = RotatingFileHandler(
log_file,
mode='a',
maxBytes=5 * 1024 * 1024,
backupCount=10,
# encoding=None,
# delay=0
)
log_handler_file.setFormatter(log_formatter)
log_handler_file.setLevel(logging.INFO)
log.addHandler(log_handler_file)
if log_to_console:
log_handler_console = logging.StreamHandler(stdout)
log_handler_console.setFormatter(log_formatter)
log_handler_console.setLevel(logging.INFO)
log.addHandler(log_handler_console)
def log_info(message: str, debug: bool):
if debug:
log = logging.getLogger('root')
log.info(message)
def log_critical(message: str, debug: bool):
if debug:
log = logging.getLogger('root')
log.critical(message)
def parse_arguments(PNP_SERVER_VERSION: str) -> arg_Namespace:
parser = ArgumentParser(
prog='open-pnp.py',
description='This is a basic implementation of the Cisco PnP protocol. It is intended to'
'\nroll out image updates and configurations for Cisco IOS/IOS-XE devices on day0.'
'\n'
f'\n{PNP_SERVER_VERSION} | Written by: thl-cmk, for more information see: https://thl-cmk.hopto.org',
formatter_class=RawTextHelpFormatter,
epilog='Usage: python open-pnp.py --config_url http://192.168.10.133:8080/configs '
'--image_url http://192.168.10.133:8080/images',
)
parser.add_argument('-b', '--bind_pnp_server', type=str,
help='Bind PnP server to IP-address. (default: 0.0.0.0)')
parser.add_argument('-p', '--port', type=int,
help='TCP port to listen on. (default: 8080)')
parser.add_argument('-r', '--status_refresh', type=int,
help='Time in seconds to refresh PnP server status page. (default: 60)')
parser.add_argument('-v', '--version', default=False, action='store_const', const=True,
help='Print open-pnp-server version and exit')
parser.add_argument('--config_file', type=str,
help='Path/name of open PnP server config file. (default: open-pnp.toml)')
parser.add_argument('--config_url', type=str,
help='Download URL for config files. I.e. http://192.168.10.133:8080/configs')
parser.add_argument('--image_data', type=str,
help='File containing the image description. (default: images.toml)')
parser.add_argument('--image_url', type=str,
help='Download URL for image files. I.e. http://192.168.10.133:8080/images')
parser.add_argument('--debug', default=False, action='store_const', const=True,
help='Enable Debug output send to "log_file".')
parser.add_argument('--default_cfg', type=str,
help='default config to use if no device specific config is found. (default: DEFAULT.cfg)')
parser.add_argument('--log_file', type=str,
help='Path/name of the logfile. (default: log/pnp_debug.log, requires --debug) ')
parser.add_argument('--log_to_console', default=False, action='store_const', const=True,
help='Enable debug output send to stdout (requires --debug).')
parser.add_argument('--time_format', type=str,
help='Format string to render time. (default: %%Y-%%m-%%dT%%H:%%M:%%S)')
return parser.parse_args()
def get_local_ip_addresses() -> List[str]:
_addresses = []
adapters = get_adapters()
for adapter in adapters:
ip_addr = ''
for ip in adapter.ips:
drop_ip = False
if ip.is_IPv4:
ip_addr = ip.ip
elif ip.is_IPv6:
ip_addr = ip.ip[0]
for entry in ['127.', 'fe80::', '::1']:
if str(ip_addr).lower().startswith(entry):
drop_ip = True
if not drop_ip:
_addresses.append(ip_addr)
return _addresses
...@@ -10,12 +10,15 @@ ...@@ -10,12 +10,15 @@
<!-- <applyto>startup</applyto> --> <!-- don't now where to place this --> <!-- <applyto>startup</applyto> --> <!-- don't now where to place this -->
<!-- <abortonsyntaxfault/> --> <!-- don't now where to place this --> <!-- <abortonsyntaxfault/> --> <!-- don't now where to place this -->
</config> </config>
{% if reload_delay is defined %}
<reload> <reload>
<reason>pnp device config</reason> <reason>pnp device config</reason>
<delayIn>{{ delay }}</delayIn> <delayIn>{{ reload_delay }}</delayIn>
<user>pnp-device-config</user> <user>pnp-device-config</user>
<saveConfig>true</saveConfig> <saveConfig>true</saveConfig>
</reload> </reload>
<!-- <noReload/>--> {% else %}
<noReload/>
{% endif %}
</request> </request>
</pnp> </pnp>
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment