#!/usr/bin/env python
# Copyright (c) 2014, Palo Alto Networks
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""Base object classes for inheritence by other classes"""
import base64
import collections
import copy
import datetime
import hashlib
import inspect
import itertools
import re
import sys
import time
import xml.dom.minidom as minidom
import xml.etree.ElementTree as ET
import pan.commit
import pan.xapi
from pan.config import PanConfig
import panos
import panos.errors as err
from panos import (
chunk_instances_for_delete_similar,
isstring,
string_or_list,
updater,
userid,
yesno,
)
logger = panos.getlogger(__name__)
Root = panos.enum("DEVICE", "VSYS", "MGTCONFIG", "PANORAMA", "PANORAMA_VSYS")
SELF = "/%s"
ENTRY = "/entry[@name='%s']"
MEMBER = "/member[text()='%s']"
# PanObject type
[docs]class PanObject(object):
"""Base class for all package objects
This class defines an object that can be placed in a tree to generate configuration.
Args:
name (str): The name of this object
Attributes:
uid (str): The unique identifier for this object if it has one. If it
doesn't have one, then this returns the class name.
vsys (str): The vsys id for this object (eg. 'vsys2') or 'shared' if no vsys
"""
XPATH = ""
SUFFIX = None
ROOT = Root.DEVICE
NAME = "name"
CHILDTYPES = ()
CHILDMETHODS = ()
HA_SYNC = True
TEMPLATE_NATIVE = False
_UNKNOWN_PANOS_VERSION = (sys.maxsize, 0, 0)
OPSTATES = {}
def __init__(self, *args, **kwargs):
# Set the 'name' variable
idx_start = 0
if self.NAME is not None:
try:
name = args[0]
idx_start = 1
except IndexError:
name = kwargs.pop(self.NAME, None)
setattr(self, self.NAME, name)
# Initialize other common variables
self.parent = None
self.children = []
# Gather all the variables from the 'variables' class method
# from the args/kwargs into instance variables.
variables = kwargs.pop("variables", None)
if variables is None:
variables = type(self).variables()
# Sort the variables by order
variables = sorted(variables, key=lambda x: x.order)
for idx, var in enumerate(variables, idx_start):
varname = var.variable
try:
# Try to get the variables from 'args' first
varvalue = args[idx]
except IndexError:
# If it's not in args, get it from 'kwargs', or store a None in the variable
try:
varvalue = kwargs.pop(varname)
except KeyError:
# If None was stored in the variable, check if
# there's a default value, and store that instead
if var.default is not None:
setattr(self, varname, var.default)
else:
setattr(self, varname, None)
continue
# For member variables, store a list containing the value instead of the individual value
if var.vartype in ("member", "entry"):
varvalue = panos.string_or_list(varvalue)
# Store the value in the instance variable
setattr(self, varname, varvalue)
self._setups()
def _setups(self):
"""The various setup functions that will be called on object creation."""
funcs = [
"_setup",
"_setup_opstate",
]
for func in funcs:
if hasattr(self, func):
f = getattr(self, func)
if callable(f):
f()
def _setup_opstate(self):
self.opstate = OpStateContainer(self, self.OPSTATES)
def __str__(self):
return self.uid
def __repr__(self):
return "<{0}{1} {2:#x}>".format(
type(self).__name__, " {0}".format(self.uid) if self.uid else "", id(self)
)
[docs] @classmethod
def variables(cls):
"""Defines the variables that exist in this object. Override in each subclass."""
return ()
@property
def vsys(self):
"""Return the vsys for this object
Traverses the tree to determine the vsys from a :class:`panos.firewall.Firewall`
or :class:`panos.device.Vsys` instance somewhere before this node in the tree.
Returns:
str: The vsys id (eg. vsys2)
"""
if self.parent is not None:
vsys = self.parent.vsys
if vsys is None and self.ROOT == Root.VSYS:
return getattr(self.parent, "DEFAULT_VSYS", None)
else:
return vsys
@vsys.setter
def vsys(self, value):
raise err.PanDeviceError("Cannot set vsys on non-vsys object")
@property
def uid(self):
"""Returns the unique identifier of this object as a string."""
if hasattr(self, "id"):
return self.id
elif self.NAME is not None:
return str(getattr(self, self.NAME))
else:
return ""
[docs] def add(self, child):
"""Add a child node to this node
Args:
child (PanObject): Node to add as a child
Returns:
PanObject: Child node
"""
child.parent = self
self.children.append(child)
return child
[docs] def insert(self, index, child):
"""Insert a child node at a specific index
This is useful for ordering or reordering security policy rules
Args:
index (int): The index where the child obj should be inserted
child (PanObject): Node to add as a child
Returns:
PanObject: Child node
"""
child.parent = self
self.children.insert(index, child)
return child
[docs] def extend(self, children):
"""Add a list of child nodes to this node
Args:
children (list): List of PanObject instances
"""
for child in children:
child.parent = self
self.children.extend(children)
[docs] def pop(self, index):
"""Remove and return the object at an index
Args:
index (int): Index of the object to remove and return
Returns:
PanObject: The object removed from the children of this node
"""
child = self.children.pop(index)
child.parent = None
return child
[docs] def remove(self, child):
"""Remove the child from this node
Args:
child (PanObject): Child to remove
"""
self.children.remove(child)
child.parent = None
[docs] def remove_by_name(self, name, cls=None):
"""Remove a child node by name
If the class is not specified, then it defaults to type(self).
Args:
name (str): Name of the child node
Keyword Args:
cls (class): Restrict removal to instances of this class
Returns:
PanObject: The removed node, otherwise None
"""
# Get the index of the first matching child
index = self.find_index(name, cls)
if index is not None:
return self.pop(index)
[docs] def removeall(self, cls=None):
"""Remove all children of a type
Not recursive.
Args:
cls (class): The class of objects to remove
Returns:
list: List of PanObjects that were removed
"""
if not self.children:
return
if cls is not None:
children = [child for child in self.children if isinstance(child, cls)]
for child in children:
self.children.remove(child)
return children
else:
children = self.children
for child in children:
child.parent = None
self.children = []
return children
[docs] def xpath(self, root=None):
"""Return the full xpath for this object
Xpath in the form: parent's xpath + this object's xpath + entry or member if applicable.
Args:
root: The root to use for this object (default: this object's root)
Returns:
str: The full xpath to this object
"""
path = []
p = self
if root is None:
root = self.ROOT
vsys = self.vsys
label = getattr(self, "VSYS_LABEL", "vsys")
while True:
if isinstance(p, PanDevice) and p != self:
# Stop on the first pandevice encountered, unless the
# panos.PanDevice object is the object whose xpath
# was asked for.
# If the object whose xpath we are creating is directly
# attached to a Panorama and the root is PANORAMA
if root == Root.PANORAMA_VSYS:
if p.__class__.__name__ == "Panorama":
root = Root.PANORAMA
else:
root = Root.VSYS
path.insert(0, p.xpath_root(root, vsys, label))
break
elif p.__class__.__name__ == "Predefined":
# Stop on predefined namespace.
path.insert(0, p.XPATH)
break
elif not hasattr(p, "VSYS_LABEL") or p == self:
# Add on the xpath of this object, unless it is a
# device.Vsys, unless the device.Vsys is the object whose
# xpath was asked for.
addon = p.XPATH
if p.SUFFIX is not None:
addon += p.SUFFIX % (p.uid,)
path.insert(0, addon)
if p.__class__.__name__ == "Firewall" and p.parent is not None:
if p.parent.__class__.__name__ == "DeviceGroup":
root = Root.VSYS
p = p.parent
if p is None:
break
if hasattr(p, "VSYS_LABEL"):
# Either panorama.DeviceGroup or device.Vsys.
label = p.VSYS_LABEL
vsys = p.vsys
elif p.__class__.__name__ in ("Template", "TemplateStack"):
if not self.TEMPLATE_NATIVE:
# Hit a template, make sure that the appropriate /config/...
# xpath has been saved.
if not path[0].startswith("/config/"):
if root == Root.PANORAMA_VSYS:
root = Root.VSYS
path.insert(0, self.xpath_root(root, vsys, label))
vsys = p.vsys
root = p.ROOT
return "".join(path)
[docs] def xpath_nosuffix(self):
"""Return the xpath without the suffix
This is used by refreshall().
Returns:
str: The xpath without entry or member on the end
"""
if self.SUFFIX is None:
return self.xpath()
else:
return self.xpath_short()
[docs] def xpath_short(self, root=None):
"""Return an xpath for this object without the final segment
Xpath in the form: parent's xpath + this object's xpath. Used for set API calls.
Args:
root: The root to use for this object (default: this object's root)
Returns:
str: The xpath without the final segment
"""
xpath = self.xpath(root)
xpath = re.sub(r"/(?=[^/']*'[^']*'[^/']*$|[^/]*$).*$", "", xpath)
return xpath
def xpath_root(self, root_type, vsys, label="vsys"):
if self.parent:
return self.parent.xpath_root(root_type, vsys, label)
def xpath_vsys(self):
if self.parent is not None:
return self.parent.xpath_vsys()
def xpath_panorama(self):
if self.parent is not None:
return self.parent.xpath_panorama()
def _root_xpath_vsys(self, vsys, label="vsys"):
if vsys == "shared":
xpath = "/config/shared"
else:
xpath = "/config/devices/entry[@name='localhost.localdomain']"
xpath += "/{0}/entry[@name='{1}']".format(label, vsys or "vsys1")
return xpath
[docs] def element(self, with_children=True, comparable=False):
"""Construct an ElementTree for this PanObject and all its children
Args:
with_children (bool): Include children in element.
comparable (bool): Element will be used in a comparison with another.
Returns:
xml.etree.ElementTree: An ElementTree instance representing the
xml form of this object and its children
"""
root = self._root_element()
variables = self.variables()
for var in variables:
missing_replacement = False
if var.vartype == "none":
value = "nonetype"
else:
value = getattr(self, var.variable)
if value is None:
continue
if var.condition is not None:
condition = var.condition.split(":")
if str(getattr(self, condition[0])) != condition[1]:
continue
path = var.path.split("/")
nextelement = root
for section in path:
if section.find("|") != -1:
# This is an element variable, so create an element containing
# the variables's value
section = re.sub(r"\([\w\d|-]*\)", str(value), section)
# Search for variable replacements in path
matches = re.findall(r"{{(.*?)}}", section)
entryvar = None
# Do variable replacement, ie. {{ }}
for match in matches:
regex = r"{{" + re.escape(match) + r"}}"
# Ignore variables that are None
if getattr(self, match) is None:
missing_replacement = True
break
# Find the discovered replacement in the list of vars
for nextvar in variables:
if nextvar.variable == match:
matchedvar = nextvar
break
if matchedvar.vartype == "entry":
# If it's an 'entry' variable
# XXX: this is using a quick patch. Should handle array-based entry vars better.
entry_value = panos.string_or_list(
getattr(self, matchedvar.variable)
)
section = re.sub(
regex,
matchedvar.path
+ "/"
+ "entry[@name='%s']" % entry_value[0],
section,
)
entryvar = matchedvar
else:
# Not an 'entry' variable
section = re.sub(
regex, getattr(self, matchedvar.variable), section
)
if missing_replacement:
break
found = nextelement.find(section)
if found is not None:
# Existing element
nextelement = found
else:
# Create elements
if entryvar is not None:
# for vartype="entry" with replacement from above
nextelement = ET.SubElement(nextelement, entryvar.path)
nextelement = ET.SubElement(
nextelement,
"entry",
{"name": getattr(self, entryvar.variable)},
)
else:
# for entry vartypes that are empty
if var.vartype == "entry" and not value:
continue
# non-entry vartypes
nextelement = ET.SubElement(nextelement, section)
if missing_replacement:
continue
var._set_inner_xml_tag_text(nextelement, value, comparable)
if with_children:
self.xml_merge(root, self._subelements())
return root
[docs] def element_str(self, pretty_print=False):
"""The XML representation of this PanObject and all its children.
Args:
pretty_print (bool): Return the resulting string pretty_printed with indentation.
Returns:
str: XML form of this object and its children
"""
if pretty_print:
raw = ET.tostring(self.element(), encoding="utf-8")
parsed = minidom.parseString(raw)
return parsed.toprettyxml(indent="\t", encoding="utf-8")
return ET.tostring(self.element(), encoding="utf-8")
def _root_element(self):
if self.SUFFIX == ENTRY:
return ET.Element("entry", {"name": self.uid})
elif self.SUFFIX == MEMBER:
root = ET.Element("member")
root.text = self.uid
return root
elif self.SUFFIX is None:
# Get right of last / in xpath
tag = self.XPATH.rsplit("/", 1)[-1]
return ET.Element(tag)
raise ValueError(
"No suffix or XPATH defined for {0}".format(self.__class__.__name__)
)
def _subelements(self, comparable=False):
"""Generator function to turn children into XML objects.
Yields:
xml.etree.ElementTree: The next child as an ``ElementTree`` object.
"""
for child in self.children:
root = self._root_element()
# Paths have a leading slash to get rid of
xpath_sections = child.XPATH.split("/")[1:]
if child.SUFFIX is None:
# If not suffix, remove the last xpath section
# because it will be part of the element
xpath_sections = xpath_sections[:-1]
e = root
for path in xpath_sections:
if path == "entry[@name='localhost.localdomain']":
e = ET.SubElement(e, "entry", {"name": "localhost.localdomain"})
else:
e = ET.SubElement(e, path)
e.append(child.element(comparable=comparable))
yield root
def _check_child_methods(self, method):
if method in self.CHILDMETHODS:
getattr(self, "child_" + method)()
for child in self.children:
child._check_child_methods(method)
[docs] def equal(self, panobject, force=False, compare_children=True):
"""Compare this object to another object
Equality of the objects is determined by the XML they generate, not by the
values of their variables.
Args:
panobject (PanObject): The object to compare with this object
force (bool): Do not raise a PanObjectError if the objects are different classes
compare_children (bool): Not supported in this object, use True
Raises:
PanObjectError: Raised if the objects are different types that
would not normally be comparable
Returns:
bool: True if the XML of the objects is equal, False if not
"""
if not panobject:
return False
if not force and type(self) != type(panobject):
raise err.PanObjectError(
"Object {0} is not comparable to {1}".format(
type(self), type(panobject)
)
)
return self.element_str() == panobject.element_str()
[docs] def apply(self):
"""Apply this object to the device, replacing any existing object of the same name
**Modifies the live device**
"""
device = self.nearest_pandevice()
logger.debug(
device.id + ': apply called on %s object "%s"' % (type(self), self.uid)
)
device.set_config_changed()
if self.HA_SYNC:
device.active().xapi.edit(
self.xpath(), self.element_str(), retry_on_peer=self.HA_SYNC
)
else:
device.xapi.edit(
self.xpath(), self.element_str(), retry_on_peer=self.HA_SYNC
)
for child in self.children:
child._check_child_methods("apply")
[docs] def create(self):
"""Create this object on the device
**Modifies the live device**
This method is nondestructive. If the object exists, the variables are added to the device
without changing existing variables on the device. If a variables already exists on the
device and this object has a different value, the value on the firewall is changed to
the value in this object.
"""
device = self.nearest_pandevice()
logger.debug(
device.id + ': create called on %s object "%s"' % (type(self), self.uid)
)
device.set_config_changed()
element = self.element_str()
if self.HA_SYNC:
device.active().xapi.set(
self.xpath_short(), element, retry_on_peer=self.HA_SYNC
)
else:
device.xapi.set(self.xpath_short(), element, retry_on_peer=self.HA_SYNC)
for child in self.children:
child._check_child_methods("create")
[docs] def delete(self):
"""Delete this object from the firewall
**Modifies the live device**
"""
device = self.nearest_pandevice()
logger.debug(
device.id + ': delete called on %s object "%s"' % (type(self), self.uid)
)
device.set_config_changed()
for child in self.children:
child._check_child_methods("delete")
if self.HA_SYNC:
device.active().xapi.delete(self.xpath(), retry_on_peer=self.HA_SYNC)
else:
device.xapi.delete(self.xpath(), retry_on_peer=self.HA_SYNC)
if self.parent is not None:
self.parent.remove(self)
[docs] def update(self, variable):
"""Change the value of a variable
**Modifies the live device**
Do not attempt this on an element variable (|) or variable with replacement {{}}
If the variable's value is None, then a delete API call is attempted.
Args:
variable (str): The name of an instance variable to update on the device
"""
device = self.nearest_pandevice()
logger.debug(
device.id
+ ': update called on %s object "%s" and variable "%s"'
% (type(self), self.uid, variable)
)
device.set_config_changed()
path, attr, value, var_path = self._get_param_specific_info(variable)
if var_path.vartype == "attrib":
raise NotImplementedError("Cannot update 'attrib' style params")
xpath = "{0}/{1}".format(self.xpath(), path)
if value is None:
# Value is None, so delete it from the live device.
device.xapi.delete(xpath, retry_on_peer=self.HA_SYNC)
else:
# Variable has a new value.
element_tag = path.split("/")[-1]
element = ET.Element(element_tag)
var_path._set_inner_xml_tag_text(element, value)
device.xapi.edit(
xpath,
ET.tostring(element, encoding="utf-8"),
retry_on_peer=self.HA_SYNC,
)
[docs] def rename(self, new_name):
"""Change the name of this object.
**Modifies the live device**
NOTE: This does not change any references that may exist in your
pan-os-python object hierarchy, but it does update the name of the
object itself.
Args:
new_name (str): The new UID for this object.
"""
dev = self.nearest_pandevice()
logger.debug(
'{0}: rename called on {1} object "{2}"'.format(
dev.id, type(self), self.uid
)
)
dev.set_config_changed()
dev.xapi.rename(self.xpath(), new_name)
setattr(self, self.NAME, new_name)
[docs] def move(self, location, ref=None, update=True):
"""Moves the current object.
**Modifies the live device**
This is useful for stuff like moving one security policy above another.
If this object's parent is a rulebase object, then this object is also
moved to the appropriate position in the local pan-os-python object tree.
Args:
location (str): Any of the following: before, after, top, or bottom
ref (PanObject/str): If location is "before" or "after", move this object before/after the ref object. If this is a string, then the string should just be the name of the object.
update (bool): If this is set to False, then only move this object in the pan-os-python object tree, do not actually perform the MOVE operation on the live device. Note that in order for this object to be moved in the pan-os-python object tree, the parent object must be a rulebase object.
Raises:
ValueError
"""
d = self.nearest_pandevice()
dst = None
new_index = None
rbs = ("Rulebase", "PreRulebase", "PostRulebase")
ref_locs = ("before", "after")
standalone_locs = ("top", "bottom")
parent = self.parent
# Sanity checks + determine move location.
if parent is None:
raise ValueError("No parent for object {0}".format(self.uid))
elif location in standalone_locs:
if ref is not None:
raise ValueError("ref should be None for {0} move".format(location))
if parent.__class__.__name__ in rbs:
new_index = 0 if location == "top" else len(parent.children) - 1
elif location in ref_locs:
if ref is None:
raise ValueError("ref must be specified for {0} move".format(location))
dst = str(ref)
if self.uid == dst:
raise ValueError("Cannot move rule in relation to self")
if parent.__class__.__name__ in rbs:
offset = 0
for i, x in enumerate(parent.children):
if self == x:
offset = 1
elif type(x) == type(self) and x.uid == dst:
new_index = (
i - offset if location == "before" else i - offset + 1
)
break
else:
raise ValueError(
"Location must be one of: {0} or {1}".format(ref_locs, standalone_locs)
)
logger.debug('{0}: move called on {1} "{2}"'.format(d.id, type(self), self.uid))
# Move the rule in the pan-os-python object tree, if applicable.
if new_index is not None:
parent.remove(self)
parent.insert(new_index, self)
# Done if we're not updating.
if not update:
return
# Perform the move on the nearest pandevice.
d.set_config_changed()
d.xapi.move(self.xpath(), location, dst)
def _get_param_specific_info(self, variable):
"""Gets a tuple of info for the given parameter.
This is to aid in things like updates or refreshes of a specific
parameter attached to this PanObject / VersionedPanObject.
Returns:
A three element tuple of the variable's xpath (str), the value of
the variable, and the full ``VarPath`` or ``ParamPath`` object that
is responsible for handling this variable.
Raises:
PanDeviceError: If the variable specified does not exist.
"""
variables = type(self).variables()
value = getattr(self, variable)
# Get the requested variable from the class' variables tuple
var = next((x for x in variables if x.variable == variable), None)
if var is None:
raise err.PanDeviceError(
"Variable %s does not exist in variable tuple" % variable
)
varpath = var.path
# Do replacements on variable path
if varpath.find("|") != -1:
# This is an element variable, so create an element containing
# the variables's value
varpath = re.sub(r"\([\w\d|-]*\)", str(value), varpath)
# Search for variable replacements in path
matches = re.findall(r"{{(.*?)}}", varpath)
entryvar = None
# Do variable replacement, ie. {{ }}
for match in matches:
regex = r"{{" + re.escape(match) + r"}}"
# Ignore variables that are None
if getattr(self, match) is None:
raise ValueError(
"While updating variable %s, missing replacement variable %s in path"
% (variable, match)
)
# Find the discovered replacement in the list of vars
for nextvar in variables:
if nextvar.variable == match:
matchedvar = nextvar
break
if matchedvar.vartype == "entry":
# If it's an 'entry' variable
# XXX: this is using a quick patch. Should handle array-based entry vars better.
entry_value = panos.string_or_list(getattr(self, matchedvar.variable))
varpath = re.sub(
regex,
matchedvar.path + "/" + "entry[@name='%s']" % entry_value[0],
varpath,
)
else:
# Not an 'entry' variable
varpath = re.sub(regex, getattr(self, matchedvar.variable), varpath)
# For vartype=attrib params, we need the containing XML element.
attr = None
if var.vartype == "attrib":
tokens = varpath.rsplit("/", 1)
attr = tokens[-1]
if len(tokens) == 1:
varpath = None
else:
varpath = tokens[0]
return (varpath, attr, value, var)
[docs] def refresh(
self, running_config=False, refresh_children=True, exceptions=True, xml=None
):
"""Refresh all variables and child objects from the device.
Args:
running_config (bool): Set to True to refresh from the running
configuration (Default: False)
xml (xml.etree.ElementTree): XML from a configuration to use
instead of refreshing from a live device
refresh_children (bool): Set to False to prevent refresh of child
objects (Default: True)
exceptions (bool): Set to False to prevent exceptions on failure
(Default: True)
"""
# Either retrieve the xml or use what is passed in
if xml is None:
xml = self._refresh_xml(running_config, exceptions, refresh_children)
else:
logger.debug(
'refresh called using xml on {0} object "{1}"'.format(
type(self), self.uid
)
)
if xml is None:
return
# Refresh this object
if hasattr(self, "parse_xml"):
# Versioned object
self.parse_xml(xml)
else:
# Classic object
variables = type(self)._parse_xml(xml)
for var, value in variables.items():
setattr(self, var, value)
# Refresh children objects if requested
if refresh_children:
self._refresh_children(xml=xml)
[docs] def refresh_variable(self, variable, running_config=False, exceptions=False):
"""Refresh a single variable of an object.
**Don't use for variables with replacements or selections in path.**
Args:
variable (str): Variable name to refresh.
running_config (bool): Set to True to refresh from the running
configuration (Default: False)
exceptions (bool): Set to False to prevent exceptions on failure
(Default: True)
Returns:
New value of the refreshed variable.
Raises:
PanObjectMissing: When the object this variable is connected to
does not exist.
"""
device = self.nearest_pandevice()
msg = '{0}: refresh_variable({1}) called on {2} object "{3}"'
logger.debug(msg.format(device.id, variable, self.__class__.__name__, self.uid))
path, attr, value, var_path = self._get_param_specific_info(variable)
xpath = self.xpath()
if path is not None:
xpath += "/{0}".format(path)
err_msg = "Object doesn't exist: {0}".format(xpath)
setattr(self, variable, [] if var_path.vartype in ("member", "entry") else None)
# Query to get the variable's XML from the device
if running_config:
api_action = device.xapi.show
else:
api_action = device.xapi.get
try:
root = api_action(xpath, retry_on_peer=self.HA_SYNC)
except (pan.xapi.PanXapiError, err.PanNoSuchNode) as e:
if exceptions:
raise err.PanObjectMissing(err_msg, pan_device=device)
return
# Determine the first element to look for in the XML
lasttag = xpath.rsplit("/", 1)[-1]
obj = root.find("result/" + lasttag)
if obj is None:
if exceptions:
raise err.PanObjectMissing(err_msg, pan_device=device)
return
if hasattr(var_path, "parse_value_from_xml_last_tag"):
# Versioned class
settings = {}
var_path.parse_value_from_xml_last_tag(obj, settings, attr)
setattr(self, variable, settings.get(variable))
else:
# Classic class
# Rebuild the elements that are lost by refreshing the
# variable directly
sections = xpath.split("/")[:-1]
root = ET.Element("root")
next_element = root
for section in sections:
next_element = ET.SubElement(next_element, section)
next_element.append(obj)
# Refresh the requested variable
variables = type(self)._parse_xml(root)
for var, value in variables.items():
if var == variable:
setattr(self, var, value)
return getattr(self, variable)
def _refresh_children(self, running_config=False, xml=None):
# Retrieve the xml if we weren't given it
if xml is None:
xml = self._refresh_xml(running_config, True)
if xml is None:
return
# Remove all the current child instances first
self.removeall()
# Check for children in the remaining XML
for child_type_string in self.CHILDTYPES:
module_name, class_name = child_type_string.split(".")
if module_name == "device":
import panos.device
elif module_name == "firewall":
import panos.firewall
elif module_name == "ha":
import panos.ha
elif module_name == "network":
import panos.network
elif module_name == "objects":
import panos.objects
elif module_name == "panorama":
import panos.panorama
elif module_name == "policies":
import panos.policies
elif module_name == "plugins":
import panos.plugins
child = getattr(getattr(panos, module_name), class_name)()
# Versioned objects need a PanDevice to get the version from, so
# set the child's parent before accessing XPATH.
child.parent = self
childroot = xml.find(child.XPATH[1:])
if childroot is not None:
l = child.refreshall_from_xml(childroot)
self.extend(l)
return self.children
def _refresh_xml(self, running_config, exceptions, refresh_children=True):
"""Get the XML for a single PanObject."""
# Get the root of the xml to parse
optimized = False
dev = self.nearest_pandevice()
msg = "{0}: refreshing xml on {1} object {2}".format(
dev.id, type(self), self.uid
)
logger.debug(msg)
api_action = dev.xapi.show if running_config else dev.xapi.get
if running_config or refresh_children:
xpath = self.xpath()
else:
optimized = True
info = self._build_element_info()
paths, settings = info[0], info[2]
query_paths = list(
set(p.path.split("/")[0].format(**settings) for p in paths)
)
xpath = "|".join("{0}/{1}".format(self.xpath(), x) for x in query_paths)
err_msg = "Object doesn't exist: {0}".format(xpath)
# Query the live device
try:
root = api_action(xpath, retry_on_peer=self.HA_SYNC)
except (pan.xapi.PanXapiError, err.PanNoSuchNode) as e:
if exceptions:
raise err.PanObjectMissing(err_msg, pan_device=dev)
else:
return
# Determine the first element to look for in the XML
if not optimized:
# Normal XML recovery for parsing
if self.SUFFIX is None:
lasttag = self.XPATH.rsplit("/", 1)[-1]
else:
lasttag = re.match(r"^/(\w*?)\[", self.SUFFIX).group(1)
elm = root.find("result/" + lasttag)
else:
# Construct the XML for parsing.
elm = self._root_element()
results = root.find("./result")
if results is not None:
for se in results:
elm.append(se)
if elm is None and exceptions:
raise err.PanObjectMissing(err_msg, pan_device=dev)
return elm
[docs] def nearest_pandevice(self):
"""The nearest :class:`panos.base.PanDevice` object to.
This method is used to determine the device to apply this object.
Returns:
PanDevice: The PanDevice object closest to this object in
the configuration tree.
Raises:
PanDeviceNotSet: There is no PanDevice object in the tree.
"""
return self._nearest_pandevice()
def _nearest_pandevice(self):
if self.parent is not None:
return self.parent._nearest_pandevice()
raise err.PanDeviceNotSet("No PanDevice set for object tree")
[docs] def panorama(self):
"""The nearest :class:`panos.panorama.Panorama` object.
This method is used to determine the device to apply this object to.
Returns:
Panorama: The Panorama object closest to this object in the
configuration tree
Raises:
PanDeviceNotSet: There is no Panorama object in the tree.
"""
if self.parent is not None:
return self.parent.panorama()
raise err.PanDeviceNotSet("No Panorama set for object tree")
[docs] def devicegroup(self):
"""The nearest :class:`panos.panorama.DeviceGroup` object.
This method is used to determine the device to apply this object to.
Returns:
DeviceGroup: The DeviceGroup object closest to this object in the
configuration tree, or None if there is no DeviceGroup in the path
to this node.
"""
if self.parent is not None:
return self.parent.devicegroup()
[docs] def find(self, name, class_type=None, recursive=False):
"""Find an object in the configuration tree by name
Args:
name (str): Name of the object to find
class_type: Class to look for
recursive (bool): Find recursively (Default: False)
Returns:
PanObject: The object in the tree that fits the criteria, or None if no object is found
"""
if class_type is None:
# Find the matching object or return None
result = next((child for child in self.children if child.uid == name), None)
else:
# Find the matching object or return None
result = next(
(
child
for child in self.children
if child.uid == name and isinstance(child, class_type)
),
None,
)
# Search recursively in children
if result is None and recursive:
for child in self.children:
result = child.find(name, class_type, recursive)
if result is not None:
break
return result
[docs] def findall(self, class_type, recursive=False):
"""Find all objects of a class in configuration tree
Args:
class_type: Class to look for
recursive (bool): Find recursively (Default: False)
Returns:
list: List of 'class_type' objects
"""
result = [child for child in self.children if isinstance(child, class_type)]
# Search recursively in children
if recursive:
for child in self.children:
result.extend(child.findall(class_type, recursive))
return result
[docs] def find_or_create(self, name, class_type, *args, **kwargs):
"""Find an object in the configuration tree by name, and create it if it doesn't exist
If the object does not exist, it is created and added to the current object.
Args:
name (str): Name of the object to find
class_type: Class to look for or create
*args: Arguments to pass to the __init__ method of class_type
*kwargs: Keyworkd arguments to pass to the __init__ method of class_type
Returns:
PanObject: The object in the tree that fits the criteria, or None if no object is found
"""
result = self.find(name, class_type)
if result is not None:
return result
else:
if name is not None:
return self.add(class_type(name, *args, **kwargs))
else:
return self.add(class_type(*args, **kwargs))
[docs] def findall_or_create(self, class_type, *args, **kwargs):
"""Find all object in the configuration tree by class, and create a new object if none exist
If no objects of this type exist, one is created and added to the current object.
Args:
class_type: Class to look for or create
*args: Arguments to pass to the __init__ method of class_type
*kwargs: Keyworkd arguments to pass to the __init__ method of class_type
Returns:
list: List of 'class_type' objects
"""
result = self.findall(class_type)
if result:
return result
else:
return [self.add(class_type(*args, **kwargs))]
[docs] def find_index(self, name=None, class_type=None):
"""Finds the first index of the given name and class type.
If name is None, just find the first instance of class_type.
If class_type is unspecified, it defaults to the current class type.
Args:
name (str): Name of the child node
class_type (class): Restrict the find to children of this type
Returns:
int: the index of the first matching child
"""
if class_type is None:
class_type = type(self)
for num, child in enumerate(self.children):
if (name is None or child.uid == name) and type(child) == class_type:
return num
[docs] @classmethod
def refreshall(
cls, parent, running_config=False, add=True, exceptions=False, name_only=False
):
"""Factory method to instantiate class from live device.
This method is a factory for the class. It takes an firewall or
Panorama and gets the xml config from the live device. It generates
instances of this class for each item this class represents in the xml
config. For example, if the class is AddressObject and there are 5
address objects on the firewall, then this method will generate 5
instances of the class AddressObject.
Args:
parent (PanObject): A PanDevice, or a PanObject subclass with a
PanDevice as its parental root.
running_config (bool): False for candidate config, True for running
config.
add (bool): Update the objects of this type in pan-os-python with
the refreshed values.
exceptions (bool): If False, exceptions are ignored if the xpath
can't be found.
name_only (bool): If True, refresh only the name of the object, but
not its variables. This results in a smaller response to the
API call when only the object name is needed.
Returns:
list: created instances of class
"""
if not running_config and exceptions:
# This is because get api calls don't produce exceptions when the
# node doesn't exist
raise ValueError("exceptions requires running_config to be True")
if name_only and running_config:
raise ValueError("can't get name_only from running_config")
if name_only and cls.SUFFIX != ENTRY:
raise ValueError(
"name_only is invalid, can only be used on entry type objects"
)
# Versioned objects need a PanDevice to get the version from, so
# set the child's parent before accessing XPATH.
class_instance = cls()
class_instance.parent = parent
device = class_instance.nearest_pandevice()
logger.debug(device.id + ": refreshall called on %s type" % cls)
# Set api_action and xpath
api_action = device.xapi.show if running_config else device.xapi.get
xpath = class_instance.xpath_nosuffix()
if name_only:
xpath = xpath + "/entry/@name"
try:
root = api_action(xpath, retry_on_peer=cls.HA_SYNC)
except (err.PanNoSuchNode, pan.xapi.PanXapiError) as e:
if exceptions:
raise e
if not str(e).startswith("No such node"):
raise e
else:
return []
if name_only:
obj = root.find("result")
else:
lasttag = class_instance.XPATH.rsplit("/", 1)[-1]
obj = root.find("result/" + lasttag)
if obj is None:
return []
# Refresh each object
instances = class_instance.refreshall_from_xml(obj)
if add:
# Remove current children of this type from parent
parent.removeall(cls=cls)
# Add the new children that were just refreshed from the device
parent.extend(instances)
return instances
[docs] def refreshall_from_xml(self, xml, refresh_children=True, variables=None):
"""Factory method to instantiate class from firewall config.
This method is a factory for the class. It takes an xml config
from a firewall and generates instances of this class for each item
this class represents in the xml config. For example, if the class is
AddressObject and there are 5 address objects on the firewall, then
this method will generate 5 instances of the class AddressObject.
Args:
xml (xml.etree.ElementTree): A section of XML configuration from a
firewall or Panorama. It should not contain the response or
result tags.
refresh_children (bool): Refresh children objects or not.
variables (iterable): A list or tuple of the variables to parse
from the XML. Note that this is only used when invoked
against classes not derived from ``VersionedPanObject``.
Returns:
list: created instances of class
"""
instances = []
if xml is None:
return []
# Get the objects from the xml at this level
if self.SUFFIX is None:
objects = [xml]
else:
lasttag = re.match(r"^/(\w*?)\[", self.SUFFIX).group(1)
objects = xml.findall(lasttag)
# Refresh each object
for obj in objects:
# Create the object instance
if hasattr(self, "parse_xml"):
# Versioned object handling
instance = type(self)()
instance.parent = self.parent
instance.parse_xml(obj)
else:
# Classic object handling
objvars = self._parse_xml(obj, variables=variables)
if self.SUFFIX is not None:
name = obj.get("name")
if name is not None:
objvars[self.NAME] = name
instance = type(self)(variables=variables, **objvars)
# Add this instance to the list
instances.append(instance)
# Refresh the children of these instances
if refresh_children:
instance._refresh_children(xml=obj)
return instances
@classmethod
def _parse_xml(cls, xml, variables=None):
"""Classic class method to parse XML to variables.
Args:
xml (xml.etree.ElementTree): the xml to retrieve variables from.
variables (list): a list of ``VarPath`` instances to parse
from the given XML. If this is not specified, then all of the
variables that this ``PanObject`` contains are parsed.
Returns:
A dict of ``VarPath`` objects.
"""
vardict = {}
# Parse each variable
if variables:
allvars = variables
else:
allvars = cls.variables()
for var in allvars:
missing_replacement = False
# Determine if variable is part of __init__ args
if var.vartype == "none":
continue
# Search for variable replacements in path
path = var.path
matches = re.findall(r"{{(.*?)}}", path)
for match in matches:
regex = r"{{" + re.escape(match) + r"}}"
# Find the discovered replacement in the list of vars
matchedvar = next(
(x for x in cls.variables() if x.variable == match), None
)
replacement = vardict[match]
if replacement is None:
missing_replacement = True
break
if matchedvar.vartype == "entry":
# If it's an 'entry' variable
if len(replacement) == 1:
replacement = replacement[0]
path = re.sub(
regex,
matchedvar.path + "/" + "entry[@name='%s']" % replacement,
path,
)
else:
# Not an 'entry' variable
path = re.sub(regex, replacement, path)
if missing_replacement:
continue
# Determine the type of variable
if var.vartype == "member":
members = xml.findall(path + "/member")
vardict[var.variable] = [m.text for m in members]
elif var.vartype == "entry":
entries = xml.findall(path + "/entry")
entries = [e.get("name") for e in entries]
if len(entries) == 1:
entries = entries[0]
vardict[var.variable] = entries
elif var.vartype == "exist":
match = xml.find(path)
vardict[var.variable] = True if match is not None else False
else:
if path.find("|") != -1:
# This is an element variable
# Get the different options in a list
options = re.search(r"\(([\w\d|-]*)\)", path).group(1).split("|")
# Create a list of all the possible paths
option_paths = {
opt: re.sub(r"\([\w\d|-]*\)", opt, path) for opt in options
}
found = False
for opt, opt_path in option_paths.items():
match = xml.find(opt_path)
if match is not None:
vardict[var.variable] = cls._convert_var(opt, var.vartype)
found = True
break
if not found:
vardict[var.variable] = None
else:
# This is a text variable
# Save the variable if it exists in the xml
vardict[var.variable] = cls._convert_var(
xml.findtext(path), var.vartype
)
return vardict
@classmethod
def _convert_var(cls, value, vartype):
if value is None:
return None
elif vartype is None:
return value
elif vartype == "int":
return int(value)
elif vartype == "bool":
return yesno(value)
def _set_reference(
self,
reference_name,
reference_type,
reference_var,
var_type,
exclusive,
refresh,
update,
running_config,
return_type,
name_only,
**kwargs
):
"""Used by helper methods to set references between objects
For example, set_zone() would set the zone for an interface by creating a reference from
the zone to the interface. If the desired reference already exists then nothing happens.
This function has two modes: refresh=True and refresh=False. You
should only ever use refresh=False if:
1) all reference objects are in the current pan-os-python object tree
2) all reference objects are children attached to nearest_pandevice()
3) this is for firewall only, not a template / template stack
4) you're using firewall.vsys, not the device.Vsys object
If any of the above do not apply, you should be using refresh=True.
"""
parent = None
update_needed = False
if return_type not in ("bool", "object"):
raise ValueError("Unknown return_type specified: {0}".format(return_type))
if refresh:
"""
pan-os-python is too flexible: users can use simple vsys mode or a
device.Vsys object, which means vsys importables can be attached
to a Vsys object or a Firewall. But a Vsys object can also be
attached to a Firewall or a Template or a TemplateStack. So
create a separate pan-os-python object tree to operate on, leaving
the user's tree alone, but making it so we know where things are.
Basically, we need a pan-os-python object tree where all objects are
are sibling objects, just like refresh=False assumes. Doing
this allows the rest of this function to operate as before.
"""
from panos.device import Vsys
from panos.firewall import Firewall
from panos.panorama import Panorama, Template, TemplateStack
new_tree = None
if reference_type.ROOT == Root.VSYS:
# If the reference type belongs in a vsys (Zone), then
# initialize the new tree with a Vsys object. Otherwise do not
# have a vsys specified as we don't care where an object is
# or is not imported into.
parent = Vsys(self.vsys or "vsys1")
new_tree = parent
p = self
while p is not None:
new_obj = None
if isinstance(p, Firewall):
new_obj = Firewall(
hostname=p.hostname,
port=p.port,
api_username=p._api_username,
api_password=p._api_password,
api_key=p._api_key,
serial=p.serial,
)
elif isinstance(p, Template):
new_obj = Template(p.name)
elif isinstance(p, TemplateStack):
new_obj = TemplateStack(p.name)
elif isinstance(p, Panorama):
new_obj = Panorama(
hostname=p.hostname,
port=p.port,
api_username=p._api_username,
api_password=p._api_password,
api_key=p._api_key,
)
if new_obj is not None:
if parent is None:
parent = new_obj
new_tree = new_obj
else:
new_obj.add(new_tree)
new_tree = new_obj
p = p.parent
if parent is None or isinstance(parent, Panorama):
raise err.PanDeviceError("Improper pan-os-python object tree")
allobjects = reference_type.refreshall(
parent, name_only=name_only, running_config=running_config
)
if name_only:
for obj in allobjects:
obj.refresh_variable(reference_var)
else:
parent = self.nearest_pandevice()
allobjects = parent.findall(reference_type)
# Find any current references to self and remove them, unless it is the desired reference
if exclusive:
for obj in allobjects:
references = getattr(obj, reference_var)
if not references:
continue
elif reference_name is not None and obj.uid == reference_name:
continue
elif isinstance(references, list) and self in references:
update_needed = True
references.remove(self)
if update:
obj.update(reference_var)
elif isinstance(references, list) and str(self) in references:
update_needed = True
references.remove(str(self))
if update:
obj.update(reference_var)
elif references == self or references == str(self):
update_needed = True
setattr(obj, reference_var, None)
if update:
obj.update(reference_var)
# Add new reference to self in requested object
if reference_name is not None:
obj = parent.find_or_create(reference_name, reference_type, **kwargs)
var = getattr(obj, reference_var)
if var_type == "list":
if var is None:
update_needed = True
setattr(obj, reference_var, [self,])
if update:
obj.update(reference_var)
elif not isinstance(var, list):
if var != self and var != str(self):
update_needed = True
setattr(obj, reference_var, [var, self])
if update:
obj.update(reference_var)
elif self not in var and str(self) not in var:
update_needed = True
var.append(self)
if update:
obj.update(reference_var)
elif var != self and var != str(self):
update_needed = True
setattr(obj, reference_var, self)
if update:
obj.update(reference_var)
if return_type == "object":
return obj
if return_type == "bool":
return update_needed
[docs] def xml_merge(self, root, elements):
"""Merges other elements into the root element.
This differs from xml_combine in a few important ways:
1) The base tag of root is valid
2) The root element must be a valid ElementTree object
3) Individual Nones in the elements iterable are ignored
Args:
root (xml.etree.ElementTree): The root element.
elements (iterable): Other xml.etree.ElementTree instances (or
None) that should be merged into ``root`` as well.
Returns:
xml.etree.ElementTree: The final merged root element.
"""
for e in elements:
if e is not None:
self._merge_elements(root, e)
return root
def _merge_elements(self, root, elm):
class dicthash(dict):
def __hash__(self):
return hash(tuple(sorted(self.items())))
# Copy text only if it isn't set already
if root.tag == elm.tag and root.text is None:
root.text = elm.text
mapping = dict(((e.tag, dicthash(e.attrib)), e) for e in root)
for e in elm:
hashed_attribs = dicthash(e.attrib)
if len(e) == 0:
try:
# Copy text only if it isn't set already
if mapping[e.tag, hashed_attribs].text is None:
# Tag doesn't have text, but another element does
mapping[e.tag, hashed_attribs].text = e.text
if (
mapping[e.tag, hashed_attribs].tag == e.tag
and mapping[e.tag, hashed_attribs].text
and e.text
and mapping[e.tag, hashed_attribs].text != e.text
):
# Member vartypes, so append this element
raise KeyError
except KeyError:
# Add new element to the mapping
mapping[e.tag, hashed_attribs] = e
root.append(e)
else:
try:
# Merge subelements together
self._merge_elements(mapping[e.tag, hashed_attribs], e)
except KeyError:
# Add new element to the mapping
mapping[e.tag, hashed_attribs] = e
root.append(e)
[docs] def about(self, parameter=None):
"""Return information about this object or the given parameter.
If no parameter is specified, then invoking this function is similar to
doing `vars(obj)`: it will return a dict of key/value pairs, with the
difference being that the keys are all specifically parameters attached
to this `VersionedPanObject`, and the values being what the current
settings are for those keys.
If a parameter is specified and this object is connected to a
parent PanDevice, then version specific information on the parameter
is returned.
If a parameter is specified but this object is not connected to a
PanDevice instance, then all versioning information for the given
parameter is returned.
Args:
parameter (str): The parameter to get info for.
Returns:
dict: An informational dict about either the object as a whole
or the specified parameter.
Raises:
AttributeError: If a parameter is specified that does not exist
on this object.
"""
if parameter is None:
return self._about_object()
else:
return self._about_parameter(parameter)
def _about_object(self):
ans = {}
# Get the variables for this object
for v in type(self).variables():
ans[v.variable] = getattr(self, v.variable)
# Add the object's uid if applicable
if self.NAME is not None:
ans[self.NAME] = self.uid
return ans
def _about_parameter(self, parameter):
parameter = str(parameter)
ans = {
"Parameter": parameter,
"Current Value": getattr(self, parameter, None),
}
for v in type(self).variables():
if parameter == v.variable:
ans["About"] = v.about()
break
else:
if parameter == self.NAME:
ans["About"] = "This is the object's unique identifier"
else:
raise AttributeError(parameter)
return ans
def _requires_import_consideration(self):
if self.vsys == "shared" or not hasattr(self, "XPATH_IMPORT"):
return False
return True
def _gather_bulk_info(self, func=None):
"""Returns info for the bulk functions to operate on.
This function gets a single instance which will act as xpath scope,
but goes back to the nearest pandevice to collect all instances of
cType with the same xpath, as we need to be aware that instances could
share path but be in different vsys.
Args:
func (str): The function calling this function
Returns:
3 element tuple:
* nearest PanDevice
* list of instances of cType that share single instance's scope
* dict: vsys key with value of dict:
* import path key with value of list of PanObject instances
"""
dev = self.nearest_pandevice()
logger.debug(
'{0}: {1} called on {2} object "{3}"'.format(dev.id, func, self, self.uid)
)
dev.set_config_changed()
# Determine base xpath to match against.
xpath = self.xpath_short()
# Now, find all PanObjects with a similar xpath.
tree = [
dev,
]
instances = []
for node in itertools.chain(tree):
tree.extend(node.children)
if node.xpath_short() == xpath:
instances.append(node)
# Now find all the objects that need to be imported.
vsys_dict = {}
all_objects = instances[:]
for node in itertools.chain(all_objects):
all_objects.extend(node.children)
if node._requires_import_consideration():
vsys = node.vsys
if vsys is None and node.ALWAYS_IMPORT:
if getattr(node, "mode", None) in ("ha", "aggregate-group"):
continue
vsys = "vsys1"
vsys_dict.setdefault(vsys, {})
vsys_dict[vsys].setdefault(node.xpath_import_base(), [])
vsys_dict[vsys][node.xpath_import_base()].append(node)
return dev, instances, vsys_dict
[docs] def create_similar(self):
"""Bulk create all objects similar to this one.
**Modifies the live device**
This is similar to create(), except instead of calling create only
on this object, it calls create for all objects that share the same
xpath as this object, recursively searching the entire object tree
from the nearest firewall or panorama instance.
As an example, if you called create_similar on an object representing
ethernet1/5.42, all of the subinterfaces for ethernet1/5 would be
included in the resulting XML document, regardless of which vsys
those subinterfaces existed in.
"""
dev, instances, vsys_dict = self._gather_bulk_info("create_similar")
if not instances:
return
# The new root tag is the last tag in the xpath, while the new xpath
# is what remains.
xpath_tokens = self.xpath_short().split("/")
new_root = xpath_tokens.pop()
xpath = "/".join(xpath_tokens)
# Append all similar children.
shared_root = ET.Element(new_root)
for x in instances:
shared_root.append(x.element())
# Perform the create.
dev.xapi.set(
xpath,
ET.tostring(shared_root, encoding="utf-8"),
retry_on_peer=self.HA_SYNC,
)
# Do all necessary imports, per vsys, per import xpath.
self._perform_vsys_dict_import_set(dev, vsys_dict)
[docs] def apply_similar(self):
"""Bulk apply all objects similar to this one.
**Modifies the live device**
This is similar to apply(), except instead of calling apply only
on this object, it calls apply for all objects that share the same
xpath as this object, recursively searching the entire object tree
from the nearest firewall or panorama instance.
As an example, if you called apply_similar on an object representing
ethernet1/5.42, all of the subinterfaces for ethernet1/5 would be
included in the resulting XML document, regardless of which vsys
those subinterfaces existed in.
Since apply does a replace of the config at the given xpath, please
be careful when using this function that all objects, whether they
be updated or not, exist in your pan-os-python object tree.
"""
dev, instances, vsys_dict = self._gather_bulk_info("apply_similar")
if not instances:
return
# The new root tag is the last tag in the xpath, while the new xpath
# is what remains.
xpath = self.xpath_short()
new_root = xpath.split("/")[-1]
# Append all children of type cType.
shared_root = ET.Element(new_root)
for x in instances:
shared_root.append(x.element())
# Perform the create.
dev.xapi.edit(
xpath,
ET.tostring(shared_root, encoding="utf-8"),
retry_on_peer=self.HA_SYNC,
)
# Do all necessary imports, per vsys, per import xpath.
self._perform_vsys_dict_import_set(dev, vsys_dict)
[docs] def delete_similar(self):
"""Bulk delete all objects similar to this one.
**Modifies the live device**
This is similar to delete(), except instead of calling delete only
on this object, it calls delete for all objects that share the same
xpath as this object, recursively searching the entire object tree
from the nearest firewall or panorama instance.
As an example, if you called delete_similar on an object representing
ethernet1/5.42, all of the subinterfaces in your pan-os-python object
tree for ethernet1/5 would be removed.
"""
dev, instances, vsys_dict = self._gather_bulk_info("delete_similar")
if not instances:
return
# This operation is only supported for entry/member objects.
if self.SUFFIX not in (ENTRY, MEMBER):
raise ValueError("delete_similar requires member or entry")
# Do all necessary unimports, per vsys, per xpath.
self._perform_vsys_dict_import_delete(dev, vsys_dict)
# Now perform the bulk delete.
joiner = ""
prefix = ""
xpath = self.xpath_nosuffix()
if self.SUFFIX == ENTRY:
joiner = "@name='{0}'"
prefix = "entry"
elif self.SUFFIX == MEMBER:
joiner = "text()='{0}'"
prefix = "member"
# After some testing, PAN-OS seems to be able to handle a DELETE API call
# with up to 25k characters in around 3.3sec while not under stress, but
# this can balloon up to 15sec with PAN-OS under load. So we'll need to
# break delete calls into chunks that will complete within 30sec instead
# of trying to specify everything all at once.
for chunk in chunk_instances_for_delete_similar(instances):
dev.xapi.delete(
"{0}/{1}[{2}]".format(
xpath, prefix, " or ".join(joiner.format(x.uid) for x in chunk),
),
retry_on_peer=self.HA_SYNC,
)
# Remove each object from self, just like delete().
for x in instances:
x.parent.remove(x)
def _perform_vsys_dict_import_set(self, dev, vsys_dict):
"""Iterates of a vsys_dict, doing imports for all instances."""
for vsys, vsys_spec in vsys_dict.items():
if vsys is None:
continue
for xpath_import_base, objs in vsys_spec.items():
xpath_tokens = xpath_import_base.split("/")
new_root = xpath_tokens.pop()
# Form the xpath from what remains of the xpath.
xpath = "/".join(xpath_tokens)
# Append objects as members to the new root.
shared_root = ET.Element(new_root)
for x in objs:
ET.SubElement(shared_root, "member").text = x.uid
# Perform the import.
dev.xapi.set(
xpath,
ET.tostring(shared_root, encoding="utf-8"),
retry_on_peer=self.HA_SYNC,
)
def _perform_vsys_dict_import_delete(self, dev, vsys_dict):
"""Iterates over a vsys_dict, deleting the import for all instances."""
for vsys_spec in vsys_dict.values():
for objs in vsys_spec.values():
members = " or ".join("text()='{0}'".format(x.uid) for x in objs)
xpath = "{0}/member[{1}]".format(objs[0].xpath_import_base(), members)
# API complains if you try to do this in one delete statement,
# so do one delete per vsys per path, just like when we set the
# imports.
dev.xapi.delete(xpath, retry_on_peer=self.HA_SYNC)
def dot(self):
result = (
"digraph configtree {graph [rankdir=LR, fontsize=10, margin=0.001];"
"node [shape=box, fontsize=10, height=0.001, margin=0.1, ordering=out];"
)
result += self._dot(root_node=True)
result += "}"
return result
def _dot(self, root_node=False):
node = type(self).__name__
module = type(self).__module__.split(".")[-1]
result = (
'"{node_name}" [style=filled fillcolor={color} '
'URL="{url}'
'/module-{module}.html#panos.{module}.{node}" '
'target="_blank"];'
)
result = result.format(
node_name=node + " : " + self.uid,
node=node,
module=module,
url=panos.DOCUMENTATION_URL,
color=panos.node_color(module),
)
# Make recursive call to children
for child in self.children:
result += child._dot()
# Build relationship with parent
if not root_node and self.parent is not None:
if self.parent is not None:
result += '"{0}" -> "{1}";'.format(
type(self.parent).__name__ + " : " + self.parent.uid,
type(self).__name__ + " : " + self.uid,
)
return result
[docs] def tree(self):
"""Display a graph of the configuration tree
The tree includes this object and its children, recursively.
This method is only for use in Jupyder Notebooks
"""
import graphviz
return graphviz.Source(self.dot())
[docs] def fulltree(self):
"""Display a graph of the entire configuration tree
This method is only for use in Jupyder Notebooks
"""
if self.parent is not None:
return self.parent.fulltree()
return self.tree()
[docs] def retrieve_panos_version(self):
"""Gets the panos_version of the closest PanDevice.
If this object is not attached to a PanDevice, then a very large
number is returned to ensure that the newest version of the
object and xpath is presented to the user.
Returns:
tuple: The version as (x, y, z)
"""
try:
device = self.nearest_pandevice()
panos_version = device.get_device_version()
except (err.PanDeviceNotSet, err.PanApiKeyNotSet, AttributeError):
panos_version = self._UNKNOWN_PANOS_VERSION
return panos_version
[docs] def hierarchy_info(self):
"""This function returns hierarchical information about this object.
All objects in pan-os-python can be added as children to other objects,
so this function details what configurations are valid for this
particular object.
Returns:
dict: Hierarchy information about this object.
"""
from panos.firewall import Firewall
from panos.panorama import DeviceGroup
from panos.panorama import Panorama
from panos.panorama import Template
from panos.panorama import TemplateStack
classes = panos.object_classes()
configs = [
[self.__class__,],
]
updated_configs = []
# Find all possible config trees.
while True:
for num, chain in enumerate(configs):
parents = panos.parents_for(chain[-1], classes)
if parents:
configs.pop(num)
for p in parents:
configs.append(
chain + [p,]
)
break
else:
break
# Because Firewall objects can be children of Panorama objects,
# we need to do another pass to check for multi-PanDevice configs
# because Panorama is not strictly necessary.
for num in range(len(configs)):
chain = configs[num]
if Firewall in chain and Panorama in chain:
configs.append(chain[: chain.index(Firewall) + 1])
# Remove dupes.
updated_configs = []
for chain in configs:
if chain not in updated_configs:
updated_configs.append(chain)
configs = updated_configs
# Remove any DeviceGroup > Firewall hierarchies.
updated_configs = []
for chain in configs:
fw_index = -1
dg_index = -1
for num, x in enumerate(chain):
if x == Firewall:
fw_index = num
elif x == DeviceGroup:
dg_index = num
if fw_index == -1 or dg_index == -1 or fw_index + 1 != dg_index:
updated_configs.append(chain)
configs = updated_configs
# Remove Template / TemplateStack hierarchies if there is a DeviceGroup
# hierarchy.
for chain in configs:
if DeviceGroup in chain:
configs = [
x for x in configs if Template not in x and TemplateStack not in x
]
break
# Get the current config tree.
cur_tree = []
p = self
while p is not None:
cur_tree.append(p)
p = p.parent
# Reverse the trees to match reality.
for x in configs:
x.reverse()
cur_tree.reverse()
return {
"configurations": configs,
"current": cur_tree,
"valid": cur_tree in configs,
}
[docs]class VersioningSupport(object):
"""A class that supports getting version specific values of something.
Versions of the value are added in ascending order using ``add_profile()``,
then can be retrieved by using ``_get_versioned_value()``. You can specify
how the retrieved value is cast by overriding ``_cast_version_value()``.
"""
def __init__(self):
self.__profiles = []
[docs] def add_profile(self, version=None, value=None):
"""Add support for version ``version`` that returns ``value``.
**Version support must be added in ascending order.**
Args:
version (str): The version to add support for. If this is
unspecified, then the version defaults to '0.0.0'.
value: The value to be retrieved for this version.
Raises:
ValueError: If the given version is lower than the most recent
version.
"""
# TODO(gfreeman): use pan-os-python versioning
if version is None:
version_tuple = (0, 0, 0)
else:
version_tuple = tuple(int(x) for x in version.split("-")[0].split("."))
if len(version_tuple) != 3:
msg = "{0} profile version {1} not formatted as X.Y.Z"
raise ValueError(msg.format(self.param, version))
# Make sure that this new profile is not a version lower
if self.__profiles:
if self.__profiles[0][0] > version_tuple:
msg = "Cannot add version {0} support after version {1}"
raise ValueError(msg.format(version_tuple, self.__profiles[0][0]))
# Add the profile
self.__profiles.insert(0, (version_tuple, value))
# Return self for chained invocations
return self
def _get_versioned_value(self, panos_version):
"""Returns version specific value.
Args:
panos_version (tuple): The version as (x, y, z) tuple
Returns:
The casted value stored for this version.
"""
value = None
for version_number, value in self.__profiles:
if panos_version >= version_number:
break
return self._cast_version_value(value)
def __iter__(self):
for version_number, value in self.__profiles:
yield version_number, self._cast_version_value(value)
def _cast_version_value(self, value):
"""Defines any special handling for the value before returning it."""
return value
[docs]class VersionedStubs(VersioningSupport):
[docs] def add_profile(self, version=None, *paths):
"""Adds the following stubs for the specified version.
Args:
version (str): The version to add support for.
*paths (str): Variable length arg list of paths for this version.
"""
return super(VersionedStubs, self).add_profile(version, paths)
def _cast_version_value(self, value):
"""Turn the list of strings into a list of stubs."""
if value is None:
return []
ans = []
for path in value:
ans.append(ParamPath(None, path, "stub"))
return ans
[docs]class ParentAwareXpath(object):
"""Class to handle xpaths of objects.
Some objects have a different xpath based on where in the tree they are
located. This class allows you configure various xpaths that can vary
both on version and what the parent class is.
If no explicit parent is specified, then the global parent of `None' is
assumed.
"""
def __init__(self):
self.settings = {}
self.parent_params = []
[docs] def add_profile(
self,
version=None,
value=None,
parents=None,
parent_param=None,
parent_param_values=None,
):
"""Adds support for the given versions, specific to the parents.
If no parents are specified, then a parent of ``None`` is assumed,
which is the global parent type.
**Version support per parent must be in ascending order.**
Args:
version (str): The version number (default: '0.0.0').
value (str): The xpath setting.
parents (list/tuple): The parent classes this version/value is valid for.
parent_param (str): Parent param to key off of.
parent_param_values (list): Values of the parent param to key off of.
"""
if parents is None:
parents = (None,)
if parent_param not in self.parent_params:
# None is always a fallback, so make sure None as a
# parent param is last.
index = -1 if parent_param is not None else len(self.parent_params)
self.parent_params.insert(index, parent_param)
if parent_param_values is None:
parent_param_values = [
None,
]
for p in parents:
for ppv in parent_param_values:
combo = (p, parent_param, ppv)
self.settings.setdefault(combo, VersioningSupport())
self.settings[combo].add_profile(version, value)
def _get_versioned_value(self, panos_version, parent):
"""Gets the xpath for this version/parent combination.
Args:
panos_version (tuple): The version as (x, y, z) tuple.
parent: The self.parent for this VersionedPanObject.
Returns:
string. The xpath.
Raises:
ValueError if no applicable xpath is found.
"""
parents = [
None,
]
parent_settings = {}
if parent is not None:
parents = [parent.__class__.__name__, None]
try:
parent_settings = parent._about_object()
except AttributeError:
parent_settings = vars(parent)
for p in parents:
for parent_param in self.parent_params:
combo = (p, parent_param, parent_settings.get(parent_param, None))
try:
return self.settings[combo]._get_versioned_value(panos_version)
except KeyError:
pass
raise ValueError("No applicable combination found for xpath")
[docs]class VersionedPanObject(PanObject):
"""Base class for all versioned package objects.
This class is an extention of :class:`panos.base.PanObject` that
supports versioning.
Args:
name (str): The name of this object.
*args: Variable length list of values to initialize this object.
**kwargs: Keyword args to initialize this object.
Attributes:
uid (str): The unique identifier for this object if it has one. If it
doesn't have one, then this returns the class name.
vsys (str): The vsys id for this object (e.g. 'vsys2') or 'shared' if
no vsys.
XPATH (str): The xpath for this object, based on where in the tree it
currently resides, as well as the versioning.
"""
_DEFAULT_NAME = None
_TEMPLATE_DEVICE_XPATH = "/config/devices/entry[@name='localhost.localdomain']"
_TEMPLATE_VSYS_XPATH = _TEMPLATE_DEVICE_XPATH + "/vsys/entry[@name='{vsys}']"
_TEMPLATE_MGTCONFIG_XPATH = "/config/mgt-config"
def __init__(self, *args, **kwargs):
if self.NAME is not None:
try:
name = args[0]
args = args[1:]
except IndexError:
name = kwargs.pop(self.NAME, None)
setattr(self, self.NAME, name or self._DEFAULT_NAME)
self.parent = None
self.children = []
self._xpaths = ParentAwareXpath()
self._stubs = VersionedStubs()
self._setups()
try:
params = super(VersionedPanObject, self).__getattribute__("_params")
except AttributeError:
params = ()
# Sanity check: there shouldn't be more args than params
if len(args) > len(params):
msg = 'Args "{0}" exceeds params "{1}"'
raise ValueError(msg.format(args, params))
# Set all params to their default values initially
for param in params:
param.value = param.default
# Handle positional params
for value, param in zip(args, params):
param.value = value
# Handle kwargs params
for name, value in kwargs.items():
for param in params:
if param.name == name:
param.value = value
break
else:
raise ValueError('No param "{0}" exists'.format(name))
def _setup(self):
"""Setup the object here.
The setup includes configuring the following:
* _xpaths
* _xpath_imports (VsysOperations objects only)
* _params
* _stubs
If you want this to have versioned parameters, be sure to
set a `_params` variable here. It should be a tuple of
:class:`panos.base.VersionedParamPath` objects.
"""
pass
def _about_object(self):
try:
ans = dict((p.name, p.value) for p in self._params)
except AttributeError:
ans = {}
finally:
# If the object has a self.NAME, include that in the result
if self.NAME is not None:
ans[self.NAME] = self.uid
return ans
def _about_parameter(self, parameter):
parameter = str(parameter)
ans = {
"Parameter": parameter,
"Current Value": getattr(self, parameter, None),
}
# Make sure the param exists or raise AttributeError
try:
for param in self._params:
if param.name == parameter:
break
else:
raise AttributeError
except AttributeError:
# Check if the parameter is the object's uid
if parameter == self.NAME:
ans["About"] = "This is the object's unique identifier"
return ans
else:
raise AttributeError(parameter)
version_info = []
panos_version = self.retrieve_panos_version()
if panos_version == self._UNKNOWN_PANOS_VERSION:
# No parent, return all versioning info for this parameter
version_info = []
for version_number, var_path in param:
version_info.append(var_path.about(version_number))
ans["About"] = version_info
else:
# Display parameter's version specific info
var_path = param._get_versioned_value(panos_version)
if var_path:
ans["About"] = var_path.about()
else:
ans["About"] = "No VarPath for this version"
return ans
def __dir__(self):
"""This is for tab-complete options."""
ans = set([])
# Get standard stuff: methods and variables/properties
try:
ans.update(super(VersionedPanObject, self).__dir__())
except AttributeError:
# Get variables
ans.update(self.__dict__.keys())
# Get functions
ans.update(dir(type(self)))
# Get the _params stuff if it's present
try:
ans.update(x.name for x in self._params)
except Exception:
pass
return list(ans)
def _build_element_info(self):
panos_version = self.retrieve_panos_version()
settings = {}
params = ()
try:
params = self._params
except AttributeError:
pass
paths = []
for param in params:
settings[param.name] = param.value
var_path = param._get_versioned_value(panos_version)
if var_path:
paths.append(var_path)
stubs = []
try:
stubs = self._stubs._get_versioned_value(panos_version)
except AttributeError:
pass
return (paths, stubs, settings)
[docs] def element(self, with_children=True, comparable=False):
"""Return an xml.etree.ElementTree for this object and its children.
Args:
with_children (bool): Include the children objects.
comparable (bool): Element will be used in a comparison with another.
Returns:
xml.etree.ElementTree for this object.
"""
ans = self._root_element()
paths, stubs, settings = self._build_element_info()
iterchain = (
(p.element(self._root_element(), settings, comparable) for p in paths),
(s.element(self._root_element(), settings, comparable) for s in stubs),
)
if with_children:
iterchain += (self._subelements(comparable),)
self.xml_merge(ans, itertools.chain(*iterchain))
# Now that the whole element is built, mixin an attrib vartypes.
#
# We do this here instead of in xml_merge() because attributes are considered
# part of the identity in that function, and I'm not sure we want to manage
# a list of what attributes are considered part of an element's identity and
# what should be mixed in.
for p in paths:
if p.vartype != "attrib":
continue
attrib_path = p.path.split("/")
attrib_name = attrib_path.pop()
attrib_value = settings[p.param]
if attrib_value is None or p.exclude:
continue
e = ans
for ap in attrib_path:
if not ap:
continue
finder = None
tag = None
attribs = {}
if ap.startswith("entry "):
junk, var_to_use = ap.split()
sol_value = panos.string_or_list(settings[var_to_use])[0]
finder = "entry[@name='{0}']".format(sol_value)
tag = "entry"
attribs["name"] = sol_value
elif ap == "entry[@name='localhost.localdomain']":
finder = ap
tag = "entry"
attribs["name"] = "localhost.localdomain"
else:
finder = ap.format(**settings)
tag = finder
e2 = e.find("./{0}".format(finder))
if e2 is None:
e = ET.SubElement(e, tag, attribs)
else:
e = e2
e.attrib[attrib_name] = attrib_value
return ans
[docs] def equal(self, panobject, force=False, compare_children=True):
"""Compare this object to another object
Equality of the objects is determined by the XML they generate, not by the
values of their variables.
Args:
panobject (VersionedPanObject): The object to compare with this object
force (bool): Do not raise a PanObjectError if the objects are different classes
compare_children (bool): Include children of the PanObject in the comparison
Raises:
PanObjectError: Raised if the objects are different types that
would not normally be comparable
Returns:
bool: True if the XML of the objects is equal, False if not
"""
if not panobject:
return False
if type(self) != type(panobject) and not force:
msg = "Object {0} is not compareable to {1}"
raise err.PanObjectError(msg.format(self, panobject))
xml_self = ET.tostring(self.element(compare_children, True), encoding="utf-8")
xml_other = ET.tostring(
panobject.element(compare_children, True), encoding="utf-8"
)
return xml_self == xml_other
def _get_param_specific_info(self, param):
"""Gets a tuple of info for the given parameter.
This is to aid in things like updates or refreshes of a specific
parameter attached to this PanObject / VersionedPanObject.
Returns:
A four element tuple of the variable's xpath (str), the attribute
name (if this is vartype="attrib"), the value of
the variable, and the full ``VarPath`` or ``ParamPath`` object that
is responsible for handling this variable.
Raises:
ValueError: If the param does not exist in this object.
PanDeviceError: If the param does not exist in the XML.
"""
paths, stubs, settings = self._build_element_info()
value = settings[param]
# Find the VarPath to use
for var_path in paths:
if var_path.param == param:
break
else:
msg = "Variable {0} is not present in this version"
raise ValueError(msg.format(param))
# Build up the xpath
xpath = []
for token in var_path.path.split("/"):
if not token:
continue
p = None
if token.startswith("entry "):
junk, var_to_use = token.split()
p = "entry[name='{0}']".format(
*(x for x in self._value_as_list(settings[var_to_use]))
)
else:
p = None
try:
p = token.format(**{})
except KeyError as ke:
param_ref = ke.args[0]
if settings[param_ref] is None:
msg = " ".join(
[
"While updating variable {0},",
"missing replacement variable {1} in path",
]
)
raise ValueError(msg.format(param, param_ref))
p = token.format(**settings)
xpath.append(p)
# Remove the last part of vartype=attrib variable xpath parts.
attr = None
if var_path.vartype == "attrib":
attr = xpath.pop()
return ("/".join(xpath) or None, attr, value, var_path)
[docs] def parse_xml(self, xml):
"""Parse the given XML into this object's parameters.
Args:
xml (xml.etree.ElementTree): The XML to parse values from.
"""
settings = {}
panos_version = self.retrieve_panos_version()
params = ()
try:
params = self._params
except AttributeError:
return
# Build up the paths and the possibilities for each param.
paths = []
possibilities = {}
for param in params:
var_path = param._get_versioned_value(panos_version)
if var_path:
paths.append(var_path)
if var_path.param and var_path.values:
possibilities[param.name] = var_path.values
# Get the stubs and append those to the paths to parse as well. We
# do this because a stub could sometimes help us find the value of
# another param that might not otherwise be present.
stubs = []
try:
stubs = self._stubs._get_versioned_value(panos_version)
except AttributeError:
pass
for stub in stubs:
if stub:
paths.append(stub)
# Retrieve the uid (if applicable)
if self.SUFFIX == ENTRY:
setattr(self, self.NAME, xml.attrib["name"])
# Parse out all VarPaths
for var_path in paths:
var_path.parse_xml(xml, settings, possibilities)
# Save results from the settings dict
for param in params:
param.value = settings.get(param.name)
def __getattr__(self, name):
params = super(VersionedPanObject, self).__getattribute__("_params")
for param in params:
if name == param.name:
return param.value
raise AttributeError(
"'{0}' object has no attribute '{1}'".format(
self.__class__.__name__, str(name),
)
)
def __setattr__(self, name, value):
params = ()
try:
params = super(VersionedPanObject, self).__getattribute__("_params")
except AttributeError:
pass
for param in params:
if name == param.name:
param.value = value
break
else:
super(VersionedPanObject, self).__setattr__(name, value)
@property
def XPATH(self):
"""Returns the version specific xpath of this object."""
panos_version = self.retrieve_panos_version()
val = self._xpaths._get_versioned_value(panos_version, self.parent)
return val.format(vsys=self.vsys or "vsys1")
[docs]class VersionedParamPath(VersioningSupport):
"""A wrapper class for ParamPath objects.
Specifying any kwargs will be interpreted as args for the first profile to
add for this parameter. If there are no kwargs specified, then any version
that may or may not have been passed in is ignored.
The ``values`` stored in each profile added are the kwargs used to
initialize the ``ParamPath`` object. The ``name`` should not be specified,
as that will be passed in positionally for you.
Args:
name (str): The parameter name. Any hyphens in the name are replaced
with underscores, as hyphens are not a valid variable character.
default: The default value this parameter should take when the user
is creating a ``VersionedPanObject``, but doesn't specify a value.
version (str): A version string like '1.2.3' or None. If the version
is None, then the version is set to '0.0.0'.
**kwargs: Various ``ParamPath`` parameters for the given version.
"""
def __init__(self, name, default=None, version=None, **kwargs):
super(VersionedParamPath, self).__init__()
self.name = name.replace("-", "_")
self.default = default
self.value = None
if kwargs:
self.add_profile(version, **kwargs)
[docs] def add_profile(self, version=None, **kwargs):
"""Add support for version ``version``.
Args:
version (str): The version to add support for. If this is
unspecified, then the version defaults to '0.0.0'.
**kwargs: The various ``ParamPath`` arguments to use for the
given version. Note that if your kwargs do not contain
a ``path``, then this means that the variable will only be
present in the resulting XML if another ``VersionedParamPath``
references this parameter in it's ``path``.
"""
return super(VersionedParamPath, self).add_profile(version, kwargs)
def _cast_version_value(self, value):
if value is None:
value = {}
return ParamPath(self.name, **value)
def __repr__(self):
return "<{0} {1}={2} default={3} {4:#x}>".format(
self.__class__.__name__, self.name, self.value, self.default, id(self)
)
[docs]class ValueEntry(VersionedPanObject):
"""Base class for objects that only have a value element.
"""
ROOT = Root.VSYS
SUFFIX = ENTRY
LOCATION = None
def _setup(self):
if self.LOCATION is None:
raise Exception("{0}.LOCATION is unset".format(self.__class__))
# xpath
self._xpaths.add_profile(value=self.LOCATION)
# params
self._params = (VersionedParamPath("value", path="value"),)
[docs]class VarPath(object):
"""Configuration variable within the object
Args:
path (str): The relative xpath to the variable
variable (str): The name of the instance variable in the class
vartype (str): The type of variable (None, 'member', 'entry', 'bool', 'int', 'exist', 'none')
default: The default value if no value is specified during __init__ of the object
xmldefault (bool): The default value if no value exists in the xml from a device
condition (str): In the format othervariable:value where this variable is only
considered if othervariable equals value
order (int): The order of this variable relative to other variables in this constructor of the
class that contains this variables. Defaults to 100, set variable order to less than or
greater than 100 to alter the order of the variables.
"""
def __init__(
self,
path,
variable=None,
vartype=None,
default=None,
xmldefault=None,
condition=None,
order=100,
):
self.path = path
self.vartype = vartype
self.default = default
self.xmldefault = xmldefault
self.condition = condition
self.order = order
if variable is None:
self.variable = self.path.rsplit("/", 1)[-1].replace("-", "_")
else:
self.variable = variable
def __repr__(self):
return "<%s %s at 0x%x>" % (type(self).__name__, repr(self.variable), id(self))
[docs] def about(self):
"""Returns information about this VarPath as a dict."""
return {
"Type": self.vartype or "string",
"Condition": self.condition,
"Default": self.default,
"XML Path": self.path,
}
def _set_inner_xml_tag_text(self, elm, value, comparable=False):
"""Sets the final elm's .text as appropriate given the vartype.
Args:
elm (xml.etree.ElementTree.Element): The element whose .text to set.
value (various): The value to put in the .text, conforming to the vartype of this parameter.
comparable (bool): Make updates for element string comparisons. For entry and member vartypes, sort the entries (True) or leave them as-is (False).
"""
# Create an element containing the value in the instance variable
if self.vartype == "member":
values = panos.string_or_list(value)
if comparable:
values = sorted(values)
for member in values:
ET.SubElement(elm, "member").text = str(member)
elif self.vartype == "entry":
values = panos.string_or_list(value)
if comparable:
values = sorted(values)
for entry in values:
ET.SubElement(elm, "entry", {"name": str(entry)})
elif self.vartype == "exist":
if value:
ET.SubElement(elm, self.variable)
elif self.vartype == "bool":
elm.text = yesno(value)
elif self.path.find("|") != -1:
# This is an element variable,
# it has already been created
# so do nothing
pass
elif self.vartype == "none":
# There is no variable, so don't try to populate it
pass
elif self.vartype == "attrib":
raise ValueError("attrib not yet supported for classic objects")
else:
elm.text = str(value)
[docs]class ParamPath(object):
"""Configuration parameter within the object.
Args:
param (str): The name of the instance parameter in the class
path: The relative xpath to the variable.
vartype: The type of variable (None, 'member', 'entry', 'yesno',
'int', 'exist').
condition (dict): Other settings that must be true for this param
to appear in the XML. The keys of the condition should be other
parameter names, with the value being what the necessary value of
that parameter should be.
values (list): Valid values this param can be set to. This is not
enforced in any way from the user's perspective when setting
parameters, but these values are referenced when parsing any XML
returned from a live device.
exclude (bool): Exclude this param from the resultant XML.
"""
def __init__(
self, param, path=None, vartype=None, condition=None, values=None, exclude=False
):
self.param = param
self.path = path
self.vartype = vartype
self.condition = condition or {}
self.values = values or []
self.exclude = exclude
if self.path is None:
self.path = self.param.replace("_", "-")
[docs] def about(self, version_header=None):
"""Returns information about this ParamPath as a dict."""
info = {
"Type": self.vartype or "string",
"Values": self.values,
"Condition": self.condition,
"XML Path": self.path,
}
if version_header is not None:
info["Versioning"] = version_header
return info
def __repr__(self):
return "<{0} '{1}' at {2:#x}>".format(
self.__class__.__name__, self.param, id(self)
)
def _value_as_list(self, value):
if isstring(value):
yield value
elif hasattr(value, "__iter__"):
for v in value:
yield str(v)
else:
yield str(value)
[docs] def element(self, elm, settings, comparable=False):
"""Create the xml.etree.ElementTree for this parameter.
Args:
elm (xml.etree.ElementTree): the root node for which to append
onto this param's XML.
settings (dict): All parameter settings for the
``VersionedPanObject``.
comparable (bool): Make necessary adjustments to the XML for comparison's sake.
Returns:
xml.etree.ElementTree: The ``elm`` passed in, modified to contain
this parameter in the XML. If this param should not be contained
in the full ``VersionedPanObject``'s XML, then None is returned.
"""
value = settings.get(self.param)
# Check if this should return None instead of an element
if self.exclude:
return None
elif self.vartype == "attrib":
return None
elif value is None and self.vartype != "stub":
return None
for condition_key, condition_value in self.condition.items():
try:
if settings[condition_key] not in condition_value:
return None
except TypeError:
if settings[condition_key] != condition_value:
return None
except KeyError:
# This condition references a param that does not exist and it is
# thus not needed
return None
e = elm
attr = None
# Build the element
tokens = self.path.split("/")
if self.vartype == "exist":
del tokens[-1]
elif self.vartype == "attrib":
attr = tokens.pop()
for token in tokens:
if not token:
continue
if token.startswith("entry "):
junk, var_to_use = token.split()
sol_val = panos.string_or_list(settings[var_to_use])[0]
child = ET.Element("entry", {"name": str(sol_val)})
elif token == "entry[@name='localhost.localdomain']":
child = ET.Element("entry", {"name": "localhost.localdomain"})
else:
child = ET.Element(token.format(**settings))
if child.tag == "None":
return None
e.append(child)
e = child
self._set_inner_xml_tag_text(e, value, comparable, attr)
return elm
@staticmethod
def _sha1_hash(string):
# Check if this string is cleartext or encrypted
if string.startswith("-"):
# Get sha1 part of encrypted string
return string[5:33]
else:
# Sha1 hash the cleartext value
# Python3: encode for sha1, decode for XML serialization.
sha1 = hashlib.sha1(string.encode("utf-8"))
return base64.b64encode(sha1.digest()).decode("utf-8")
[docs] def parse_xml(self, xml, settings, possibilities):
"""Parse the XML to find this parameter's value.
Both this parameter, and any other parameters that may be discovered
during the parsing of this parameter, will be saved in the ``settings``
dict passed in to this function.
Args:
xml (xml.etree.ElementTree): The XML to parse.
settings (dict): Current known values for this object's parameters.
possibilities (dict): A dict where the key is a parameter's name,
and the value is a list of strings that that param could be
in the XML.
"""
if not self.path:
# No path, so this is just a parameter ParamPath
return
# Check that conditional is met
for condition_key, condition_value in self.condition.items():
try:
if settings[condition_key] not in condition_value:
return
except TypeError:
if settings[condition_key] != condition_value:
return
except KeyError:
# This condition references a param that does not exist and it is
# thus not needed
return None
attr = None
e = xml
tokens = self.path.split("/")
if self.vartype == "exist":
del tokens[-1]
elif self.vartype == "attrib":
attr = tokens.pop()
for p in tokens:
# Skip this path part if there is no path part
if not p:
continue
path_str = None
if p.startswith("entry "):
# Entry path part
entry_var = p.split()[1]
if entry_var not in settings:
# Entry's name is not yet known, try to find it
ans = e.find("./entry")
if ans is None:
return
settings[entry_var] = ans.attrib["name"]
sol_val = panos.string_or_list(settings[entry_var])[0]
path_str = "entry[@name='{0}']".format(sol_val)
else:
# Standard path part
try:
# If we don't have all the settings necessary to format
# this string, a KeyError will be raised
path_str = p.format(**settings)
except KeyError as ke:
# Missing a parameter's setting, check all of that param's
# possibilities against the XML to see which one it is
missing_variable = ke.args[0]
if missing_variable not in possibilities:
return
possibility_settings = settings.copy()
for pos in possibilities[missing_variable]:
possibility_settings[missing_variable] = pos
path_str = p.format(**possibility_settings)
ans = e.find("./{0}".format(path_str))
if ans is not None:
settings[missing_variable] = pos
break
else:
return
ans = e.find("./{0}".format(path_str))
if ans is None:
return
e = ans
# Pull the value, properly formatted, from this last element
self.parse_value_from_xml_last_tag(e, settings, attr)
def _set_inner_xml_tag_text(self, elm, value, comparable=False, attr=None):
"""Sets the final elm's .text as appropriate given the vartype.
Args:
elm (xml.etree.ElementTree.Element): The element whose .text to set.
value (various): The value to put in the .text, conforming to the vartype of this parameter.
comparable (bool): Make updates for element string comparisons. For encrypted fields, if the text should be set to a password hash (True) or left as a basestring (False). For entry and member vartypes, sort the entries (True) or leave them as-is (False).
attr (str): For `vartype="attrib"`, the attribute name.
"""
# Format the element text appropriately
if self.vartype == "member":
values = self._value_as_list(value)
if comparable:
values = sorted(values)
for v in values:
ET.SubElement(elm, "member").text = v
elif self.vartype == "entry":
values = self._value_as_list(value)
if comparable:
values = sorted(values)
for v in values:
ET.SubElement(elm, "entry", {"name": v})
elif self.vartype == "exist":
if value:
exist_tag = self.path.split("/")[-1]
ET.SubElement(elm, exist_tag)
elif self.vartype == "yesno":
elm.text = "yes" if value else "no"
elif (
self.vartype == "stub"
or "{{{0}}}".format(self.param) == self.path.split("/")[-1]
):
pass
elif self.vartype == "int":
elm.text = str(int(value))
elif self.vartype == "encrypted" and comparable:
elm.text = self._sha1_hash(str(value))
elif self.vartype == "attrib":
elm.attrib[attr] = value
else:
elm.text = str(value)
[docs] def parse_value_from_xml_last_tag(self, elm, settings, attr):
"""Actually do the parsing for this parameter.
The value parsed is saved into the ``settings`` dict.
Args:
elm (xml.etree.ElementTree): The final (deepest) tag in the XML
document passed in to ``parse_xml()`` that contains the actual
value to parse out for this parameter.
settings (dict): The dict where the parsed value will be saved.
attr (str): For `vartype="attrib"`, the attribute name.
Raises:
ValueError: If a param is in an incorrect format.
"""
# Do vartype processing
if self.vartype == "member":
settings[self.param] = [x.text for x in elm.findall("member")]
elif self.vartype == "entry":
settings[self.param] = [x.attrib["name"] for x in elm.findall("entry")]
elif self.vartype == "exist":
exist_tag = self.path.split("/")[-1]
ans = elm.find("./{0}".format(exist_tag))
settings[self.param] = True if ans is not None else False
elif self.vartype == "yesno":
if elm.text == "yes":
settings[self.param] = True
elif elm.text == "no":
settings[self.param] = False
else:
raise ValueError('{0} "{1}" is not yes/no'.format(self.param, elm.text))
elif (
self.vartype == "stub"
or "{{{0}}}".format(self.param) == self.path.split("/")[-1]
):
pass
elif self.vartype == "int":
settings[self.param] = int(elm.text)
elif self.vartype == "attrib":
settings[self.param] = elm.attrib.get(attr, None)
else:
settings[self.param] = elm.text
[docs]class VsysOperations(VersionedPanObject):
"""Modify PanObject methods to set vsys import configuration."""
CHILDMETHODS = ("create", "apply", "delete")
ALWAYS_IMPORT = False
def __init__(self, *args, **kwargs):
self._xpath_imports = ParentAwareXpath()
super(VsysOperations, self).__init__(*args, **kwargs)
@property
def XPATH_IMPORT(self):
"""Returns the version specific xpath import for this object."""
panos_version = self.retrieve_panos_version()
return self._xpath_imports._get_versioned_value(panos_version, self.parent)
[docs] def create(self):
super(VsysOperations, self).create()
self.child_create()
[docs] def apply(self):
super(VsysOperations, self).apply()
self.child_apply()
[docs] def delete(self):
self.child_delete()
super(VsysOperations, self).delete()
def child_create(self):
return self._create_apply_child()
def child_apply(self):
return self._create_apply_child()
def _create_apply_child(self):
# Remove vsys import if this object has an interface in ha or ag mode
if str(getattr(self, "mode", None)) in ("ha", "aggregate-group"):
self.set_vsys(None, refresh=True, update=True)
elif self.ALWAYS_IMPORT and self.vsys is None:
self.create_import("vsys1")
else:
self.create_import()
def child_delete(self):
if self.ALWAYS_IMPORT and self.vsys is None:
self.delete_import("vsys1")
else:
self.delete_import()
[docs] def create_import(self, vsys=None):
"""Create a vsys import for the object
Args:
vsys (str): Override the vsys
"""
if vsys is None:
vsys = self.vsys
# There are no vsys imports in template stacks.
p = self
while p is not None:
if p.__class__.__name__ == "TemplateStack":
return
p = p.parent
if vsys != "shared" and vsys is not None and self.XPATH_IMPORT is not None:
xpath = self.xpath_import_base(vsys)
element = "<member>{0}</member>".format(self.uid)
device = self.nearest_pandevice()
device.active().xapi.set(xpath, element, retry_on_peer=True)
def xpath_import_base(self, vsys=None):
template = ""
p = self
while p is not None:
if p.__class__.__name__ in ("Template", "TemplateStack"):
template = p.xpath()
break
p = p.parent
vsys_xpath = self._root_xpath_vsys(vsys or self.vsys or "vsys1")
return "{0}{1}/import{2}".format(template, vsys_xpath, self.XPATH_IMPORT)
[docs] def delete_import(self, vsys=None):
"""Delete a vsys import for the object
Args:
vsys (str): Override the vsys
"""
if vsys is None:
vsys = self.vsys
# There are no vsys imports in template stacks.
p = self
while p is not None:
if p.__class__.__name__ == "TemplateStack":
return
p = p.parent
if vsys != "shared" and vsys is not None and self.XPATH_IMPORT is not None:
xpath = "{0}/member[text()='{1}']".format(
self.xpath_import_base(vsys), self.uid
)
device = self.nearest_pandevice()
device.active().xapi.delete(xpath, retry_on_peer=True)
[docs] def set_vsys(
self,
vsys_id,
refresh=False,
update=False,
running_config=False,
return_type="object",
):
"""Set the vsys for this interface.
Creates a reference to this interface in the specified vsys and
removes references to this interface from all other vsys. The vsys
will be created if it doesn't exist.
Args:
vsys_id (str): The vsys id to set for this object (eg. vsys2)
refresh (bool): Refresh the relevant current state of the device
before taking action (Default: False)
update (bool): Apply the changes to the device (Default: False)
running_config (bool): If refresh is True, refresh from the running
configuration (Default: False)
return_type (str): Specify what this function returns, can be
either 'object' (the default) or 'bool'. If this is 'object',
then the return value is the device.Vsys in question. If
this is 'bool', then the return value is a boolean that tells
you about if the live device needs updates (update=False) or
was updated (update=True).
Returns:
Vsys: The vsys for this interface after the operation completes
"""
if refresh and running_config:
msg = "Can't refresh vsys from running config in set_vsys"
raise ValueError(msg)
# Don't import HA or aggregate-group interfaces.
if getattr(self, "mode", "") in ("ha", "aggregate-group"):
return False
# There are no vsys imports in template stacks.
p = self
while p is not None:
if p.__class__.__name__ == "TemplateStack":
if return_type == "bool":
return False
return
p = p.parent
import_to_vsys_param = {
"vlan": "vlans",
"virtual-wire": "virtual_wires",
"virtual-router": "virtual_routers",
"interface": "interface",
}
for key, param_name in import_to_vsys_param.items():
if self.XPATH_IMPORT.endswith("/{0}".format(key)):
break
else:
raise ValueError("Unknown import type: {0}".format(self.XPATH_IMPORT))
from panos.device import Vsys
return self._set_reference(
vsys_id,
Vsys,
param_name,
"list",
True,
refresh,
update,
running_config,
return_type,
True,
)
[docs] @classmethod
def refreshall(
cls,
parent,
running_config=False,
add=True,
exceptions=False,
name_only=False,
matching_vsys=True,
):
instances = super(VsysOperations, cls).refreshall(
parent,
running_config,
add=False,
exceptions=exceptions,
name_only=name_only,
)
if not matching_vsys:
return instances
# Versioned objects need a PanDevice to get the version from, so
# set the child's parent before accessing XPATH.
class_instance = cls()
class_instance.parent = parent
# Filter out instances that are not in this vsys's imports
device = parent.nearest_pandevice()
api_action = device.xapi.show if running_config else device.xapi.get
if (
parent.vsys != "shared"
and parent.vsys is not None
and class_instance.XPATH_IMPORT is not None
):
imports = []
xpath = class_instance.xpath_import_base()
try:
imports_xml = api_action(xpath, retry_on_peer=True)
except (err.PanNoSuchNode, pan.xapi.PanXapiError) as e:
if not str(e).startswith("No such node"):
raise e
else:
imports = imports_xml.findall(".//member")
if imports is not None:
imports = [member.text for member in imports]
if imports is not None:
instances = [
instance for instance in instances if instance.name in imports
]
if add:
# Remove current children of this type from parent
parent.removeall(cls=cls)
# Add the new children that were just refreshed from the device
parent.extend(instances)
return instances
[docs]class OpStateContainer(object):
"""Container for all opstate namespaces.
The name "opstate" is short for "operational state" and acts as a container
for non-configuration functionality to exist.
"""
def __init__(self, obj, config):
for namespace, cls in config.items():
setattr(self, namespace, cls(obj))
[docs] def about(self):
"""Returns information about this object's opstate namespaces.
Returns:
dict: Keys are the opstate's namespace, values are the classes.
"""
return vars(self)
[docs]class OpState(object):
"""Parent class for all opstate namespaces."""
def __init__(self, obj, *args, **kwargs):
self.obj = obj
self._setup(*args, **kwargs)
def _setup(self, *args, **kwargs):
"""Called during __init__."""
pass
def _str(self, elm, field):
if elm is not None:
val = elm.find("./{0}".format(field))
if val is not None:
return val.text
def _int(self, elm, field):
val = self._str(elm, field)
if val is not None:
return int(val)
def _datetime(self, elm, field, fmt):
val = self._str(elm, field)
if val is not None:
try:
return datetime.datetime.strptime(val, fmt)
except ValueError:
pass
return val
[docs]class PanDevice(PanObject):
"""A Palo Alto Networks device
The device can be of any type (currently supported devices are firewall,
or panorama). The class handles common device functions that apply
to all device types.
Usually this class is not instantiated directly. It is the base class for a
firewall.Firewall object or a panorama.Panorama object.
Args:
hostname: Hostname or IP of device for API connections
api_username: Username of administrator to access API
api_password: Password of administrator to access API
api_key: The API Key for connecting to the device's API
port: Port of device for API connections
is_virtual (bool): Physical or Virtual firewall
timeout: The timeout for asynchronous jobs
interval: The interval to check asynchronous jobs
Attributes:
ha_peer (PanDevice): The HA peer device of this PanDevice
"""
NAME = "hostname"
def __init__(
self,
hostname,
api_username=None,
api_password=None,
api_key=None,
port=443,
is_virtual=None,
timeout=1200,
interval=0.5,
*args,
**kwargs
):
"""Initialize PanDevice"""
super(PanDevice, self).__init__(*args, **kwargs)
# create a class logger
self._logger = panos.getlogger(__name__ + "." + self.__class__.__name__)
self.hostname = hostname
self.port = port
self._api_username = api_username
self._api_password = api_password
self._api_key = api_key
self.is_virtual = is_virtual
self.timeout = timeout
self.interval = interval
self.serial = None
self._xapi_private = None
self.config_locked = False
self.commit_locked = False
self.lock_before_change = False
self.shared_lock_before_change = False
self.config_changed = []
# Create a PAN-OS updater subsystem
self.software = updater.SoftwareUpdater(self)
# Create a content updater subsystem
self.content = updater.ContentUpdater(self)
# State variables
self.version = None
self._version_info = None
self.content_version = None
self.platform = None
# HA Pair Firewall or Panorama
self._ha_peer = None
self._ha_active = True
self.ha_failed = None
# Create a User-ID subsystem
self.userid = userid.UserId(self)
"""User-ID subsystem
See Also: :class:`panos.userid`
"""
# create a predefined object subsystem
# avoid a premature import
from panos import predefined
self.predefined = predefined.Predefined(self)
"""Predefined object subsystem
See Also: :class:`panos.predefined`
"""
[docs] def get_device_version(self):
"""Gets the current version on the PanDevice."""
# If it's already known, return the version info
if self._version_info is not None:
return self._version_info
# If the version is unknown but we have an API key, get the version
# from the device.
if self._api_key is not None:
self.refresh_system_info()
return self._version_info
# The version is unknown and there is not yet an API key, so there
# is no permission to touch the live device yet.
raise err.PanApiKeyNotSet("Please retrieve an API KEY first.")
[docs] @classmethod
def create_from_device(
cls, hostname, api_username=None, api_password=None, api_key=None, port=443,
):
"""Factory method to create a :class:`panos.firewall.Firewall`
or :class:`panos.panorama.Panorama` object from a live device
Connects to the device and detects its type and current state
in order to create a PanDevice subclass.
Args:
hostname: Hostname or IP of device for API connections
api_username: Username of administrator to access API
api_password: Password of administrator to access API
api_key: The API Key for connecting to the device's API
port: Port of device for API connections
Returns:
PanDevice: New subclass instance (Firewall or Panorama instance)
"""
# Create generic PanDevice to connect and get information
from panos import firewall, panorama
device = PanDevice(hostname, api_username, api_password, api_key, port,)
system_info = device.refresh_system_info()
version = system_info[0]
model = system_info[1]
if model == "Panorama" or model.startswith("M-"):
instance = panorama.Panorama(
hostname, api_username, api_password, device.api_key, port,
)
else:
serial = system_info[2]
instance = firewall.Firewall(
hostname, api_username, api_password, device.api_key, serial, port,
)
instance._set_version_and_version_info(version)
return instance
class XapiWrapper(pan.xapi.PanXapi):
# This is a confusing class used for catching exceptions and faults.
# TODO: comment this class
CONNECTION_EXCEPTIONS = (
err.PanConnectionTimeout,
err.PanURLError,
err.PanOutdatedSslError,
err.PanSessionTimedOut,
)
def __init__(self, *args, **kwargs):
self.pan_device = kwargs.pop("pan_device", None)
pan.xapi.PanXapi.__init__(self, *args, **kwargs)
pred = lambda x: inspect.ismethod(x) or inspect.isfunction(
x
) # inspect.ismethod needed for Python2, inspect.isfunction needed for Python3
for name, method in inspect.getmembers(pan.xapi.PanXapi, pred):
# Ignore hidden methods
if name[0] == "_":
continue
# Ignore non-api methods
if name in ("xml_result", "xml_root", "cmd_xml"):
continue
# Wrapper method. This is used to create
# methods in this class that match the methods in the
# super class, and call the super class methods inside
# a try/except block, which allows us to check and
# analyze the exceptions and convert them to more
# useful exceptions than generic PanXapiErrors.
wrapper_method = PanDevice.XapiWrapper.make_method(name, method)
# Create method matching each public method of the base class
setattr(PanDevice.XapiWrapper, name, wrapper_method)
@classmethod
def make_method(cls, super_method_name, super_method):
def method(self, *args, **kwargs):
retry_on_peer = kwargs.pop(
"retry_on_peer",
True
if super_method_name not in ("keygen", "op", "ad_hoc", "export")
else False,
)
apply_on_peer = kwargs.pop("apply_on_peer", False)
ha_peer = self.pan_device.ha_peer
# Check if apply to both devices
# Note: An exception will not be raised if one device could not be accessed
# An exception will be raised on other errors on either device, or if both
# devices could not be accessed.
if apply_on_peer:
# Apply to peer first
connection_failures = 0
if ha_peer is not None and not ha_peer.ha_failed:
try:
kwargs["retry_on_peer"] = False
result = getattr(ha_peer.xapi, super_method_name)(
*args, **kwargs
)
except pan.xapi.PanXapiError as e:
the_exception = self.classify_exception(e)
if type(the_exception) in self.CONNECTION_EXCEPTIONS:
# passive firewall connection failed
connection_failures += 1
else:
raise the_exception
if not self.pan_device.ha_failed:
try:
super_method(self, *args, **kwargs)
result = copy.deepcopy(self.element_root)
except pan.xapi.PanXapiError as e:
the_exception = self.classify_exception(e)
if type(the_exception) in self.CONNECTION_EXCEPTIONS:
# passive firewall connection failed
connection_failures += 1
else:
raise the_exception
elif (
self.pan_device.ha_failed
and ha_peer is not None
and not ha_peer.ha_failed
and retry_on_peer
):
# This device is failed, use the other
logger.debug("Current device is failed, starting with other device")
kwargs["retry_on_peer"] = True
result = getattr(ha_peer.xapi, super_method_name)(*args, **kwargs)
elif (
not self.pan_device.is_active()
and ha_peer is not None
and retry_on_peer
):
# I'm not active, call the peer
kwargs["retry_on_peer"] = True
result = getattr(ha_peer.xapi, super_method_name)(*args, **kwargs)
# Copy result from peer xapi to this xapi
result_vars = (
"status",
"status_detail",
"status_code",
"element_root",
"element_result",
"export_result",
"xml_document",
"text_document",
)
for var in result_vars:
setattr(self, var, getattr(ha_peer.xapi, var))
else:
try:
# This device has not failed, or both have failed
# and this device is active
# First get the superclass method
super_method(self, *args, **kwargs)
result = copy.deepcopy(self.element_root)
except pan.xapi.PanXapiError as e:
the_exception = self.classify_exception(e)
if type(the_exception) in self.CONNECTION_EXCEPTIONS:
# The attempt on the active failed with a connection error
new_active = self.pan_device.set_failed()
if retry_on_peer and new_active is not None:
logger.debug(
"Connection to device '%s' failed, using HA peer '%s'"
% (self.pan_device.id, new_active.hostname)
)
# The active failed, apply on passive (which is now active)
kwargs["retry_on_peer"] = False
getattr(new_active.xapi, super_method_name)(
*args, **kwargs
)
result = copy.deepcopy(new_active.xapi.element_root)
else:
raise the_exception
else:
raise the_exception
return result
return method
def classify_exception(self, e):
if str(e) == "Invalid credentials.":
return err.PanInvalidCredentials(str(e), pan_device=self.pan_device,)
elif str(e).startswith("URLError:"):
if str(e).endswith("timed out"):
return err.PanConnectionTimeout(str(e), pan_device=self.pan_device,)
else:
# This could be that we have an old version of OpenSSL
# that doesn't support TLSv1.1, so check for that and give
# a more explicit error if so.
if (
str(e)
== "URLError: reason: [Errno 54] Connection reset by peer"
):
min_openssl_version = ["1", "0", "1"]
help_url = "http://pan-os-python.readthedocs.io/en/latest/usage.html#connecting-to-pan-os-8-0"
try:
# Examples:
# OpenSSL 1.0.2j 26 Sep 2016
# OpenSSL 0.9.8zh 14 Jan 2016
import ssl
vs = ssl.OPENSSL_VERSION.split()[1].split(".")
except (ImportError, IndexError):
pass
else:
if vs < min_openssl_version:
msg = " ".join(
(
"You are attempting to connect to PANOS",
"8.0 or higher with an outdated OpenSSL",
"library({0}). Please update to OpenSSL",
"{1} or higher. Refer to the following",
"URL for more information: {2}",
)
)
return err.PanOutdatedSslError(
msg.format(
ssl.OPENSSL_VERSION,
".".join(min_openssl_version),
help_url,
),
pan_device=self.pan_device,
)
return err.PanURLError(str(e), pan_device=self.pan_device)
elif str(e).startswith("timeout waiting for job"):
return err.PanJobTimeout(str(e), pan_device=self.pan_device)
elif str(e).startswith(
"Another commit/validate is in" " progress. Please try again later"
):
return err.PanCommitInProgress(str(e), pan_device=self.pan_device)
elif str(e).startswith("A commit is in progress."):
return err.PanCommitInProgress(str(e), pan_device=self.pan_device)
elif str(e).startswith(
"You cannot commit while an install is in progress. Please try again later."
):
return err.PanInstallInProgress(str(e), pan_device=self.pan_device)
elif str(e).startswith("Session timed out"):
return err.PanSessionTimedOut(str(e), pan_device=self.pan_device)
elif str(e).startswith("No such node"):
return err.PanNoSuchNode(str(e), pan_device=self.pan_device)
elif str(e).startswith(
"Failed to synchronize running configuration with HA peer"
):
return err.PanHAConfigSyncFailed(str(e), pan_device=self.pan_device)
elif str(e).startswith("Configuration is locked by"):
return err.PanLockError(str(e), pan_device=self.pan_device)
elif str(e).startswith(
"Another sync is in progress. Please try again later"
):
return err.PanHASyncInProgress(str(e), pan_device=self.pan_device)
else:
return err.PanDeviceXapiError(str(e), pan_device=self.pan_device)
# Properties
@property
def id(self):
return str(getattr(self, self.NAME, "<no-id>"))
@property
def api_key(self):
if self._api_key is None:
self._api_key = self._retrieve_api_key()
return self._api_key
@property
def xapi(self):
if self._xapi_private is None:
self._xapi_private = self.generate_xapi()
return self._xapi_private
[docs] def op(
self,
cmd=None,
vsys=None,
xml=False,
cmd_xml=True,
extra_qs=None,
retry_on_peer=False,
quote='"',
):
"""Perform operational command on this device
Operational commands are most any command that is not a debug or config
command. These include many 'show' commands such as ``show system info``.
When passing the cmd as a command string (not XML) you must include any
non-keyword strings in the command inside double quotes (``"``). Here's some
examples::
# The string "facebook-base" must be in quotes because it is not a keyword
fw.op('clear session all filter application "facebook-base"')
# The string "ethernet1/1" must be in quotes because it is not a keyword
fw.op('show interface "ethernet1/1"')
# Using an alternative quote character to get DHCP info on ethernet1/1
fw.op('show dhcp client state `ethernet1/1`', quote='`')
Args:
cmd (str): The operational command to execute
vsys (str): Vsys id.
xml (bool): Return value should be a string (Default: False)
cmd_xml (bool): True: cmd is not XML, False: cmd is XML (Default: True)
extra_qs: Extra parameters for API call
retry_on_peer (bool): Try on active Firewall first, then try on passive Firewall
quote (str): The quote character when the supplied `cmd` is a string and `cmd_xml=True`
Returns:
xml.etree.ElementTree: The result of the operational command. May also return a string of XML if xml=True
"""
if cmd_xml:
cmd = panos.string_to_xml(cmd, quote)
element = self.xapi.op(cmd, vsys, False, extra_qs, retry_on_peer=retry_on_peer)
if xml:
return ET.tostring(element, encoding="utf-8")
else:
return element
[docs] def update_connection_method(self):
"""Regenerate the xapi object used to connect to the device
This is only necessary if the API key, password, hostname, or other
connectivity information in this object has changed. In this case,
the xapi object used to communicate with the firewall must be regenerated
to use the new connectivity information.
The new xapi is stored in the PanDevice object and returned.
Returns:
XapiWrapper: The xapi object which is also stored in self.xapi.
"""
self._xapi_private = self.generate_xapi()
return self._xapi_private
def generate_xapi(self):
kwargs = {
"api_key": self.api_key,
"hostname": self.hostname,
"port": self.port,
"timeout": self.timeout,
"pan_device": self,
}
xapi_constructor = PanDevice.XapiWrapper
return xapi_constructor(**kwargs)
[docs] def set_config_changed(self, scope=None):
"""Set flag that configuration of this device has changed
This is useful for checking if a commit is necessary by knowing
if the configuration was actually changed. This method is already
used by every pan-os-python package method that makes a configuration
change. But this method could also by run directly to force
a 'dirty' configuration state in a PanDevice object.
Args:
scope: vsys in which configuration was changed, or 'shared'
"""
# TODO: enhance to support device-group and template scope
if scope is None:
scope = getattr(self, "vsys", None)
if scope is None:
scope = "shared"
if self.lock_before_change:
if not self.config_locked:
self.add_config_lock(scope=scope, exceptions=True)
elif self.shared_lock_before_change:
if not self.config_locked:
self.add_config_lock(scope="shared", exceptions=True)
if scope not in self.config_changed:
self.config_changed.append(scope)
def _build_xpath(self, root, vsys):
return self.xpath_root(root, vsys or self.vsys)
def xpath_root(self, root_type, vsys, label="vsys"):
if root_type == Root.DEVICE:
xpath = self.xpath_device()
elif root_type == Root.VSYS:
xpath = self._root_xpath_vsys(vsys, label)
elif root_type == Root.MGTCONFIG:
xpath = self.xpath_mgtconfig()
elif root_type == Root.PANORAMA:
xpath = self.xpath_panorama()
else:
xpath = self.XPATH
return xpath
def xpath_mgtconfig(self):
return "/config/mgt-config"
def xpath_device(self):
return "/config/devices/entry[@name='localhost.localdomain']"
def xpath_vsys(self):
raise NotImplementedError
def xpath_panorama(self):
raise NotImplementedError
def _retrieve_api_key(self):
"""Return an API key for a username and password
Given a username and password, return the API key of that user for
this PAN Device. The username and password are not stored, and the
API key is returned. It is up to the caller to store it in an
instance variable if desired.
Returns:
A string containing the API key
"""
self._logger.debug(
"Getting API Key from %s for user %s" % (self.hostname, self._api_username)
)
xapi = PanDevice.XapiWrapper(
pan_device=self,
api_username=self._api_username,
api_password=self._api_password,
hostname=self.hostname,
port=self.port,
timeout=self.timeout,
)
xapi.keygen(retry_on_peer=False)
return xapi.api_key
def devices(self):
return self
[docs] def show_system_info(self):
"""Returns the data from "show system info".
Returns:
dict
"""
root = self.xapi.op(cmd="show system info", cmd_xml=True)
pconf = PanConfig(root)
system_info = pconf.python()
return system_info["response"]["result"]
[docs] def refresh_system_info(self):
"""Refresh system information variables.
Variables refreshed:
- PAN-OS version
- platform
- serial
- content version (if this is a :class:`panos.firewall.Firewall`)
- multi_vsys (if this is a :class:`panos.firewall.Firewall`)
Returns:
namedtuple: version, platform, serial
"""
# This section is commented because version api cannot be targeted
# on Panorama. When this feature is added, ok to uncomment this.
# try:
# # For PANOS >= 7.1, this is faster than the op command.
# ans = self.xapi.ad_hoc('type=version', modify_qs=True)
# except err.PanDeviceXapiError as e:
# # If this is an error other than "version" isn't supported,
# # reraise the exception.
# if str(e) != 'Illegal value for parameter "type" [version].':
# raise
#
# # Otherwise, this is PANOS < 7.1, so do the (slower) op command.
# system_info = self.show_system_info()
# else:
# # The `show_system_info()` returns way more information than
# # `refresh_system_info()` cares about, so to share the same parsing
# # code, we'll create our own dict to pass `_save_system_info()`
# # that contains the keys we care about. Doing the above
# # `xapi.ad_hoc()` returns the things we care about, both for
# # panorama and the firewall's cases, so we don't need to do any
# # extra processing or tweaking than just formatting the response.
# system_info = {'system': {}}
# for e in ans.find('./result'):
# system_info['system'][e.tag] = e.text
system_info = self.show_system_info()
# Save the system info to this object
self._save_system_info(system_info)
# Return the important fields as a namedtuple
SystemInfo = collections.namedtuple(
"SystemInfo", ["version", "platform", "serial"]
)
return SystemInfo(self.version, self.platform, self.serial)
def _save_system_info(self, system_info):
"""Save information about the PanDevice to the object itself.
This function has a few purposes:
* Save the PANOS version so that we can make versioning decisions
* Save the platform
* Save the serial number
Subclasses may super() this function to get the shared functionality,
then save anything specific to them.
Args:
system_info (dict): A dict of system info passed from the
"refresh_system_info()" function.
"""
self._set_version_and_version_info(system_info["system"]["sw-version"])
self.platform = system_info["system"]["model"]
self.serial = system_info["system"]["serial"]
def _set_version_and_version_info(self, version):
"""Sets the version and the specially formatted versioning version."""
self.version = version
if version:
# Example PAN-OS versions: 9.0.3-h1, 9.0.3.xfr
tokens = self.version.split(".")[:3]
tokens[2] = tokens[2].split("-")[0]
self._version_info = tuple(int(x) for x in tokens)
else:
# Cases where version is not known (ie : firewall pre-registered on Panorama but never connected yet)
self._version_info = version
[docs] def refresh_version(self):
"""Refresh version of PAN-OS
Version is stored in self.version and returned
returns:
str: version of PAN-OS
"""
system_info = self.refresh_system_info()
self.version = system_info[0]
return self.version
[docs] def set_hostname(self, hostname):
"""Set the device hostname
Convenience method to set the firewall or Panorama hostname
Args:
hostname (str): hostname to set (should never be None)
"""
if hostname is None:
raise ValueError("hostname should not be None")
import panos.device
self._logger.debug("Set hostname: %s" % str(hostname))
system = self.findall_or_create(panos.device.SystemSettings)[0]
if system.hostname != hostname:
system.hostname = hostname
# This handles addition and deletion
system.update("hostname")
[docs] def set_dns_servers(self, primary, secondary=None):
"""Set the device DNS Servers
Convenience method to set the firewall or Panorama dns servers
Args:
primary (str): IP address of primary DNS server
secondary (str): IP address of secondary DNS server
"""
import panos.device
self._logger.debug(
"Set dns-servers: primary:%s secondary:%s" % (primary, secondary)
)
system = self.findall_or_create(panos.device.SystemSettings)[0]
if system.dns_primary != primary:
system.dns_primary = primary
# This handles addition and deletion
system.update("dns_primary")
if system.dns_secondary != secondary:
system.dns_secondary = secondary
system.update("dns_secondary")
[docs] def set_ntp_servers(self, primary, secondary=None):
"""Set the device NTP Servers
Convenience method to set the firewall or Panorama NTP servers
Args:
primary (str): IP address of primary DNS server
secondary (str): IP address of secondary DNS server
"""
import panos.device
self._logger.debug(
"Set ntp-servers: primary:%s secondary:%s" % (primary, secondary)
)
system = self.findall_or_create(panos.device.SystemSettings)[0]
if primary is None:
ntp1 = system.findall(panos.device.NTPServerPrimary)
if ntp1:
ntp1[0].delete()
else:
ntp1 = system.findall_or_create(panos.device.NTPServerPrimary)[0]
if ntp1.address != primary:
ntp1.address = primary
ntp1.create()
if secondary is None:
ntp2 = system.findall(panos.device.NTPServerSecondary)
if ntp2:
ntp2[0].delete()
else:
ntp2 = system.findall_or_create(panos.device.NTPServerSecondary)[0]
if ntp2.address != secondary:
ntp2.address = secondary
ntp2.create()
[docs] def pending_changes(self, retry_on_peer=True):
"""Check if there are pending changes on the live device
Args:
retry_on_peer (bool): Try on active Firewall first, if connection error try on passive Firewall
Returns:
bool: True if pending changes, False if not
"""
self.xapi.op(
cmd="check pending-changes", cmd_xml=True, retry_on_peer=retry_on_peer
)
pconf = PanConfig(self.xapi.element_result)
response = pconf.python()
return response["result"]
def add_commit_lock(
self, comment=None, scope="shared", exceptions=True, retry_on_peer=True
):
self._logger.debug(
"%s: Add commit lock requested for scope %s" % (self.id, scope)
)
cmd = ET.Element("request")
subel = ET.SubElement(cmd, "commit-lock")
subel = ET.SubElement(subel, "add")
if comment is not None:
subel = ET.SubElement(subel, "comment")
subel.text = comment
try:
self.xapi.op(
ET.tostring(cmd, encoding="utf-8"),
vsys=scope,
retry_on_peer=retry_on_peer,
)
except (pan.xapi.PanXapiError, err.PanDeviceXapiError) as e:
if not re.match(r"Commit lock is already held", str(e)):
raise
else:
if exceptions:
raise err.PanLockError(str(e), pan_device=self)
else:
self._logger.debug(str(e))
return False
self.commit_locked = True
return True
def remove_commit_lock(
self, admin=None, scope="shared", exceptions=True, retry_on_peer=True
):
self._logger.debug(
"%s: Remove commit lock requested for scope %s" % (self.id, scope)
)
cmd = ET.Element("request")
subel = ET.SubElement(cmd, "commit-lock")
subel = ET.SubElement(subel, "remove")
if admin is not None:
subel = ET.SubElement(subel, "admin")
subel.text = admin
try:
self.xapi.op(
ET.tostring(cmd, encoding="utf-8"),
vsys=scope,
retry_on_peer=retry_on_peer,
)
except (pan.xapi.PanXapiError, err.PanDeviceXapiError) as e:
if not re.match(r"Commit lock is not currently held", str(e)):
raise
else:
if exceptions:
raise err.PanLockError(str(e), pan_device=self)
else:
self._logger.debug(str(e))
return False
self.commit_locked = False
return True
def add_config_lock(
self, comment=None, scope="shared", exceptions=True, retry_on_peer=True
):
self._logger.debug(
"%s: Add config lock requested for scope %s" % (self.id, scope)
)
cmd = ET.Element("request")
subel = ET.SubElement(cmd, "config-lock")
subel = ET.SubElement(subel, "add")
if comment is not None:
subel = ET.SubElement(subel, "comment")
subel.text = comment
try:
self.xapi.op(
ET.tostring(cmd, encoding="utf-8"),
vsys=scope,
retry_on_peer=retry_on_peer,
)
except (pan.xapi.PanXapiError, err.PanDeviceXapiError) as e:
if not re.match(
r"Config for scope (shared|vsys\d) is currently locked", str(e)
) and not re.match(r"You already own a config lock for scope", str(e)):
raise
else:
if exceptions:
raise err.PanLockError(str(e), pan_device=self)
else:
self._logger.debug(str(e))
return False
self.config_locked = True
return True
def remove_config_lock(self, scope="shared", exceptions=True, retry_on_peer=True):
self._logger.debug(
"%s: Remove config lock requested for scope %s" % (self.id, scope)
)
cmd = ET.Element("request")
subel = ET.SubElement(cmd, "config-lock")
subel = ET.SubElement(subel, "remove")
try:
self.xapi.op(
ET.tostring(cmd, encoding="utf-8"),
vsys=scope,
retry_on_peer=retry_on_peer,
)
except (pan.xapi.PanXapiError, err.PanDeviceXapiError) as e:
if not re.match(
r"Config is not currently locked for scope (shared|vsys\d)", str(e)
):
raise
else:
if exceptions:
raise err.PanLockError(str(e), pan_device=self)
else:
self._logger.debug(str(e))
return False
self.config_locked = False
return True
def remove_all_locks(self, scope="shared", retry_on_peer=True):
self.remove_config_lock(
scope=scope, exceptions=False, retry_on_peer=retry_on_peer
)
self.remove_commit_lock(
scope=scope, exceptions=False, retry_on_peer=retry_on_peer
)
def check_commit_locks(self, retry_on_peer=True):
self.xapi.op("show commit-locks", cmd_xml=True, retry_on_peer=retry_on_peer)
response = self.xapi.element_result.find(".//entry")
return True if response is not None else False
def check_config_locks(self, retry_on_peer=True):
self.xapi.op("show config-locks", cmd_xml=True, retry_on_peer=retry_on_peer)
response = self.xapi.element_result.find(".//entry")
return True if response is not None else False
def revert_to_running_configuration(self, retry_on_peer=True):
self._logger.debug("%s: Revert to running configuration" % self.id)
self.xapi.op(
"<load><config><from>" "running-config.xml" "</from></config></load>",
retry_on_peer=retry_on_peer,
)
def restart(self):
self._logger.debug("Requesting restart on device: %s" % (self.id,))
try:
self.xapi.op("request restart system", cmd_xml=True)
except pan.xapi.PanXapiError as e:
if not str(e).startswith("Command succeeded with no output"):
raise e
# High Availability Methods
@property
def ha_peer(self):
return self._ha_peer
[docs] def set_ha_peers(self, device):
"""Establish an HA peer relationship between two PanDevice objects
Args:
device: The HA peer device
"""
self._ha_peer = device
self.ha_peer._ha_peer = self
# If both are active or both are passive,
# set self to active and ha_peer to passive
if self._ha_active == self.ha_peer._ha_active:
self._ha_active = True
self.ha_peer._ha_active = False
[docs] def ha_pair(self):
"""List containing this firewall and its HA peer
Returns:
list: self and self.ha_peer in a list. If there is not ha_peer, then
a single item list containing only self is returned.
"""
return [fw for fw in [self, self.ha_peer] if fw is not None]
[docs] def active(self):
"""Return the active device in the HA Pair"""
if self._ha_active:
return self
else:
return self.ha_peer
[docs] def passive(self):
"""Return the passive device in the HA Pair"""
if self._ha_active:
return self.ha_peer
else:
return self
[docs] def is_active(self):
"""Return True if this device is active"""
return self._ha_active
[docs] def activate(self):
"""Make this PanDevice active and the other passive"""
self._ha_active = True
if self.ha_peer is not None:
self.ha_peer._ha_active = False
[docs] def toggle_ha_active(self):
"""Switch the active device in this HA Pair"""
if self.ha_peer is not None:
self._ha_active = not self._ha_active
self.ha_peer._ha_active = not self.ha_peer._ha_active
def update_ha_active(self):
# TODO: Implement this
raise NotImplementedError
[docs] def set_failed(self):
"""Set this PanDevice as a failed HA Peer
API calls will no longer be attempted to this device until one of
the following conditions:
1. self.ha_failed is set to False
2. self.ha_failed is set to True on the peer device
Returns:
PanDevice: The HA Peer device
"""
if self.ha_peer is None:
return None
self.ha_failed = True
if self.ha_peer is not None:
self.ha_peer.activate()
return self.ha_peer
[docs] def map_ha(self, method_name, *args, **kwargs):
"""Apply to both devices in HA Pair
Invoke a method of this class on both this instance and its HA peer
Args:
method_name: The name of the method in this class (or subclass) to invoke
*args: Arguments to pass to the method
**kwargs: Keyword arguments to pass to the method
Returns:
A tuple of the return values of invoking the method on each device. The
first item in the tuple is always from invoking the method on self, and
the second item is from invoking the method on the ha_peer. The second
item is None if there is no HA Peer.
"""
result1 = getattr(self, method_name)(*args, **kwargs)
result2 = None
if self.ha_peer is not None:
result2 = getattr(self.ha_peer, method_name)(*args, **kwargs)
return result1, result2
def show_highavailability_state(self):
from panos.panorama import Panorama
ha_state = self.op("show high-availability state")
enabled = ha_state.findtext("result/enabled")
p = self
if enabled is None or enabled == "no":
return "disabled", None
else:
if isinstance(p, Panorama):
xpath = "result/local-info/state"
else:
xpath = "result/group/local-info/state"
return ha_state.findtext(xpath), ha_state
[docs] def refresh_ha_active(self):
"""Refresh which device is active using the live device
Returns:
str: Current HA state of this device
"""
logger.debug("Refreshing active firewall in HA Pair")
if self.ha_peer is None:
return
self_state = self.show_highavailability_state()[0]
peer_state = self.ha_peer.show_highavailability_state()[0]
states = (self_state, peer_state)
if "disabled" in states:
return
elif "initial" in states:
logger.debug("HA is initializing on one or both devices, try again soon")
return "initial"
else:
for fw, state in ((self, self_state), (self.ha_peer, peer_state)):
fw._ha_active = state == "active"
return self_state
[docs] def synchronize_config(self):
"""Force configuration synchronization from this device to its HA peer"""
# TODO: Fix return value, too many types
state = self.config_sync_state()
if state is None:
return
elif state == "synchronization in progress":
# Wait until synchronization done
return self.watch_op(
"show high-availability state", "group/running-sync", "synchronized"
)
elif state != "synchronized":
logger.debug("Synchronizing configuration with HA peer")
response = self.active().op(
"request high-availability sync-to-remote running-config", "shared"
)
line = response.find("./msg/line")
if line is None:
raise err.PanDeviceError(
"Unable to syncronize configuration, no response from firewall"
)
elif line.text.startswith(
"successfully sync'd running configuration to HA peer"
):
# PAN-OS 7.0
return True
elif line.text.startswith(
"HA synchronization job has been queued on peer. "
"Please check job status on peer."
):
# PAN-OS 7.1
# Wait until synchronization done
return self.watch_op(
"show high-availability state", "group/running-sync", "synchronized"
)
else:
raise err.PanDeviceError(
"Unable to syncronize configuration: %s" % line.text
)
else:
logger.debug("Config synchronization is not required, already synchronized")
return True
[docs] def config_sync_state(self):
"""Get the current configuration synchronization state from the live device
Returns:
str: Current configuration sync state, or None if HA is not enabled
"""
# TODO: What if HA is on, but HA config sync is off?
logger.debug("Checking configuration sync state")
ha_state = self.active().op("show high-availability state")
enabled = ha_state.find("./result/enabled")
if enabled is None or enabled.text == "no":
logger.debug("HA is not enabled on firewall")
return
if enabled.text == "yes":
sync_enabled = ha_state.find("./result/group/running-sync-enabled")
if sync_enabled is None or sync_enabled.text != "yes":
logger.debug("HA config sync is not enabled on firewall")
return
else:
state = ha_state.find("./result/group/running-sync")
if state is None:
logger.debug("HA or config sync is not enabled on firewall")
return
logger.debug("Current config sync state is: %s" % state.text)
return state.text
[docs] def config_synced(self):
"""Check if configuration is synchronized between HA peers
Returns:
bool: True if synchronized, False if not
"""
state = self.config_sync_state()
if state is None:
return False
elif state != "synchronized":
return False
else:
return True
# Commit methods
[docs] def commit(
self, sync=False, exception=False, cmd=None, admins=None, sync_all=False
):
"""Trigger a commit
Args:
sync (bool): Block until the commit is finished (Default: False)
exception (bool): Create an exception on commit errors (Default: False)
cmd (str): Commit options in XML format
admins (str/list): name or list of admins whose changes need to be committed
sync_all (bool): If this is a Panorama commit, wait for firewalls jobs to finish (Default: False)
Returns:
dict: Commit results
"""
self._logger.debug("Commit initiated on device: %s" % (self.id,))
return self._commit(
sync=sync, exception=exception, cmd=cmd, admins=admins, sync_all=sync_all
)
def _commit(
self,
cmd=None,
exclude=None,
commit_all=False,
sync=False,
sync_all=False,
exception=False,
admins=None,
):
"""Internal use commit helper method.
:param exclude:
Can be:
device-and-network
policy-and-objects
:param admins:
string or list containing specific admin user(s) whose changes need to be committed
:param sync:
Synchronous commit, ie. wait for job to finish
:return:
Result of commit as dict if synchronous. JobID if asynchronous.
In either case, if no commit is needed, return None.
Most important fields in dict:
success: True or False
result: OK or FAIL
messages: list of warnings or errors
"""
action = None
# Adding in handling for the commit normalizations.
if (
cmd is not None
and hasattr(cmd, "element")
and hasattr(cmd, "commit_action")
):
action = cmd.commit_action
cmd = cmd.element()
# TODO: Support per-vsys commit
if isinstance(cmd, pan.commit.PanCommit):
cmd = cmd.cmd()
elif isinstance(cmd, ET.Element):
cmd = ET.tostring(cmd, encoding="utf-8")
elif isstring(cmd):
pass
else:
cmd = ET.Element("commit")
if exclude is not None or admins is not None:
partial = ET.SubElement(cmd, "partial")
if admins is not None:
partial_admin = ET.SubElement(partial, "admin")
admins = panos.string_or_list(admins)
for admin in admins:
admin_xml = ET.SubElement(partial_admin, "member")
admin_xml.text = admin
if exclude is not None:
excluded = ET.SubElement(partial, exclude)
cmd = ET.tostring(cmd, encoding="utf-8")
logger.debug(
self.id
+ ": commit requested: commit_all:%s sync:%s sync_all:%s cmd:%s"
% (str(commit_all), str(sync), str(sync_all), cmd,)
)
if commit_all:
action = "all"
self._logger.debug("Initiating commit")
commit_response = self.xapi.commit(
cmd=cmd,
action=action,
sync=False,
interval=self.interval,
timeout=self.timeout,
retry_on_peer=True,
)
# Set locks off
self.config_changed = []
self.config_locked = False
self.commit_locked = False
# Determine if a commit was needed and get the job id
try:
jobid = commit_response.find("./result/job").text
except AttributeError:
if exception:
raise err.PanCommitNotNeeded("Commit not needed", pan_device=self)
else:
return
if not sync:
# Don't synchronize, just return
self._logger.debug("Commit initiated (async), job id: %s" % (jobid,))
return jobid
else:
result = self.syncjob(commit_response, sync_all=sync_all)
if exception and not result["success"]:
self._logger.debug(
"Commit failed - device: %s, job: %s, messages: %s, warnings: %s"
% (self.id, result["jobid"], result["messages"], result["warnings"])
)
raise err.PanCommitFailed(pan_device=self, result=result)
else:
if result["success"]:
self._logger.debug(
"Commit succeeded - device: %s, job: %s, messages: %s, warnings: %s"
% (
self.id,
result["jobid"],
result["messages"],
result["warnings"],
)
)
else:
self._logger.debug(
"Commit failed - device: %s, job: %s, messages: %s, warnings: %s"
% (
self.id,
result["jobid"],
result["messages"],
result["warnings"],
)
)
return result
[docs] def syncjob(self, job_id, sync_all=False, interval=0.5):
"""Block until job completes and return result
Args:
job_id (int): job ID, or response XML from job creation
sync_all (bool): Wait for all devices to complete if commit all operation
interval (float): Interval in seconds to check if job is complete
Returns:
dict: Job result
"""
try:
import http.client as httplib
except ImportError:
import httplib
if interval is not None:
try:
interval = float(interval)
if interval < 0:
raise ValueError
except ValueError:
raise err.PanDeviceError("Invalid interval: %s" % interval)
try:
job = job_id.find("./result/job")
if job is None:
return False
job = job.text
except AttributeError:
job = job_id
cmd = 'show jobs id "%s"' % job
start_time = time.time()
self._logger.debug("Waiting for job to finish...")
attempts = 0
while True:
try:
attempts += 1
job_xml = self.xapi.op(cmd=cmd, cmd_xml=True, retry_on_peer=True)
except (pan.xapi.PanXapiError, err.PanDeviceError) as e:
# Connection errors (URLError) are ok, this can happen in PAN-OS 7.0.1 and 7.0.2
# if the hostname is changed
# Invalid cred errors are ok because FW auth system takes longer to start up in these cases
# Other errors should be raised
if not str(e).startswith("URLError:") and not str(e).startswith(
"Invalid credentials."
):
# Error not related to connection issue. Raise it.
raise e
else:
# self._logger.debug2("Sleep %.2f seconds" % interval)
time.sleep(interval)
continue
except httplib.BadStatusLine as e:
# Connection issue. The firewall is currently restarting the API service or rebooting
# self._logger.debug2("Sleep %.2f seconds" % interval)
time.sleep(interval)
continue
status = job_xml.find("./result/job/status")
if status is None:
raise pan.xapi.PanXapiError(
"No status element in " + "'%s' response" % cmd
)
if status.text == "FIN" and sync_all:
# Check the status of each device commit
device_commits_finished = True
device_results = job_xml.findall("./result/job/devices/entry/result")
for device_result in device_results:
if device_result.text == "PEND":
device_commits_finished = False
break # One device isn't finished, so stop checking others
if device_results and device_commits_finished:
return self._parse_job_results(job_xml, get_devices=True)
elif not device_results:
return self._parse_job_results(job_xml, get_devices=False)
elif status.text == "FIN":
# Job completed, parse the results
return self._parse_job_results(job_xml, get_devices=False)
logger.debug("Job %s status %s" % (job, status.text))
if (
self.timeout is not None
and self.timeout != 0
and time.time() > start_time + self.timeout
):
raise pan.xapi.PanXapiError(
"Timeout waiting for " + "job %s completion" % job
)
# self._logger.debug2("Sleep %.2f seconds" % interval)
time.sleep(interval)
[docs] def syncreboot(self, interval=5.0, timeout=600):
"""Block until reboot completes and return version of device"""
try:
import http.client as httplib
except ImportError:
import httplib
# Validate interval and convert it to float
if interval is not None:
try:
interval = float(interval)
if interval < 0:
raise ValueError
except ValueError:
raise err.PanDeviceError("Invalid interval: %s" % interval)
self._logger.debug("Syncing reboot...")
# Record start time to gauge timeout
start_time = time.time()
attempts = 0
is_rebooting = False
time.sleep(interval)
while True:
try:
# Try to get the device version (ie. test to see if firewall is up)
attempts += 1
version = self.refresh_version()
except (pan.xapi.PanXapiError, err.PanDeviceXapiError) as e:
# Connection errors (URLError) are ok
# Invalid cred errors are ok because FW auth system takes longer to start up
# Other errors should be raised
if not str(e).startswith("URLError:") and not str(e).startswith(
"Invalid credentials."
):
# Error not related to connection issue. Raise it.
raise e
else:
# Connection issue. The firewall is currently rebooting.
is_rebooting = True
self._logger.debug("Connection attempted: %s" % str(e))
self._logger.debug(
"Device is not available yet. Connection attempts: %s"
% str(attempts)
)
except httplib.BadStatusLine as e:
# Connection issue. The firewall is currently rebooting.
is_rebooting = True
self._logger.debug("Connection attempted: %s" % str(e))
self._logger.debug(
"Device is not available yet. Connection attempts: %s"
% str(attempts)
)
else:
# No exception... connection succeeded and device is up!
# This could mean reboot hasn't started yet, so check that we had
# a connection error prior to this success.
if is_rebooting:
self._logger.debug("Device is up! Running version %s" % version)
return version
else:
self._logger.debug(
"Device is up, but it probably hasn't started rebooting yet."
)
# Check to see if we hit timeout
if (
self.timeout is not None
and self.timeout != 0
and time.time() > start_time + self.timeout
):
raise err.PanDeviceError("Timeout waiting for device to reboot")
# Sleep and try again
self._logger.debug("Sleep %.2f seconds" % interval)
time.sleep(interval)
def _parse_job_results(self, show_job_xml, get_devices=True):
# Parse the final results
pconf = PanConfig(show_job_xml)
job_response = pconf.python()
try:
job = job_response["response"]["result"]["job"]
except KeyError:
raise err.PanDeviceError("Can't get job results, error parsing results xml")
devices_results = {}
devices_success = True
# Determine if this was a commit all job
devices = show_job_xml.findall("./result/job/devices/entry")
if devices and get_devices:
devices = job["devices"]["entry"]
for device in devices:
dev_success = True if device["result"] == "OK" else False
if not dev_success:
devices_success = False
devices_results[device["serial-no"]] = {
"success": dev_success,
"serial": device["serial-no"],
"name": device["devicename"],
"result": device["result"],
"starttime": device["tstart"],
"endtime": device["tfin"],
}
# Errors and warnings might not have a full structure. If it is just a string, then
# a TypeError will be produced, so in that case, just grab the string.
try:
devices_results[device["serial-no"]]["warnings"] = device[
"details"
]["msg"]["warnings"]["line"]
except (TypeError, KeyError) as e:
try:
devices_results[device["serial-no"]]["warnings"] = device[
"details"
]["msg"]["warnings"]
except (TypeError, KeyError) as e:
devices_results[device["serial-no"]]["warnings"] = ""
except (TypeError, KeyError) as e:
devices_results[device["serial-no"]]["warnings"] = ""
try:
devices_results[device["serial-no"]]["messages"] = device[
"details"
]["msg"]["errors"]["line"]
except (TypeError, KeyError) as e:
devices_results[device["serial-no"]]["messages"] = device["details"]
success = True if job["result"] == "OK" and devices_success else False
if get_devices:
messages = []
else:
try:
messages = job["details"]["line"]
except KeyError:
messages = []
if isstring(messages):
messages = string_or_list(messages)
# Create the results dict
result = {
"success": success,
"result": job["result"],
"jobid": job["id"],
"user": job["user"],
"warnings": job["warnings"] if "warnings" in job else None,
"starttime": job["tenq"],
"endtime": job["tfin"],
"messages": messages,
"devices": devices_results,
"xml": show_job_xml,
}
return result
[docs] def watch_op(self, cmd, path, value, vsys=None, cmd_xml=True, interval=1.0):
"""Watch an operational command for an expected value
Blocks script execution until the value exists or timeout expires
Args:
cmd (str): Operational command to run
path (str): XPath to the value to watch
value (str): The value expected before method completes
vsys (str): Vsys id for the operational command
cmd_xml (bool): True: cmd is not XML, False: cmd is XML (Default: True)
interval (float): Interval in seconds to check if the value exists
"""
if interval is not None:
try:
interval = float(interval)
if interval < 0:
raise ValueError
except ValueError:
raise err.PanDeviceError("Invalid interval: %s" % interval)
if vsys is None:
vsys = self.vsys
self._logger.debug("Waiting for value %s..." % value)
start_time = time.time()
attempts = 0
while True:
attempts += 1
xml = self.xapi.op(cmd=cmd, cmd_xml=cmd_xml)
status = xml.find("./result/%s" % path)
if status is None:
raise err.PanNoSuchNode("No element at path")
current_value = status.text
logger.debug("Current value %s" % current_value)
if current_value == value:
return True
if (
self.timeout is not None
and self.timeout != 0
and time.time() > start_time + self.timeout
):
raise err.PanJobTimeout("Timeout waiting for value: %s" % value)
logger.debug("Sleep %.2f seconds" % interval)
time.sleep(interval)
[docs] def nearest_pandevice(self):
"""The nearest :class:`panos.base.PanDevice` object.
This method is used to determine the device to apply this object to.
Returns:
PanDevice: The PanDevice object closest to this object in
the configuration tree.
Raises:
PanDeviceNotSet: There is no PanDevice object in the tree.
"""
if self.parent is not None:
return self.parent._nearest_pandevice()
else:
return self._nearest_pandevice()
def _nearest_pandevice(self):
return self
def _format_result_as_license_list(self, result):
"""Formats the ElementTree as a list of License namedtuples."""
ans = []
License = collections.namedtuple(
"License",
[
"feature",
"description",
"serial",
"issued",
"expires",
"expired",
"authcode",
],
)
def _parse_license_date(value):
"""Turns a string into a datetime.date object.
If the value is "Never", this function returns None.
If the value can't be parsed, then the string itself is returned.
"""
if value is None or value.text is None or value.text == "Never":
return None
date_format = "%B %d, %Y"
months = {
"January": 1,
"February": 2,
"March": 3,
"April": 4,
"May": 5,
"June": 6,
"July": 7,
"August": 8,
"September": 9,
"October": 10,
"November": 11,
"December": 12,
}
tokens = value.text.split()
try:
return datetime.date(
int(tokens[2]), months[tokens[0]], int(tokens[1][:-1])
)
except (ValueError, KeyError, IndexError):
return value.text
for x in result.findall("./result/licenses/entry"):
ans.append(
License(
x.find("./feature").text,
x.find("./description").text,
x.find("./serial").text,
_parse_license_date(x.find("./issued")),
_parse_license_date(x.find("./expires")),
x.find("./expired").text == "yes",
x.find("./authcode").text,
)
)
return ans
[docs] def request_license_info(self):
"""Returns the licenses currently installed on this device.
**Touches the live device**
Note: For namedtuple objects, you can access the variables via
its index like a normal tuple or via name like a class.
Returns:
list: A list of namedtuples of the licenses with the following attributes:
- feature (str): the feature name
- description (str): description
- serial (str): the license's serial number
- issued (datetime.date/None): issue date
- expires (datetime.date/None): expiration date, or None if the license does not expire
- expired (bool): True if the license is currently expired
- authcode (str/None): license's authcode
"""
result = self.op("request license info")
return self._format_result_as_license_list(result)
[docs] def fetch_licenses_from_license_server(self):
"""Fetches licenses from the license server.
**Modifies the live device**
Note: For namedtuple objects, you can access the variables via
its index like a normal tuple or via name like a class.
Returns:
list: A list of namedtuples of the licenses with the following attributes:
- feature (str): the feature name
- description (str): description
- serial (str): the license's serial number
- issued (datetime.date/None): issue date
- expires (datetime.date/None): expiration date, or None if the license does not expire
- expired (bool): True if the license is currently expired
- authcode (str/None): license's authcode
"""
result = self.op("request license fetch")
return self._format_result_as_license_list(result)
[docs] def activate_feature_using_authorization_code(self, code):
"""Updates a license using the given auth code.
**Modifies the live device**
Args:
code (str): The authorization code.
Raises:
PanActivateFeatureAuthCodeError
"""
try:
result = self.op('request license fetch auth-code "{0}"'.format(code))
except pan.xapi.PanXapiError as e:
"""
pan-python can handle both XML responses & plaintext responses from
a PAN-OS, and it makes this determination based on headers that are
sent back from any given action. Raw XML text returned is stored
in pan.xapi.PanXapi.xml_document, and the raw plain text is stored
in pan.xapi.PanXapi.text_document.
When it comes to licensing, it's been observed that PAN-OS can
send back a response with Content-Type: application/xml, but the
content isn't actually XML, it's plain text. When this happens,
pan-python wraps the xml.etree.ElementTree error and returns
a PanXapiError instead that mentions the parsing problem.
So, check the not-actually-XML response sent back to see if the
licensing operation was actually successful.
"""
err_msg = "{0}".format(e)
if err_msg.startswith("ElementTree.fromstring ParseError:"):
acceptable_errors = (
"VM Device License installed. Restarting pan services.",
)
for msg in acceptable_errors:
if msg in self.xapi.xml_document:
return
raise pan.xapi.PanXapiError(
"{0} | xml_document={1}".format(err_msg, self.xapi.xml_document)
)
else:
raise
if result.attrib.get("status") != "success":
raise err.PanActivateFeatureAuthCodeError(
result.get("./msg/line").text, pan_device=self
)
[docs] def request_password_hash(self, value):
"""Request a password hash from the live device.
This function does not modify the live device, but it does
interact with the live device to generate the password hash.
Args:
value (str): The password
Returns:
str: A hashed version of the password provided.
Raises:
ValueError: If the password hash is not found.
"""
result = self.op('request password-hash password "{0}"'.format(value))
elm = result.find("./result/phash")
if elm is None:
raise ValueError("No password hash in response")
return elm.text
[docs] def test_security_policy_match(
self,
source,
destination,
protocol,
application=None,
category=None,
port=None,
user=None,
from_zone=None,
to_zone=None,
show_all=False,
):
"""Test security policy match using the given criteria.
This function will always return a list for its results. If `show_all`
is set to False, then the list will only have one entry in it. The
keys in each dict are as follows:
* name (str): rule's name
* index (int): the index of the security rule
* action (str): the security rule's action
Args:
source (str): Source IP address.
destination (str): Destination IP address.
protocol (int): IP protocol value (1-255).
application (str): Application name.
category (str): Category name.
port (int): Destination port.
user (str): Source user.
from_zone (str): Source zone name.
to_zone (str): Destination zone name.
show_all (bool): Show all potential match rules until first allow.
Returns:
List of dicts
"""
extras = (
("application", application),
("category", category),
("destination-port", port),
("source-user", user),
("from", from_zone),
("to", to_zone),
("show-all", show_all),
)
# Build up the XML document.
root = ET.Element("test")
elm = ET.SubElement(root, "security-policy-match")
# Add in required params.
ET.SubElement(elm, "source").text = source
ET.SubElement(elm, "destination").text = destination
ET.SubElement(elm, "protocol").text = str(int(protocol))
# Add in the optional params.
for desc, val in extras:
if val is None:
continue
if desc == "destination-port":
ET.SubElement(elm, desc).text = str(int(val))
elif desc == "show-all":
ET.SubElement(elm, desc).text = "yes" if val else "no"
else:
ET.SubElement(elm, desc).text = val
# Run the test operation.
res = self.op(ET.tostring(root, encoding="utf-8"), cmd_xml=False)
# Build up the answer.
#
# Side note here: the XML document returned here does not follow the
# rules of the API, so we can't use the SecurityRule module to parse
# the results. For this reason, we won't parse everything, just
# name, index, and action.
ans = []
for elm in res.findall("./result/rules/entry"):
if "name" in elm.attrib:
val = {
"name": elm.attrib["name"],
}
e = elm.find("./index")
val["index"] = 0 if e is None else int(e.text)
e = elm.find("./action")
val["action"] = "" if e is None else e.text
ans.append(val)
else:
tokens = elm.text.split(";")
if len(tokens) == 2 and tokens[1].startswith(" index: "):
ans.append(
{"name": tokens[0], "index": int(tokens[1].split(":")[1])}
)
else:
raise err.PanDeviceError(
"Not sure how to parse response: {0}".format(elm.text)
)
# Done.
return ans
[docs] def clock(self):
"""Gets the current time on PAN-OS.
Returns:
datetime.datetime
"""
ans = self.op("<show><clock/></show>", cmd_xml=False)
res = ans.find("./result")
if res is None:
return None
fmt = "%a %b %d %H:%M:%S %Z %Y"
text = res.text.strip()
return datetime.datetime.strptime(text, fmt)
[docs] def plugins(self):
"""Returns plugin information.
Each dict in the list returned has the following keys:
* name
* version
* release_date
* release_note_url
* package_file
* size
* platform
* installed
* downloaded
Returns:
list of dicts
"""
# Older versions of PAN-OS do not have this command, so if we get an
# exception, just return None.
try:
res = self.op("<show><plugins><packages/></plugins></show>", cmd_xml=False)
except err.PanDeviceError:
return None
ans = []
for o in res.findall("./result/plugins/entry"):
ans.append(
{
"name": o.find("./name").text,
"version": o.find("./version").text,
"release_date": o.find("./release-date").text,
"release_note_url": (
o.find("./release-note-url").text or ""
).strip(),
"package_file": o.find("./pkg-file").text,
"size": o.find("./size").text,
"platform": o.find("./platform").text,
"installed": o.find("./installed").text,
"downloaded": o.find("./downloaded").text,
}
)
return ans
[docs] def whoami(self):
"""Returns which user you're currently authenticated as.
NOTE: PAN-OS 10.0+
Returns:
string
"""
res = self.op("<show><admins/></show>", cmd_xml=False)
for o in res.findall("./result/admins/entry"):
name = None
is_self = False
for child in o:
if child.tag == "admin":
name = child.text
elif child.tag == "self":
is_self = True
if name is not None and is_self:
return name