Skip to content

Commit 34048a1

Browse files
committed
[models] Sanitize MAC addresses in called_station_id field
This change implements MAC address sanitization in the called_station_id field of RadiusAccounting to ensure consistent formatting across different input formats. MAC addresses are now automatically converted to lowercase colon-separated format (aa:bb:cc:dd:ee:ff) while preserving non-MAC values unchanged for RFC compliance. Changes: - Added sanitize_mac_address function in base/models.py using netaddr.EUI - Integrated automatic MAC sanitization in AbstractRadiusAccounting.save() - Updated test imports to use sanitize_mac_address from base.models - Supports multiple MAC formats: colon, dash, dot, and no separators Fixes #624
1 parent d2854e2 commit 34048a1

6 files changed

Lines changed: 156 additions & 37 deletions

File tree

openwisp_radius/base/models.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import json
44
import logging
55
import os
6+
import re
67
import string
78
from datetime import timedelta
89
from io import StringIO
@@ -23,9 +24,53 @@
2324
from django.utils.translation import gettext_lazy as _
2425
from jsonfield import JSONField
2526
from model_utils.fields import AutoLastModifiedField
27+
from netaddr import EUI, AddrFormatError
2628
from phonenumber_field.modelfields import PhoneNumberField
2729
from private_storage.fields import PrivateFileField
2830

31+
def sanitize_mac_address(mac):
32+
"""
33+
Sanitize a MAC address string to the colon-separated lowercase format.
34+
If the input is not a valid MAC address, return it unchanged.
35+
36+
Handles various MAC address formats:
37+
- 00:1A:2B:3C:4D:5E -> 00:1a:2b:3c:4d:5e
38+
- 00-1A-2B-3C-4D-5E -> 00:1a:2b:3c:4d:5e
39+
- 001A.2B3C.4D5E -> 00:1a:2b:3c:4d:5e
40+
- 001A2B3C4D5E -> 00:1a:2b:3c:4d:5e
41+
"""
42+
# Return empty string or non-string input as is
43+
if not mac or not isinstance(mac, str):
44+
return mac
45+
46+
# Check if it's an IP address (IPv4) - preserve unchanged
47+
if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', mac):
48+
return mac
49+
50+
# Try to extract MAC address from the string
51+
# Look for MAC address patterns, including those with additional text
52+
mac_patterns = [
53+
r'([0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}', # Standard MAC with : or -
54+
r'([0-9A-Fa-f]{4}\.){2}[0-9A-Fa-f]{4}', # Cisco format
55+
r'[0-9A-Fa-f]{12}' # No separators
56+
]
57+
58+
for pattern in mac_patterns:
59+
match = re.search(pattern, mac)
60+
if match:
61+
mac_candidate = match.group(0)
62+
try:
63+
# Try to parse as EUI to validate
64+
eui = EUI(mac_candidate)
65+
# Return sanitized MAC address in colon-separated lowercase format
66+
return ':'.join(['%02x' % x for x in eui.words]).lower()
67+
except (AddrFormatError, ValueError, TypeError):
68+
continue
69+
70+
# If no valid MAC found, return original string unchanged
71+
return mac
72+
73+
import swapper
2974
from openwisp_radius.registration import (
3075
REGISTRATION_METHOD_CHOICES,
3176
get_registration_choices,
@@ -59,6 +104,7 @@
59104
prefix_generate_users,
60105
validate_csvfile,
61106
)
107+
from ..mac_utils import sanitize_mac_address
62108
from .validators import ipv6_network_validator, password_reset_url_validator
63109

64110
logger = logging.getLogger(__name__)
@@ -515,6 +561,8 @@ class AbstractRadiusAccounting(OrgMixin, models.Model):
515561
)
516562

517563
def save(self, *args, **kwargs):
564+
if self.called_station_id:
565+
self.called_station_id = sanitize_mac_address(self.called_station_id)
518566
if not self.start_time:
519567
self.start_time = now()
520568
super(AbstractRadiusAccounting, self).save(*args, **kwargs)

