diff --git a/examples/bulk_address_objects.py b/examples/bulk_address_objects.py index a63244f1..3d886c5c 100755 --- a/examples/bulk_address_objects.py +++ b/examples/bulk_address_objects.py @@ -36,7 +36,6 @@ import panos.firewall import panos.objects - HOSTNAME = "127.0.0.1" USERNAME = "admin" PASSWORD = "admin" diff --git a/examples/ensure_security_rule.py b/examples/ensure_security_rule.py index cafbf314..c5dcdf0c 100755 --- a/examples/ensure_security_rule.py +++ b/examples/ensure_security_rule.py @@ -33,7 +33,6 @@ import panos.firewall import panos.policies - HOSTNAME = "127.0.0.1" USERNAME = "admin" PASSWORD = "admin" diff --git a/examples/prisma_access_create_remote_network.py b/examples/prisma_access_create_remote_network.py index c43a20fd..83906409 100644 --- a/examples/prisma_access_create_remote_network.py +++ b/examples/prisma_access_create_remote_network.py @@ -25,6 +25,7 @@ To use the script, you need to replace the variables below with desired values. """ + import logging import os import sys diff --git a/examples/prisma_access_list_RN_regions_bw.py b/examples/prisma_access_list_RN_regions_bw.py index 4d5985aa..ddf4fe8f 100644 --- a/examples/prisma_access_list_RN_regions_bw.py +++ b/examples/prisma_access_list_RN_regions_bw.py @@ -20,10 +20,11 @@ prisma_access_list_RN_regions_bw.py ========== -This script is an example on how to retrieve list of prisma access +This script is an example on how to retrieve list of prisma access remote networks locations and bandwidth allocation and print it. """ + __author__ = "bmigette" diff --git a/examples/prisma_access_show_jobs_status.py b/examples/prisma_access_show_jobs_status.py index 24779e7f..081a262f 100644 --- a/examples/prisma_access_show_jobs_status.py +++ b/examples/prisma_access_show_jobs_status.py @@ -20,10 +20,11 @@ prisma_access_show_jobs_status.py ========== -This script is an example on how to retrieve list of prisma access +This script is an example on how to retrieve list of prisma access jobs (commit and push), and how to get details of a specific job """ + __author__ = "bmigette" diff --git a/examples/prisma_access_show_remote_net_per_tenant.py b/examples/prisma_access_show_remote_net_per_tenant.py index 4c904b5b..2263bf97 100644 --- a/examples/prisma_access_show_remote_net_per_tenant.py +++ b/examples/prisma_access_show_remote_net_per_tenant.py @@ -20,10 +20,11 @@ prisma_access_show_remote_net_per_tenant.py ========== -This script is an example on how to retrieve list of prisma access +This script is an example on how to retrieve list of prisma access tenants and their remote networks """ + __author__ = "bmigette" diff --git a/panos/base.py b/panos/base.py index 03aae887..350dfa1b 100644 --- a/panos/base.py +++ b/panos/base.py @@ -29,6 +29,7 @@ import time import xml.dom.minidom as minidom import xml.etree.ElementTree as ET +from xml.sax.saxutils import escape as xml_escape import pan.commit import pan.xapi @@ -3386,7 +3387,7 @@ def create_import(self, vsys=None): if vsys != "shared" and vsys is not None and self.XPATH_IMPORT is not None: xpath = self.xpath_import_base(vsys) - element = "{0}".format(self.uid) + element = "{0}".format(xml_escape(self.uid)) device = self.nearest_pandevice() device.active().xapi.set(xpath, element, retry_on_peer=True) @@ -3830,9 +3831,11 @@ def make_method(cls, super_method_name, super_method): def method(self, *args, **kwargs): retry_on_peer = kwargs.pop( "retry_on_peer", - True - if super_method_name not in ("keygen", "op", "ad_hoc", "export") - else False, + ( + True + if super_method_name not in ("keygen", "op", "ad_hoc", "export") + else False + ), ) apply_on_peer = kwargs.pop("apply_on_peer", False) ha_peer = self.pan_device.ha_peer diff --git a/panos/userid.py b/panos/userid.py index 4e42580f..7e06d5a3 100644 --- a/panos/userid.py +++ b/panos/userid.py @@ -19,6 +19,7 @@ import xml.etree.ElementTree as ET from copy import deepcopy +from xml.sax.saxutils import escape, quoteattr from pan.xapi import PanXapiError @@ -550,7 +551,7 @@ def get_groups(self, style=None): "", ] if style is not None: - msg.append("".format(style)) + msg.append("".format(quoteattr(style))) msg.append("") cmd = "".join(msg) vsys = self.device.vsys or "vsys1" @@ -594,7 +595,11 @@ def get_group_members(self, group): list """ - cmd = "" + group + "" + cmd = ( + "" + + escape(group) + + "" + ) vsys = self.device.vsys or "vsys1" resp = self.device.op(cmd, vsys=vsys, cmd_xml=False) @@ -644,12 +649,12 @@ def get_user_tags(self, user=None, prefix=None): if user is None: msg.append( "" - + "{0}".format(limit) - + "{0}".format(start) + + "{0}".format(escape(str(limit))) + + "{0}".format(escape(str(start))) + "" ) else: - msg.append("{0}".format(user)) + msg.append("{0}".format(escape(user))) msg.append("") cmd = ET.fromstring("".join(msg)) diff --git a/tests/live/conftest.py b/tests/live/conftest.py index 8d379d35..6ed5b27f 100644 --- a/tests/live/conftest.py +++ b/tests/live/conftest.py @@ -6,7 +6,6 @@ from panos import firewall from panos import panorama - live_devices = {} one_fw_per_version = [] one_device_type_per_version = [] diff --git a/tests/test_base.py b/tests/test_base.py index 1eb0c6db..2ba1437d 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -23,7 +23,8 @@ import pan.xapi import panos.base as Base import panos.errors as Err - +import panos.firewall +import panos.network OBJECT_NAME = "MyObjectName" VSYS = "vsys1" @@ -1701,5 +1702,30 @@ def test_times_out(self, mocksleep): assert mocksleep.call_count == 0 +class TestVsysOperationsCreateImport(unittest.TestCase): + def test_create_import_escapes_uid(self): + evil_name = "zone&" + fw = panos.firewall.Firewall( + "fw1", "user", "passwd", "authkey", serial="S", vsys="vsys1" + ) + vlan = panos.network.Vlan(evil_name) + fw.add(vlan) + + with mock.patch.object( + panos.network.Vlan, + "XPATH_IMPORT", + new_callable=mock.PropertyMock, + return_value="/network/vlan", + ): + fw.xapi.set = mock.Mock() + vlan.create_import("vsys1") + + args, _ = fw.xapi.set.call_args + element = args[1] + self.assertEqual(element, "zone&<bad>") + parsed = ET.fromstring(element) + self.assertEqual(parsed.text, evil_name) + + if __name__ == "__main__": unittest.main() diff --git a/tests/test_classic_objects.py b/tests/test_classic_objects.py index dfae9f29..7411772e 100644 --- a/tests/test_classic_objects.py +++ b/tests/test_classic_objects.py @@ -1,4 +1,4 @@ -""" Tests specifically for classic objects. +"""Tests specifically for classic objects. Note: All tests in this file are for classic objects. These are to try and make sure that the fix for classic objects with a self.NAME == None still @@ -6,7 +6,6 @@ """ - from panos import device diff --git a/tests/test_device_profile_xpaths.py b/tests/test_device_profile_xpaths.py index 05bcc01a..19075c26 100644 --- a/tests/test_device_profile_xpaths.py +++ b/tests/test_device_profile_xpaths.py @@ -57,7 +57,6 @@ from panos.panorama import Panorama from panos.panorama import Template - OBJECTS = { SnmpServerProfile: [None, SnmpV2cServer, SnmpV3Server], EmailServerProfile: [ diff --git a/tests/test_opstate.py b/tests/test_opstate.py index bf579e2f..da8f08b3 100644 --- a/tests/test_opstate.py +++ b/tests/test_opstate.py @@ -12,7 +12,6 @@ from panos.policies import SecurityRule from panos.policies import AuditCommentLog - HIT_COUNT_PREFIX = """ diff --git a/tests/test_predefined.py b/tests/test_predefined.py index 1a1b913a..992e302a 100644 --- a/tests/test_predefined.py +++ b/tests/test_predefined.py @@ -13,7 +13,6 @@ from panos.objects import ServiceObject from panos.objects import Tag - PREDEFINED_CONFIG = { ApplicationContainer: { "single": "refresh_application", diff --git a/tests/test_standards.py b/tests/test_standards.py index c284b448..373632d3 100644 --- a/tests/test_standards.py +++ b/tests/test_standards.py @@ -18,7 +18,6 @@ from panos import predefined from panos import plugins - # Versioning constants. # # When checking versioning, this is the highest major version to check for diff --git a/tests/test_userid.py b/tests/test_userid.py index a3a2052a..c2d0bf6d 100644 --- a/tests/test_userid.py +++ b/tests/test_userid.py @@ -17,6 +17,7 @@ import mock import sys import unittest +import xml.etree.ElementTree as ET import panos.firewall import panos.panorama @@ -99,6 +100,63 @@ def test_batch_untag_user(self): ], ) + def test_get_user_tags_escapes_user(self): + evil = "admin999999x" + fw = panos.firewall.Firewall( + "fw1", "user", "passwd", "authkey", serial="Serial", vsys="vsys1" + ) + empty_response = ET.fromstring( + b"" + ) + fw.op = mock.Mock(return_value=empty_response) + + fw.userid.get_user_tags(user=evil) + + sent = fw.op.call_args[1]["cmd"] + parsed = ET.fromstring(sent) + users = parsed.findall("./object/registered-user/user") + self.assertEqual(len(users), 1) + self.assertEqual(users[0].text, evil) + self.assertIsNone(parsed.find("./object/registered-user/all")) + + def test_get_groups_escapes_style(self): + evil = "x'/>\nTotal: 0\n" + ) + fw.op = mock.Mock(return_value=empty_response) + + fw.userid.get_groups(style=evil) + + sent = fw.op.call_args[0][0] + parsed = ET.fromstring(sent) + entries = parsed.findall("./user/group/list/entry") + self.assertEqual(len(entries), 1) + self.assertEqual(entries[0].attrib["name"], evil) + self.assertIsNone(parsed.find(".//evil")) + + def test_get_group_members_escapes_group(self): + evil = "gx" + fw = panos.firewall.Firewall( + "fw1", "user", "passwd", "authkey", serial="Serial", vsys="vsys1" + ) + empty_response = ET.fromstring( + b"\nTotal: 0\n" + ) + fw.op = mock.Mock(return_value=empty_response) + + fw.userid.get_group_members(evil) + + sent = fw.op.call_args[0][0] + parsed = ET.fromstring(sent) + names = parsed.findall("./user/group/name") + self.assertEqual(len(names), 1) + self.assertEqual(names[0].text, evil) + self.assertIsNone(parsed.find(".//evil")) + if __name__ == "__main__": unittest.main()