diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 00000000..71e2868c --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,57 @@ +# AGENTS.md + +This file provides guidance to AI agents when working with code in this +repository. + +## Commands + +Dependencies are managed with Poetry. Prefix commands with `poetry run` (or +activate the venv). + +- `poetry install` — set up the dev environment +- `poetry run make test` — run pytest +- `poetry run pytest tests/test_firewall.py::TestClass::test_name` — run a + single test +- `poetry run make lint` — flake8 over `panos` and `tests` +- `poetry run make check-format` / `make format` — check / apply black + isort +- `poetry run make bandit` — security scan +- `poetry run make test-all` — run the full tox matrix + +## Architecture + +`pan-os-python` is an object-oriented SDK that mirrors the PAN-OS configuration +tree. Users build a tree of objects rooted at a device, then call CRUD methods +that translate to XML API calls against a firewall or Panorama. + +Core abstractions live in `panos/base.py`: + +- `PanObject` — base node. Every config object has an `XPATH`, optional `SUFFIX` + (`ENTRY`/`MEMBER`), a `ROOT` (DEVICE / VSYS / PANORAMA / …), and a + `CHILDTYPES` tuple declaring which classes may be added under it. `add()` / + `remove()` / `find()` build and traverse the tree; `xpath()` is composed by + walking parents up to the `PanDevice` at the root. +- `VersionedPanObject` — subclass used for almost all real config objects. + Parameters are declared via `_setup()` using `VersionedParamPath` entries, + each with per-PAN-OS-version XML paths. The object renders different XML + depending on the connected device's version. +- `VsysOperations` — mixin behavior for objects that live inside a vsys and need + import/export handling. +- `PanDevice` — base for `firewall.Firewall` and `panorama.Panorama`. Owns the + `pan.xapi` connection, version detection, HA state, commit/op helpers, and is + always the root of the tree. + +Module layout follows the PAN-OS config hierarchy: `device.py`, `network.py`, +`objects.py`, `policies.py`, `ha.py`, `panorama.py`, `predefined.py`, +`plugins.py`, `userid.py`, `updater.py`, `errors.py`. Adding a new config node +usually means subclassing `VersionedPanObject` (or `VsysOperations`) in the +right module, defining `_setup()`, and adding it to the parent's `CHILDTYPES`. + +Tests under `tests/` are mostly offline unit tests that mock `pan.xapi`; +`tests/live/` and `test_integration.py` hit real devices and aren't run by +default. + +## Releases & commit style + +This repo uses semantic-release driven by Conventional Commits (`feat:`, `fix:`, +`chore:`, …). Commit messages determine version bumps and changelog entries — +keep the prefix correct. diff --git a/panos/base.py b/panos/base.py index d8505e91..0f3d9f77 100644 --- a/panos/base.py +++ b/panos/base.py @@ -1506,42 +1506,14 @@ def _convert_var(cls, value, vartype): elif vartype == "bool": return yesno(value) - def _set_reference( - self, - reference_name, - reference_type, - reference_var, - var_type, - exclusive, - refresh, - update, - running_config, - return_type, - name_only, - **kwargs + def _get_all_objects_by_type( + self, reference_type, refresh, running_config, name_only, reference_var ): - """Used by helper methods to set references between objects - - For example, set_zone() would set the zone for an interface by creating a reference from - the zone to the interface. If the desired reference already exists then nothing happens. - - This function has two modes: refresh=True and refresh=False. You - should only ever use refresh=False if: - - 1) all reference objects are in the current pan-os-python object tree - 2) all reference objects are children attached to nearest_pandevice() - 3) this is for firewall only, not a template / template stack - 4) you're using firewall.vsys, not the device.Vsys object - - If any of the above do not apply, you should be using refresh=True. - + """ + Returns all of the objects of the given type, ensuring that `reference_var` is refreshed and available to be + set, and the parent pandevice """ parent = None - update_needed = False - - if return_type not in ("bool", "object"): - raise ValueError("Unknown return_type specified: {0}".format(return_type)) - if refresh: """ pan-os-python is too flexible: users can use simple vsys mode or a @@ -1616,6 +1588,22 @@ def _set_reference( parent = self.nearest_pandevice() allobjects = parent.findall(reference_type) + return parent, allobjects + + def _update_reference_in_objects( + self, + parent, + allobjects, + reference_name, + reference_type, + reference_var, + var_type, + exclusive, + update, + return_type, + **kwargs + ): + update_needed = False # Find any current references to self and remove them, unless it is the desired reference if exclusive: for obj in allobjects: @@ -1678,6 +1666,57 @@ def _set_reference( if return_type == "bool": return update_needed + def _set_reference( + self, + reference_name, + reference_type, + reference_var, + var_type, + exclusive, + refresh, + update, + running_config, + return_type, + name_only, + **kwargs + ): + """Used by helper methods to set references between objects + + For example, set_zone() would set the zone for an interface by creating a reference from + the zone to the interface. If the desired reference already exists then nothing happens. + + This function has two modes: refresh=True and refresh=False. You + should only ever use refresh=False if: + + 1) all reference objects are in the current pan-os-python object tree + 2) all reference objects are children attached to nearest_pandevice() + 3) this is for firewall only, not a template / template stack + 4) you're using firewall.vsys, not the device.Vsys object + + If any of the above do not apply, you should be using refresh=True. + + """ + if return_type not in ("bool", "object"): + raise ValueError("Unknown return_type specified: {0}".format(return_type)) + + # Get all the objects and the parent + parent, allobjects = self._get_all_objects_by_type( + reference_type, refresh, running_config, name_only, reference_var + ) + + return self._update_reference_in_objects( + parent, + allobjects, + reference_name, + reference_type, + reference_var, + var_type, + exclusive, + update, + return_type, + **kwargs + ) + def xml_merge(self, root, elements): """Merges other elements into the root element. @@ -3479,6 +3518,7 @@ def set_vsys( "vlan": "vlans", "virtual-wire": "virtual_wires", "virtual-router": "virtual_routers", + "logical-router": "logical_routers", "interface": "interface", } for key, param_name in import_to_vsys_param.items(): @@ -4900,7 +4940,7 @@ def _commit( except AttributeError: if exception: raise err.PanCommitNotNeeded("Commit not needed", pan_device=self) - else: + elif commit_response.find("./msg/line") is not None: # By getting here, there was no "./result/job" in the commit response, # and there was no exception raised either, so capture the response message commit_response_msg = commit_response.find("./msg/line").text @@ -4921,6 +4961,7 @@ def _commit( "messages": [commit_response_msg], } return log_collector_group_push_result + else: return if not sync: # Don't synchronize, just return diff --git a/panos/device.py b/panos/device.py index 704c9aa6..2a288a90 100644 --- a/panos/device.py +++ b/panos/device.py @@ -99,6 +99,7 @@ class Vsys(VersionedPanObject): vlans (list): A list of strings of VLANs virtual_wires (list): A list of strings of virtual wires virtual_routers (list): A list of strings of virtual routers + logical_routers (list): (10.2+) A list of strings of logical routers visible_vsys (list): A list of strings of the vsys visible dns_proxy (str): DNS Proxy server decrypt_forwarding (bool): Allow forwarding of decrypted content @@ -188,6 +189,10 @@ def _setup(self): path="import/network/virtual-router", ) ) + params.append(VersionedParamPath("logical_routers", exclude=True)) + params[-1].add_profile( + "10.2.0", path="import/network/logical-router", vartype="member" + ) params.append( VersionedParamPath( "visible_vsys", vartype="member", path="import/visible-vsys" diff --git a/panos/firewall.py b/panos/firewall.py index aafb7564..5349f019 100644 --- a/panos/firewall.py +++ b/panos/firewall.py @@ -591,7 +591,6 @@ def is_partial(self): self.exclude_device_and_network, self.exclude_shared_objects, self.exclude_policy_and_objects, - self.force, ] return any(x for x in pp_list) @@ -622,11 +621,9 @@ def element(self): ET.SubElement(partial, "shared-object").text = "excluded" if self.exclude_policy_and_objects: ET.SubElement(partial, "policy-and-objects").text = "excluded" + fe.append(partial) - if self.force: - fe = ET.SubElement(root, "force") - fe.append(partial) - else: - root.append(partial) + if self.force: + fe = ET.SubElement(root, "force") return root diff --git a/panos/network.py b/panos/network.py index 1a9e1ad3..b37bebab 100644 --- a/panos/network.py +++ b/panos/network.py @@ -522,6 +522,83 @@ def set_vlan( False, ) + def set_logical_router( + self, + lr_name, + refresh=False, + update=False, + running_config=False, + return_type="object", + vrf_name="default", + **kwargs + ): + """adds the given interface to the VRF by name. + + This is more complicated than `set_virtual_router` as the logical routers have child VRF child elements, which + is where the interfaces are configured. + + This will use the VRF name 'default' by default. + + Args: + lr_name (str): The name of the LogicalRouter or + a :class:`panos.network.LogicalRouter` instance + refresh (bool): Refresh the relevant current state of the device + before taking action (Default: False) + update (bool): Apply the changes to the device (Default: False) + running_config: If refresh is True, refresh from the running + configuration (Default: False) + vrf_name (str): Sets the vrf inside the LR. (Default: 'default') + return_type (str): Specify what this function returns, can be + either 'object' (the default) or 'bool'. If this is 'object', + then the return value is the LogicalRouter in question. If + this is 'bool', then the return value is a boolean that tells + you about if the live device needs updates (update=False) or + was updated (update=True). + """ + + # First we get all the logical routers + parent, all_logical_routers = self._get_all_objects_by_type( + LogicalRouter, + refresh, + running_config, + name_only=False, + reference_var="vrf", + ) + target_lr: LogicalRouter | None + + target_lr = next((lr for lr in all_logical_routers if lr.uid == lr_name), None) + if not target_lr: + # If the LR isn't found, create it instead + target_lr = LogicalRouter(name=lr_name) + parent.add(target_lr) + vrf = Vrf(name=vrf_name) + target_lr.add(vrf) + target_lr.create() + + # Remove interface from other LRs first + for lr in all_logical_routers: + Vrf.refreshall(lr) + if lr.name != lr_name: + for vrf in lr.findall(Vrf): + if vrf.interface: + if self.name in vrf.interface: + vrf.interface.remove(self.name) + if update: + vrf.update("interface") + + return self._update_reference_in_objects( + target_lr, + target_lr.findall(Vrf), + reference_name=vrf_name, + reference_var="interface", + reference_type=Vrf, + var_type="list", + return_type=return_type, + update=update, + exclusive=True, + **kwargs + ) + def get_counters(self): """Pull the counters for an interface @@ -3017,10 +3094,9 @@ class BgpPeerGroup(VersionedPanObject): aggregated_confed_as_path (bool): the peers understand aggregated confederation AS path soft_reset_with_stored_info (bool): soft reset with stored info type (str): peer group type I('ebgp')/I('ibgp')/I('ebgp-confed')/I('ibgp-confed') - export_nexthop (str): export locally resolved nexthop I('resolve')/I('use-self') - import_nexthop (str): override nexthop with peer address I('original')/I('use-peer'), only with 'ebgp' remove_private_as (bool): remove private AS when exporting route, only with 'ebgp' - + import_nexthop (str): override nexthop with peer address I('original')/I('use-peer'), only with 'ebgp' + export_nexthop (str): export locally resolved nexthop I('resolve')/I('use-self') """ SUFFIX = ENTRY @@ -3046,9 +3122,10 @@ def _setup(self): ) params.append( VersionedParamPath( - "export_nexthop", - path="type/{type}/export-nexthop", - values=("resolve", "use-self"), + "remove_private_as", + condition={"type": "ebgp"}, + path="type/{type}/remove-private-as", + vartype="yesno", ) ) params.append( @@ -3061,10 +3138,9 @@ def _setup(self): ) params.append( VersionedParamPath( - "remove_private_as", - condition={"type": "ebgp"}, - path="type/{type}/remove-private-as", - vartype="yesno", + "export_nexthop", + path="type/{type}/export-nexthop", + values=("resolve", "use-self"), ) ) diff --git a/panos/panorama.py b/panos/panorama.py index 64600ecf..16e36055 100644 --- a/panos/panorama.py +++ b/panos/panorama.py @@ -990,7 +990,6 @@ def is_partial(self): self.log_collector_groups, self.exclude_device_and_network, self.exclude_shared_objects, - self.force, ] return any(x for x in pp_list) @@ -1031,12 +1030,10 @@ def element(self): ET.SubElement(partial, "device-and-network").text = "excluded" if self.exclude_shared_objects: ET.SubElement(partial, "shared-object").text = "excluded" + root.append(partial) - if self.force: - fe = ET.SubElement(root, "force") - fe.append(partial) - else: - root.append(partial) + if self.force: + fe = ET.SubElement(root, "force") return root diff --git a/tests/live/test_network.py b/tests/live/test_network.py index 52bab2a1..8975d999 100644 --- a/tests/live/test_network.py +++ b/tests/live/test_network.py @@ -1,6 +1,9 @@ import random +import traceback +from inspect import trace from panos import device, network +from panos.network import Interface from tests.live import testlib @@ -587,6 +590,44 @@ def cleanup_dependencies(self, fw, state): pass +class TestVirtualRouterInterfaces(testlib.FwFlow): + def create_dependencies(self, fw, state): + # Disable ARE on the device so we're just using VR setup + advanced_routing_engine = device.AdvancedRoutingEngine(enable=False) + fw.add(advanced_routing_engine) + advanced_routing_engine.create() + + state.eth_obj = None + state.eth = testlib.get_available_interfaces(fw, max_interfaces=7)[0] + + state.eth_obj = network.EthernetInterface( + state.eth, "layer3", testlib.random_ip("/24") + ) + fw.add(state.eth_obj) + state.eth_obj.create() + + state.obj = network.VirtualRouter(testlib.random_name()) + fw.add(state.obj) + + def setup_state_obj(self, fw, state): + pass + + def test_10_set_vr(self, fw, state_map): + state = self.sanity(fw, state_map) + state.eth_obj.set_virtual_router(state.obj.name) + + def cleanup_dependencies(self, fw, state): + try: + state.vr.delete() + except Exception: + pass + + try: + state.eth_obj.delete() + except Exception: + pass + + class TestStaticRouteV6(testlib.FwFlow): def create_dependencies(self, fw, state): state.eth_obj = None @@ -1659,13 +1700,20 @@ def create_dependencies(self, fw, state): state.advanced_routing_engine_obj.create() state.eth_obj = None - state.eth = testlib.get_available_interfaces(fw)[0] + available_interfaces = testlib.get_available_interfaces(fw, 2, 7) + state.eth = available_interfaces[0] + state.eth_2 = available_interfaces[1] state.eth_obj = network.EthernetInterface( state.eth, "layer3", testlib.random_ip("/24") ) + state.eth_obj_2 = network.EthernetInterface( + state.eth_2, "layer3", testlib.random_ip("/24") + ) fw.add(state.eth_obj) + fw.add(state.eth_obj_2) state.eth_obj.create() + state.eth_obj_2.create() def setup_state_obj(self, fw, state): vrf = network.Vrf( @@ -1690,8 +1738,37 @@ def setup_state_obj(self, fw, state): ) lr = network.LogicalRouter(testlib.random_name()) lr.add(vrf) + + lr_2 = network.LogicalRouter(testlib.random_name()) + vrf2 = network.Vrf("default") + lr_2.add(vrf2) + state.obj = lr + state.obj_2 = lr_2 fw.add(state.obj) + fw.add(state.obj_2) + state.obj.create() + state.obj_2.create() + + lr_2.refresh() + + def test_10_set_lr_for_interface(self, fw, state_map): + """Test setting the LR for an interface instead of the other way around""" + state = self.sanity(fw, state_map) + eth: Interface = state.eth_obj_2 + eth.set_logical_router(state.obj_2.name, update=True) + + def test_11_change_lr_for_interface(self, fw, state_map): + """Test setting the LR for an interface instead of the other way around""" + state = self.sanity(fw, state_map) + eth: Interface = state.eth_obj_2 + eth.set_logical_router(state.obj.name, update=True, refresh=True) + + def test_11_change_lr_for_interface_add_new_lr(self, fw, state_map): + """Test setting the LR for an interface instead of the other way around""" + state = self.sanity(fw, state_map) + eth: Interface = state.eth_obj + eth.set_logical_router("test-new-lr", update=True, refresh=True) def update_state_obj(self, fw, state): state.obj.ad_static = random.randint(10, 240) @@ -1699,9 +1776,16 @@ def update_state_obj(self, fw, state): def cleanup_dependencies(self, fw, state): try: + fw.add(state.obj) + fw.add(state.obj_2) + lrs = network.LogicalRouter.refreshall(fw) + for lr in lrs: + lr.delete() state.eth_obj.delete() - state.advanced_routing_engine_obj.delete() - except Exception: + state.eth_obj_2.delete() + except Exception as e: + print(traceback.format_exc()) + print(e) pass diff --git a/tests/live/testlib.py b/tests/live/testlib.py index 959f3781..e0c86935 100644 --- a/tests/live/testlib.py +++ b/tests/live/testlib.py @@ -1,4 +1,5 @@ import random +import traceback import pytest @@ -47,11 +48,11 @@ def random_mac(): return ":".join("{0:02x}".format(random.randint(0, 255)) for x in range(6)) -def get_available_interfaces(con, num=1): +def get_available_interfaces(con, num=1, max_interfaces=10): ifaces = network.EthernetInterface.refreshall(con, add=False) ifaces = set(x.name for x in ifaces) - all_interfaces = set("ethernet1/{0}".format(x) for x in range(1, 10)) + all_interfaces = set("ethernet1/{0}".format(x) for x in range(1, max_interfaces)) available = all_interfaces.difference(ifaces) ans = [] @@ -73,6 +74,8 @@ def test_01_setup_dependencies(self, fw, state_map): except Exception as e: print("SETUP ERROR: {0}".format(e)) state.err = True + print(traceback.format_exc()) + pytest.skip("Setup failed") def create_dependencies(self, fw, state):