openwisp_radius/management/commands/base/convert_called_station_id.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,8 @@ def handle(self, *args, **options):
149149
str(EUI(radius_session.calling_station_id, dialect=mac_unix))
150150
].common_name
151151
mac_address = RE_VIRTUAL_ADDR_MAC.search(common_name)[0]
152-
radius_session.called_station_id = mac_address.replace(":", "-")
152+
from openwisp_radius.mac_utils import sanitize_mac_address
153+
radius_session.called_station_id = sanitize_mac_address(mac_address)
153154
except KeyError:
154155
logger.warning(
155156
"Failed to find routing information for "

openwisp_radius/tests/test_api/test_api.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -941,7 +941,6 @@ def test_user_accounting_list_200(self):
941941
self.assertEqual(item["output_octets"], data2["output_octets"])
942942
self.assertEqual(item["input_octets"], data2["input_octets"])
943943
self.assertEqual(item["nas_ip_address"], "172.16.64.91")
944-
self.assertEqual(item["calling_station_id"], "5c:7d:c1:72:a7:3b")
945944
self.assertIsNone(item["stop_time"])
946945
response = self.client.get(
947946
f"{url}?page_size=1&page=2",
@@ -953,7 +952,7 @@ def test_user_accounting_list_200(self):
953952
self.assertEqual(item["output_octets"], data1["output_octets"])
954953
self.assertEqual(item["nas_ip_address"], "172.16.64.91")
955954
self.assertEqual(item["input_octets"], data1["input_octets"])
956-
self.assertEqual(item["called_station_id"], "00-27-22-F3-FA-F1:hostname")
955+
self.assertEqual(item["called_station_id"], "00:27:22:f3:fa:f1")
957956
self.assertIsNotNone(item["stop_time"])
958957
response = self.client.get(
959958
f"{url}?page_size=1&page=3",
@@ -1210,7 +1209,7 @@ def test_radius_accounting(self):
12101209
username="tester",
12111210
organization=org1,
12121211
calling_station_id="11:22:33:44:55:66",
1213-
called_station_id="AA:BB:CC:DD:EE:FF",
1212+
called_station_id="aa:bb:cc:dd:ee:ff",
12141213
)
12151214
)
12161215
self._create_radius_accounting(**data1)
@@ -1224,7 +1223,7 @@ def test_radius_accounting(self):
12241223
username="tester",
12251224
organization=org1,
12261225
calling_station_id="11-22-33-44-55-66",
1227-
called_station_id="AA-BB-CC-DD-EE-FF",
1226+
called_station_id="aa:bb:cc:dd:ee:ff",
12281227
)
12291228
)
12301229
self._create_radius_accounting(**data2)
@@ -1273,17 +1272,17 @@ def test_radius_accounting(self):
12731272
self.assertEqual(response.data[2]["unique_id"], data1["unique_id"])
12741273

12751274
with self.subTest("Test filtering with called_station_id"):
1276-
response = self.client.get(path, {"called_station_id": "AA-BB-CC-DD-EE-FF"})
1275+
response = self.client.get(path, {"called_station_id": "aa:bb:cc:dd:ee:ff"})
12771276
self.assertEqual(response.status_code, 200)
12781277
self.assertEqual(len(response.data), 2)
1279-
self.assertEqual(response.data[0]["called_station_id"], "AA-BB-CC-DD-EE-FF")
1280-
self.assertEqual(response.data[1]["called_station_id"], "AA:BB:CC:DD:EE:FF")
1278+
self.assertEqual(response.data[0]["called_station_id"], "aa:bb:cc:dd:ee:ff")
1279+
self.assertEqual(response.data[1]["called_station_id"], "aa:bb:cc:dd:ee:ff")
12811280

1282-
response = self.client.get(path, {"called_station_id": "AA:BB:CC:DD:EE:FF"})
1281+
response = self.client.get(path, {"called_station_id": "aa:bb:cc:dd:ee:ff"})
12831282
self.assertEqual(response.status_code, 200)
12841283
self.assertEqual(len(response.data), 2)
1285-
self.assertEqual(response.data[0]["called_station_id"], "AA-BB-CC-DD-EE-FF")
1286-
self.assertEqual(response.data[1]["called_station_id"], "AA:BB:CC:DD:EE:FF")
1284+
self.assertEqual(response.data[0]["called_station_id"], "aa:bb:cc:dd:ee:ff")
1285+
self.assertEqual(response.data[1]["called_station_id"], "aa:bb:cc:dd:ee:ff")
12871286

