Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 48 additions & 20 deletions panos/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,31 @@

import panos
import panos.errors as err

# Defined before sub-module imports below: panos.userid imports _xpath_safe
# from this module, so the symbol must exist by the time `from panos import
# userid` triggers userid's module body.
SELF = "/%s"
ENTRY = "/entry[@name=%s]"
MEMBER = "/member[text()=%s]"


def _xpath_safe(val):
"""Return val as an XPath 1.0 string literal, safe to inject into a predicate.

XPath 1.0 has no escape syntax for quotes inside string literals, so a value
containing quotes must be wrapped in the opposite quote, or split into a
concat() expression when both quote types are present.
"""
val = "" if val is None else str(val)
if "'" not in val:
return "'" + val + "'"
if '"' not in val:
return '"' + val + '"'
parts = val.split("'")
return "concat('" + "', \"'\", '".join(parts) + "')"


from panos import (
chunk_instances_for_delete_similar,
isstring,
Expand All @@ -48,9 +73,6 @@
logger = panos.getlogger(__name__)

Root = panos.enum("DEVICE", "VSYS", "MGTCONFIG", "PANORAMA", "PANORAMA_VSYS")
SELF = "/%s"
ENTRY = "/entry[@name='%s']"
MEMBER = "/member[text()='%s']"


# PanObject type
Expand Down Expand Up @@ -340,7 +362,7 @@ def xpath(self, root=None):
# xpath was asked for.
addon = p.XPATH
if p.SUFFIX is not None:
addon += p.SUFFIX % (p.uid,)
addon += p.SUFFIX % (_xpath_safe(p.uid),)
path.insert(0, addon)
if p.__class__.__name__ == "Firewall" and p.parent is not None:
if p.parent.__class__.__name__ == "DeviceGroup":
Expand Down Expand Up @@ -412,7 +434,7 @@ def _root_xpath_vsys(self, vsys, label="vsys"):
xpath = "/config/shared"
else:
xpath = "/config/devices/entry[@name='localhost.localdomain']"
xpath += "/{0}/entry[@name='{1}']".format(label, vsys or "vsys1")
xpath += "/{0}/entry[@name={1}]".format(label, _xpath_safe(vsys or "vsys1"))

return xpath

Expand Down Expand Up @@ -476,7 +498,7 @@ def element(self, with_children=True, comparable=False):
regex,
matchedvar.path
+ "/"
+ "entry[@name='%s']" % entry_value[0],
+ "entry[@name=%s]" % _xpath_safe(entry_value[0]),
section,
)
entryvar = matchedvar
Expand Down Expand Up @@ -860,7 +882,9 @@ def _get_param_specific_info(self, variable):
entry_value = panos.string_or_list(getattr(self, matchedvar.variable))
varpath = re.sub(
regex,
matchedvar.path + "/" + "entry[@name='%s']" % entry_value[0],
matchedvar.path
+ "/"
+ "entry[@name=%s]" % _xpath_safe(entry_value[0]),
varpath,
)
else:
Expand Down Expand Up @@ -1448,7 +1472,9 @@ def _parse_xml(cls, xml, variables=None):
replacement = replacement[0]
path = re.sub(
regex,
matchedvar.path + "/" + "entry[@name='%s']" % replacement,
matchedvar.path
+ "/"
+ "entry[@name=%s]" % _xpath_safe(replacement),
path,
)
else:
Expand Down Expand Up @@ -1983,10 +2009,10 @@ def delete_similar(self):
prefix = ""
xpath = self.xpath_nosuffix()
if self.SUFFIX == ENTRY:
joiner = "@name='{0}'"
joiner = "@name={0}"
prefix = "entry"
elif self.SUFFIX == MEMBER:
joiner = "text()='{0}'"
joiner = "text()={0}"
prefix = "member"

