#!/usr/bin/env python
# Copyright (c) 2015, Palo Alto Networks
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""User-ID and Dynamic Address Group updates using the User-ID API"""
import xml.etree.ElementTree as ET
from copy import deepcopy
from pan.xapi import PanXapiError
import panos.errors as err
from panos import getlogger, string_or_list, string_or_list_or_none
from panos.updater import PanOSVersion
logger = getlogger(__name__)
[docs]class UserId(object):
"""User-ID Subsystem of Firewall
A member of a firewall.Firewall object that has special methods for
interacting with the User-ID API. This includes login/logout of a user,
user/group mappings, and dynamic address group tags.
This class is typically not instantiated by anything but the
base.PanDevice class itself. There is an instance of this UserId class
inside every instantiated base.PanDevice class.
**Support:** UserId API is supported on Panorama starting with Panorama 8.0
UserId API is supported on all firewall PAN-OS versions but with varying
features as noted in the documentation for each method.
Args:
device (base.PanDevice): The firewall or Panorama this user-id subsystem leverages
prefix (str): Prefix to use in all IP tag operations for Dynamic Address Groups
ignore_dup_errors (bool): Devices produce errors when a tag is registered that already
exists. Set to true to ignore these errors. (Default: True)
"""
def __init__(self, device, prefix="", ignore_dup_errors=True):
# Create a class logger
self._logger = getlogger(__name__ + "." + self.__class__.__name__)
self.device = device
self.prefix = prefix
self.ignore_dup_errors = ignore_dup_errors
# Build the initial uid-message
self._uidmessage = ET.fromstring(
"<uid-message>"
+ "<version>1.0</version>"
+ "<type>update</type>"
+ "<payload/>"
+ "</uid-message>"
)
# Batch state
self._batch = False
self._batch_uidmessage = deepcopy(self._uidmessage)
def _create_uidmessage(self):
if self._batch:
payload = self._batch_uidmessage.find("payload")
return self._batch_uidmessage, payload
else:
root = deepcopy(self._uidmessage)
payload = root.find("payload")
return root, payload
[docs] def batch_start(self):
"""Start creating an API call
The API call will not be sent to the firewall until batch_end() is
called. This allows multiple operations to be added to a single API
call.
"""
self._batch = True
self._batch_uidmessage = deepcopy(self._uidmessage)
[docs] def batch_end(self):
"""End a batched API call and send it to the firewall
This method usually follows a batch_start() and several other
operations.
The API call will not be sent to the firewall until batch_end() is
called. This allows multiple operations to be added to a single API
call.
"""
uid_message, payload = self._create_uidmessage()
self._batch = False
# Only send the API call if there was actually a command added to the payload
if len(payload) > 0:
self.send(uid_message)
self._batch_uidmessage = deepcopy(self._uidmessage)
[docs] def send(self, uidmessage):
"""Send a uidmessage to the User-ID API of a firewall
Used for adhoc User-ID API calls that are not supported by other
methods in this class. This method cannot be batched.
Args:
uidmessage (str): The UID Message in XML to send to the firewall
"""
if self._batch:
return
else:
cmd = ET.tostring(uidmessage)
try:
self.device.xapi.user_id(cmd=cmd, vsys=self.device.vsys)
except (err.PanDeviceXapiError, PanXapiError) as e:
# Check if this is just an error about duplicates or nonexistant tags
# If so, ignore the error. Most operations don't care about this.
message = str(e)
if self.ignore_dup_errors and (
message.endswith("already exists, ignore")
or message.endswith("does not exist, ignore unreg")
):
return
else:
raise e
[docs] def login(self, user, ip, timeout=None):
"""Login a single user
Maps a user to an IP address
This method can be batched with batch_start() and batch_end().
Args:
user (str): a username
ip (str): an ip address
timeout (int): timeout in minutes to remove this mapping
"""
root, payload = self._create_uidmessage()
login = payload.find("login")
if login is None:
login = ET.SubElement(payload, "login")
entry = ET.SubElement(login, "entry", {"name": user, "ip": ip})
if timeout:
entry.set("timeout", str(timeout))
self.send(root)
[docs] def logins(self, users):
"""Login multiple users in the same API call
This method can be batched with batch_start() and batch_end().
Args:
users: a list of sets of user/ip mappings with optional timeout in minutes
eg. [('user1', '10.0.1.1'), ('user2', '10.0.1.2', 60)]
"""
if not users:
return
root, payload = self._create_uidmessage()
login = payload.find("login")
if login is None:
login = ET.SubElement(payload, "login")
for user in users:
entry = ET.SubElement(login, "entry", {"name": user[0], "ip": user[1]})
try:
entry.set("timeout", str(user[2]))
except IndexError:
# No timeout specified
pass
self.send(root)
[docs] def logout(self, user, ip):
"""Logout a single user
Removes a mapping of a user to an IP address
This method can be batched with batch_start() and batch_end().
Args:
user (str): a username
ip (str): an ip address
"""
root, payload = self._create_uidmessage()
logout = payload.find("logout")
if logout is None:
logout = ET.SubElement(payload, "logout")
ET.SubElement(logout, "entry", {"name": user, "ip": ip})
self.send(root)
[docs] def logouts(self, users):
"""Logout multiple users in the same API call
This method can be batched with batch_start() and batch_end().
Arguments:
users: a list of sets of user/ip mappings
eg. [(user1, 10.0.1.1), (user2, 10.0.1.2)]
"""
if not users:
return
root, payload = self._create_uidmessage()
logout = payload.find("logout")
if logout is None:
logout = ET.SubElement(payload, "logout")
for user in users:
ET.SubElement(logout, "entry", {"name": user[0], "ip": user[1]})
self.send(root)
[docs] def register(self, ip, tags, timeout=None):
"""Register an ip tag for a Dynamic Address Group.
This method can be batched with batch_start() and batch_end().
Args:
ip (:obj:`list` or :obj:`str`): IP address(es) to tag
tags (:obj:`list` or :obj:`str`): The tag(s) for the IP address
timeout (string): The optional timeout value in seconds. (Max is 2,592,000 sec (30 days))
"""
root, payload = self._create_uidmessage()
register = payload.find("register")
if register is None:
register = ET.SubElement(payload, "register")
ip = list(set(string_or_list(ip)))
tags = list(set(string_or_list(tags)))
if not tags:
return
tags = [self.prefix + t for t in tags]
for c_ip in ip:
tagelement = register.find("./entry[@ip='%s']/tag" % c_ip)
if tagelement is None:
entry = ET.SubElement(register, "entry", {"ip": c_ip})
tagelement = ET.SubElement(entry, "tag")
for tag in tags:
member = ET.SubElement(tagelement, "member")
if timeout is not None:
member.set("timeout", str(timeout))
member.text = tag
self.send(root)
[docs] def unregister(self, ip, tags):
"""Unregister an ip tag for a Dynamic Address Group
This method can be batched with batch_start() and batch_end().
Args:
ip (:obj:`list` or :obj:`str`): IP address(es) with the tag to remove
tags (:obj:`list` or :obj:`str`): The tag(s) to remove from the IP address
"""
root, payload = self._create_uidmessage()
unregister = payload.find("unregister")
if unregister is None:
unregister = ET.SubElement(payload, "unregister")
ip = list(set(string_or_list(ip)))
tags = list(set(string_or_list(tags)))
if not tags:
return
tags = [self.prefix + t for t in tags]
for c_ip in ip:
tagelement = unregister.find("./entry[@ip='%s']/tag" % c_ip)
if tagelement is None:
entry = ET.SubElement(unregister, "entry", {"ip": c_ip})
tagelement = ET.SubElement(entry, "tag")
for tag in tags:
member = ET.SubElement(tagelement, "member")
member.text = tag
self.send(root)
[docs] def get_registered_ip(self, ip=None, tags=None, prefix=None):
"""Return registered/tagged addresses
When called without arguments, retrieves all registered addresses.
Note: Passing a single ip and/or single tag to this method results in a response
from the firewall that contains only the relevant entries. ie. the filtering is done on
the firewall before it responds. Passing a list of multiple ip addresses or tags will
result in retreival of the entire tag database from the firewall which is then filtered and
returned with only the relevant entries. Therefor, using a single ip or tag is more efficient.
**Support:** PAN-OS 6.0 and higher
Args:
ip (:obj:`list` or :obj:`str`): IP address(es) to get tags for
tags (:obj:`list` or :obj:`str`): Tag(s) to get
prefix (str): Override class tag prefix
Returns:
dict: ip addresses as keys with tags as values
Raises:
PanDeviceError if running PAN-OS < 8.0 and a logfile is returned
instead of IP/tag mapings.
"""
if self.device is None:
raise err.PanDeviceNotSet("No device set for this userid instance")
version = self.device.retrieve_panos_version()
if prefix is None:
prefix = self.prefix
# Build up the command.
limit = 0
start_elm = None
start_offset = 1
root = ET.Element("show")
cmd = ET.SubElement(root, "object")
if version >= (6, 1, 0):
cmd = ET.SubElement(cmd, "registered-ip")
if version >= (8, 0, 0):
# PAN-OS 8.0+ supports paging.
limit = 500
ET.SubElement(cmd, "limit").text = "{0}".format(limit)
start_elm = ET.SubElement(cmd, "start-point")
start_elm.text = "{0}".format(start_offset)
else:
cmd = ET.SubElement(cmd, "registered-address")
# Add ip/tag filter arguments to command.
ip = list(set(string_or_list_or_none(ip)))
tags = list(set(string_or_list_or_none(tags)))
tags = [prefix + t for t in tags]
if len(tags) == 1:
tag_element = ET.SubElement(cmd, "tag")
ET.SubElement(tag_element, "entry", {"name": tags[0]})
if len(ip) == 1:
ip_element = ET.SubElement(cmd, "ip")
ip_element.text = ip[0]
addresses = {}
while True:
resp = self.device.op(
cmd=ET.tostring(root, encoding="utf-8"),
vsys=self.device.vsys,
cmd_xml=False,
)
# PAN-OS 7.1 and lower can return "outfile" instead of actual results.
outfile = resp.find("./result/msg/line/outfile")
if outfile is not None:
msg = [
'PAN-OS returned "{0}" instead of IP/tag mappings'.format(
outfile.text
),
"please upgrade to PAN-OS 8.0+",
]
raise err.PanDeviceError(", ".join(msg))
entries = resp.findall("./result/entry")
for entry in entries:
c_ip = entry.get("ip")
if ip and c_ip not in ip:
continue
members = entry.findall("./tag/member")
c_tags = []
for member in members:
tag = member.text
if not prefix or tag.startswith(prefix):
if not tags or tag in tags:
c_tags.append(tag)
if c_tags:
addresses[c_ip] = c_tags
if start_elm is None or limit == 0 or len(entries) < limit:
break
start_offset += len(entries)
start_elm.text = "{0}".format(start_offset)
# Done.
return addresses
[docs] def clear_registered_ip(self, ip=None, tags=None, prefix=None):
"""Unregister registered/tagged addresses
Removes registered addresses used by dynamic address groups.
When called without arguments, removes all registered addresses
Note: Passing a single ip and/or single tag to this method results in a response
from the firewall that contains only the relevant entries. ie. the filtering is done on
the firewall before it responds. Passing a list of multiple ip addresses or tags will
result in retreival of the entire tag database from the firewall which is then filtered and
returned with only the relevant entries. Therefor, using a single ip or tag is more efficient.
**Support:** PAN-OS 6.0 and higher
Warning:
This will clear any batch without it being sent, and can't be used as part of a batch.
Args:
ip (:obj:`list` or :obj:`str`): IP address(es) to remove tags for
tags (:obj:`list` or :obj:`str`): Tag(s) to remove
prefix (str): Override class tag prefix
"""
addresses = self.get_registered_ip(ip, tags, prefix)
self.batch_start()
for ip, tags in addresses.items():
self.unregister(ip, tags)
self.batch_end()
[docs] def audit_registered_ip_for_tag(self, tag, ip_addresses, timeout=None):
"""Synchronize the current registered-ip tag to tag only the specificied IP addresses.
Sets the registered-ip list for a single tag on the device. Regardless
of the current state of the registered-ip tag list when this method is
called, at the end of the method the list for the specified tag will
contain only the ip addresses passed in the argument. The current state
of the list is retrieved to reduce the number of operations needed. If
the list for this tag is currently in the requested state, no API call
is made after retrieving the list.
**Support:** PAN-OS 6.0 and higher
Warning: This will clear any batch without it being sent, and can't be
used as part of a batch.
Args:
tag (string): Tag to audit
ip_addresses(list): List of IP addresses that should have the tag
timeout (string): The optional timeout value in seconds.
"""
device_list = self.get_registered_ip(tags=tag, prefix=self.prefix)
self.batch_start()
registered_ips = device_list.keys()
tag = self.prefix + tag
for ip in registered_ips:
if ip not in ip_addresses:
# The IP is not requested, unregister it for this tag
self.unregister(ip, tag)
for ip in ip_addresses:
if ip not in registered_ips:
# The IP is requested, register it with this tag
self.register(ip, tag, timeout)
self.batch_end()
[docs] def audit_registered_ip(self, ip_tags_pairs, timeout=None):
"""Synchronize the current registered-ip tag list to this exact set of ip-tags
Sets the registered-ip tag list on the device.
Regardless of the current state of the registered-ip tag list when this method is
called, at the end of the method the list will contain only the ip-tags passed in the
argument. The current state of the list is retrieved to reduce the number of operations
needed. If the list is currently in the requested state, no API call is made after
retrieving the list.
**Support:** PAN-OS 6.0 and higher
Warning:
This will clear any batch without it being sent, and can't be used as part of a batch.
Args:
ip_tags_pairs (dict): dictionary where keys are ip addresses and values or tuples of tags
timeout (string): The optional timeout value in seconds.
"""
device_list = self.get_registered_ip()
requested_list = deepcopy(ip_tags_pairs)
self.batch_start()
# Handle unregistrations
for ip, tags in device_list.items():
if ip not in requested_list:
# The IP is not requested, unregister it and all its tags
self.unregister(ip, tags)
else:
# Convert requested tags from tuple to list
requested_list[ip] = list(requested_list[ip])
# The IP is requested, audit its tags
for tag in tags:
if tag not in requested_list[ip]:
# Tag is not requested, unregister it
self.unregister(ip, tag)
else:
# Tag already exists on device, so don't re-register it
requested_list[ip].remove(tag)
# Remove ip's with no tags left to register
requested_list = {ip: tags for ip, tags in requested_list.items() if tags}
# Handle registrations
for ip, tags in requested_list.items():
self.register(ip, tags, timeout)
self.batch_end()
[docs] def set_group(self, group, users):
"""
Set a group's membership to the specified users.
This method can be batched with batch_start() and batch_end().
Args:
group: The group name.
users (list): The users to be in this group.
"""
root, payload = self._create_uidmessage()
# Find the groups section.
groups = payload.find("./groups")
if groups is None:
groups = ET.SubElement(payload, "groups")
# Find the group.
entries = groups.findall("./entry")
for entry in entries:
if entry.attrib["name"] == group:
ge = entry.find("./members")
break
else:
entry = ET.SubElement(groups, "entry", {"name": group})
ge = ET.SubElement(entry, "members")
# Now add in the users to this group.
for user in users:
ET.SubElement(ge, "entry", {"name": user})
# Done.
self.send(root)
[docs] def get_groups(self, style=None):
"""
Get a list of groups.
Args:
style: The type of groups to retrieve. If unspecified, returns a list of
all groups. Can be "custom-group", "dynamic", or "xmlapi".
Returns:
list
"""
msg = [
"<show><user><group><list>",
]
if style is not None:
msg.append("<entry name='{0}'/>".format(style))
msg.append("</list></group></user></show>")
cmd = "".join(msg)
vsys = self.device.vsys or "vsys1"
resp = self.device.op(cmd, vsys=self.device.vsys, cmd_xml=False)
if resp is None:
return
"""
Example returned XML:
9.1:
<response status="success"><result><![CDATA[\nmalicious_users \ncn=contractors,cn=users,dc=nam,dc=local \ntemp_contractors_dynamic_group \nspecial_project \nrisky_users \ncn=employees,cn=users,dc=nam,dc=local \nhigh_risk_users \n\nTotal: 7\n* : Custom Group\n\n]]></result></response>
<response status="success"><result><![CDATA[\n\nTotal: 0\n* : Custom Group\n\n]]></result></response>
<response status="success"><result><![CDATA[\nmalicious_users \ntemp_contractors_dynamic_group \nspecial_project \nrisky_users \nhigh_risk_users \n\nTotal: 5\n* : Custom Group\n\n]]></result></response>
"""
data = resp.find("./result")
if data is None:
return
lines = data.text.split("\n")
ans = []
for line in lines:
if line.startswith("Total: "):
break
val = line.strip()
if val:
ans.append(val)
return ans
[docs] def get_group_members(self, group):
"""
Returns a list of users in the given group.
Args:
group: The name of the group.
Returns:
list
"""
cmd = "<show><user><group><name>" + group + "</name></group></user></show>"
vsys = self.device.vsys or "vsys1"
resp = self.device.op(cmd, vsys=vsys, cmd_xml=False)
if resp is None:
return
"""
Example returned XML:
9.1:
<response status="success"><result><![CDATA[\nUser group \'blah\' does not exist or does not have members\n]]></result></response>
<response status="success"><result><![CDATA[\n\nsource type: xmlapi\nGroup type: Dynamic\n\n[1 ] nam\\jsmith\n[2 ] panw\\garfield\n\n]]></result></response>
"""
data = resp.find("./result")
if data is None:
return
lines = data.text.split("\n")
ans = [x.split("]")[1].strip() for x in lines if len(x.split("]")) == 2]
return ans
[docs] def tag_user(self, user, tags, timeout=None, prefix=None):
"""
Tags the user with the specified tags.
This method can be batched with batch_start() and batch_end().
Note: PAN-OS 9.1+
Args:
user: The user.
tags (list): The list of tags to apply.
timeout (int): (Optional) The timeout for the given tags.
prefix: Override class tag prefix.
"""
if timeout is not None:
timeout = int(timeout)
if prefix is None:
prefix = self.prefix or ""
root, payload = self._create_uidmessage()
# Find the register user tags section.
ru = payload.find("./register-user")
if ru is None:
ru = ET.SubElement(payload, "register-user")
# Find the tags section for this specific user.
entries = ru.findall("./entry")
for entry in entries:
if entry.attrib["user"] == user:
te = entry.find("./tag")
break
else:
entry = ET.SubElement(ru, "entry", {"user": user,})
te = ET.SubElement(entry, "tag")
# Now add in the tags with the specified timeout.
props = {}
if timeout is not None:
props["timeout"] = "{0}".format(timeout)
for tag in tags:
ET.SubElement(te, "member", props).text = prefix + tag
# Done.
self.send(root)
[docs] def untag_user(self, user, tags=None, prefix=None):
"""
Removes tags associated with a user.
This method can be batched with batch_start() and batch_end().
Note: PAN-OS 9.1+
Args:
user: The user.
tags (list): (Optional) Remove only these tags instead of all tags.
prefix: Override class tag prefix.
"""
root, payload = self._create_uidmessage()
if prefix is None:
prefix = self.prefix or ""
# Find the unregister user tags section.
uu = payload.find("./unregister-user")
if uu is None:
uu = ET.SubElement(payload, "unregister-user")
# Find the tags section for this specific user.
entries = uu.findall("./entry")
for entry in entries:
if entry.attrib["user"] == user:
break
else:
entry = ET.SubElement(uu, "entry", {"user": user,})
# Do tag removal.
te = entry.find("./tag")
if tags is not None:
if te is None:
te = ET.SubElement(entry, "tag")
for tag in tags:
ET.SubElement(te, "member").text = prefix + tag
elif te is not None:
entry.remove(te)
# Done.
self.send(root)