Skip to content

Commit a819721

Browse files
committed
[chores] Updated DeviceCertificate model and tests
Added selenium tests and refactored DeviceCertificate model
1 parent 4711fa3 commit a819721

4 files changed

Lines changed: 136 additions & 101 deletions

File tree

openwisp_controller/config/base/device_certificate.py

Lines changed: 55 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
from openwisp_controller.config import settings as app_settings
1010
from openwisp_utils.base import TimeStampedEditableModel
1111

12+
MAC_ADDRESS_OID = "1.3.6.1.4.1.65901.1"
13+
DEVICE_UUID_OID = "1.3.6.1.4.1.65901.2"
14+
1215

1316
class AbstractDeviceCertificate(TimeStampedEditableModel):
1417
config = models.ForeignKey(
@@ -84,63 +87,69 @@ def _get_common_name(self):
8487
common_name = f"{common_name}-{unique_slug}"
8588
return common_name
8689

87-
def _auto_create_cert(self, name, common_name):
88-
"""
89-
Automatically creates and assigns a client x509 certificate
90-
using Blueprint cloning and custom hardware OID injection.
91-
"""
90+
def _build_cert(self, name, common_name):
91+
"""Build (but do not save) a Cert instance from template + blueprint."""
9292
ca = self.template.ca
9393
blueprint = self.template.blueprint_cert
94-
device = self.config.device
9594
cert_model = self.__class__.cert.field.related_model
9695

97-
# blueprint property cloning with CA fallback
98-
key_length = blueprint.key_length if blueprint else ca.key_length
99-
digest = blueprint.digest if blueprint else str(ca.digest)
100-
country_code = blueprint.country_code if blueprint else ca.country_code
101-
state = blueprint.state if blueprint else ca.state
102-
city = blueprint.city if blueprint else ca.city
103-
organization_name = (
104-
blueprint.organization_name if blueprint else ca.organization_name
96+
attrs = self._clone_blueprint_attrs(ca, blueprint)
97+
extensions = self._build_extensions(blueprint)
98+
cert = cert_model(
99+
name=name,
100+
ca=ca,
101+
common_name=common_name,
102+
extensions=extensions,
103+
**attrs,
104+
)
105+
return self._auto_create_cert_extra(cert)
106+
107+
def _clone_blueprint_attrs(self, ca, blueprint):
108+
"""
109+
Extracts base X.509 attributes (such as key length, digest, and
110+
location data) from the provided blueprint certificate.
111+
"""
112+
source = blueprint or ca
113+
digest = str(source.digest) if not blueprint else source.digest
114+
return dict(
115+
key_length=source.key_length,
116+
digest=digest,
117+
country_code=source.country_code,
118+
state=source.state,
119+
city=source.city,
120+
organization_name=source.organization_name,
121+
email=source.email,
105122
)
106-
email = blueprint.email if blueprint else ca.email
107123

124+
def _build_extensions(self, blueprint):
125+
"""Compiles the list of X.509 extensions for the new certificate."""
108126
if blueprint and blueprint.extensions:
109127
extensions = copy.deepcopy(blueprint.extensions)
110128
else:
111129
extensions = [{"name": "nsCertType", "value": "client", "critical": False}]
130+
extensions.extend(self._get_hardware_oid_extensions())
131+
return extensions
112132

113-
# inject MAC and UUID as custom OIDs, prerequisite: #228 in django-x509
114-
mac_oid = "1.3.6.1.4.1.65901.1"
115-
uuid_oid = "1.3.6.1.4.1.65901.2"
116-
extensions.extend(
117-
[
118-
{
119-
"oid": mac_oid,
120-
"value": f"ASN1:UTF8:string:{device.mac_address}",
121-
"critical": False,
122-
},
123-
{
124-
"oid": uuid_oid,
125-
"value": f"ASN1:UTF8:string:{device.id}",
126-
"critical": False,
127-
},
128-
]
129-
)
130-
cert = cert_model(
131-
name=name,
132-
ca=ca,
133-
key_length=key_length,
134-
digest=digest,
135-
country_code=country_code,
136-
state=state,
137-
city=city,
138-
organization_name=organization_name,
139-
email=email,
140-
common_name=common_name,
141-
extensions=extensions,
142-
)
143-
cert = self._auto_create_cert_extra(cert)
133+
def _get_hardware_oid_extensions(self):
134+
device = self.config.device
135+
return [
136+
{
137+
"oid": MAC_ADDRESS_OID,
138+
"value": f"ASN1:UTF8:string:{device.mac_address}",
139+
"critical": False,
140+
},
141+
{
142+
"oid": DEVICE_UUID_OID,
143+
"value": f"ASN1:UTF8:string:{device.id}",
144+
"critical": False,
145+
},
146+
]
147+
148+
def _auto_create_cert(self, name, common_name):
149+
"""
150+
Automatically creates and assigns a client x509 certificate
151+
"""
152+
cert = self._build_cert(name=name, common_name=common_name)
144153
cert.full_clean()
145154
cert.save()
146155
self.cert = cert

openwisp_controller/config/handlers.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ def device_registered_notification(sender, instance, is_new, **kwargs):
4545
)
4646

4747

48+
def _hardware_fields_changed(update_fields):
49+
return update_fields is None or (
50+
"name" in update_fields or "mac_address" in update_fields
51+
)
52+
53+
4854
@receiver(pre_save, sender=Device, dispatch_uid="capture_old_hardware_properties")
4955
def capture_old_hardware_properties(sender, instance, **kwargs):
5056
"""
@@ -53,10 +59,8 @@ def capture_old_hardware_properties(sender, instance, **kwargs):
5359
"""
5460
if not instance.pk:
5561
return
56-
update_fields = kwargs.get("update_fields")
57-
if update_fields is not None:
58-
if "name" not in update_fields and "mac_address" not in update_fields:
59-
return
62+
if not _hardware_fields_changed(kwargs.get("update_fields")):
63+
return
6064
try:
6165
old_instance = sender.objects.only("name", "mac_address").get(pk=instance.pk)
6266
instance._old_name = old_instance.name
@@ -72,10 +76,8 @@ def detect_hardware_drift(sender, instance, created, **kwargs):
7276
"""
7377
if created or not app_settings.REGENERATE_CERTS_ON_HARDWARE_CHANGE:
7478
return
75-
update_fields = kwargs.get("update_fields")
76-
if update_fields is not None:
77-
if "name" not in update_fields and "mac_address" not in update_fields:
78-
return
79+
if not _hardware_fields_changed(kwargs.get("update_fields")):
80+
return
7981
name_changed = getattr(instance, "_old_name", instance.name) != instance.name
8082
mac_changed = (
8183
getattr(instance, "_old_mac", instance.mac_address) != instance.mac_address

openwisp_controller/config/tasks.py

Lines changed: 4 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import copy
21
import logging
32

43
import requests
@@ -233,7 +232,6 @@ def regenerate_device_certificates_task(device_id, expected_cert_ids=None):
233232
return
234233
Device = load_model("config", "Device")
235234
DeviceCertificate = load_model("config", "DeviceCertificate")
236-
Cert = load_model("django_x509", "Cert")
237235
try:
238236
device = Device.objects.get(id=device_id)
239237
except Device.DoesNotExist:
@@ -262,60 +260,19 @@ def regenerate_device_certificates_task(device_id, expected_cert_ids=None):
262260
continue
263261
old_cert = dc.cert
264262
old_cert.revoke()
265-
blueprint = dc.template.blueprint_cert
266-
if blueprint and blueprint.extensions:
267-
extensions = copy.deepcopy(blueprint.extensions)
268-
else:
269-
extensions = [
270-
{"name": "nsCertType", "value": "client", "critical": False}
271-
]
272-
ca = dc.template.ca
273-
key_length = blueprint.key_length if blueprint else ca.key_length
274-
digest = blueprint.digest if blueprint else str(ca.digest)
275-
country_code = blueprint.country_code if blueprint else ca.country_code
276-
state = blueprint.state if blueprint else ca.state
277-
city = blueprint.city if blueprint else ca.city
278-
organization_name = (
279-
blueprint.organization_name if blueprint else ca.organization_name
280-
)
281-
email = blueprint.email if blueprint else ca.email
282-
extensions.extend(
283-
[
284-
{
285-
"oid": "1.3.6.1.4.1.65901.1",
286-
"value": f"ASN1:UTF8:string:{device.mac_address}",
287-
"critical": False,
288-
},
289-
{
290-
"oid": "1.3.6.1.4.1.65901.2",
291-
"value": f"ASN1:UTF8:string:{device.id}",
292-
"critical": False,
293-
},
294-
]
295-
)
296-
new_cert = Cert(
297-
name=device.name,
298-
ca=ca,
299-
key_length=key_length,
300-
digest=digest,
301-
country_code=country_code,
302-
state=state,
303-
city=city,
304-
organization_name=organization_name,
305-
email=email,
306-
common_name=dc._get_common_name(),
307-
extensions=extensions,
263+
new_cert = dc._build_cert(
264+
name=device.name, common_name=dc._get_common_name()
308265
)
309-
new_cert = dc._auto_create_cert_extra(new_cert)
310266
new_cert.full_clean()
311267
new_cert.save()
312268
dc.cert = new_cert
313269
dc.save()
314270
configs_to_update.add(dc.config)
315271
certs_regenerated += 1
316272
for config in configs_to_update:
273+
config.refresh_from_db()
317274
config.update_status_if_checksum_changed()
318-
if certs_regenerated:
275+
if certs_regenerated > 0:
319276
try:
320277
message = _(
321278
"Hardware drift detected on device {device_name}. "

openwisp_controller/config/tests/test_selenium.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
Device = load_model("config", "Device")
2323
DeviceGroup = load_model("config", "DeviceGroup")
2424
Cert = load_model("django_x509", "Cert")
25+
Ca = load_model("django_x509", "Ca")
26+
DeviceCertificate = load_model("config", "DeviceCertificate")
27+
Notification = load_model("openwisp_notifications", "Notification")
2528

2629

2730
class SeleniumTestMixin(BaseSeleniumTestMixin):
@@ -486,6 +489,70 @@ def test_relevant_templates_duplicates(self):
486489
self.find_element(by=By.NAME, value="_save").click()
487490
self.wait_for_presence(By.CSS_SELECTOR, ".messagelist .success", timeout=5)
488491

492+
def test_e2e_certificate_provisioning(self):
493+
"""
494+
End-to-end flow: create CA, create certificate
495+
template, assign to device, verify certificate generation.
496+
"""
497+
org = self._get_org()
498+
ca = Ca.objects.create(name="test-ca", organization=org)
499+
template = self._create_template(
500+
organization=org, type="cert", ca=ca, auto_cert=True
501+
)
502+
device = self._create_device(organization=org, name="e2e-router")
503+
self._create_config(device=device)
504+
505+
self.login()
506+
self.open(
507+
reverse(f"admin:{self.config_app_label}_device_change", args=[device.id])
508+
+ "#config-group"
509+
)
510+
self.hide_loading_overlay()
511+
self.find_element(by=By.XPATH, value=f'//*[@value="{template.id}"]').click()
512+
self.web_driver.execute_script(
513+
'document.querySelector("#ow-user-tools").style.display="none"'
514+
)
515+
self.find_element(by=By.NAME, value="_continue").click()
516+
self.wait_for_presence(By.CSS_SELECTOR, ".messagelist .success", timeout=5)
517+
device.config.refresh_from_db()
518+
device_cert = DeviceCertificate.objects.get(
519+
config=device.config, template=template
520+
)
521+
self.assertIsNotNone(
522+
device_cert.cert, "Certificate was not generated after UI assignment!"
523+
)
524+
self.assertFalse(device_cert.cert.revoked)
525+
self.assertEqual(device_cert.cert.name, "e2e-router")
526+
527+
def test_hardware_drift_notification(self):
528+
org = self._get_org()
529+
ca = Ca.objects.create(name="test-ca", organization=org)
530+
template = self._create_template(
531+
organization=org, type="cert", ca=ca, auto_cert=True
532+
)
533+
device = self._create_device(organization=org, name="old-router-name")
534+
config = self._create_config(device=device)
535+
config.templates.add(template)
536+
self.login()
537+
self.open(
538+
reverse(f"admin:{self.config_app_label}_device_change", args=[device.id])
539+
)
540+
self.hide_loading_overlay()
541+
name_input = self.find_element(by=By.NAME, value="name", wait_for="presence")
542+
name_input.clear()
543+
name_input.send_keys("renamed-router")
544+
# Hide user tools because it covers the save button
545+
self.web_driver.execute_script(
546+
'document.querySelector("#ow-user-tools").style.display="none"'
547+
)
548+
self.find_element(by=By.NAME, value="_save").click()
549+
self.wait_for_presence(By.CSS_SELECTOR, ".messagelist .success", timeout=5)
550+
self.find_element(by=By.ID, value="openwisp_notifications").click()
551+
notification = self.wait_for_visibility(
552+
By.CLASS_NAME, "ow-notification-elem", timeout=10
553+
)
554+
self.assertIn("Hardware drift detected", notification.text)
555+
489556

490557
@tag("selenium_tests")
491558
class TestDeviceGroupAdmin(

0 commit comments

Comments
 (0)