Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# AGENTS.md

This file provides guidance to AI agents when working with code in this
repository.

## Commands

Dependencies are managed with Poetry. Prefix commands with `poetry run` (or
activate the venv).

- `poetry install` — set up the dev environment
- `poetry run make test` — run pytest
- `poetry run pytest tests/test_firewall.py::TestClass::test_name` — run a
single test
- `poetry run make lint` — flake8 over `panos` and `tests`
- `poetry run make check-format` / `make format` — check / apply black + isort
- `poetry run make bandit` — security scan
- `poetry run make test-all` — run the full tox matrix

## Architecture

`pan-os-python` is an object-oriented SDK that mirrors the PAN-OS configuration
tree. Users build a tree of objects rooted at a device, then call CRUD methods
that translate to XML API calls against a firewall or Panorama.

Core abstractions live in `panos/base.py`:

- `PanObject` — base node. Every config object has an `XPATH`, optional `SUFFIX`
(`ENTRY`/`MEMBER`), a `ROOT` (DEVICE / VSYS / PANORAMA / …), and a
`CHILDTYPES` tuple declaring which classes may be added under it. `add()` /
`remove()` / `find()` build and traverse the tree; `xpath()` is composed by
walking parents up to the `PanDevice` at the root.
- `VersionedPanObject` — subclass used for almost all real config objects.
Parameters are declared via `_setup()` using `VersionedParamPath` entries,
each with per-PAN-OS-version XML paths. The object renders different XML
depending on the connected device's version.
- `VsysOperations` — mixin behavior for objects that live inside a vsys and need
import/export handling.
- `PanDevice` — base for `firewall.Firewall` and `panorama.Panorama`. Owns the
`pan.xapi` connection, version detection, HA state, commit/op helpers, and is
always the root of the tree.

Module layout follows the PAN-OS config hierarchy: `device.py`, `network.py`,
`objects.py`, `policies.py`, `ha.py`, `panorama.py`, `predefined.py`,
`plugins.py`, `userid.py`, `updater.py`, `errors.py`. Adding a new config node
usually means subclassing `VersionedPanObject` (or `VsysOperations`) in the
right module, defining `_setup()`, and adding it to the parent's `CHILDTYPES`.

Tests under `tests/` are mostly offline unit tests that mock `pan.xapi`;
`tests/live/` and `test_integration.py` hit real devices and aren't run by
default.

## Releases & commit style

This repo uses semantic-release driven by Conventional Commits (`feat:`, `fix:`,
`chore:`, …). Commit messages determine version bumps and changelog entries —
keep the prefix correct.
109 changes: 75 additions & 34 deletions panos/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1506,42 +1506,14 @@ def _convert_var(cls, value, vartype):
elif vartype == "bool":
return yesno(value)