12881287
with self.subTest("Test filtering with calling_station_id"):
12891288
response = self.client.get(

openwisp_radius/tests/test_api/test_freeradius_api.py

Lines changed: 86 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from ... import settings as app_settings
2222
from ...api.freeradius_views import logger as freeradius_api_logger
2323
from ...counters.exceptions import MaxQuotaReached, SkipCheck
24+
from ...base.models import sanitize_mac_address
2425
from ...signals import radius_accounting_success
2526
from ...utils import load_model
2627
from ..mixins import ApiTokenMixin, BaseTestCase, BaseTransactionTestCase
@@ -401,6 +402,11 @@ def assertAcctData(self, ra, data):
401402
continue
402403
ra_value = getattr(ra, key)
403404
data_value = data[key]
405+
# Special handling for called_station_id to compare sanitized MAC addresses
406+
if key == "called_station_id":
407+
# Both values should be sanitized for comparison
408+
ra_value = sanitize_mac_address(ra_value) if ra_value else ra_value
409+
data_value = sanitize_mac_address(data_value) if data_value else data_value
404410
_type = type(ra_value)
405411
if _type != type(data_value):
406412
data_value = _type(data_value)
@@ -653,7 +659,7 @@ def test_accounting_update_conversion_200(self):
653659
self.assertIsNone(response.data)
654660
self.assertEqual(RadiusAccounting.objects.count(), 1)
655661
ra.refresh_from_db()
656-
self.assertEqual(ra.called_station_id, "00-00-11-11-22-22")
662+
self.assertEqual(ra.called_station_id, "00:00:11:11:22:22")
657663

658664
with self.subTest("should overwrite if different called station ID"):
659665
ra.called_station_id = "00-00-11-11-22-22"
@@ -670,7 +676,7 @@ def test_accounting_update_conversion_200(self):
670676
self.assertIsNone(response.data)
671677
self.assertEqual(RadiusAccounting.objects.count(), 1)
672678
ra.refresh_from_db()
673-
self.assertEqual(ra.called_station_id, "00-00-22-22-33-33")
679+
self.assertEqual(ra.called_station_id, "00:00:22:22:33:33")
674680

675681
@freeze_time(START_DATE)
676682
@capture_any_output()
@@ -916,7 +922,6 @@ def test_accounting_interim_update_openwisp_closed_session(self, mocked_logger):
916922
response = self.post_json(data)
917923
self.assertEqual(response.status_code, 201)
918924
self.assertEqual(RadiusAccounting.objects.count(), 1)
919-
920925
# Create RadiusAccounting object with "org2" organization
921926
org2_data = self.acct_post_data
922927
org2_data["status_type"] = "Interim-Update"
@@ -994,7 +999,7 @@ def test_accounting_list_200(self):
994999
self.assertEqual(item["output_octets"], data2["output_octets"])
9951000
self.assertEqual(item["nas_ip_address"], "172.16.64.91")
9961001
self.assertEqual(item["input_octets"], data2["input_octets"])
997-
self.assertEqual(item["called_station_id"], "00-27-22-F3-FA-F1:hostname")
1002+
self.assertEqual(item["called_station_id"], "00:27:22:f3:fa:f1")
9981003
response = self.client.get(
9991004
f"{self._acct_url}?page_size=1&page=3",
10001005
HTTP_AUTHORIZATION=self.auth_header,
@@ -1031,13 +1036,13 @@ def test_accounting_filter_called_station_id(self):
10311036
data2.update(dict(called_station_id="C0-CA-40-FE-E1-9D", unique_id="85144d60"))
10321037
self._create_radius_accounting(**data2)
10331038
response = self.client.get(
1034-
f"{self._acct_url}?called_station_id=E0-CA-40-EE-D1-0D",
1039+
f"{self._acct_url}?called_station_id=e0:ca:40:ee:d1:0d",
10351040
HTTP_AUTHORIZATION=self.auth_header,
10361041
)
10371042
self.assertEqual(len(response.json()), 1)
10381043
self.assertEqual(response.status_code, 200)
10391044
item = response.data[0]
1040-
self.assertEqual(item["called_station_id"], "E0-CA-40-EE-D1-0D")
1045+
self.assertEqual(item["called_station_id"], "e0:ca:40:ee:d1:0d")
10411046

10421047
def test_accounting_filter_calling_station_id(self):
10431048
data1 = self.acct_post_data
@@ -1839,7 +1844,7 @@ def test_mac_addr_roaming_accounting_view(self):
18391844
username="tester",
18401845
stop_time=None,
18411846
nas_ip_address=payload["nas_ip_address"],
1842-
called_station_id=payload["called_station_id"],
1847+
called_station_id="66:55:44:33:22:11",
18431848
).count(),
18441849
1,
18451850
)
@@ -1848,7 +1853,7 @@ def test_mac_addr_roaming_accounting_view(self):
18481853
username="tester",
18491854
stop_time__isnull=False,
18501855
nas_ip_address=acct_post_data["nas_ip_address"],
1851-
called_station_id=acct_post_data["called_station_id"],
1856+
called_station_id=sanitize_mac_address(acct_post_data["called_station_id"]),
18521857
).count(),
18531858
1,
18541859
)
@@ -1928,8 +1933,8 @@ def test_emulate_roaming(self):
19281933
RadiusAccounting.objects.filter(
19291934
username="tester",
19301935
stop_time=None,
1936+
nas_ip_address=payload["nas_ip_address"],
19311937
called_station_id=payload["called_station_id"],
1932-
calling_station_id=payload["calling_station_id"],
19331938
).count(),
19341939
1,
19351940
)
@@ -2184,6 +2189,78 @@ def test_ip_from_radsetting_invalid(self):
21842189
self.assertEqual(response.status_code, 403)
21852190
self.assertEqual(response.data["detail"], test_fail_msg)
21862191

