diff --git a/openwisp_radius/api/freeradius_views.py b/openwisp_radius/api/freeradius_views.py
index b69232e5..c8e8d90b 100644
--- a/openwisp_radius/api/freeradius_views.py
+++ b/openwisp_radius/api/freeradius_views.py
@@ -28,6 +28,7 @@
from .. import registration
from .. import settings as app_settings
+from ..base.models import sanitize_mac_address
from ..counters.base import BaseCounter
from ..counters.exceptions import MaxQuotaReached
from ..signals import radius_accounting_success
@@ -372,7 +373,7 @@ def _check_simultaneous_use(
# of the same client on the same NAS.
if called_station_id and calling_station_id:
open_sessions = open_sessions.exclude(
- called_station_id=called_station_id,
+ called_station_id=sanitize_mac_address(called_station_id),
calling_station_id=calling_station_id,
)
open_sessions = open_sessions.count()
diff --git a/openwisp_radius/api/serializers.py b/openwisp_radius/api/serializers.py
index b9b01165..f721f6b1 100644
--- a/openwisp_radius/api/serializers.py
+++ b/openwisp_radius/api/serializers.py
@@ -35,6 +35,7 @@
from .. import settings as app_settings
from ..base.forms import PasswordResetForm
+from ..base.models import sanitize_mac_address
from ..counters.exceptions import SkipCheck
from ..registration import REGISTRATION_METHOD_CHOICES
from ..utils import (
@@ -271,7 +272,11 @@ def _check_called_station_id(self, instance, acct_data):
do not overwrite it back to unconverted ID during interim updates
"""
ids = app_settings.CALLED_STATION_IDS
- if not ids or instance.called_station_id == acct_data["called_station_id"]:
+ called_station_id = acct_data.get("called_station_id")
+ if not called_station_id:
+ return acct_data
+ sanitized = sanitize_mac_address(called_station_id)
+ if not ids or instance.called_station_id == sanitized:
return acct_data
try:
organization = acct_data["organization"]
@@ -285,7 +290,7 @@ def _check_called_station_id(self, instance, acct_data):
unconverted_ids = ids.get(str(organization.id), {}).get(
"unconverted_ids", []
) + ids.get(organization.slug, {}).get("unconverted_ids", [])
- if acct_data["called_station_id"] in unconverted_ids:
+ if sanitized in unconverted_ids or called_station_id in unconverted_ids:
acct_data["called_station_id"] = instance.called_station_id
except Exception:
logger.exception("Got exception in _check_called_station_id")
diff --git a/openwisp_radius/base/models.py b/openwisp_radius/base/models.py
index 808b8640..4f5b9f61 100644
--- a/openwisp_radius/base/models.py
+++ b/openwisp_radius/base/models.py
@@ -3,6 +3,7 @@
import json
import logging
import os
+import re
import string
from datetime import timedelta
from io import StringIO
@@ -25,6 +26,7 @@
from django.utils.translation import gettext_lazy as _
from jsonfield import JSONField
from model_utils.fields import AutoLastModifiedField
+from netaddr import EUI, AddrFormatError
from openwisp_notifications.signals import notify
from phonenumber_field.modelfields import PhoneNumberField
from private_storage.fields import PrivateFileField
@@ -178,6 +180,40 @@
OPTIONAL_SETTINGS = app_settings.OPTIONAL_REGISTRATION_FIELDS
+def sanitize_mac_address(mac):
+ """
+ Sanitize a MAC address string to the colon-separated lowercase format.
+ If the input is not a valid MAC address, return it unchanged.
+
+ Handles various MAC address formats:
+ - 00:1A:2B:3C:4D:5E -> 00:1a:2b:3c:4d:5e
+ - 00-1A-2B-3C-4D-5E -> 00:1a:2b:3c:4d:5e
+ - 001A.2B3C.4D5E -> 00:1a:2b:3c:4d:5e
+ - 001A2B3C4D5E -> 00:1a:2b:3c:4d:5e
+ """
+ if not mac or not isinstance(mac, str):
+ return mac
+ try:
+ ipaddress.ip_address(mac)
+ return mac
+ except ValueError:
+ pass
+ mac_patterns = [
+ r"([0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}",
+ r"([0-9A-Fa-f]{4}\.){2}[0-9A-Fa-f]{4}",
+ r"(?.*Reset password.*<\/a>',
+ r'.*Reset password.*',
)
self.assertNotIn('
.*Manage Session.*<\/a>',
+ r'.*Manage Session.*',
)
self.assertIn(
"A new session has been started for your account:" f" {user.username}",
@@ -239,9 +239,10 @@ def test_send_login_email(self, translation_activate, utils_logger, task_logger)
user.language = "it"
user.save(update_fields=["language"])
tasks.send_login_email.delay(accounting_data)
+ email = mail.outbox.pop()
self.assertRegex(
"".join(email.alternatives[0][0].splitlines()),
- '.*Manage Session.*<\/a>',
+ r'.*Manage Session.*',
)
self.assertEqual(translation_activate.call_args_list[0][0][0], "it")
self.assertEqual(
diff --git a/setup.py b/setup.py
index 8dda298b..1ecdd6cd 100644
--- a/setup.py
+++ b/setup.py
@@ -8,7 +8,7 @@
if sys.argv[-1] == "publish":
# delete any *.pyc, *.pyo and __pycache__
- os.system('find . | grep -E "(__pycache__|\.pyc|\.pyo$)" | xargs rm -rf')
+ os.system(r'find . | grep -E "(__pycache__|\.pyc|\.pyo$)" | xargs rm -rf')
os.system("python setup.py sdist bdist_wheel")
os.system("twine upload -s dist/*")
os.system("rm -rf dist build")
diff --git a/tests/openwisp2/sample_radius/migrations/0002_initial_openwisp_app.py b/tests/openwisp2/sample_radius/migrations/0002_initial_openwisp_app.py
index f41d423c..7bbb3d4e 100644
--- a/tests/openwisp2/sample_radius/migrations/0002_initial_openwisp_app.py
+++ b/tests/openwisp2/sample_radius/migrations/0002_initial_openwisp_app.py
@@ -505,7 +505,7 @@ class Migration(migrations.Migration):
max_length=32,
validators=[
django.core.validators.RegexValidator(
- re.compile("^[^\\s/\\.]+$"),
+ re.compile(r"^[^\s/\.]+$"),
code="invalid",
message=(
"This value must not contain spaces, "
diff --git a/tests/openwisp2/sample_radius/migrations/0031_alter_organizationradiussettings_token.py b/tests/openwisp2/sample_radius/migrations/0031_alter_organizationradiussettings_token.py
new file mode 100644
index 00000000..6285c415
--- /dev/null
+++ b/tests/openwisp2/sample_radius/migrations/0031_alter_organizationradiussettings_token.py
@@ -0,0 +1,35 @@
+# Generated by Django 5.2.5 on 2025-09-02 07:05
+
+import re
+
+import django.core.validators
+from django.db import migrations
+
+import openwisp_utils.fields
+import openwisp_utils.utils
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("sample_radius", "0030_alter_radiusaccounting_called_station_id_and_more"),
+ ]
+
+ operations = [
+ migrations.AlterField(
+ model_name="organizationradiussettings",
+ name="token",
+ field=openwisp_utils.fields.KeyField(
+ default=openwisp_utils.utils.get_random_key,
+ help_text=None,
+ max_length=32,
+ validators=[
+ django.core.validators.RegexValidator(
+ re.compile(r"^[^\s/\.]+$"),
+ code="invalid",
+ message="This value must not contain spaces, dots or slashes.",
+ )
+ ],
+ ),
+ ),
+ ]
diff --git a/tests/openwisp2/sample_radius/migrations/0032_merge_20251222_0407.py b/tests/openwisp2/sample_radius/migrations/0032_merge_20251222_0407.py
new file mode 100644
index 00000000..70af2fab
--- /dev/null
+++ b/tests/openwisp2/sample_radius/migrations/0032_merge_20251222_0407.py
@@ -0,0 +1,13 @@
+# Generated by Django 5.2.5 on 2025-12-22 07:07
+
+from django.db import migrations
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("sample_radius", "0031_alter_organizationradiussettings_token"),
+ ("sample_radius", "0031_radiusbatch_status"),
+ ]
+
+ operations = []