From 4e23b585572ca86e97cdb6d56a238bb56eb548ed Mon Sep 17 00:00:00 2001 From: Brian Torres-Gil Date: Wed, 6 May 2026 09:50:10 -0700 Subject: [PATCH 1/2] fix: Escape user-controlled values in constructed XML commands UserId.get_user_tags, UserId.get_groups, UserId.get_group_members, and VsysOperations.create_import built XML payloads via str.format / string concatenation. Special characters in caller-supplied names (& < > ') either crashed ET.fromstring on the receiving end or altered the structure of the command sent to the device. Wrap the interpolated values with xml.sax.saxutils.escape (element text) and quoteattr (attribute values), and add regression tests. --- panos/base.py | 3 ++- panos/userid.py | 15 +++++++---- tests/test_base.py | 27 ++++++++++++++++++++ tests/test_userid.py | 59 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 98 insertions(+), 6 deletions(-) diff --git a/panos/base.py b/panos/base.py index 03aae887..675b48e8 100644 --- a/panos/base.py +++ b/panos/base.py @@ -29,6 +29,7 @@ import time import xml.dom.minidom as minidom import xml.etree.ElementTree as ET +from xml.sax.saxutils import escape as xml_escape import pan.commit import pan.xapi @@ -3386,7 +3387,7 @@ def create_import(self, vsys=None): if vsys != "shared" and vsys is not None and self.XPATH_IMPORT is not None: xpath = self.xpath_import_base(vsys) - element = "{0}".format(self.uid) + element = "{0}".format(xml_escape(self.uid)) device = self.nearest_pandevice() device.active().xapi.set(xpath, element, retry_on_peer=True) diff --git a/panos/userid.py b/panos/userid.py index 4e42580f..7e06d5a3 100644 --- a/panos/userid.py +++ b/panos/userid.py @@ -19,6 +19,7 @@ import xml.etree.ElementTree as ET from copy import deepcopy +from xml.sax.saxutils import escape, quoteattr from pan.xapi import PanXapiError @@ -550,7 +551,7 @@ def get_groups(self, style=None): "", ] if style is not None: - msg.append("".format(style)) + msg.append("".format(quoteattr(style))) msg.append("") cmd = "".join(msg) vsys = self.device.vsys or "vsys1" @@ -594,7 +595,11 @@ def get_group_members(self, group): list """ - cmd = "" + group + "" + cmd = ( + "" + + escape(group) + + "" + ) vsys = self.device.vsys or "vsys1" resp = self.device.op(cmd, vsys=vsys, cmd_xml=False) @@ -644,12 +649,12 @@ def get_user_tags(self, user=None, prefix=None): if user is None: msg.append( "" - + "{0}".format(limit) - + "{0}".format(start) + + "{0}".format(escape(str(limit))) + + "{0}".format(escape(str(start))) + "" ) else: - msg.append("{0}".format(user)) + msg.append("{0}".format(escape(user))) msg.append("") cmd = ET.fromstring("".join(msg)) diff --git a/tests/test_base.py b/tests/test_base.py index 1eb0c6db..9f6f3780 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -23,6 +23,8 @@ import pan.xapi import panos.base as Base import panos.errors as Err +import panos.firewall +import panos.network OBJECT_NAME = "MyObjectName" @@ -1701,5 +1703,30 @@ def test_times_out(self, mocksleep): assert mocksleep.call_count == 0 +class TestVsysOperationsCreateImport(unittest.TestCase): + def test_create_import_escapes_uid(self): + evil_name = "zone&" + fw = panos.firewall.Firewall( + "fw1", "user", "passwd", "authkey", serial="S", vsys="vsys1" + ) + vlan = panos.network.Vlan(evil_name) + fw.add(vlan) + + with mock.patch.object( + panos.network.Vlan, + "XPATH_IMPORT", + new_callable=mock.PropertyMock, + return_value="/network/vlan", + ): + fw.xapi.set = mock.Mock() + vlan.create_import("vsys1") + + args, _ = fw.xapi.set.call_args + element = args[1] + self.assertEqual(element, "zone&<bad>") + parsed = ET.fromstring(element) + self.assertEqual(parsed.text, evil_name) + + if __name__ == "__main__": unittest.main() diff --git a/tests/test_userid.py b/tests/test_userid.py index a3a2052a..ec644479 100644 --- a/tests/test_userid.py +++ b/tests/test_userid.py @@ -17,6 +17,7 @@ import mock import sys import unittest +import xml.etree.ElementTree as ET import panos.firewall import panos.panorama @@ -100,5 +101,63 @@ def test_batch_untag_user(self): ) + def test_get_user_tags_escapes_user(self): + evil = "admin999999x" + fw = panos.firewall.Firewall( + "fw1", "user", "passwd", "authkey", serial="Serial", vsys="vsys1" + ) + empty_response = ET.fromstring( + b"" + ) + fw.op = mock.Mock(return_value=empty_response) + + fw.userid.get_user_tags(user=evil) + + sent = fw.op.call_args[1]["cmd"] + parsed = ET.fromstring(sent) + users = parsed.findall("./object/registered-user/user") + self.assertEqual(len(users), 1) + self.assertEqual(users[0].text, evil) + self.assertIsNone(parsed.find("./object/registered-user/all")) + + def test_get_groups_escapes_style(self): + evil = "x'/>\nTotal: 0\n" + ) + fw.op = mock.Mock(return_value=empty_response) + + fw.userid.get_groups(style=evil) + + sent = fw.op.call_args[0][0] + parsed = ET.fromstring(sent) + entries = parsed.findall("./user/group/list/entry") + self.assertEqual(len(entries), 1) + self.assertEqual(entries[0].attrib["name"], evil) + self.assertIsNone(parsed.find(".//evil")) + + def test_get_group_members_escapes_group(self): + evil = "gx" + fw = panos.firewall.Firewall( + "fw1", "user", "passwd", "authkey", serial="Serial", vsys="vsys1" + ) + empty_response = ET.fromstring( + b"\nTotal: 0\n" + ) + fw.op = mock.Mock(return_value=empty_response) + + fw.userid.get_group_members(evil) + + sent = fw.op.call_args[0][0] + parsed = ET.fromstring(sent) + names = parsed.findall("./user/group/name") + self.assertEqual(len(names), 1) + self.assertEqual(names[0].text, evil) + self.assertIsNone(parsed.find(".//evil")) + + if __name__ == "__main__": unittest.main() From 0ca6b77cb741c07261c77eaf74d2402385bfc6e7 Mon Sep 17 00:00:00 2001 From: Brian Torres-Gil Date: Wed, 6 May 2026 10:56:55 -0700 Subject: [PATCH 2/2] chore: fix formatting issues --- examples/bulk_address_objects.py | 1 - examples/ensure_security_rule.py | 1 - examples/prisma_access_create_remote_network.py | 1 + examples/prisma_access_list_RN_regions_bw.py | 3 ++- examples/prisma_access_show_jobs_status.py | 3 ++- examples/prisma_access_show_remote_net_per_tenant.py | 3 ++- panos/base.py | 8 +++++--- tests/live/conftest.py | 1 - tests/test_base.py | 1 - tests/test_classic_objects.py | 3 +-- tests/test_device_profile_xpaths.py | 1 - tests/test_opstate.py | 1 - tests/test_predefined.py | 1 - tests/test_standards.py | 1 - tests/test_userid.py | 1 - 15 files changed, 13 insertions(+), 17 deletions(-) diff --git a/examples/bulk_address_objects.py b/examples/bulk_address_objects.py index a63244f1..3d886c5c 100755 --- a/examples/bulk_address_objects.py +++ b/examples/bulk_address_objects.py @@ -36,7 +36,6 @@ import panos.firewall import panos.objects - HOSTNAME = "127.0.0.1" USERNAME = "admin" PASSWORD = "admin" diff --git a/examples/ensure_security_rule.py b/examples/ensure_security_rule.py index cafbf314..c5dcdf0c 100755 --- a/examples/ensure_security_rule.py +++ b/examples/ensure_security_rule.py @@ -33,7 +33,6 @@ import panos.firewall import panos.policies - HOSTNAME = "127.0.0.1" USERNAME = "admin" PASSWORD = "admin" diff --git a/examples/prisma_access_create_remote_network.py b/examples/prisma_access_create_remote_network.py index c43a20fd..83906409 100644 --- a/examples/prisma_access_create_remote_network.py +++ b/examples/prisma_access_create_remote_network.py @@ -25,6 +25,7 @@ To use the script, you need to replace the variables below with desired values. """ + import logging import os import sys diff --git a/examples/prisma_access_list_RN_regions_bw.py b/examples/prisma_access_list_RN_regions_bw.py index 4d5985aa..ddf4fe8f 100644 --- a/examples/prisma_access_list_RN_regions_bw.py +++ b/examples/prisma_access_list_RN_regions_bw.py @@ -20,10 +20,11 @@ prisma_access_list_RN_regions_bw.py ========== -This script is an example on how to retrieve list of prisma access +This script is an example on how to retrieve list of prisma access remote networks locations and bandwidth allocation and print it. """ + __author__ = "bmigette" diff --git a/examples/prisma_access_show_jobs_status.py b/examples/prisma_access_show_jobs_status.py index 24779e7f..081a262f 100644 --- a/examples/prisma_access_show_jobs_status.py +++ b/examples/prisma_access_show_jobs_status.py @@ -20,10 +20,11 @@ prisma_access_show_jobs_status.py ========== -This script is an example on how to retrieve list of prisma access +This script is an example on how to retrieve list of prisma access jobs (commit and push), and how to get details of a specific job """ + __author__ = "bmigette" diff --git a/examples/prisma_access_show_remote_net_per_tenant.py b/examples/prisma_access_show_remote_net_per_tenant.py index 4c904b5b..2263bf97 100644 --- a/examples/prisma_access_show_remote_net_per_tenant.py +++ b/examples/prisma_access_show_remote_net_per_tenant.py @@ -20,10 +20,11 @@ prisma_access_show_remote_net_per_tenant.py ========== -This script is an example on how to retrieve list of prisma access +This script is an example on how to retrieve list of prisma access tenants and their remote networks """ + __author__ = "bmigette" diff --git a/panos/base.py b/panos/base.py index 675b48e8..350dfa1b 100644 --- a/panos/base.py +++ b/panos/base.py @@ -3831,9 +3831,11 @@ def make_method(cls, super_method_name, super_method): def method(self, *args, **kwargs): retry_on_peer = kwargs.pop( "retry_on_peer", - True - if super_method_name not in ("keygen", "op", "ad_hoc", "export") - else False, + ( + True + if super_method_name not in ("keygen", "op", "ad_hoc", "export") + else False + ), ) apply_on_peer = kwargs.pop("apply_on_peer", False) ha_peer = self.pan_device.ha_peer diff --git a/tests/live/conftest.py b/tests/live/conftest.py index 8d379d35..6ed5b27f 100644 --- a/tests/live/conftest.py +++ b/tests/live/conftest.py @@ -6,7 +6,6 @@ from panos import firewall from panos import panorama - live_devices = {} one_fw_per_version = [] one_device_type_per_version = [] diff --git a/tests/test_base.py b/tests/test_base.py index 9f6f3780..2ba1437d 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -26,7 +26,6 @@ import panos.firewall import panos.network - OBJECT_NAME = "MyObjectName" VSYS = "vsys1" diff --git a/tests/test_classic_objects.py b/tests/test_classic_objects.py index dfae9f29..7411772e 100644 --- a/tests/test_classic_objects.py +++ b/tests/test_classic_objects.py @@ -1,4 +1,4 @@ -""" Tests specifically for classic objects. +"""Tests specifically for classic objects. Note: All tests in this file are for classic objects. These are to try and make sure that the fix for classic objects with a self.NAME == None still @@ -6,7 +6,6 @@ """ - from panos import device diff --git a/tests/test_device_profile_xpaths.py b/tests/test_device_profile_xpaths.py index 05bcc01a..19075c26 100644 --- a/tests/test_device_profile_xpaths.py +++ b/tests/test_device_profile_xpaths.py @@ -57,7 +57,6 @@ from panos.panorama import Panorama from panos.panorama import Template - OBJECTS = { SnmpServerProfile: [None, SnmpV2cServer, SnmpV3Server], EmailServerProfile: [ diff --git a/tests/test_opstate.py b/tests/test_opstate.py index bf579e2f..da8f08b3 100644 --- a/tests/test_opstate.py +++ b/tests/test_opstate.py @@ -12,7 +12,6 @@ from panos.policies import SecurityRule from panos.policies import AuditCommentLog - HIT_COUNT_PREFIX = """ diff --git a/tests/test_predefined.py b/tests/test_predefined.py index 1a1b913a..992e302a 100644 --- a/tests/test_predefined.py +++ b/tests/test_predefined.py @@ -13,7 +13,6 @@ from panos.objects import ServiceObject from panos.objects import Tag - PREDEFINED_CONFIG = { ApplicationContainer: { "single": "refresh_application", diff --git a/tests/test_standards.py b/tests/test_standards.py index c284b448..373632d3 100644 --- a/tests/test_standards.py +++ b/tests/test_standards.py @@ -18,7 +18,6 @@ from panos import predefined from panos import plugins - # Versioning constants. # # When checking versioning, this is the highest major version to check for diff --git a/tests/test_userid.py b/tests/test_userid.py index ec644479..c2d0bf6d 100644 --- a/tests/test_userid.py +++ b/tests/test_userid.py @@ -100,7 +100,6 @@ def test_batch_untag_user(self): ], ) - def test_get_user_tags_escapes_user(self): evil = "admin999999x" fw = panos.firewall.Firewall(