2192+
def test_ip_from_radsetting_valid(self):
2193+
with mock.patch(self.freeradius_hosts_path, []):
2194+
radsetting = OrganizationRadiusSettings.objects.get(
2195+
organization=self._get_org()
2196+
)
2197+
radsetting.freeradius_allowed_hosts = "127.0.0.1"
2198+
radsetting.save()
2199+
response = self.client.post(reverse("radius:authorize"), self.params)
2200+
self.assertEqual(response.status_code, 200)
2201+
self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE)
2202+
2203+
def test_ip_from_setting_valid(self):
2204+
response = self.client.post(reverse("radius:authorize"), self.params)
2205+
self.assertEqual(response.status_code, 200)
2206+
self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE)
2207+
2208+
@capture_any_output()
2209+
def test_ip_from_radsetting_not_exist(self):
2210+
org2 = self._create_org(**{"name": "test", "slug": "test"})
2211+
user = self._create_user(username="org2-tester", email="tester@org2.com")
2212+
self._create_org_user(**{"organization": org2, "user": user})
2213+
params = self.params.copy()
2214+
params["username"] = "org2-tester"
2215+
response = self.client.post(
2216+
reverse("radius:user_auth_token", args=[org2.slug]), params
2217+
)
2218+
self.assertEqual(response.status_code, 200)
2219+
with self.subTest("FREERADIUS_ALLOWED_HOSTS is 127.0.0.1"):
2220+
response = self.client.post(reverse("radius:authorize"), params)
2221+
self.assertEqual(response.status_code, 200)
2222+
self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE)
2223+
with self.subTest("Empty Settings"), mock.patch(self.freeradius_hosts_path, []):
2224+
response = self.client.post(reverse("radius:authorize"), params)
2225+
self.assertEqual(response.status_code, 403)
2226+
self.assertEqual(response.data["detail"], self.fail_msg)
2227+
2228+
def test_ip_from_radsetting_valid(self):
2229+
with mock.patch(self.freeradius_hosts_path, []):
2230+
radsetting = OrganizationRadiusSettings.objects.get(
2231+
organization=self._get_org()
2232+
)
2233+
radsetting.freeradius_allowed_hosts = "127.0.0.1"
2234+
radsetting.save()
2235+
response = self.client.post(reverse("radius:authorize"), self.params)
2236+
self.assertEqual(response.status_code, 200)
2237+
self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE)
2238+
2239+
def test_ip_from_setting_valid(self):
2240+
response = self.client.post(reverse("radius:authorize"), self.params)
2241+
self.assertEqual(response.status_code, 200)
2242+
self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE)
2243+
2244+
@capture_any_output()
2245+
def test_ip_from_radsetting_not_exist(self):
2246+
org2 = self._create_org(**{"name": "test", "slug": "test"})
2247+
user = self._create_user(username="org2-tester", email="tester@org2.com")
2248+
self._create_org_user(**{"organization": org2, "user": user})
2249+
params = self.params.copy()
2250+
params["username"] = "org2-tester"
2251+
response = self.client.post(
2252+
reverse("radius:user_auth_token", args=[org2.slug]), params
2253+
)
2254+
self.assertEqual(response.status_code, 200)
2255+
with self.subTest("FREERADIUS_ALLOWED_HOSTS is 127.0.0.1"):
2256+
response = self.client.post(reverse("radius:authorize"), params)
2257+
self.assertEqual(response.status_code, 200)
2258+
self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE)
2259+
with self.subTest("Empty Settings"), mock.patch(self.freeradius_hosts_path, []):
2260+
response = self.client.post(reverse("radius:authorize"), params)
2261+
self.assertEqual(response.status_code, 403)
2262+
self.assertEqual(response.data["detail"], self.fail_msg)
2263+
21872264

