Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions openwisp_controller/subnet_division/rule_types/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,36 @@ def _provision_receiver():
def destroyer_receiver(cls, instance, **kwargs):
cls.destroy_provisioned_subnets_ips(instance, **kwargs)

@staticmethod
def post_provision_handler(instance, provisioned, **kwargs):
@classmethod
def post_provision_handler(cls, instance, provisioned, **kwargs):
"""
This method should be overridden in inherited rule types to
perform any operation on provisioned subnets and IP addresses.
:param instance: object that triggered provisioning
:param provisioned: dictionary containing subnets and IP addresses
provisioned, None if nothing is provisioned
Hook for post-provisioning actions on subnets and IP addresses.

This method is intended to be extended by subclasses of rule types
to perform custom operations after subnets and IPs are provisioned.

Subnet provisioning is executed asynchronously in Celery workers.
If the device configuration references variables provided by the
subnet division rule, the current checksum may have been computed
using variable names instead of their provisioned values. In such cases,
`Config.checksum_db` (which tracks persisted configuration changes)
must be updated to reflect the actual provisioned values, and the
checksum cache invalidated to avoid stale data.

:param instance: The object that triggered the provisioning.
:param provisioned: Dictionary containing provisioned subnets and IPs,
or None if no provisioning occurred.
"""
pass
if not provisioned:
return
config = cls.get_config(instance)
config._invalidate_backend_instance_cache()
current_checksum = config.checksum
if current_checksum != config.checksum_db:
# Update checksum using the UPDATE query to avoid sending
# unnecessary signals that may be triggered by `save()` method.
config._update_checksum_db(current_checksum)
config.invalidate_checksum_cache()

@staticmethod
def subnet_provisioned_signal_emitter(instance, provisioned):
Expand Down
5 changes: 3 additions & 2 deletions openwisp_controller/subnet_division/rule_types/vpn.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ def provision_for_existing_objects(cls, rule_obj):
for vpn_client in qs:
cls.provision_receiver(instance=vpn_client, created=True)

@staticmethod
def post_provision_handler(instance, provisioned, **kwargs):
@classmethod
def post_provision_handler(cls, instance, provisioned, **kwargs):
super().post_provision_handler(instance, provisioned, **kwargs)
# Assign the first provisioned IP address to the VPNClient
# only when subnets and IPs have been provisioned
if provisioned and provisioned["ip_addresses"]:
Expand Down
140 changes: 137 additions & 3 deletions openwisp_controller/subnet_division/tests/test_admin.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
from unittest.mock import patch

from django.test import TestCase
from django.test import TestCase, TransactionTestCase
from django.urls import reverse
from swapper import load_model

from openwisp_controller.config.tests.utils import TestWireguardVpnMixin
from openwisp_controller.config.tests.test_admin import TestDeviceAdminMixin
from openwisp_controller.config.tests.utils import (
TestVpnX509Mixin,
TestWireguardVpnMixin,
)
from openwisp_users.tests.utils import TestMultitenantAdminMixin

from .helpers import SubnetDivisionAdminTestMixin
from .helpers import SubnetDivisionAdminTestMixin, SubnetDivisionTestMixin

Subnet = load_model("openwisp_ipam", "Subnet")
Device = load_model("config", "Device")
Config = load_model("config", "Config")


class TestSubnetAdmin(
Expand Down Expand Up @@ -257,3 +262,132 @@ def test_delete_device(self):
)
self.assertEqual(subnet_response.status_code, 200)
self.assertContains(subnet_response, self.config.device.name, 1)


class TestTransactionDeviceAdmin(
SubnetDivisionTestMixin,
TestVpnX509Mixin,
TestDeviceAdminMixin,
TransactionTestCase,
):
ipam_label = "openwisp_ipam"
config_label = "config"

def test_vpn_template_switch_checksum_db(self):
admin = self._create_admin()
self.client.force_login(admin)
org = self._get_org()
vpn1_subnet = self._get_master_subnet(organization=org, subnet="10.0.0.0/24")
self._get_vpn_subdivision_rule(
number_of_ips=1,
number_of_subnets=1,
organization=org,
master_subnet=vpn1_subnet,
label="VPN1",
)
vpn1 = self._create_vpn(name="vpn1", organization=org, subnet=vpn1_subnet)
vpn2_subnet = self._get_master_subnet(organization=org, subnet="10.0.1.0/24")
self._get_vpn_subdivision_rule(
number_of_ips=1,
number_of_subnets=1,
organization=org,
master_subnet=vpn2_subnet,
label="VPN2",
)
vpn2 = self._create_vpn(name="vpn2", organization=org, subnet=vpn2_subnet)
vpn1_template = self._create_template(
organization=org,
name="vpn1-template",
type="vpn",
vpn=vpn1,
default_values={
"VPN1_subnet1_ip1": "10.0.0.1",
"VPN1_prefix": "24",
"ifname": "tun0",
},
auto_cert=True,
config={},
)
vpn1_template.config["openvpn"][0]["dev"] = "{{ ifname }}"
vpn1_template.config.update(
{
"network": [
{
"config_name": "interface",
"config_value": "lan",
"ipaddr": "{{ VPN1_subnet1_ip1 }}",
"netmask": "255.255.255.240",
}
],
}
)
vpn1_template.full_clean()
vpn1_template.save()
vpn2_template = self._create_template(
organization=org,
name="vpn2-template",
type="vpn",
vpn=vpn2,
default_values={
"VPN2_subnet1_ip1": "10.0.1.1",
"VPN2_prefix": "32",
"ifname": "tun1",
},
auto_cert=True,
config={},
)
vpn2_template.config["openvpn"][0]["dev"] = "{{ ifname }}"
vpn2_template.config.update(
{
"network": [
{
"config_name": "interface",
"config_value": "lan",
"ipaddr": "{{ VPN2_subnet1_ip1 }}",
"netmask": "255.255.255.240",
}
],
}
)
vpn2_template.full_clean()
vpn2_template.save()
default_template = self._create_template(
name="default-template",
default=True,
)
path = reverse(f"admin:{self.config_label}_device_add")
params = self._get_device_params(org=org)
params.update(
{"config-0-templates": f"{default_template.pk},{vpn1_template.pk}"}
)
response = self.client.post(path, data=params, follow=True)
self.assertEqual(response.status_code, 200)
config = Config.objects.get(device__name=params["name"])
config.refresh_from_db()
config._invalidate_backend_instance_cache()
initial_checksum = config.checksum
self.assertEqual(config.checksum_db, initial_checksum)
self.assertEqual(config.vpnclient_set.count(), 1)
self.assertEqual(config.vpnclient_set.first().vpn, vpn1)

path = reverse(
f"admin:{self.config_label}_device_change", args=[config.device_id]
)
params.update(
{
"config-0-templates": f"{default_template.pk},{vpn2_template.pk}",
"config-0-id": str(config.pk),
"config-0-device": str(config.device_id),
"config-INITIAL_FORMS": 1,
"_continue": True,
}
)
response = self.client.post(path, data=params, follow=True)
self.assertEqual(response.status_code, 200)
config.refresh_from_db()
config._invalidate_backend_instance_cache()
self.assertEqual(config.status, "modified")
self.assertEqual(config.vpnclient_set.count(), 1)
self.assertEqual(config.vpnclient_set.first().vpn, vpn2)
self.assertNotEqual(config.checksum, initial_checksum)
self.assertEqual(config.checksum, config.checksum_db)
Loading