|
31 | 31 | from phonenumber_field.modelfields import PhoneNumberField |
32 | 32 | from private_storage.fields import PrivateFileField |
33 | 33 |
|
34 | | -def sanitize_mac_address(mac): |
35 | | - """ |
36 | | - Sanitize a MAC address string to the colon-separated lowercase format. |
37 | | - If the input is not a valid MAC address, return it unchanged. |
38 | | - |
39 | | - Handles various MAC address formats: |
40 | | - - 00:1A:2B:3C:4D:5E -> 00:1a:2b:3c:4d:5e |
41 | | - - 00-1A-2B-3C-4D-5E -> 00:1a:2b:3c:4d:5e |
42 | | - - 001A.2B3C.4D5E -> 00:1a:2b:3c:4d:5e |
43 | | - - 001A2B3C4D5E -> 00:1a:2b:3c:4d:5e |
44 | | - """ |
45 | | - # Return empty string or non-string input as is |
46 | | - if not mac or not isinstance(mac, str): |
47 | | - return mac |
48 | | - |
49 | | - # Check if it's an IP address (IPv4) - preserve unchanged |
50 | | - if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', mac): |
51 | | - return mac |
52 | | - |
53 | | - # Try to extract MAC address from the string |
54 | | - # Look for MAC address patterns, including those with additional text |
55 | | - mac_patterns = [ |
56 | | - r'([0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}', # Standard MAC with : or - |
57 | | - r'([0-9A-Fa-f]{4}\.){2}[0-9A-Fa-f]{4}', # Cisco format |
58 | | - r'[0-9A-Fa-f]{12}' # No separators |
59 | | - ] |
60 | | - |
61 | | - for pattern in mac_patterns: |
62 | | - match = re.search(pattern, mac) |
63 | | - if match: |
64 | | - mac_candidate = match.group(0) |
65 | | - try: |
66 | | - # Try to parse as EUI to validate |
67 | | - eui = EUI(mac_candidate) |
68 | | - # Return sanitized MAC address in colon-separated lowercase format |
69 | | - return ':'.join(['%02x' % x for x in eui.words]).lower() |
70 | | - except (AddrFormatError, ValueError, TypeError): |
71 | | - continue |
72 | | - |
73 | | - # If no valid MAC found, return original string unchanged |
74 | | - return mac |
75 | | - |
76 | | -import swapper |
77 | 34 | from openwisp_radius.registration import ( |
78 | 35 | REGISTRATION_METHOD_CHOICES, |
79 | 36 | get_registration_choices, |
@@ -223,6 +180,53 @@ def sanitize_mac_address(mac): |
223 | 180 | OPTIONAL_SETTINGS = app_settings.OPTIONAL_REGISTRATION_FIELDS |
224 | 181 |
|
225 | 182 |
|
| 183 | +def sanitize_mac_address(mac): |
| 184 | + """ |
| 185 | + Sanitize a MAC address string to the colon-separated lowercase format. |
| 186 | + If the input is not a valid MAC address, return it unchanged. |
| 187 | +
|
| 188 | + Handles various MAC address formats: |
| 189 | + - 00:1A:2B:3C:4D:5E -> 00:1a:2b:3c:4d:5e |
| 190 | + - 00-1A-2B-3C-4D-5E -> 00:1a:2b:3c:4d:5e |
| 191 | + - 001A.2B3C.4D5E -> 00:1a:2b:3c:4d:5e |
| 192 | + - 001A2B3C4D5E -> 00:1a:2b:3c:4d:5e |
| 193 | + """ |
| 194 | + if not mac or not isinstance(mac, str): |
| 195 | + return mac |
| 196 | + try: |
| 197 | + ipaddress.ip_address(mac) |
| 198 | + return mac |
| 199 | + except ValueError: |
| 200 | + pass |
| 201 | + mac_patterns = [ |
| 202 | + r"([0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}", |
| 203 | + r"([0-9A-Fa-f]{4}\.){2}[0-9A-Fa-f]{4}", |
| 204 | + r"[0-9A-Fa-f]{12}", |
| 205 | + ] |
| 206 | + for pattern in mac_patterns: |
| 207 | + if re.fullmatch(pattern, mac): |
| 208 | + try: |
| 209 | + eui = EUI(mac) |
| 210 | + return ":".join([f"{x:02x}" for x in eui.words]).lower() |
| 211 | + except (AddrFormatError, ValueError, TypeError): |
| 212 | + continue |
| 213 | + return mac |
| 214 | + |
| 215 | + |
| 216 | +class SanitizedMacCharField(models.CharField): |
| 217 | + """ |
| 218 | + A CharField that automatically sanitizes MAC addresses |
| 219 | + to the canonical lowercase colon-separated format on save. |
| 220 | + """ |
| 221 | + |
| 222 | + def pre_save(self, model_instance, add): |
| 223 | + value = getattr(model_instance, self.attname) |
| 224 | + if value: |
| 225 | + value = sanitize_mac_address(value) |
| 226 | + setattr(model_instance, self.attname, value) |
| 227 | + return value |
| 228 | + |
| 229 | + |
226 | 230 | class AutoUsernameMixin(object): |
227 | 231 | def clean(self): |
228 | 232 | """ |
@@ -495,15 +499,15 @@ class AbstractRadiusAccounting(OrgMixin, models.Model): |
495 | 499 | null=True, |
496 | 500 | blank=True, |
497 | 501 | ) |
498 | | - called_station_id = models.CharField( |
| 502 | + called_station_id = SanitizedMacCharField( |
499 | 503 | verbose_name=_("called station ID"), |
500 | 504 | max_length=253, |
501 | 505 | db_column="calledstationid", |
502 | 506 | db_index=True, |
503 | 507 | blank=True, |
504 | 508 | null=True, |
505 | 509 | ) |
506 | | - calling_station_id = models.CharField( |
| 510 | + calling_station_id = SanitizedMacCharField( |
507 | 511 | verbose_name=_("calling station ID"), |
508 | 512 | max_length=253, |
509 | 513 | db_column="callingstationid", |
@@ -573,8 +577,6 @@ class AbstractRadiusAccounting(OrgMixin, models.Model): |
573 | 577 | ) |
574 | 578 |
|
575 | 579 | def save(self, *args, **kwargs): |
576 | | - if self.called_station_id: |
577 | | - self.called_station_id = sanitize_mac_address(self.called_station_id) |
578 | 580 | if not self.start_time: |
579 | 581 | self.start_time = now() |
580 | 582 | super(AbstractRadiusAccounting, self).save(*args, **kwargs) |
@@ -624,8 +626,9 @@ def _close_stale_sessions_on_nas_boot(cls, called_station_id): |
624 | 626 | """ |
625 | 627 | if not called_station_id: |
626 | 628 | return 0 |
| 629 | + sanitized_called_station_id = sanitize_mac_address(called_station_id) |
627 | 630 | stale_sessions = cls.objects.filter( |
628 | | - called_station_id=called_station_id, |
| 631 | + called_station_id=sanitized_called_station_id, |
629 | 632 | stop_time__isnull=True, |
630 | 633 | ) |
631 | 634 | closed_count = stale_sessions.update( |
@@ -877,14 +880,14 @@ class AbstractRadiusPostAuth(OrgMixin, UUIDModel): |
877 | 880 | verbose_name=_("password"), max_length=64, db_column="pass", blank=True |
878 | 881 | ) |
879 | 882 | reply = models.CharField(verbose_name=_("reply"), max_length=32) |
880 | | - called_station_id = models.CharField( |
| 883 | + called_station_id = SanitizedMacCharField( |
881 | 884 | verbose_name=_("called station ID"), |
882 | 885 | max_length=253, |
883 | 886 | db_column="calledstationid", |
884 | 887 | blank=True, |
885 | 888 | null=True, |
886 | 889 | ) |
887 | | - calling_station_id = models.CharField( |
| 890 | + calling_station_id = SanitizedMacCharField( |
888 | 891 | verbose_name=_("calling station ID"), |
889 | 892 | max_length=253, |
890 | 893 | db_column="callingstationid", |
|
0 commit comments