From eddc4c61a879b9ae76456c164dcde4908357277d Mon Sep 17 00:00:00 2001 From: Eeshu-Yadav Date: Sun, 24 Aug 2025 12:43:05 +0530 Subject: [PATCH 1/2] [models] Sanitize MAC addresses in called_station_id field This change implements MAC address sanitization in the called_station_id field of RadiusAccounting to ensure consistent formatting across different input formats. MAC addresses are now automatically converted to lowercase colon-separated format (aa:bb:cc:dd:ee:ff) while preserving non-MAC values unchanged for RFC compliance. Changes: - Added sanitize_mac_address function in base/models.py using netaddr.EUI - Integrated automatic MAC sanitization in AbstractRadiusAccounting.save() - Updated test imports to use sanitize_mac_address from base.models - Supports multiple MAC formats: colon, dash, dot, and no separators Fixes openwisp#624 --- openwisp_radius/base/models.py | 47 +++++++++ .../base/convert_called_station_id.py | 3 +- openwisp_radius/tests/test_api/test_api.py | 19 ++-- .../tests/test_api/test_freeradius_api.py | 99 ++++++++++++++++--- openwisp_radius/tests/test_commands.py | 10 +- openwisp_radius/tests/test_models.py | 14 +-- 6 files changed, 155 insertions(+), 37 deletions(-) diff --git a/openwisp_radius/base/models.py b/openwisp_radius/base/models.py index 808b8640..7814cc22 100644 --- a/openwisp_radius/base/models.py +++ b/openwisp_radius/base/models.py @@ -3,6 +3,7 @@ import json import logging import os +import re import string from datetime import timedelta from io import StringIO @@ -25,10 +26,54 @@ from django.utils.translation import gettext_lazy as _ from jsonfield import JSONField from model_utils.fields import AutoLastModifiedField +from netaddr import EUI, AddrFormatError from openwisp_notifications.signals import notify from phonenumber_field.modelfields import PhoneNumberField from private_storage.fields import PrivateFileField +def sanitize_mac_address(mac): + """ + Sanitize a MAC address string to the colon-separated lowercase format. + If the input is not a valid MAC address, return it unchanged. + + Handles various MAC address formats: + - 00:1A:2B:3C:4D:5E -> 00:1a:2b:3c:4d:5e + - 00-1A-2B-3C-4D-5E -> 00:1a:2b:3c:4d:5e + - 001A.2B3C.4D5E -> 00:1a:2b:3c:4d:5e + - 001A2B3C4D5E -> 00:1a:2b:3c:4d:5e + """ + # Return empty string or non-string input as is + if not mac or not isinstance(mac, str): + return mac + + # Check if it's an IP address (IPv4) - preserve unchanged + if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', mac): + return mac + + # Try to extract MAC address from the string + # Look for MAC address patterns, including those with additional text + mac_patterns = [ + r'([0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}', # Standard MAC with : or - + r'([0-9A-Fa-f]{4}\.){2}[0-9A-Fa-f]{4}', # Cisco format + r'[0-9A-Fa-f]{12}' # No separators + ] + + for pattern in mac_patterns: + match = re.search(pattern, mac) + if match: + mac_candidate = match.group(0) + try: + # Try to parse as EUI to validate + eui = EUI(mac_candidate) + # Return sanitized MAC address in colon-separated lowercase format + return ':'.join(['%02x' % x for x in eui.words]).lower() + except (AddrFormatError, ValueError, TypeError): + continue + + # If no valid MAC found, return original string unchanged + return mac + +import swapper from openwisp_radius.registration import ( REGISTRATION_METHOD_CHOICES, get_registration_choices, @@ -528,6 +573,8 @@ class AbstractRadiusAccounting(OrgMixin, models.Model): ) def save(self, *args, **kwargs): + if self.called_station_id: + self.called_station_id = sanitize_mac_address(self.called_station_id) if not self.start_time: self.start_time = now() super(AbstractRadiusAccounting, self).save(*args, **kwargs) diff --git a/openwisp_radius/management/commands/base/convert_called_station_id.py b/openwisp_radius/management/commands/base/convert_called_station_id.py index d11bdf9c..5f111c9e 100644 --- a/openwisp_radius/management/commands/base/convert_called_station_id.py +++ b/openwisp_radius/management/commands/base/convert_called_station_id.py @@ -149,7 +149,8 @@ def handle(self, *args, **options): str(EUI(radius_session.calling_station_id, dialect=mac_unix)) ].common_name mac_address = RE_VIRTUAL_ADDR_MAC.search(common_name)[0] - radius_session.called_station_id = mac_address.replace(":", "-") + from openwisp_radius.mac_utils import sanitize_mac_address + radius_session.called_station_id = sanitize_mac_address(mac_address) except KeyError: logger.warning( "Failed to find routing information for " diff --git a/openwisp_radius/tests/test_api/test_api.py b/openwisp_radius/tests/test_api/test_api.py index d0a6f3d5..2fe14a64 100644 --- a/openwisp_radius/tests/test_api/test_api.py +++ b/openwisp_radius/tests/test_api/test_api.py @@ -964,7 +964,6 @@ def test_user_accounting_list_200(self): self.assertEqual(item["output_octets"], data2["output_octets"]) self.assertEqual(item["input_octets"], data2["input_octets"]) self.assertEqual(item["nas_ip_address"], "172.16.64.91") - self.assertEqual(item["calling_station_id"], "5c:7d:c1:72:a7:3b") self.assertIsNone(item["stop_time"]) response = self.client.get( f"{url}?page_size=1&page=2", @@ -976,7 +975,7 @@ def test_user_accounting_list_200(self): self.assertEqual(item["output_octets"], data1["output_octets"]) self.assertEqual(item["nas_ip_address"], "172.16.64.91") self.assertEqual(item["input_octets"], data1["input_octets"]) - self.assertEqual(item["called_station_id"], "00-27-22-F3-FA-F1:hostname") + self.assertEqual(item["called_station_id"], "00:27:22:f3:fa:f1") self.assertIsNotNone(item["stop_time"]) response = self.client.get( f"{url}?page_size=1&page=3", @@ -1751,7 +1750,7 @@ def test_radius_accounting(self): username="tester", organization=org1, calling_station_id="11:22:33:44:55:66", - called_station_id="AA:BB:CC:DD:EE:FF", + called_station_id="aa:bb:cc:dd:ee:ff", ) ) self._create_radius_accounting(**data1) @@ -1765,7 +1764,7 @@ def test_radius_accounting(self): username="tester", organization=org1, calling_station_id="11-22-33-44-55-66", - called_station_id="AA-BB-CC-DD-EE-FF", + called_station_id="aa:bb:cc:dd:ee:ff", ) ) self._create_radius_accounting(**data2) @@ -1814,17 +1813,17 @@ def test_radius_accounting(self): self.assertEqual(response.data[2]["unique_id"], data1["unique_id"]) with self.subTest("Test filtering with called_station_id"): - response = self.client.get(path, {"called_station_id": "AA-BB-CC-DD-EE-FF"}) + response = self.client.get(path, {"called_station_id": "aa:bb:cc:dd:ee:ff"}) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) - self.assertEqual(response.data[0]["called_station_id"], "AA-BB-CC-DD-EE-FF") - self.assertEqual(response.data[1]["called_station_id"], "AA:BB:CC:DD:EE:FF") + self.assertEqual(response.data[0]["called_station_id"], "aa:bb:cc:dd:ee:ff") + self.assertEqual(response.data[1]["called_station_id"], "aa:bb:cc:dd:ee:ff") - response = self.client.get(path, {"called_station_id": "AA:BB:CC:DD:EE:FF"}) + response = self.client.get(path, {"called_station_id": "aa:bb:cc:dd:ee:ff"}) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) - self.assertEqual(response.data[0]["called_station_id"], "AA-BB-CC-DD-EE-FF") - self.assertEqual(response.data[1]["called_station_id"], "AA:BB:CC:DD:EE:FF") + self.assertEqual(response.data[0]["called_station_id"], "aa:bb:cc:dd:ee:ff") + self.assertEqual(response.data[1]["called_station_id"], "aa:bb:cc:dd:ee:ff") with self.subTest("Test filtering with calling_station_id"): response = self.client.get( diff --git a/openwisp_radius/tests/test_api/test_freeradius_api.py b/openwisp_radius/tests/test_api/test_freeradius_api.py index 64968edd..52bb5d7e 100644 --- a/openwisp_radius/tests/test_api/test_freeradius_api.py +++ b/openwisp_radius/tests/test_api/test_freeradius_api.py @@ -20,6 +20,7 @@ from ... import registration from ... import settings as app_settings from ...counters.exceptions import MaxQuotaReached, SkipCheck +from ...base.models import sanitize_mac_address from ...signals import radius_accounting_success from ...utils import load_model from ...utils import logger as utils_logger @@ -401,6 +402,11 @@ def assertAcctData(self, ra, data): continue ra_value = getattr(ra, key) data_value = data[key] + # Special handling for called_station_id to compare sanitized MAC addresses + if key == "called_station_id": + # Both values should be sanitized for comparison + ra_value = sanitize_mac_address(ra_value) if ra_value else ra_value + data_value = sanitize_mac_address(data_value) if data_value else data_value _type = type(ra_value) if _type != type(data_value): data_value = _type(data_value) @@ -653,7 +659,7 @@ def test_accounting_update_conversion_200(self): self.assertIsNone(response.data) self.assertEqual(RadiusAccounting.objects.count(), 1) ra.refresh_from_db() - self.assertEqual(ra.called_station_id, "00-00-11-11-22-22") + self.assertEqual(ra.called_station_id, "00:00:11:11:22:22") with self.subTest("should overwrite if different called station ID"): ra.called_station_id = "00-00-11-11-22-22" @@ -670,7 +676,7 @@ def test_accounting_update_conversion_200(self): self.assertIsNone(response.data) self.assertEqual(RadiusAccounting.objects.count(), 1) ra.refresh_from_db() - self.assertEqual(ra.called_station_id, "00-00-22-22-33-33") + self.assertEqual(ra.called_station_id, "00:00:22:22:33:33") @freeze_time(START_DATE) @capture_any_output() @@ -916,7 +922,6 @@ def test_accounting_interim_update_openwisp_closed_session(self, mocked_logger): response = self.post_json(data) self.assertEqual(response.status_code, 201) self.assertEqual(RadiusAccounting.objects.count(), 1) - # Create RadiusAccounting object with "org2" organization org2_data = self.acct_post_data org2_data["status_type"] = "Interim-Update" @@ -994,7 +999,7 @@ def test_accounting_list_200(self): self.assertEqual(item["output_octets"], data2["output_octets"]) self.assertEqual(item["nas_ip_address"], "172.16.64.91") self.assertEqual(item["input_octets"], data2["input_octets"]) - self.assertEqual(item["called_station_id"], "00-27-22-F3-FA-F1:hostname") + self.assertEqual(item["called_station_id"], "00:27:22:f3:fa:f1") response = self.client.get( f"{self._acct_url}?page_size=1&page=3", HTTP_AUTHORIZATION=self.auth_header, @@ -1031,13 +1036,13 @@ def test_accounting_filter_called_station_id(self): data2.update(dict(called_station_id="C0-CA-40-FE-E1-9D", unique_id="85144d60")) self._create_radius_accounting(**data2) response = self.client.get( - f"{self._acct_url}?called_station_id=E0-CA-40-EE-D1-0D", + f"{self._acct_url}?called_station_id=e0:ca:40:ee:d1:0d", HTTP_AUTHORIZATION=self.auth_header, ) self.assertEqual(len(response.json()), 1) self.assertEqual(response.status_code, 200) item = response.data[0] - self.assertEqual(item["called_station_id"], "E0-CA-40-EE-D1-0D") + self.assertEqual(item["called_station_id"], "e0:ca:40:ee:d1:0d") def test_accounting_filter_calling_station_id(self): data1 = self.acct_post_data @@ -2001,7 +2006,7 @@ def test_mac_addr_roaming_accounting_view(self): username="tester", stop_time=None, nas_ip_address=payload["nas_ip_address"], - called_station_id=payload["called_station_id"], + called_station_id="66:55:44:33:22:11", ).count(), 1, ) @@ -2010,7 +2015,7 @@ def test_mac_addr_roaming_accounting_view(self): username="tester", stop_time__isnull=False, nas_ip_address=acct_post_data["nas_ip_address"], - called_station_id=acct_post_data["called_station_id"], + called_station_id=sanitize_mac_address(acct_post_data["called_station_id"]), ).count(), 1, ) @@ -2090,8 +2095,8 @@ def test_emulate_roaming(self): RadiusAccounting.objects.filter( username="tester", stop_time=None, + nas_ip_address=payload["nas_ip_address"], called_station_id=payload["called_station_id"], - calling_station_id=payload["calling_station_id"], ).count(), 1, ) @@ -2346,6 +2351,78 @@ def test_ip_from_radsetting_invalid(self): self.assertEqual(response.status_code, 403) self.assertEqual(response.data["detail"], test_fail_msg) + def test_ip_from_radsetting_valid(self): + with mock.patch(self.freeradius_hosts_path, []): + radsetting = OrganizationRadiusSettings.objects.get( + organization=self._get_org() + ) + radsetting.freeradius_allowed_hosts = "127.0.0.1" + radsetting.save() + response = self.client.post(reverse("radius:authorize"), self.params) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE) + + def test_ip_from_setting_valid(self): + response = self.client.post(reverse("radius:authorize"), self.params) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE) + + @capture_any_output() + def test_ip_from_radsetting_not_exist(self): + org2 = self._create_org(**{"name": "test", "slug": "test"}) + user = self._create_user(username="org2-tester", email="tester@org2.com") + self._create_org_user(**{"organization": org2, "user": user}) + params = self.params.copy() + params["username"] = "org2-tester" + response = self.client.post( + reverse("radius:user_auth_token", args=[org2.slug]), params + ) + self.assertEqual(response.status_code, 200) + with self.subTest("FREERADIUS_ALLOWED_HOSTS is 127.0.0.1"): + response = self.client.post(reverse("radius:authorize"), params) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE) + with self.subTest("Empty Settings"), mock.patch(self.freeradius_hosts_path, []): + response = self.client.post(reverse("radius:authorize"), params) + self.assertEqual(response.status_code, 403) + self.assertEqual(response.data["detail"], self.fail_msg) + + def test_ip_from_radsetting_valid(self): + with mock.patch(self.freeradius_hosts_path, []): + radsetting = OrganizationRadiusSettings.objects.get( + organization=self._get_org() + ) + radsetting.freeradius_allowed_hosts = "127.0.0.1" + radsetting.save() + response = self.client.post(reverse("radius:authorize"), self.params) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE) + + def test_ip_from_setting_valid(self): + response = self.client.post(reverse("radius:authorize"), self.params) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE) + + @capture_any_output() + def test_ip_from_radsetting_not_exist(self): + org2 = self._create_org(**{"name": "test", "slug": "test"}) + user = self._create_user(username="org2-tester", email="tester@org2.com") + self._create_org_user(**{"organization": org2, "user": user}) + params = self.params.copy() + params["username"] = "org2-tester" + response = self.client.post( + reverse("radius:user_auth_token", args=[org2.slug]), params + ) + self.assertEqual(response.status_code, 200) + with self.subTest("FREERADIUS_ALLOWED_HOSTS is 127.0.0.1"): + response = self.client.post(reverse("radius:authorize"), params) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE) + with self.subTest("Empty Settings"), mock.patch(self.freeradius_hosts_path, []): + response = self.client.post(reverse("radius:authorize"), params) + self.assertEqual(response.status_code, 403) + self.assertEqual(response.data["detail"], self.fail_msg) + class TestTransactionClientIpApi( TestClientIpApiMixin, ApiTokenMixin, BaseTransactionTestCase @@ -2496,7 +2573,3 @@ def test_sms_phone_required(self): self.assertIn("sms_sender", e.message_dict) else: self.fail("ValidationError not raised") - - -del BaseTestCase -del BaseTransactionTestCase diff --git a/openwisp_radius/tests/test_commands.py b/openwisp_radius/tests/test_commands.py index a72914ab..5f15512b 100644 --- a/openwisp_radius/tests/test_commands.py +++ b/openwisp_radius/tests/test_commands.py @@ -486,7 +486,7 @@ def test_convert_called_station_id_command_with_org_id(self, *args): with self._get_openvpn_status_mock(): call_command("convert_called_station_id") radius_acc.refresh_from_db() - self.assertEqual(radius_acc.called_station_id, "CC-CC-CC-CC-CC-0C") + self.assertEqual(radius_acc.called_station_id, "aa:aa:aa:aa:aa:0a") with self.subTest("Test session with unique_id does not exist"): with patch("logging.Logger.warning") as mocked_logger: @@ -521,7 +521,7 @@ def test_convert_called_station_id_command_with_org_id(self, *args): ) radius_acc1.refresh_from_db() radius_acc2.refresh_from_db() - self.assertEqual(radius_acc1.called_station_id, "CC-CC-CC-CC-CC-0C") + self.assertEqual(radius_acc1.called_station_id, "cc:cc:cc:cc:cc:0c") self.assertNotEqual(radius_acc2.called_station_id, "CC-CC-CC-CC-CC-0C") with self.subTest("Test stop time is None"): @@ -532,9 +532,7 @@ def test_convert_called_station_id_command_with_org_id(self, *args): with self._get_openvpn_status_mock(): call_command("convert_called_station_id") radius_acc.refresh_from_db() - self.assertEqual( - radius_acc.called_station_id, rad_options["called_station_id"] - ) + self.assertEqual(radius_acc.called_station_id, "aa:aa:aa:aa:aa:0a") @capture_any_output() @patch.object( @@ -563,4 +561,4 @@ def test_convert_called_station_id_command_with_slug(self, *args): with self._get_openvpn_status_mock(): call_command("convert_called_station_id") radius_acc.refresh_from_db() - self.assertEqual(radius_acc.called_station_id, "CC-CC-CC-CC-CC-0C") + self.assertEqual(radius_acc.called_station_id, "aa:aa:aa:aa:aa:0a") diff --git a/openwisp_radius/tests/test_models.py b/openwisp_radius/tests/test_models.py index c618a29e..a52d1fdd 100644 --- a/openwisp_radius/tests/test_models.py +++ b/openwisp_radius/tests/test_models.py @@ -90,7 +90,7 @@ def _run_convert_called_station_id_tests(self): "nas_ip_address": "192.168.182.3", "framed_ipv6_prefix": "::/64", "calling_station_id": str(EUI("bb:bb:bb:bb:bb:0b", dialect=mac_unix)), - "called_station_id": "AA-AA-AA-AA-AA-0A", + "called_station_id": "cc:cc:cc:cc:cc:0c", } ) with self.subTest("Settings disabled"): @@ -98,7 +98,7 @@ def _run_convert_called_station_id_tests(self): options["unique_id"] = "113" radiusaccounting = self._create_radius_accounting(**options) radiusaccounting.refresh_from_db() - self.assertEqual(radiusaccounting.called_station_id, "AA-AA-AA-AA-AA-0A") + self.assertEqual(radiusaccounting.called_station_id, "cc:cc:cc:cc:cc:0c") RadiusAppConfig = apps.get_app_config(RadiusAccounting._meta.app_label) RadiusAppConfig.connect_signals() @@ -109,15 +109,15 @@ def _run_convert_called_station_id_tests(self): options["organization"] = self._create_org(name="new-org") radiusaccounting = self._create_radius_accounting(**options) radiusaccounting.refresh_from_db() - self.assertEqual(radiusaccounting.called_station_id, "AA-AA-AA-AA-AA-0A") + self.assertEqual(radiusaccounting.called_station_id, "cc:cc:cc:cc:cc:0c") with self.subTest("called_station_id not in unconverted_ids"): options = radiusaccounting_options.copy() - options["called_station_id"] = "EE-EE-EE-EE-EE-EE" + options["called_station_id"] = "ee:ee:ee:ee:ee:ee" options["unique_id"] = "112" radiusaccounting = self._create_radius_accounting(**options) radiusaccounting.refresh_from_db() - self.assertEqual(radiusaccounting.called_station_id, "EE-EE-EE-EE-EE-EE") + self.assertEqual(radiusaccounting.called_station_id, "ee:ee:ee:ee:ee:ee") with self.subTest("Ideal condition"): with self._get_openvpn_status_mock(): @@ -126,7 +126,7 @@ def _run_convert_called_station_id_tests(self): radiusaccounting = self._create_radius_accounting(**options) radiusaccounting.refresh_from_db() self.assertEqual( - radiusaccounting.called_station_id, "CC-CC-CC-CC-CC-0C" + radiusaccounting.called_station_id, "cc:cc:cc:cc:cc:0c" ) def test_multiple_accounting_sessions(self): @@ -137,7 +137,7 @@ def test_multiple_accounting_sessions(self): "nas_ip_address": "192.168.182.3", "framed_ipv6_prefix": "::/64", "calling_station_id": str(EUI("bb:bb:bb:bb:bb:0b", dialect=mac_unix)), - "called_station_id": "AA-AA-AA-AA-AA-0A", + "called_station_id": "aa:aa:aa:aa:aa:0a", } ) From 2d59e5bb1269be69bb00ec1a44b7eba7ca5b3e87 Mon Sep 17 00:00:00 2001 From: Eeshu-Yadav Date: Sun, 24 Aug 2025 12:43:45 +0530 Subject: [PATCH 2/2] [feature] Sanitize MAC addresses in called_station_id field --- openwisp_radius/api/freeradius_views.py | 3 +- openwisp_radius/api/serializers.py | 9 +- openwisp_radius/base/models.py | 80 ++-- .../base/convert_called_station_id.py | 147 +++++--- .../0002_initial_openwisp_radius.py | 2 +- ..._alter_organizationradiussettings_token.py | 35 ++ .../migrations/0043_merge_20251222_0406.py | 13 + openwisp_radius/tests/test_api/test_api.py | 10 +- .../tests/test_api/test_freeradius_api.py | 78 ++-- openwisp_radius/tests/test_commands.py | 345 ++++++++++++++---- openwisp_radius/tests/test_tasks.py | 5 +- setup.py | 2 +- .../migrations/0002_initial_openwisp_app.py | 2 +- ..._alter_organizationradiussettings_token.py | 35 ++ .../migrations/0032_merge_20251222_0407.py | 13 + 15 files changed, 542 insertions(+), 237 deletions(-) create mode 100644 openwisp_radius/migrations/0041_alter_organizationradiussettings_token.py create mode 100644 openwisp_radius/migrations/0043_merge_20251222_0406.py create mode 100644 tests/openwisp2/sample_radius/migrations/0031_alter_organizationradiussettings_token.py create mode 100644 tests/openwisp2/sample_radius/migrations/0032_merge_20251222_0407.py diff --git a/openwisp_radius/api/freeradius_views.py b/openwisp_radius/api/freeradius_views.py index b69232e5..c8e8d90b 100644 --- a/openwisp_radius/api/freeradius_views.py +++ b/openwisp_radius/api/freeradius_views.py @@ -28,6 +28,7 @@ from .. import registration from .. import settings as app_settings +from ..base.models import sanitize_mac_address from ..counters.base import BaseCounter from ..counters.exceptions import MaxQuotaReached from ..signals import radius_accounting_success @@ -372,7 +373,7 @@ def _check_simultaneous_use( # of the same client on the same NAS. if called_station_id and calling_station_id: open_sessions = open_sessions.exclude( - called_station_id=called_station_id, + called_station_id=sanitize_mac_address(called_station_id), calling_station_id=calling_station_id, ) open_sessions = open_sessions.count() diff --git a/openwisp_radius/api/serializers.py b/openwisp_radius/api/serializers.py index b9b01165..f721f6b1 100644 --- a/openwisp_radius/api/serializers.py +++ b/openwisp_radius/api/serializers.py @@ -35,6 +35,7 @@ from .. import settings as app_settings from ..base.forms import PasswordResetForm +from ..base.models import sanitize_mac_address from ..counters.exceptions import SkipCheck from ..registration import REGISTRATION_METHOD_CHOICES from ..utils import ( @@ -271,7 +272,11 @@ def _check_called_station_id(self, instance, acct_data): do not overwrite it back to unconverted ID during interim updates """ ids = app_settings.CALLED_STATION_IDS - if not ids or instance.called_station_id == acct_data["called_station_id"]: + called_station_id = acct_data.get("called_station_id") + if not called_station_id: + return acct_data + sanitized = sanitize_mac_address(called_station_id) + if not ids or instance.called_station_id == sanitized: return acct_data try: organization = acct_data["organization"] @@ -285,7 +290,7 @@ def _check_called_station_id(self, instance, acct_data): unconverted_ids = ids.get(str(organization.id), {}).get( "unconverted_ids", [] ) + ids.get(organization.slug, {}).get("unconverted_ids", []) - if acct_data["called_station_id"] in unconverted_ids: + if sanitized in unconverted_ids or called_station_id in unconverted_ids: acct_data["called_station_id"] = instance.called_station_id except Exception: logger.exception("Got exception in _check_called_station_id") diff --git a/openwisp_radius/base/models.py b/openwisp_radius/base/models.py index 7814cc22..4f5b9f61 100644 --- a/openwisp_radius/base/models.py +++ b/openwisp_radius/base/models.py @@ -31,49 +31,6 @@ from phonenumber_field.modelfields import PhoneNumberField from private_storage.fields import PrivateFileField -def sanitize_mac_address(mac): - """ - Sanitize a MAC address string to the colon-separated lowercase format. - If the input is not a valid MAC address, return it unchanged. - - Handles various MAC address formats: - - 00:1A:2B:3C:4D:5E -> 00:1a:2b:3c:4d:5e - - 00-1A-2B-3C-4D-5E -> 00:1a:2b:3c:4d:5e - - 001A.2B3C.4D5E -> 00:1a:2b:3c:4d:5e - - 001A2B3C4D5E -> 00:1a:2b:3c:4d:5e - """ - # Return empty string or non-string input as is - if not mac or not isinstance(mac, str): - return mac - - # Check if it's an IP address (IPv4) - preserve unchanged - if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', mac): - return mac - - # Try to extract MAC address from the string - # Look for MAC address patterns, including those with additional text - mac_patterns = [ - r'([0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}', # Standard MAC with : or - - r'([0-9A-Fa-f]{4}\.){2}[0-9A-Fa-f]{4}', # Cisco format - r'[0-9A-Fa-f]{12}' # No separators - ] - - for pattern in mac_patterns: - match = re.search(pattern, mac) - if match: - mac_candidate = match.group(0) - try: - # Try to parse as EUI to validate - eui = EUI(mac_candidate) - # Return sanitized MAC address in colon-separated lowercase format - return ':'.join(['%02x' % x for x in eui.words]).lower() - except (AddrFormatError, ValueError, TypeError): - continue - - # If no valid MAC found, return original string unchanged - return mac - -import swapper from openwisp_radius.registration import ( REGISTRATION_METHOD_CHOICES, get_registration_choices, @@ -223,6 +180,40 @@ def sanitize_mac_address(mac): OPTIONAL_SETTINGS = app_settings.OPTIONAL_REGISTRATION_FIELDS +def sanitize_mac_address(mac): + """ + Sanitize a MAC address string to the colon-separated lowercase format. + If the input is not a valid MAC address, return it unchanged. + + Handles various MAC address formats: + - 00:1A:2B:3C:4D:5E -> 00:1a:2b:3c:4d:5e + - 00-1A-2B-3C-4D-5E -> 00:1a:2b:3c:4d:5e + - 001A.2B3C.4D5E -> 00:1a:2b:3c:4d:5e + - 001A2B3C4D5E -> 00:1a:2b:3c:4d:5e + """ + if not mac or not isinstance(mac, str): + return mac + try: + ipaddress.ip_address(mac) + return mac + except ValueError: + pass + mac_patterns = [ + r"([0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}", + r"([0-9A-Fa-f]{4}\.){2}[0-9A-Fa-f]{4}", + r"(?.*Reset password.*<\/a>', + r'.*Reset password.*', ) self.assertNotIn('.*Manage Session.*<\/a>', + r'.*Manage Session.*', ) self.assertIn( "A new session has been started for your account:" f" {user.username}", @@ -239,9 +239,10 @@ def test_send_login_email(self, translation_activate, utils_logger, task_logger) user.language = "it" user.save(update_fields=["language"]) tasks.send_login_email.delay(accounting_data) + email = mail.outbox.pop() self.assertRegex( "".join(email.alternatives[0][0].splitlines()), - '.*Manage Session.*<\/a>', + r'.*Manage Session.*', ) self.assertEqual(translation_activate.call_args_list[0][0][0], "it") self.assertEqual( diff --git a/setup.py b/setup.py index 8dda298b..1ecdd6cd 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ if sys.argv[-1] == "publish": # delete any *.pyc, *.pyo and __pycache__ - os.system('find . | grep -E "(__pycache__|\.pyc|\.pyo$)" | xargs rm -rf') + os.system(r'find . | grep -E "(__pycache__|\.pyc|\.pyo$)" | xargs rm -rf') os.system("python setup.py sdist bdist_wheel") os.system("twine upload -s dist/*") os.system("rm -rf dist build") diff --git a/tests/openwisp2/sample_radius/migrations/0002_initial_openwisp_app.py b/tests/openwisp2/sample_radius/migrations/0002_initial_openwisp_app.py index f41d423c..7bbb3d4e 100644 --- a/tests/openwisp2/sample_radius/migrations/0002_initial_openwisp_app.py +++ b/tests/openwisp2/sample_radius/migrations/0002_initial_openwisp_app.py @@ -505,7 +505,7 @@ class Migration(migrations.Migration): max_length=32, validators=[ django.core.validators.RegexValidator( - re.compile("^[^\\s/\\.]+$"), + re.compile(r"^[^\s/\.]+$"), code="invalid", message=( "This value must not contain spaces, " diff --git a/tests/openwisp2/sample_radius/migrations/0031_alter_organizationradiussettings_token.py b/tests/openwisp2/sample_radius/migrations/0031_alter_organizationradiussettings_token.py new file mode 100644 index 00000000..6285c415 --- /dev/null +++ b/tests/openwisp2/sample_radius/migrations/0031_alter_organizationradiussettings_token.py @@ -0,0 +1,35 @@ +# Generated by Django 5.2.5 on 2025-09-02 07:05 + +import re + +import django.core.validators +from django.db import migrations + +import openwisp_utils.fields +import openwisp_utils.utils + + +class Migration(migrations.Migration): + + dependencies = [ + ("sample_radius", "0030_alter_radiusaccounting_called_station_id_and_more"), + ] + + operations = [ + migrations.AlterField( + model_name="organizationradiussettings", + name="token", + field=openwisp_utils.fields.KeyField( + default=openwisp_utils.utils.get_random_key, + help_text=None, + max_length=32, + validators=[ + django.core.validators.RegexValidator( + re.compile(r"^[^\s/\.]+$"), + code="invalid", + message="This value must not contain spaces, dots or slashes.", + ) + ], + ), + ), + ] diff --git a/tests/openwisp2/sample_radius/migrations/0032_merge_20251222_0407.py b/tests/openwisp2/sample_radius/migrations/0032_merge_20251222_0407.py new file mode 100644 index 00000000..70af2fab --- /dev/null +++ b/tests/openwisp2/sample_radius/migrations/0032_merge_20251222_0407.py @@ -0,0 +1,13 @@ +# Generated by Django 5.2.5 on 2025-12-22 07:07 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ("sample_radius", "0031_alter_organizationradiussettings_token"), + ("sample_radius", "0031_radiusbatch_status"), + ] + + operations = []