#!/usr/bin/env python
# Copyright (c) 2015, Palo Alto Networks
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""High availability objects to configure HA for a firewall or Panorama"""
import inspect
import logging
import xml.etree.ElementTree as ET
import pan.xapi
import panos.errors as err
from panos import firewall, getlogger, isstring, network
from panos.base import ENTRY, MEMBER, PanDevice, PanObject, Root
from panos.base import VarPath as Var
from panos.base import VersionedPanObject, VersionedParamPath
logger = getlogger(__name__)
[docs]class HighAvailabilityInterface(PanObject):
"""Base class for high availability interface classes
Do not instantiate this class. Use its subclasses.
"""
NAME = None
HA_SYNC = False
# TODO: Support encryption
def __init__(self, *args, **kwargs):
# Store the 'port' variable
# This is necessary because 'port' is a property
# so that self.old_port can work correctly
# XXX: better to remove the need for old_port in a future version
try:
args = list(args)
port = args.pop(2)
except IndexError:
port = kwargs.pop("port", None)
if type(self) == HighAvailabilityInterface:
raise AssertionError(
"Do not instantiate a HighAvailabilityInterface. Please use a subclass."
)
super(HighAvailabilityInterface, self).__init__(*args, **kwargs)
self._port = port
# This is used by setup_interface method to remove old interfaces
self.old_port = None
@property
def port(self):
return self._port
@port.setter
def port(self, value):
if hasattr(self, "_port"):
if value != self._port:
self.old_port = self._port
self._port = value
[docs] @classmethod
def variables(cls):
return (
Var("ip-address"),
Var("netmask"),
Var("port"),
Var("gateway"),
Var("link-speed"),
Var("link-duplex"),
)
[docs] def setup_interface(self):
"""Setup the data interface as an HA interface
Use this method to automatically convert the data interface
to 'ha' mode. This must be done *before* this HA interface
is created on the firewall.
"""
pandevice = self.nearest_pandevice()
if pandevice is None:
return None
if isstring(self.port):
intname = self.port
else:
intname = str(self.port)
intconfig_needed = False
inttype = None
if intname.startswith("ethernet"):
intprefix = "ethernet"
inttype = network.EthernetInterface
intconfig_needed = True
elif intname.startswith("ae"):
intprefix = "ae"
inttype = network.AggregateInterface
intconfig_needed = True
elif intname.startswith("management"):
self.link_speed = None
self.link_duplex = None
if intconfig_needed:
apply_needed = False
interface = panos.find(
intname, (network.EthernetInterface, network.AggregateInterface)
)
if interface is None:
interface = panos.add(inttype(name=intname, mode="ha"))
apply_needed = True
elif interface.mode != "ha":
interface.mode = "ha"
apply_needed = True
if inttype == network.EthernetInterface:
if self.link_speed is not None:
# Transfer the link_speed to the eth interface
if interface.link_speed != self.link_speed:
interface.link_speed = self.link_speed
apply_needed = True
if self.link_duplex is not None:
# Transfer the link_duplex to the eth interface
if interface.link_duplex != self.link_duplex:
interface.link_duplex = self.link_duplex
apply_needed = True
self.link_speed = None
self.link_duplex = None
if apply_needed:
interface.apply()
return interface
[docs] def delete_old_interface(self):
"""Delete the data interface previously used by this HA interface
Use this if the 'port' of an HA interface was changed and the old
interface needs to be cleaned up.
"""
if self.old_port is not None:
self.delete_interface(self.old_port)
self.old_port = None
[docs] def delete_interface(self, interface=None, pan_device=None):
"""Delete the data interface used by this HA interface
Args:
interface (HighAvailabilityInterface): The HA interface (HA1, HA2, etc)
pan_device (PanDevice): The PanDevice object to apply the change
"""
if pan_device is None:
pan_device = self.nearest_pandevice()
if pan_device is None:
return None
port = interface if interface is not None else self.port
if isstring(port):
intname = port
else:
intname = str(port)
if intname.startswith("ethernet"):
interface = pan_device.find(intname, network.EthernetInterface)
if interface is None:
# Already deleted
return
elif interface.mode == "ha":
interface.delete()
elif intname.startswith("ae"):
interface = pan_device.find(intname, network.AggregateInterface)
if interface is None:
# Already deleted
return
elif interface.mode == "ha":
interface.mode = "tap"
interface.apply()
[docs]class HA1(HighAvailabilityInterface):
"""HA1 interface
Args:
ip_address (str): IP of the interface
netmask (str): Netmask of the interface
port (str): Interface to use for this HA interface (eg. ethernet1/5)
gateway (str): Default gateway of the interface
link_speed (str): Link speed
link_duplex (str): Link duplex
monitor_hold_time (int): Monitor hold time
"""
# TODO: Encryption
XPATH = "/interface/ha1"
[docs] @classmethod
def variables(cls):
return super(HA1, HA1).variables() + (Var("monitor-hold-time", vartype="int"),)
[docs]class HA1Backup(HighAvailabilityInterface):
"""HA1 Backup interface
Args:
ip_address (str): IP of the interface
netmask (str): Netmask of the interface
port (str): Interface to use for this HA interface (eg. ethernet1/5)
gateway (str): Default gateway of the interface
link_speed (str): Link speed
link_duplex (str): Link duplex
"""
XPATH = "/interface/ha1-backup"
[docs]class HA2(HighAvailabilityInterface):
"""HA2 interface
Args:
ip_address (str): IP of the interface
netmask (str): Netmask of the interface
port (str): Interface to use for this HA interface (eg. ethernet1/5)
gateway (str): Default gateway of the interface
link_speed (str): Link speed
link_duplex (str): Link duplex
"""
XPATH = "/interface/ha2"
[docs]class HA2Backup(HighAvailabilityInterface):
"""HA2 Backup interface
Args:
ip_address (str): IP of the interface
netmask (str): Netmask of the interface
port (str): Interface to use for this HA interface (eg. ethernet1/5)
gateway (str): Default gateway of the interface
link_speed (str): Link speed
link_duplex (str): Link duplex
"""
XPATH = "/interface/ha2-backup"
[docs]class HA3(HighAvailabilityInterface):
"""HA3 interface
Args:
port (str): Interface to use for this HA interface (eg. ethernet1/5)
link_speed (str): Link speed
link_duplex (str): Link duplex
"""
XPATH = "/interface/ha3"
[docs] @classmethod
def variables(cls):
return (
Var("port"),
Var("link_speed"),
Var("link_duplex"),
)
[docs]class HighAvailability(VersionedPanObject):
"""High availability configuration base object
All high availability configuration is in this object or is a child of this object
Args:
name: (unused, and may be omitted)
enabled (bool): Enable HA (Default: True)
group_id (int): The group identifier
description (str): Description for HA pairing
config_sync (bool): Enabled configuration synchronization (Default: True)
peer_ip (str): HA Peer's HA1 IP address
mode (str): Mode of HA: 'active-passive' or 'active-active' (Default: 'active-passive')
passive_link_state (str): Passive link state
state_sync (bool): Enabled state synchronization (Default: False)
ha2_keepalive (bool): Enable HA2 keepalives
ha2_keepalive_action (str): HA2 keepalive action
ha2_keepalive_threshold (int): HA2 keepalive threshold
peer_ip_backup (str): HA Peer's HA1 backup IP address
device_id (int): HA3 device id (0 or 1)
session_owner_selection (str): active-active session owner mode
session_setup (str): active-active session setup mode
tentative_hold_time (int): active-active tentative hold timer
sync_qos (bool): active-active network sync qos
sync_virtual_router (bool): active-active network sync virtual router
ip_hash_key (str): active-active hash key used by ip-hash algorithm
"""
ROOT = Root.DEVICE
SUFFIX = None
HA_SYNC = False
CHILDTYPES = (
"ha.HA1",
"ha.HA1Backup",
"ha.HA2",
"ha.HA2Backup",
"ha.HA3",
)
ACTIVE_PASSIVE = "active-passive"
ACTIVE_ACTIVE = "active-active"
def _setup(self):
# xpaths
self._xpaths.add_profile(value="/deviceconfig/high-availability")
self._xpaths.add_profile(
value="{0}/deviceconfig/high-availability".format(
self._TEMPLATE_DEVICE_XPATH
),
parents=("Template", "TemplateStack"),
)
# params
params = []
params.append(
VersionedParamPath("enabled", default=True, vartype="yesno", path="enabled")
)
params.append(
VersionedParamPath("group_id", default=1, vartype="entry", path="group")
)
params[-1].add_profile("8.1.0", vartype="int", path="group/group-id")
params.append(
VersionedParamPath("description", path="group/entry group_id/description")
)
params[-1].add_profile("8.1.0", path="group/description")
params.append(
VersionedParamPath(
"config_sync",
vartype="yesno",
path="group/entry group_id/configuration-synchronization/enabled",
)
)
params[-1].add_profile(
"8.1.0", vartype="yesno", path="group/configuration-synchronization/enabled"
)
params.append(
VersionedParamPath("peer_ip", path="group/entry group_id/peer-ip")
)
params[-1].add_profile("8.1.0", path="group/peer-ip")
params.append(
VersionedParamPath(
"mode",
default="active-passive",
values=("active-passive", "active-active"),
path="group/entry group_id/mode/{mode}",
)
)
params[-1].add_profile(
"8.1.0",
values=("active-passive", "active-active"),
path="group/mode/{mode}",
)
params.append(
VersionedParamPath(
"passive_link_state",
condition={"mode": "active-passive"},
path="group/entry group_id/mode/{mode}/passive-link-state",
)
)
params[-1].add_profile(
"8.1.0",
condition={"mode": "active-passive"},
path="group/mode/{mode}/passive-link-state",
)
params.append(
VersionedParamPath(
"state_sync",
vartype="yesno",
default=False,
path="group/entry group_id/state-synchronization/enabled",
)
)
params[-1].add_profile(
"8.1.0", vartype="yesno", path="group/state-synchronization/enabled"
)
params.append(
VersionedParamPath(
"ha2_keepalive",
vartype="yesno",
path="group/entry group_id/state-synchronization/ha2-keep-alive/enabled",
)
)
params[-1].add_profile(
"8.1.0",
vartype="yesno",
path="group/state-synchronization/ha2-keep-alive/enabled",
)
params.append(
VersionedParamPath(
"ha2_keepalive_action",
values=("log-only", "split-datapath"),
path="group/entry group_id/state-synchronization/ha2-keep-alive/action",
)
)
params[-1].add_profile(
"8.1.0",
values=("log-only", "split-datapath"),
path="group/state-synchronization/ha2-keep-alive/action",
)
params.append(
VersionedParamPath(
"ha2_keepalive_threshold",
vartype="int",
path="group/entry group_id/state-synchronization/ha2-keep-alive/threshold",
)
)
params[-1].add_profile(
"8.1.0",
vartype="int",
path="group/state-synchronization/ha2-keep-alive/threshold",
)
params.append(
VersionedParamPath(
"peer_ip_backup", path="group/entry group_id/peer-ip-backup"
)
)
params[-1].add_profile("8.1.0", path="group/peer-ip-backup")
params.append(
VersionedParamPath(
"device_id",
condition={"mode": "active-active"},
values=(0, 1),
vartype="int",
path="group/entry group_id/mode/{mode}/device-id",
)
)
params[-1].add_profile(
"8.1.0",
condition={"mode": "active-active"},
values=(0, 1),
vartype="int",
path="group/mode/{mode}/device-id",
)
params.append(
VersionedParamPath(
"session_owner_selection",
condition={
"mode": "active-active",
"session_owner_selection": "primary-device",
},
values=("primary-device", "first-packet"),
path="group/entry group_id/mode/{mode}/session-owner-selection/{session_owner_selection}",
)
)
params[-1].add_profile(
"8.1.0",
condition={
"mode": "active-active",
"session_owner_selection": "primary-device",
},
values=("primary-device", "first-packet"),
path="group/mode/{mode}/session-owner-selection/{session_owner_selection}",
)
params.append(
VersionedParamPath(
"session_setup",
condition={
"mode": "active-active",
"session_owner_selection": "first-packet",
},
values=("first-packet", "ip-modulo", "ip-hash", "primary-device"),
path="group/entry group_id/mode/{mode}/session-owner-selection/first-packet/session-setup/{session_setup}",
)
)
params[-1].add_profile(
"8.1.0",
condition={
"mode": "active-active",
"session_owner_selection": "first-packet",
},
values=("first-packet", "ip-modulo", "ip-hash", "primary-device"),
path="group/mode/{mode}/session-owner-selection/first-packet/session-setup/{session_setup}",
)
params.append(
VersionedParamPath(
"tentative_hold_time",
condition={"mode": "active-active"},
vartype="int",
path="group/entry group_id/mode/{mode}/tentative-hold-time",
)
)
params[-1].add_profile(
"8.1.0",
condition={"mode": "active-active"},
vartype="int",
path="group/mode/{mode}/tentative-hold-time",
)
params.append(
VersionedParamPath(
"sync_qos",
condition={"mode": "active-active"},
vartype="yesno",
path="group/entry group_id/mode/{mode}/network-configuration/sync/qos",
)
)
params[-1].add_profile(
"8.1.0",
condition={"mode": "active-active"},
vartype="yesno",
path="group/mode/{mode}/network-configuration/sync/qos",
)
params.append(
VersionedParamPath(
"sync_virtual_router",
condition={"mode": "active-active"},
vartype="yesno",
path="group/entry group_id/mode/{mode}/network-configuration/sync/virtual-router",
)
)
params[-1].add_profile(
"8.1.0",
condition={"mode": "active-active"},
vartype="yesno",
path="group/mode/{mode}/network-configuration/sync/virtual-router",
)
params.append(
VersionedParamPath(
"ip_hash_key",
condition={
"mode": "active-active",
"session_owner_selection": "first-packet",
"session_setup": "ip-hash",
},
values=("source", "source-and-destination"),
path="group/entry group_id/mode/{mode}/session-owner-selection/first-packet/session-setup/{session_setup}/hash-key",
)
)
params[-1].add_profile(
"8.1.0",
condition={
"mode": "active-active",
"session_owner_selection": "first-packet",
"session_setup": "ip-hash",
},
values=("source", "source-and-destination"),
path="group/mode/{mode}/session-owner-selection/first-packet/session-setup/{session_setup}/hash-key",
)
self._params = tuple(params)
# stubs
self._stubs.add_profile(
"0.0.0", "interface/ha1",
)