diff --git a/examples/bulk_address_objects.py b/examples/bulk_address_objects.py
index a63244f1..3d886c5c 100755
--- a/examples/bulk_address_objects.py
+++ b/examples/bulk_address_objects.py
@@ -36,7 +36,6 @@
import panos.firewall
import panos.objects
-
HOSTNAME = "127.0.0.1"
USERNAME = "admin"
PASSWORD = "admin"
diff --git a/examples/ensure_security_rule.py b/examples/ensure_security_rule.py
index cafbf314..c5dcdf0c 100755
--- a/examples/ensure_security_rule.py
+++ b/examples/ensure_security_rule.py
@@ -33,7 +33,6 @@
import panos.firewall
import panos.policies
-
HOSTNAME = "127.0.0.1"
USERNAME = "admin"
PASSWORD = "admin"
diff --git a/examples/prisma_access_create_remote_network.py b/examples/prisma_access_create_remote_network.py
index c43a20fd..83906409 100644
--- a/examples/prisma_access_create_remote_network.py
+++ b/examples/prisma_access_create_remote_network.py
@@ -25,6 +25,7 @@
To use the script, you need to replace the variables below with desired values.
"""
+
import logging
import os
import sys
diff --git a/examples/prisma_access_list_RN_regions_bw.py b/examples/prisma_access_list_RN_regions_bw.py
index 4d5985aa..ddf4fe8f 100644
--- a/examples/prisma_access_list_RN_regions_bw.py
+++ b/examples/prisma_access_list_RN_regions_bw.py
@@ -20,10 +20,11 @@
prisma_access_list_RN_regions_bw.py
==========
-This script is an example on how to retrieve list of prisma access
+This script is an example on how to retrieve list of prisma access
remote networks locations and bandwidth allocation and print it.
"""
+
__author__ = "bmigette"
diff --git a/examples/prisma_access_show_jobs_status.py b/examples/prisma_access_show_jobs_status.py
index 24779e7f..081a262f 100644
--- a/examples/prisma_access_show_jobs_status.py
+++ b/examples/prisma_access_show_jobs_status.py
@@ -20,10 +20,11 @@
prisma_access_show_jobs_status.py
==========
-This script is an example on how to retrieve list of prisma access
+This script is an example on how to retrieve list of prisma access
jobs (commit and push), and how to get details of a specific job
"""
+
__author__ = "bmigette"
diff --git a/examples/prisma_access_show_remote_net_per_tenant.py b/examples/prisma_access_show_remote_net_per_tenant.py
index 4c904b5b..2263bf97 100644
--- a/examples/prisma_access_show_remote_net_per_tenant.py
+++ b/examples/prisma_access_show_remote_net_per_tenant.py
@@ -20,10 +20,11 @@
prisma_access_show_remote_net_per_tenant.py
==========
-This script is an example on how to retrieve list of prisma access
+This script is an example on how to retrieve list of prisma access
tenants and their remote networks
"""
+
__author__ = "bmigette"
diff --git a/panos/base.py b/panos/base.py
index 03aae887..350dfa1b 100644
--- a/panos/base.py
+++ b/panos/base.py
@@ -29,6 +29,7 @@
import time
import xml.dom.minidom as minidom
import xml.etree.ElementTree as ET
+from xml.sax.saxutils import escape as xml_escape
import pan.commit
import pan.xapi
@@ -3386,7 +3387,7 @@ def create_import(self, vsys=None):
if vsys != "shared" and vsys is not None and self.XPATH_IMPORT is not None:
xpath = self.xpath_import_base(vsys)
- element = "{0}".format(self.uid)
+ element = "{0}".format(xml_escape(self.uid))
device = self.nearest_pandevice()
device.active().xapi.set(xpath, element, retry_on_peer=True)
@@ -3830,9 +3831,11 @@ def make_method(cls, super_method_name, super_method):
def method(self, *args, **kwargs):
retry_on_peer = kwargs.pop(
"retry_on_peer",
- True
- if super_method_name not in ("keygen", "op", "ad_hoc", "export")
- else False,
+ (
+ True
+ if super_method_name not in ("keygen", "op", "ad_hoc", "export")
+ else False
+ ),
)
apply_on_peer = kwargs.pop("apply_on_peer", False)
ha_peer = self.pan_device.ha_peer
diff --git a/panos/userid.py b/panos/userid.py
index 4e42580f..7e06d5a3 100644
--- a/panos/userid.py
+++ b/panos/userid.py
@@ -19,6 +19,7 @@
import xml.etree.ElementTree as ET
from copy import deepcopy
+from xml.sax.saxutils import escape, quoteattr
from pan.xapi import PanXapiError
@@ -550,7 +551,7 @@ def get_groups(self, style=None):
"",
]
if style is not None:
- msg.append("".format(style))
+ msg.append("".format(quoteattr(style)))
msg.append("
")
cmd = "".join(msg)
vsys = self.device.vsys or "vsys1"
@@ -594,7 +595,11 @@ def get_group_members(self, group):
list
"""
- cmd = "" + group + ""
+ cmd = (
+ ""
+ + escape(group)
+ + ""
+ )
vsys = self.device.vsys or "vsys1"
resp = self.device.op(cmd, vsys=vsys, cmd_xml=False)
@@ -644,12 +649,12 @@ def get_user_tags(self, user=None, prefix=None):
if user is None:
msg.append(
""
- + "{0}".format(limit)
- + "{0}".format(start)
+ + "{0}".format(escape(str(limit)))
+ + "{0}".format(escape(str(start)))
+ ""
)
else:
- msg.append("{0}".format(user))
+ msg.append("{0}".format(escape(user)))
msg.append("")
cmd = ET.fromstring("".join(msg))
diff --git a/tests/live/conftest.py b/tests/live/conftest.py
index 8d379d35..6ed5b27f 100644
--- a/tests/live/conftest.py
+++ b/tests/live/conftest.py
@@ -6,7 +6,6 @@
from panos import firewall
from panos import panorama
-
live_devices = {}
one_fw_per_version = []
one_device_type_per_version = []
diff --git a/tests/test_base.py b/tests/test_base.py
index 1eb0c6db..2ba1437d 100644
--- a/tests/test_base.py
+++ b/tests/test_base.py
@@ -23,7 +23,8 @@
import pan.xapi
import panos.base as Base
import panos.errors as Err
-
+import panos.firewall
+import panos.network
OBJECT_NAME = "MyObjectName"
VSYS = "vsys1"
@@ -1701,5 +1702,30 @@ def test_times_out(self, mocksleep):
assert mocksleep.call_count == 0
+class TestVsysOperationsCreateImport(unittest.TestCase):
+ def test_create_import_escapes_uid(self):
+ evil_name = "zone&"
+ fw = panos.firewall.Firewall(
+ "fw1", "user", "passwd", "authkey", serial="S", vsys="vsys1"
+ )
+ vlan = panos.network.Vlan(evil_name)
+ fw.add(vlan)
+
+ with mock.patch.object(
+ panos.network.Vlan,
+ "XPATH_IMPORT",
+ new_callable=mock.PropertyMock,
+ return_value="/network/vlan",
+ ):
+ fw.xapi.set = mock.Mock()
+ vlan.create_import("vsys1")
+
+ args, _ = fw.xapi.set.call_args
+ element = args[1]
+ self.assertEqual(element, "zone&<bad>")
+ parsed = ET.fromstring(element)
+ self.assertEqual(parsed.text, evil_name)
+
+
if __name__ == "__main__":
unittest.main()
diff --git a/tests/test_classic_objects.py b/tests/test_classic_objects.py
index dfae9f29..7411772e 100644
--- a/tests/test_classic_objects.py
+++ b/tests/test_classic_objects.py
@@ -1,4 +1,4 @@
-""" Tests specifically for classic objects.
+"""Tests specifically for classic objects.
Note: All tests in this file are for classic objects. These are to try and
make sure that the fix for classic objects with a self.NAME == None still
@@ -6,7 +6,6 @@
"""
-
from panos import device
diff --git a/tests/test_device_profile_xpaths.py b/tests/test_device_profile_xpaths.py
index 05bcc01a..19075c26 100644
--- a/tests/test_device_profile_xpaths.py
+++ b/tests/test_device_profile_xpaths.py
@@ -57,7 +57,6 @@
from panos.panorama import Panorama
from panos.panorama import Template
-
OBJECTS = {
SnmpServerProfile: [None, SnmpV2cServer, SnmpV3Server],
EmailServerProfile: [
diff --git a/tests/test_opstate.py b/tests/test_opstate.py
index bf579e2f..da8f08b3 100644
--- a/tests/test_opstate.py
+++ b/tests/test_opstate.py
@@ -12,7 +12,6 @@
from panos.policies import SecurityRule
from panos.policies import AuditCommentLog
-
HIT_COUNT_PREFIX = """
diff --git a/tests/test_predefined.py b/tests/test_predefined.py
index 1a1b913a..992e302a 100644
--- a/tests/test_predefined.py
+++ b/tests/test_predefined.py
@@ -13,7 +13,6 @@
from panos.objects import ServiceObject
from panos.objects import Tag
-
PREDEFINED_CONFIG = {
ApplicationContainer: {
"single": "refresh_application",
diff --git a/tests/test_standards.py b/tests/test_standards.py
index c284b448..373632d3 100644
--- a/tests/test_standards.py
+++ b/tests/test_standards.py
@@ -18,7 +18,6 @@
from panos import predefined
from panos import plugins
-
# Versioning constants.
#
# When checking versioning, this is the highest major version to check for
diff --git a/tests/test_userid.py b/tests/test_userid.py
index a3a2052a..c2d0bf6d 100644
--- a/tests/test_userid.py
+++ b/tests/test_userid.py
@@ -17,6 +17,7 @@
import mock
import sys
import unittest
+import xml.etree.ElementTree as ET
import panos.firewall
import panos.panorama
@@ -99,6 +100,63 @@ def test_batch_untag_user(self):
],
)
+ def test_get_user_tags_escapes_user(self):
+ evil = "admin999999x"
+ fw = panos.firewall.Firewall(
+ "fw1", "user", "passwd", "authkey", serial="Serial", vsys="vsys1"
+ )
+ empty_response = ET.fromstring(
+ b""
+ )
+ fw.op = mock.Mock(return_value=empty_response)
+
+ fw.userid.get_user_tags(user=evil)
+
+ sent = fw.op.call_args[1]["cmd"]
+ parsed = ET.fromstring(sent)
+ users = parsed.findall("./object/registered-user/user")
+ self.assertEqual(len(users), 1)
+ self.assertEqual(users[0].text, evil)
+ self.assertIsNone(parsed.find("./object/registered-user/all"))
+
+ def test_get_groups_escapes_style(self):
+ evil = "x'/>\nTotal: 0\n"
+ )
+ fw.op = mock.Mock(return_value=empty_response)
+
+ fw.userid.get_groups(style=evil)
+
+ sent = fw.op.call_args[0][0]
+ parsed = ET.fromstring(sent)
+ entries = parsed.findall("./user/group/list/entry")
+ self.assertEqual(len(entries), 1)
+ self.assertEqual(entries[0].attrib["name"], evil)
+ self.assertIsNone(parsed.find(".//evil"))
+
+ def test_get_group_members_escapes_group(self):
+ evil = "gx"
+ fw = panos.firewall.Firewall(
+ "fw1", "user", "passwd", "authkey", serial="Serial", vsys="vsys1"
+ )
+ empty_response = ET.fromstring(
+ b"\nTotal: 0\n"
+ )
+ fw.op = mock.Mock(return_value=empty_response)
+
+ fw.userid.get_group_members(evil)
+
+ sent = fw.op.call_args[0][0]
+ parsed = ET.fromstring(sent)
+ names = parsed.findall("./user/group/name")
+ self.assertEqual(len(names), 1)
+ self.assertEqual(names[0].text, evil)
+ self.assertIsNone(parsed.find(".//evil"))
+
if __name__ == "__main__":
unittest.main()