Source code for panos.updater

#!/usr/bin/env python

# Copyright (c) 2014, Palo Alto Networks
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.


"""Device updater handles software versions and updates for devices"""

from pan.config import PanConfig

import panos.errors as err
from panos import PanOSVersion, getlogger, isstring

logger = getlogger(__name__)


[docs]class Updater(object): """This class is instantiated by the PanDevice class as a software update subsystem""" def __init__(self, pandevice): # create a class logger self._logger = getlogger(__name__ + "." + self.__class__.__name__) self.pandevice = pandevice self.versions = {} def _op(self, cmd): return self.pandevice.xapi.op(cmd, cmd_xml=True)
[docs]class SoftwareUpdater(Updater):
[docs] def info(self): """Fetch version list from live device. Synchronizes this current updater state with the live device. """ self._logger.debug("Device %s software updater: info" % self.pandevice.id) response = self._op("request system software info") self.pandevice.version = self._parse_current_version(response) self.versions = self._parse_version_list(response)
[docs] def check(self): """Trigger PAN-OS to get versions, then synchronize this object instance. First, PAN-OS will reach out to the upgrade servers to get the list of all version that can be upgraded to. Then synchronizes this current updater state with the live device. """ self._logger.debug( "Device %s software updater: check for new versions" % self.pandevice.id ) response = self._op("request system software check") self.pandevice.version = self._parse_current_version(response) self.versions = self._parse_version_list(response)
[docs] def download(self, version, sync_to_peer=True, sync=False): """PAN-OS downloads the requested version. Args: version (string): PAN-OS version (eg. "10.0.2") sync_to_peer (bool, optional): Send a copy to HA peer. Defaults to True. sync (bool, optional): Run job synchronously and return the result. Defaults to False. Raises: err.PanDeviceError: on unsuccessful download Returns: If sync, returns result of PAN-OS download job """ self._logger.info( "Device %s downloading version: %s" % (self.pandevice.id, version) ) response = self._op( 'request system software download sync-to-peer "%s" version "%s"' % ("yes" if sync_to_peer else "no", version) ) if sync: result = self.pandevice.syncjob(response) if not result["success"]: raise err.PanDeviceError( "Device %s attempt to download version %s failed: %s" % (self.pandevice.id, version, result["messages"]) ) return result else: return True
[docs] def install(self, version, load_config=None, sync=False): """Install the requested PAN-OS version. Does not download the software or perform the reboot required after installation. Args: version (string): PAN-OS version (eg. "10.0.2") load_config (string, optional): Configuration to use for booting new software. Defaults to None. sync (bool, optional): Run job synchronously and return the result. Defaults to False. Raises: err.PanDeviceError: on unsuccessful install Returns: If sync, returns result of PAN-OS install job """ self._logger.info( "Device %s installing version: %s" % (self.pandevice.id, version) ) response = self._op( 'request system software install%s version "%s"' % ( " load-config " + load_config if load_config is not None else "", version, ) ) if sync: result = self.pandevice.syncjob(response) if not result["success"]: raise err.PanDeviceError( "Device %s attempt to install version %s failed: %s" % (self.pandevice.id, version, result["messages"]) ) return result else: return True
def _parse_version_list(self, response_element): all_versions = {} for software_version in response_element.findall(".//versions/entry"): # This line doesn't work correctly in pan-python < 0.7.0. newversion = PanConfig(software_version).python("./") all_versions[newversion["version"]] = newversion return all_versions def _parse_current_version(self, response_element): current_entry = response_element.find(".//versions/entry/[current='yes']") current_version = current_entry.find("./version").text self._logger.debug("Found current version: %s" % current_version) return current_version
[docs] def download_install(self, version, load_config=None, sync=False): """Download and install the requested PAN-OS version. Like a combinations of the ``check()``, ``download()``, and ``install()`` methods, but with some additional checks. For example, it will not act if the requested version is already running, and it will skip to the install if it is already downloaded. Does not perform the required reboot after the install. Args: version (string): PAN-OS version (eg. "10.0.2") load_config (string, optional): Configuration to use for booting new software. Defaults to None. sync (bool, optional): Run jobs synchronously and return the result. Defaults to False. Raises: err.PanDeviceError: problem found in pre-download checks Returns: If sync, returns result of PAN-OS install job """ if isstring(version): version = PanOSVersion(version) # Get list of software if needed if not self.versions: self.check() # Get versions as StrictVersion objects available_versions = map(PanOSVersion, self.versions.keys()) target_version = PanOSVersion(str(version)) current_version = PanOSVersion(self.pandevice.version) if str(target_version) not in available_versions: raise err.PanDeviceError( "Error upgrading to unknown version: %s" % target_version ) # Check if already on the target version if current_version == target_version: raise err.PanDeviceError( "Requested upgrade to version %s which is already running on device %s" % (target_version, self.pandevice.id) ) # Download the software upgrade if not self.versions[str(target_version)]["downloaded"]: self.download(target_version, sync=True) # Install the software upgrade result = self.install(target_version, load_config=load_config, sync=sync) return result
[docs] def download_install_reboot(self, version, load_config=None, sync=False): """Download and install the requested PAN-OS version, then reboot. Like a combinations of the ``check()``, ``download()``, and ``install()`` methods with a reboot at the end. It has additional checks. For example, it will not act if the requested version is already running, and it will skip to the install if it is already downloaded. Args: version (string): PAN-OS version (eg. "10.0.2") load_config (string, optional): Configuration to use for booting new software. Defaults to None. sync (bool, optional): Run jobs synchronously and return the result. Defaults to False. Raises: err.PanDeviceError: problem found in pre-download checks or after reboot """ if isstring(version): version = PanOSVersion(version) self.download_install(version, load_config, sync=True) # Reboot the device self._logger.info( "Device %s is rebooting after upgrading to version %s. This will take a while." % (self.pandevice.id, version) ) self.pandevice.restart() if sync: new_version = self.pandevice.syncreboot() if version != new_version: raise err.PanDeviceError( "Attempt to upgrade to version %s failed." "Device %s is on version %s after reboot." % (version, self.pandevice.id, new_version) ) self.pandevice.version = new_version return new_version else: return None
[docs] def upgrade_to_version(self, target_version, dryrun=False): """Upgrade to the target version, completing all intermediate upgrades. For example, if firewall is running version 9.0.5 and target version is 10.0.2, then this method will proceed through the following steps: - Upgrade to 9.1.0 and reboot - Upgrade to 10.0.0 and reboot - Upgrade to 10.0.2 and reboot Does not account for HA pairs. Example: This shows how to upgrade a firewall to version 10.0.2. This will work regardless of which version the firewall is currently running:: from panos.firewall import Firewall fw = Firewall("10.0.0.5", "admin", "password") fw.software.upgrade_to_version("10.0.2") Args: target_version (string): PAN-OS version (eg. "10.0.2") or "latest" dryrun (bool, optional): Log what steps would be taken, but don't make any changes to the live device. Defaults to False. Raises: err.PanDeviceError: any problem during the upgrade process """ # Get list of software if needed if not self.versions: self.check() # For a dry run, need to record the starting version starting_version = self.pandevice.version # Get versions as StrictVersion objects available_versions = map(PanOSVersion, self.versions.keys()) current_version = PanOSVersion(self.pandevice.version) latest_version = max(available_versions) next_minor_version = self._next_minor_version(current_version) # Check that this is an upgrade, not a downgrade if current_version > target_version: raise err.PanDeviceError( "Device %s upgrade failed: Can't upgrade from %s to %s." % (self.pandevice.id, self.pandevice.version, target_version) ) # Determine the next version to upgrade to if target_version == "latest": next_version = min(latest_version, next_minor_version) elif latest_version < target_version: next_version = next_minor_version elif not self._direct_upgrade_possible(current_version, target_version): next_version = next_minor_version else: next_version = PanOSVersion(str(target_version)) if next_version not in available_versions and not dryrun: self._logger.info( "Device %s upgrading to %s, currently on %s. Checking for newer versions." % (self.pandevice.id, target_version, self.pandevice.version) ) self.check() available_versions = map(PanOSVersion, self.versions.keys()) latest_version = max(available_versions) # Check if done upgrading if current_version == target_version: self._logger.info( "Device %s is running target version: %s" % (self.pandevice.id, target_version) ) return True elif target_version == "latest" and current_version == latest_version: self._logger.info( "Device %s is running latest version: %s" % (self.pandevice.id, latest_version) ) if dryrun: self._logger.info( "NOTE: dryrun with 'latest' does not show all upgrades," ) self._logger.info( "as new versions are learned through the upgrade process," ) self._logger.info( "so results may be different than dryrun output when using 'latest'." ) return True # Ensure the content pack is upgraded to the latest self.pandevice.content.download_and_install_latest(sync=True) # Upgrade to the next version self._logger.info( "Device %s will be upgraded to version: %s" % (self.pandevice.id, next_version) ) if dryrun: self.pandevice.version = str(next_version) else: self.download_install_reboot(next_version, sync=True) self.check() result = self.upgrade_to_version(target_version, dryrun=dryrun) if result and dryrun: self.pandevice.version = starting_version return result
def _next_major_version(self, version): if isstring(version): version = PanOSVersion(version) next_version = PanOSVersion(str(version.major + 1) + ".0.0") # Account for lack of PAN-OS 7.0.0 if next_version == "7.0.0": next_version = PanOSVersion("7.0.1") return next_version def _next_minor_version(self, version): from panos.firewall import Firewall if isstring(version): next_version = PanOSVersion(version) if version.minor == 1: next_version = PanOSVersion(str(version.major + 1) + ".0.0") # There is no PAN-OS 5.1 for firewalls, so next minor release from 5.0.x is 6.0.0. elif ( version.major == 5 and version.minor == 0 and issubclass(type(self.pandevice), Firewall) ): next_version = PanOSVersion("6.0.0") else: next_version = PanOSVersion(str(version.major) + ".1.0") # Account for lack of PAN-OS 7.0.0 if next_version == "7.0.0": next_version = PanOSVersion("7.0.1") return next_version def _next_patch_version(self, version): if isstring(version): version = PanOSVersion(version) next_version = PanOSVersion( str(version.major) + str(version.minor) + str(version.patch + 1) ) return next_version def _direct_upgrade_possible(self, current_version, target_version): """Check if current version can directly upgrade to target version :returns True if a direct upgrade is possible, False if not """ if isstring(current_version): current_version = PanOSVersion(current_version) if isstring(target_version): target_version = PanOSVersion(target_version) # Upgrade the patch version # eg. 6.0.2 -> 6.0.3 if ( current_version.major == target_version.major and current_version.minor == target_version.minor ): return True # Upgrade the minor version # eg. 6.0.2 -> 6.1.0 if ( current_version.major == target_version.major and current_version.minor == 0 and target_version.minor == 1 and target_version.patch == 0 ): return True # Upgrade the major version # eg. 6.1.2 -> 7.0.0 if ( current_version.major + 1 == target_version.major and current_version.minor == 1 and target_version.minor == 0 and target_version.patch == 0 ): return True # Upgrading a firewall from PAN-OS 5.0.x to 6.0.x # This is a special case because there is no PAN-OS 5.1.x from panos.firewall import Firewall if ( current_version.major == 5 and current_version.minor == 0 and target_version == "6.0.0" and issubclass(type(self.pandevice), Firewall) ): return True return False
[docs]class ContentUpdater(Updater):
[docs] def info(self): """Fetch version list from live device. Synchronizes this current updater state with the live device. """ response = self._op("request content upgrade info") self.pandevice.content_version = self._parse_current_version(response) self.versions = self._parse_version_list(response)
[docs] def check(self): """Trigger PAN-OS to get versions, then synchronize this object instance. First, PAN-OS will reach out to the upgrade servers to get the list of all version that can be upgraded to. Then synchronizes this current updater state with the live device. """ response = self._op("request content upgrade check") self.pandevice.content_version = self._parse_current_version(response) self.versions = self._parse_version_list(response)
[docs] def download(self, sync_to_peer=None, sync=False): """Download the latest content version. Args: sync_to_peer (bool, optional): Send a copy to HA peer. Defaults to None. sync (bool, optional): Run jobs synchronously and return the result. Defaults to False. Raises: err.PanDeviceError: on unsuccessful download Returns: If sync, returns result of download job """ if not self.versions: self.check() available_versions = map(PanOSVersion, self.versions.keys()) latest_version = max(available_versions) if self.versions[str(latest_version)]["downloaded"]: return self._logger.info( "Device %s downloading content version: %s" % (self.pandevice.id, latest_version) ) if sync_to_peer is None: sync_to_peer_text = "" elif sync_to_peer: sync_to_peer_text = ' "" sync-to-peer "yes"' else: sync_to_peer_text = ' "" sync-to-peer "no"' command = "request content upgrade download latest{0}".format(sync_to_peer_text) response = self._op(command) if sync: result = self.pandevice.syncjob(response) if not result["success"]: raise err.PanDeviceError( "Device %s attempt to download content version %s failed: %s" % (self.pandevice.id, latest_version, result["messages"]) ) return result else: return True
[docs] def install( self, version="latest", sync_to_peer=True, skip_commit=False, sync=False ): """Install the requested content version. Args: version (string): Content version (eg. "8357-6464"). Defaults to "latest". sync_to_peer (bool, optional): Send a copy to HA peer. Defaults to True. skip_commit (bool, optional): Do not perform a commit after install. Defaults to False. sync (bool, optional): Run jobs synchronously and return the result. Defaults to False. Raises: err.PanDeviceError: on unsuccessful install Returns: If sync, returns result of install job """ if not self.versions: self.check() available_versions = map(PanOSVersion, self.versions.keys()) latest_version = max(available_versions) if self.versions[str(latest_version)]["current"]: return self._logger.info( "Device %s installing content version: %s" % (self.pandevice.id, version) ) op = ( 'request content upgrade install commit "%s" sync-to-peer "%s" version "%s"' % ("no" if skip_commit else "yes", "yes" if sync_to_peer else "no", version) ) response = self._op(op) if sync: result = self.pandevice.syncjob(response) if not result["success"]: raise err.PanDeviceError( "Device %s attempt to install content version %s failed: %s" % (self.pandevice.id, version, result["messages"]) ) return result else: return True
def download_and_install_latest(self, sync=False): self.download(sync=sync) self.install(sync=sync)
[docs] def downgrade(self, sync=False): """Return to the previous content version. Args: sync (bool, optional): Run jobs synchronously and return the result. Defaults to False. Returns: If sync, returns result of install job """ response = self._op('request content downgrade install "previous"') if sync: return self.pandevice.syncjob(response) else: return True
def _parse_version_list(self, response_element): all_versions = {} for software_version in response_element.findall(".//content-updates/entry"): # This line doesn't work correctly in pan-python < 0.7.0. newversion = PanConfig(software_version).python("./") all_versions[newversion["version"]] = newversion return all_versions def _parse_current_version(self, response_element): current_entry = response_element.find( ".//content-updates/entry/[current='yes']" ) if current_entry is None: return current_version = current_entry.find("./version").text self._logger.debug("Found current version: %s" % current_version) return current_version
[docs] def download_install( self, version="latest", sync_to_peer=False, skip_commit=False, sync=False ): """Download and install the requested content version. Like a combinations of the ``check()``, ``download()``, and ``install()`` methods. Args: version (string): Content version (eg. "8357-6464"). Defaults to "latest". sync_to_peer (bool, optional): Send a copy to HA peer. Defaults to False. skip_commit (bool, optional): Do not perform a commit after install. Defaults to False. sync (bool, optional): Run jobs synchronously and return the result. Defaults to False. """ # Get list of software if needed if not self.versions: self.check() # Download the software upgrade self.download(sync_to_peer=sync_to_peer, sync=True) # Install the software upgrade self.install( version, sync_to_peer=sync_to_peer, skip_commit=skip_commit, sync=sync )