# After some testing, PAN-OS seems to be able to handle a DELETE API call
Expand All @@ -1999,7 +2025,7 @@ def delete_similar(self):
"{0}/{1}[{2}]".format(
xpath,
prefix,
" or ".join(joiner.format(x.uid) for x in chunk),
" or ".join(joiner.format(_xpath_safe(x.uid)) for x in chunk),
),
retry_on_peer=self.HA_SYNC,
)
Expand Down Expand Up @@ -2036,7 +2062,9 @@ def _perform_vsys_dict_import_delete(self, dev, vsys_dict):
"""Iterates over a vsys_dict, deleting the import for all instances."""
for vsys_spec in vsys_dict.values():
for objs in vsys_spec.values():
members = " or ".join("text()='{0}'".format(x.uid) for x in objs)
members = " or ".join(
"text()={0}".format(_xpath_safe(x.uid)) for x in objs
)
xpath = "{0}/member[{1}]".format(objs[0].xpath_import_base(), members)
# API complains if you try to do this in one delete statement,
# so do one delete per vsys per path, just like when we set the
Expand Down Expand Up @@ -2431,7 +2459,7 @@ class VersionedPanObject(PanObject):

_DEFAULT_NAME = None
_TEMPLATE_DEVICE_XPATH = "/config/devices/entry[@name='localhost.localdomain']"
_TEMPLATE_VSYS_XPATH = _TEMPLATE_DEVICE_XPATH + "/vsys/entry[@name='{vsys}']"
_TEMPLATE_VSYS_XPATH = _TEMPLATE_DEVICE_XPATH + "/vsys/entry[@name={vsys}]"
_TEMPLATE_MGTCONFIG_XPATH = "/config/mgt-config"

