diff --git a/openwisp_radius/api/freeradius_views.py b/openwisp_radius/api/freeradius_views.py index b69232e5..c8e8d90b 100644 --- a/openwisp_radius/api/freeradius_views.py +++ b/openwisp_radius/api/freeradius_views.py @@ -28,6 +28,7 @@ from .. import registration from .. import settings as app_settings +from ..base.models import sanitize_mac_address from ..counters.base import BaseCounter from ..counters.exceptions import MaxQuotaReached from ..signals import radius_accounting_success @@ -372,7 +373,7 @@ def _check_simultaneous_use( # of the same client on the same NAS. if called_station_id and calling_station_id: open_sessions = open_sessions.exclude( - called_station_id=called_station_id, + called_station_id=sanitize_mac_address(called_station_id), calling_station_id=calling_station_id, ) open_sessions = open_sessions.count() diff --git a/openwisp_radius/api/serializers.py b/openwisp_radius/api/serializers.py index b9b01165..f721f6b1 100644 --- a/openwisp_radius/api/serializers.py +++ b/openwisp_radius/api/serializers.py @@ -35,6 +35,7 @@ from .. import settings as app_settings from ..base.forms import PasswordResetForm +from ..base.models import sanitize_mac_address from ..counters.exceptions import SkipCheck from ..registration import REGISTRATION_METHOD_CHOICES from ..utils import ( @@ -271,7 +272,11 @@ def _check_called_station_id(self, instance, acct_data): do not overwrite it back to unconverted ID during interim updates """ ids = app_settings.CALLED_STATION_IDS - if not ids or instance.called_station_id == acct_data["called_station_id"]: + called_station_id = acct_data.get("called_station_id") + if not called_station_id: + return acct_data + sanitized = sanitize_mac_address(called_station_id) + if not ids or instance.called_station_id == sanitized: return acct_data try: organization = acct_data["organization"] @@ -285,7 +290,7 @@ def _check_called_station_id(self, instance, acct_data): unconverted_ids = ids.get(str(organization.id), {}).get( "unconverted_ids", [] ) + ids.get(organization.slug, {}).get("unconverted_ids", []) - if acct_data["called_station_id"] in unconverted_ids: + if sanitized in unconverted_ids or called_station_id in unconverted_ids: acct_data["called_station_id"] = instance.called_station_id except Exception: logger.exception("Got exception in _check_called_station_id") diff --git a/openwisp_radius/base/models.py b/openwisp_radius/base/models.py index 808b8640..4f5b9f61 100644 --- a/openwisp_radius/base/models.py +++ b/openwisp_radius/base/models.py @@ -3,6 +3,7 @@ import json import logging import os +import re import string from datetime import timedelta from io import StringIO @@ -25,6 +26,7 @@ from django.utils.translation import gettext_lazy as _ from jsonfield import JSONField from model_utils.fields import AutoLastModifiedField +from netaddr import EUI, AddrFormatError from openwisp_notifications.signals import notify from phonenumber_field.modelfields import PhoneNumberField from private_storage.fields import PrivateFileField @@ -178,6 +180,40 @@ OPTIONAL_SETTINGS = app_settings.OPTIONAL_REGISTRATION_FIELDS +def sanitize_mac_address(mac): + """ + Sanitize a MAC address string to the colon-separated lowercase format. + If the input is not a valid MAC address, return it unchanged. + + Handles various MAC address formats: + - 00:1A:2B:3C:4D:5E -> 00:1a:2b:3c:4d:5e + - 00-1A-2B-3C-4D-5E -> 00:1a:2b:3c:4d:5e + - 001A.2B3C.4D5E -> 00:1a:2b:3c:4d:5e + - 001A2B3C4D5E -> 00:1a:2b:3c:4d:5e + """ + if not mac or not isinstance(mac, str): + return mac + try: + ipaddress.ip_address(mac) + return mac + except ValueError: + pass + mac_patterns = [ + r"([0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}", + r"([0-9A-Fa-f]{4}\.){2}[0-9A-Fa-f]{4}", + r"(?.*Reset password.*<\/a>', + r'.*Reset password.*', ) self.assertNotIn('.*Manage Session.*<\/a>', + r'.*Manage Session.*', ) self.assertIn( "A new session has been started for your account:" f" {user.username}", @@ -239,9 +239,10 @@ def test_send_login_email(self, translation_activate, utils_logger, task_logger) user.language = "it" user.save(update_fields=["language"]) tasks.send_login_email.delay(accounting_data) + email = mail.outbox.pop() self.assertRegex( "".join(email.alternatives[0][0].splitlines()), - '.*Manage Session.*<\/a>', + r'.*Manage Session.*', ) self.assertEqual(translation_activate.call_args_list[0][0][0], "it") self.assertEqual( diff --git a/setup.py b/setup.py index 8dda298b..1ecdd6cd 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ if sys.argv[-1] == "publish": # delete any *.pyc, *.pyo and __pycache__ - os.system('find . | grep -E "(__pycache__|\.pyc|\.pyo$)" | xargs rm -rf') + os.system(r'find . | grep -E "(__pycache__|\.pyc|\.pyo$)" | xargs rm -rf') os.system("python setup.py sdist bdist_wheel") os.system("twine upload -s dist/*") os.system("rm -rf dist build") diff --git a/tests/openwisp2/sample_radius/migrations/0002_initial_openwisp_app.py b/tests/openwisp2/sample_radius/migrations/0002_initial_openwisp_app.py index f41d423c..7bbb3d4e 100644 --- a/tests/openwisp2/sample_radius/migrations/0002_initial_openwisp_app.py +++ b/tests/openwisp2/sample_radius/migrations/0002_initial_openwisp_app.py @@ -505,7 +505,7 @@ class Migration(migrations.Migration): max_length=32, validators=[ django.core.validators.RegexValidator( - re.compile("^[^\\s/\\.]+$"), + re.compile(r"^[^\s/\.]+$"), code="invalid", message=( "This value must not contain spaces, " diff --git a/tests/openwisp2/sample_radius/migrations/0031_alter_organizationradiussettings_token.py b/tests/openwisp2/sample_radius/migrations/0031_alter_organizationradiussettings_token.py new file mode 100644 index 00000000..6285c415 --- /dev/null +++ b/tests/openwisp2/sample_radius/migrations/0031_alter_organizationradiussettings_token.py @@ -0,0 +1,35 @@ +# Generated by Django 5.2.5 on 2025-09-02 07:05 + +import re + +import django.core.validators +from django.db import migrations + +import openwisp_utils.fields +import openwisp_utils.utils + + +class Migration(migrations.Migration): + + dependencies = [ + ("sample_radius", "0030_alter_radiusaccounting_called_station_id_and_more"), + ] + + operations = [ + migrations.AlterField( + model_name="organizationradiussettings", + name="token", + field=openwisp_utils.fields.KeyField( + default=openwisp_utils.utils.get_random_key, + help_text=None, + max_length=32, + validators=[ + django.core.validators.RegexValidator( + re.compile(r"^[^\s/\.]+$"), + code="invalid", + message="This value must not contain spaces, dots or slashes.", + ) + ], + ), + ), + ] diff --git a/tests/openwisp2/sample_radius/migrations/0032_merge_20251222_0407.py b/tests/openwisp2/sample_radius/migrations/0032_merge_20251222_0407.py new file mode 100644 index 00000000..70af2fab --- /dev/null +++ b/tests/openwisp2/sample_radius/migrations/0032_merge_20251222_0407.py @@ -0,0 +1,13 @@ +# Generated by Django 5.2.5 on 2025-12-22 07:07 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ("sample_radius", "0031_alter_organizationradiussettings_token"), + ("sample_radius", "0031_radiusbatch_status"), + ] + + operations = []