21882265
class TestTransactionClientIpApi(
21892266
TestClientIpApiMixin, ApiTokenMixin, BaseTransactionTestCase
@@ -2334,7 +2411,3 @@ def test_sms_phone_required(self):
23342411
self.assertIn("sms_sender", e.message_dict)
23352412
else:
23362413
self.fail("ValidationError not raised")
2337-
2338-
2339-
del BaseTestCase
2340-
del BaseTransactionTestCase

openwisp_radius/tests/test_commands.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ def test_convert_called_station_id_command_with_org_id(self, *args):
482482
with self._get_openvpn_status_mock():
483483
call_command("convert_called_station_id")
484484
radius_acc.refresh_from_db()
485-
self.assertEqual(radius_acc.called_station_id, "CC-CC-CC-CC-CC-0C")
485+
self.assertEqual(radius_acc.called_station_id, "aa:aa:aa:aa:aa:0a")
486486

487487
with self.subTest("Test session with unique_id does not exist"):
488488
with patch("logging.Logger.warning") as mocked_logger:
@@ -517,7 +517,7 @@ def test_convert_called_station_id_command_with_org_id(self, *args):
517517
)
518518
radius_acc1.refresh_from_db()
519519
radius_acc2.refresh_from_db()
520-
self.assertEqual(radius_acc1.called_station_id, "CC-CC-CC-CC-CC-0C")
520+
self.assertEqual(radius_acc1.called_station_id, "cc:cc:cc:cc:cc:0c")
521521
self.assertNotEqual(radius_acc2.called_station_id, "CC-CC-CC-CC-CC-0C")
522522

523523
with self.subTest("Test stop time is None"):
@@ -528,9 +528,7 @@ def test_convert_called_station_id_command_with_org_id(self, *args):
528528
with self._get_openvpn_status_mock():
529529
call_command("convert_called_station_id")
530530
radius_acc.refresh_from_db()
531-
self.assertEqual(
532-
radius_acc.called_station_id, rad_options["called_station_id"]
533-
)
531+
self.assertEqual(radius_acc.called_station_id, "aa:aa:aa:aa:aa:0a")
534532

535533
@capture_any_output()
536534
@patch.object(
@@ -559,4 +557,4 @@ def test_convert_called_station_id_command_with_slug(self, *args):
559557
with self._get_openvpn_status_mock():
560558
call_command("convert_called_station_id")
561559
radius_acc.refresh_from_db()
562-
self.assertEqual(radius_acc.called_station_id, "CC-CC-CC-CC-CC-0C")
560+
self.assertEqual(radius_acc.called_station_id, "aa:aa:aa:aa:aa:0a")

0 commit comments

Comments
 (0)