def _set_reference(
self,
reference_name,
reference_type,
reference_var,
var_type,
exclusive,
refresh,
update,
running_config,
return_type,
name_only,
**kwargs
def _get_all_objects_by_type(
self, reference_type, refresh, running_config, name_only, reference_var
):
"""Used by helper methods to set references between objects

For example, set_zone() would set the zone for an interface by creating a reference from
the zone to the interface. If the desired reference already exists then nothing happens.

This function has two modes: refresh=True and refresh=False. You
should only ever use refresh=False if:

1) all reference objects are in the current pan-os-python object tree
2) all reference objects are children attached to nearest_pandevice()
3) this is for firewall only, not a template / template stack
4) you're using firewall.vsys, not the device.Vsys object

If any of the above do not apply, you should be using refresh=True.

"""
Returns all of the objects of the given type, ensuring that `reference_var` is refreshed and available to be
set, and the parent pandevice
"""
parent = None
update_needed = False

if return_type not in ("bool", "object"):
raise ValueError("Unknown return_type specified: {0}".format(return_type))

if refresh:
"""
pan-os-python is too flexible: users can use simple vsys mode or a
Expand Down Expand Up @@ -1616,6 +1588,22 @@ def _set_reference(
parent = self.nearest_pandevice()
allobjects = parent.findall(reference_type)

return parent, allobjects

def _update_reference_in_objects(
self,
parent,
allobjects,
reference_name,
reference_type,
reference_var,
var_type,
exclusive,
update,
return_type,
**kwargs
):
update_needed = False
# Find any current references to self and remove them, unless it is the desired reference
if exclusive:
for obj in allobjects:
Expand Down Expand Up @@ -1678,6 +1666,57 @@ def _set_reference(
if return_type == "bool":
return update_needed

def _set_reference(
self,
reference_name,
reference_type,
reference_var,
var_type,
exclusive,
refresh,
update,
running_config,
return_type,
name_only,
**kwargs
):
"""Used by helper methods to set references between objects

For example, set_zone() would set the zone for an interface by creating a reference from
the zone to the interface. If the desired reference already exists then nothing happens.

This function has two modes: refresh=True and refresh=False. You
should only ever use refresh=False if:

1) all reference objects are in the current pan-os-python object tree
2) all reference objects are children attached to nearest_pandevice()
3) this is for firewall only, not a template / template stack
4) you're using firewall.vsys, not the device.Vsys object

If any of the above do not apply, you should be using refresh=True.

"""
if return_type not in ("bool", "object"):
raise ValueError("Unknown return_type specified: {0}".format(return_type))

# Get all the objects and the parent
parent, allobjects = self._get_all_objects_by_type(
reference_type, refresh, running_config, name_only, reference_var
)

return self._update_reference_in_objects(
parent,
allobjects,
reference_name,
reference_type,
reference_var,
var_type,
exclusive,
update,
return_type,
**kwargs
)

def xml_merge(self, root, elements):
"""Merges other elements into the root element.

Expand Down Expand Up @@ -3479,6 +3518,7 @@ def set_vsys(
"vlan": "vlans",
"virtual-wire": "virtual_wires",
"virtual-router": "virtual_routers",
"logical-router": "logical_routers",
"interface": "interface",
}
for key, param_name in import_to_vsys_param.items():
Expand Down Expand Up @@ -4900,7 +4940,7 @@ def _commit(
except AttributeError:
if exception:
raise err.PanCommitNotNeeded("Commit not needed", pan_device=self)
else:
elif commit_response.find("./msg/line") is not None:
# By getting here, there was no "./result/job" in the commit response,
# and there was no exception raised either, so capture the response message
commit_response_msg = commit_response.find("./msg/line").text
Expand All @@ -4921,6 +4961,7 @@ def _commit(
"messages": [commit_response_msg],
}
return log_collector_group_push_result
else:
return
if not sync:
# Don't synchronize, just return
Expand Down
5 changes: 5 additions & 0 deletions panos/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class Vsys(VersionedPanObject):
vlans (list): A list of strings of VLANs
virtual_wires (list): A list of strings of virtual wires
virtual_routers (list): A list of strings of virtual routers
logical_routers (list): (10.2+) A list of strings of logical routers
visible_vsys (list): A list of strings of the vsys visible
dns_proxy (str): DNS Proxy server
decrypt_forwarding (bool): Allow forwarding of decrypted content
Expand Down Expand Up @@ -188,6 +189,10 @@ def _setup(self):
path="import/network/virtual-router",
)
)
params.append(VersionedParamPath("logical_routers", exclude=True))
params[-1].add_profile(
"10.2.0", path="import/network/logical-router", vartype="member"
)
params.append(
VersionedParamPath(
"visible_vsys", vartype="member", path="import/visible-vsys"
Expand Down
9 changes: 3 additions & 6 deletions panos/firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,6 @@ def is_partial(self):
self.exclude_device_and_network,
self.exclude_shared_objects,
self.exclude_policy_and_objects,
self.force,
]

return any(x for x in pp_list)
Expand Down Expand Up @@ -622,11 +621,9 @@ def element(self):
ET.SubElement(partial, "shared-object").text = "excluded"
if self.exclude_policy_and_objects:
ET.SubElement(partial, "policy-and-objects").text = "excluded"
fe.append(partial)

if self.force:
fe = ET.SubElement(root, "force")
fe.append(partial)
else:
root.append(partial)
if self.force:
fe = ET.SubElement(root, "force")

return root
96 changes: 86 additions & 10 deletions panos/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,83 @@ def set_vlan(
False,
)

def set_logical_router(
self,
lr_name,
refresh=False,
update=False,
running_config=False,
return_type="object",
vrf_name="default",
**kwargs
):
"""adds the given interface to the VRF by name.

This is more complicated than `set_virtual_router` as the logical routers have child VRF child elements, which
is where the interfaces are configured.

This will use the VRF name 'default' by default.

Args:
lr_name (str): The name of the LogicalRouter or
a :class:`panos.network.LogicalRouter` instance
refresh (bool): Refresh the relevant current state of the device
before taking action (Default: False)
update (bool): Apply the changes to the device (Default: False)
running_config: If refresh is True, refresh from the running
configuration (Default: False)
vrf_name (str): Sets the vrf inside the LR. (Default: 'default')
return_type (str): Specify what this function returns, can be
either 'object' (the default) or 'bool'. If this is 'object',
then the return value is the LogicalRouter in question. If
this is 'bool', then the return value is a boolean that tells
you about if the live device needs updates (update=False) or
was updated (update=True).
"""

# First we get all the logical routers
parent, all_logical_routers = self._get_all_objects_by_type(
LogicalRouter,
refresh,
running_config,
name_only=False,
reference_var="vrf",
)
target_lr: LogicalRouter | None

target_lr = next((lr for lr in all_logical_routers if lr.uid == lr_name), None)
if not target_lr:
# If the LR isn't found, create it instead
target_lr = LogicalRouter(name=lr_name)
parent.add(target_lr)
vrf = Vrf(name=vrf_name)
target_lr.add(vrf)
target_lr.create()

# Remove interface from other LRs first
for lr in all_logical_routers:
Vrf.refreshall(lr)
if lr.name != lr_name:
for vrf in lr.findall(Vrf):
if vrf.interface:
if self.name in vrf.interface:
vrf.interface.remove(self.name)
if update:
vrf.update("interface")

return self._update_reference_in_objects(
target_lr,
target_lr.findall(Vrf),
reference_name=vrf_name,
reference_var="interface",
reference_type=Vrf,
var_type="list",
return_type=return_type,
update=update,
exclusive=True,
**kwargs
)

def get_counters(self):
"""Pull the counters for an interface

Expand Down Expand Up @@ -3017,10 +3094,9 @@ class BgpPeerGroup(VersionedPanObject):
aggregated_confed_as_path (bool): the peers understand aggregated confederation AS path
soft_reset_with_stored_info (bool): soft reset with stored info
type (str): peer group type I('ebgp')/I('ibgp')/I('ebgp-confed')/I('ibgp-confed')
export_nexthop (str): export locally resolved nexthop I('resolve')/I('use-self')
import_nexthop (str): override nexthop with peer address I('original')/I('use-peer'), only with 'ebgp'
remove_private_as (bool): remove private AS when exporting route, only with 'ebgp'

import_nexthop (str): override nexthop with peer address I('original')/I('use-peer'), only with 'ebgp'
export_nexthop (str): export locally resolved nexthop I('resolve')/I('use-self')
"""

SUFFIX = ENTRY
Expand All @@ -3046,9 +3122,10 @@ def _setup(self):
)
params.append(
VersionedParamPath(
"export_nexthop",
path="type/{type}/export-nexthop",
values=("resolve", "use-self"),
"remove_private_as",
condition={"type": "ebgp"},
path="type/{type}/remove-private-as",
vartype="yesno",
)
)
params.append(
Expand All @@ -3061,10 +3138,9 @@ def _setup(self):
)
params.append(
VersionedParamPath(
"remove_private_as",
condition={"type": "ebgp"},
path="type/{type}/remove-private-as",
vartype="yesno",
"export_nexthop",
path="type/{type}/export-nexthop",
values=("resolve", "use-self"),
)
)

Expand Down
Loading
Loading