def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -2638,7 +2666,7 @@ def element(self, with_children=True, comparable=False):
if ap.startswith("entry "):
junk, var_to_use = ap.split()
sol_value = panos.string_or_list(settings[var_to_use])[0]
finder = "entry[@name='{0}']".format(sol_value)
finder = "entry[@name={0}]".format(_xpath_safe(sol_value))
tag = "entry"
attribs["name"] = sol_value
elif ap == "entry[@name='localhost.localdomain']":
Expand Down Expand Up @@ -2725,8 +2753,8 @@ def _get_param_specific_info(self, param):
p = None
if token.startswith("entry "):
junk, var_to_use = token.split()
p = "entry[name='{0}']".format(
*(x for x in self._value_as_list(settings[var_to_use]))
p = "entry[name={0}]".format(
*(_xpath_safe(x) for x in self._value_as_list(settings[var_to_use]))
)
else:
p = None
Expand Down Expand Up @@ -2835,7 +2863,7 @@ def XPATH(self):
"""Returns the version specific xpath of this object."""
panos_version = self.retrieve_panos_version()
val = self._xpaths._get_versioned_value(panos_version, self.parent)
return val.format(vsys=self.vsys or "vsys1")
return val.format(vsys=_xpath_safe(self.vsys or "vsys1"))


class VersionedParamPath(VersioningSupport):
Expand Down Expand Up @@ -3198,7 +3226,7 @@ def parse_xml(self, xml, settings, possibilities):
return
settings[entry_var] = ans.attrib["name"]
sol_val = panos.string_or_list(settings[entry_var])[0]
path_str = "entry[@name='{0}']".format(sol_val)
path_str = "entry[@name={0}]".format(_xpath_safe(sol_val))
else:
# Standard path part
try:
Expand Down Expand Up @@ -3420,8 +3448,8 @@ def delete_import(self, vsys=None):
p = p.parent

if vsys != "shared" and vsys is not None and self.XPATH_IMPORT is not None:
xpath = "{0}/member[text()='{1}']".format(
self.xpath_import_base(vsys), self.uid
xpath = "{0}/member[text()={1}]".format(
self.xpath_import_base(vsys), _xpath_safe(self.uid)
)
device = self.nearest_pandevice()
device.active().xapi.delete(xpath, retry_on_peer=True)
Expand Down
7 changes: 4 additions & 3 deletions panos/firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from panos import device, getlogger, yesno
from panos.base import ENTRY, PanDevice, Root
from panos.base import VarPath as Var
from panos.base import _xpath_safe

logger = getlogger(__name__)

Expand Down Expand Up @@ -333,7 +334,7 @@ def delete(self):
devices_xpath = self.devicegroup().xpath() + self.XPATH
devices_xml = panorama.xapi.get(devices_xpath)
dg_vsys = devices_xml.findall(
"result/devices/entry[@name='%s']/vsys/entry" % self.serial
"result/devices/entry[@name=%s]/vsys/entry" % _xpath_safe(self.serial)
)
if dg_vsys:
if len(dg_vsys) == 1:
Expand All @@ -344,7 +345,7 @@ def delete(self):
# It's not the only vsys, just delete the vsys
panorama.set_config_changed()
panorama.xapi.delete(
self.xpath() + "/vsys/entry[@name='%s']" % self.vsys
self.xpath() + "/vsys/entry[@name=%s]" % _xpath_safe(self.vsys)
)
else:
# This is a firewall under a panorama
Expand Down Expand Up @@ -392,7 +393,7 @@ def refreshall_from_xml(self, xml, refresh_children=False, variables=None):
)
# Add system settings to firewall instances
for fw in firewall_instances:
entry = xml.find("entry[@name='%s']" % fw.serial)
entry = xml.find("entry[@name=%s]" % _xpath_safe(fw.serial))
system = fw.find_or_create(None, device.SystemSettings)
system.hostname = entry.findtext("hostname")
system.ip_address = entry.findtext("ip-address")
Expand Down
9 changes: 7 additions & 2 deletions panos/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@
from panos import device, getlogger, string_or_list
from panos.base import ENTRY, MEMBER, PanObject, Root
from panos.base import VarPath as Var
from panos.base import VersionedPanObject, VersionedParamPath, VsysOperations
from panos.base import (
VersionedPanObject,
VersionedParamPath,
VsysOperations,
_xpath_safe,
)

logger = getlogger(__name__)

Expand Down Expand Up @@ -741,7 +746,7 @@ def XPATH(self):
if self._BASE_INTERFACE_NAME in path:
base = self.uid.split(".")[0]
path = path.replace(
self._BASE_INTERFACE_NAME, "entry[@name='{0}']".format(base)
self._BASE_INTERFACE_NAME, "entry[@name={0}]".format(_xpath_safe(base))
)

return path
Expand Down
28 changes: 17 additions & 11 deletions panos/panorama.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from panos import base, firewall, getlogger, policies, yesno
from panos.base import ENTRY, MEMBER, OpState, PanObject, Root
from panos.base import VarPath as Var
from panos.base import VersionedPanObject, VersionedParamPath
from panos.base import VersionedPanObject, VersionedParamPath, _xpath_safe

logger = getlogger(__name__)

Expand Down Expand Up @@ -643,7 +643,7 @@ def refresh_devices(
serial = str(device)
if serial is None:
continue
entry = devices_xml.find("entry[@name='%s']" % serial)
entry = devices_xml.find("entry[@name=%s]" % _xpath_safe(serial))
if entry is None:
if only_connected:
raise err.PanNotConnectedOnPanorama(
Expand All @@ -661,7 +661,10 @@ def refresh_devices(
except AttributeError:
continue
# Create entry if needed
if filtered_devices_xml.find("entry[@name='%s']" % serial) is None:
if (
filtered_devices_xml.find("entry[@name=%s]" % _xpath_safe(serial))
is None
):
entry_copy = deepcopy(entry)
# If looking for specific vsys, erase all vsys in filtered entry
if vsys != "shared" and vsys is not None:
Expand All @@ -670,15 +673,15 @@ def refresh_devices(
filtered_devices_xml.append(entry_copy)
# Get specific vsys
if vsys != "shared" and vsys is not None:
vsys_entry = entry.find("vsys/entry[@name='%s']" % vsys)
vsys_entry = entry.find("vsys/entry[@name=%s]" % _xpath_safe(vsys))
if vsys_entry is None:
raise err.PanNotAttachedOnPanorama(
"Can't find device with serial %s and"
" vsys %s attached to Panorama at %s"
% (serial, vsys, self.id)
)
vsys_section = filtered_devices_xml.find(
"entry[@name='%s']/vsys" % serial
"entry[@name=%s]/vsys" % _xpath_safe(serial)
)
vsys_section.append(vsys_entry)
devices_xml = filtered_devices_xml
Expand Down Expand Up @@ -733,7 +736,8 @@ def refresh_devices(
continue
for fw_entry in dg_entry.find("devices"):
fw_entry_op = devicegroup_opxml.find(
"entry/devices/entry[@name='%s']" % fw_entry.get("name")
"entry/devices/entry[@name=%s]"
% _xpath_safe(fw_entry.get("name"))
)
if fw_entry_op is not None:
panos.xml_combine(fw_entry, fw_entry_op)
Expand All @@ -748,7 +752,7 @@ def refresh_devices(
dg_serials = [
entry.get("name")
for entry in devicegroup_configxml.findall(
"entry[@name='%s']/devices/entry" % dg.name
"entry[@name=%s]/devices/entry" % _xpath_safe(dg.name)
)
]
# Find firewall with each serial
Expand All @@ -759,13 +763,14 @@ def refresh_devices(
all_dg_vsys = [
entry.get("name")
for entry in devicegroup_configxml.findall(
"entry[@name='%s']/devices/entry[@name='%s']/vsys/entry"
% (dg.name, dg_serial)
"entry[@name=%s]/devices/entry[@name=%s]/vsys/entry"
% (_xpath_safe(dg.name), _xpath_safe(dg_serial))
)
]
# Collect the firewall serial entry to get current status information
fw_entry = devicegroup_configxml.find(
"entry[@name='%s']/devices/entry[@name='%s']" % (dg.name, dg_serial)
"entry[@name=%s]/devices/entry[@name=%s]"
% (_xpath_safe(dg.name), _xpath_safe(dg_serial))
)
if not all_dg_vsys:
# This is a single-context firewall, assume vsys1
Expand Down Expand Up @@ -807,7 +812,8 @@ def refresh_devices(
shared_policy_status = fw_entry.findtext("shared-policy-status")
if shared_policy_status is None:
shared_policy_status = fw_entry.findtext(
"vsys/entry[@name='%s']/shared-policy-status" % dg_vsys
"vsys/entry[@name=%s]/shared-policy-status"
% _xpath_safe(dg_vsys)
)
fw.state.set_shared_policy_synced(shared_policy_status)

Expand Down
5 changes: 3 additions & 2 deletions panos/predefined.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import panos.errors as err
from panos import getlogger, objects
from panos.base import _xpath_safe
from panos.updater import PanOSVersion

logger = getlogger(__name__)
Expand All @@ -43,7 +44,7 @@ class Predefined(object):

# xpath
XPATH = "/config/predefined"
SINGLE_ENTRY_XPATH = "/entry[@name='{0}']"
SINGLE_ENTRY_XPATH = "/entry[@name={0}]"
ALL_ENTRIES_XPATH = "/entry"
CHILDTYPES = (
"objects.ApplicationContainer",
Expand Down Expand Up @@ -76,7 +77,7 @@ def _refresh(self, decisions, name=None):
x.parent = self
xpath = x.xpath_nosuffix()
if name is not None:
xpath += self.SINGLE_ENTRY_XPATH.format(name)
xpath += self.SINGLE_ENTRY_XPATH.format(_xpath_safe(name))
else:
xpath += self.ALL_ENTRIES_XPATH

Expand Down
5 changes: 3 additions & 2 deletions panos/userid.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import panos.errors as err
from panos import getlogger, string_or_list, string_or_list_or_none
from panos.base import _xpath_safe
from panos.updater import PanOSVersion

logger = getlogger(__name__)
Expand Down Expand Up @@ -244,7 +245,7 @@ def register(self, ip, tags, timeout=None):
return
tags = [self.prefix + t for t in tags]
for c_ip in ip:
tagelement = register.find("./entry[@ip='%s']/tag" % c_ip)
tagelement = register.find("./entry[@ip=%s]/tag" % _xpath_safe(c_ip))
if tagelement is None:
entry = ET.SubElement(register, "entry", {"ip": c_ip})
tagelement = ET.SubElement(entry, "tag")
Expand Down Expand Up @@ -275,7 +276,7 @@ def unregister(self, ip, tags):
return
tags = [self.prefix + t for t in tags]
for c_ip in ip:
tagelement = unregister.find("./entry[@ip='%s']/tag" % c_ip)
tagelement = unregister.find("./entry[@ip=%s]/tag" % _xpath_safe(c_ip))
if tagelement is None:
entry = ET.SubElement(unregister, "entry", {"ip": c_ip})
tagelement = ET.SubElement(entry, "tag")
Expand Down
Loading