Skip to content

Commit cf877ea

Browse files
committed
lora/lora-lorawan: Add LoRaWAN 1.0.x MAC layer.
Add LoRaWAN Class A device operation with OTAA and ABP activation. Includes AES-128-CMAC/CTR crypto primitives built on ucryptolib for MIC calculation, payload encryption, and session key derivation per the LoRaWAN 1.0.x specification. Used by Pycom LoPy/LoPy4 boards as defined in micropython/micropython#19026. Signed-off-by: Danilo D <danilodt@gmail.com>
1 parent c9c9d04 commit cf877ea

File tree

3 files changed

+552
-0
lines changed

3 files changed

+552
-0
lines changed
Lines changed: 397 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,397 @@
1+
"""
2+
LoRaWAN 1.0.x MAC layer for MicroPython.
3+
4+
Implements Class A device operation with OTAA and ABP activation.
5+
Runs on top of the SX127x raw LoRa driver.
6+
7+
Usage (OTAA):
8+
from lora import LoRa
9+
from lorawan import LoRaWAN
10+
11+
radio = LoRa(frequency=868100000, sf=7, bw=125000)
12+
wan = LoRaWAN(radio, mode=LoRaWAN.OTAA,
13+
dev_eui=bytes.fromhex('...'),
14+
app_eui=bytes.fromhex('...'),
15+
app_key=bytes.fromhex('...'))
16+
wan.join()
17+
wan.send(1, b'Hello')
18+
19+
Usage (ABP):
20+
wan = LoRaWAN(radio, mode=LoRaWAN.ABP,
21+
dev_addr=0x01234567,
22+
nwk_skey=bytes.fromhex('...'),
23+
app_skey=bytes.fromhex('...'))
24+
wan.send(1, b'Hello')
25+
"""
26+
27+
from micropython import const
28+
import struct
29+
import time
30+
import os
31+
from lora.lorawan_crypto import (
32+
aes_cmac,
33+
aes_ctr_encrypt,
34+
aes_ctr_decrypt,
35+
compute_mic,
36+
derive_session_keys,
37+
)
38+
39+
# LoRaWAN MHDR message types
40+
_MTYPE_JOIN_REQUEST = const(0x00)
41+
_MTYPE_JOIN_ACCEPT = const(0x20)
42+
_MTYPE_UNCONFIRMED_UP = const(0x40)
43+
_MTYPE_UNCONFIRMED_DOWN = const(0x60)
44+
_MTYPE_CONFIRMED_UP = const(0x80)
45+
_MTYPE_CONFIRMED_DOWN = const(0xA0)
46+
47+
# Direction constants
48+
_DIR_UP = const(0)
49+
_DIR_DOWN = const(1)
50+
51+
# LoRaWAN data rate table for EU868
52+
_DR_TABLE_EU868 = {
53+
0: (12, 125000),
54+
1: (11, 125000),
55+
2: (10, 125000),
56+
3: (9, 125000),
57+
4: (8, 125000),
58+
5: (7, 125000),
59+
}
60+
61+
# Default RX2 parameters (EU868)
62+
_RX2_FREQ = 869525000
63+
_RX2_DR = 0 # SF12/125kHz
64+
65+
# Class A timing
66+
_JOIN_ACCEPT_DELAY1 = 5000 # ms
67+
_JOIN_ACCEPT_DELAY2 = 6000 # ms
68+
_RECEIVE_DELAY1 = 1000 # ms
69+
_RECEIVE_DELAY2 = 2000 # ms
70+
71+
72+
class LoRaWAN:
73+
"""LoRaWAN 1.0.x Class A device."""
74+
75+
OTAA = 0
76+
ABP = 1
77+
78+
def __init__(
79+
self,
80+
radio,
81+
mode=0,
82+
dev_eui=None,
83+
app_eui=None,
84+
app_key=None,
85+
dev_addr=None,
86+
nwk_skey=None,
87+
app_skey=None,
88+
):
89+
"""
90+
Initialize LoRaWAN MAC layer.
91+
92+
Args:
93+
radio: LoRa radio instance (from lora module)
94+
mode: LoRaWAN.OTAA or LoRaWAN.ABP
95+
dev_eui: 8-byte Device EUI (OTAA)
96+
app_eui: 8-byte Application EUI / JoinEUI (OTAA)
97+
app_key: 16-byte Application Key (OTAA)
98+
dev_addr: 4-byte Device Address as int (ABP)
99+
nwk_skey: 16-byte Network Session Key (ABP)
100+
app_skey: 16-byte Application Session Key (ABP)
101+
"""
102+
self._radio = radio
103+
self._mode = mode
104+
self._joined = False
105+
106+
# Frame counters
107+
self._fcnt_up = 0
108+
self._fcnt_down = 0
109+
110+
# RX1 delay and data rate offset
111+
self._rx1_delay = _RECEIVE_DELAY1
112+
self._rx1_dr_offset = 0
113+
114+
if mode == self.OTAA:
115+
if not all([dev_eui, app_eui, app_key]):
116+
raise ValueError("OTAA requires dev_eui, app_eui, app_key")
117+
self._dev_eui = dev_eui
118+
self._app_eui = app_eui
119+
self._app_key = app_key
120+
self._dev_addr = None
121+
self._nwk_skey = None
122+
self._app_skey = None
123+
elif mode == self.ABP:
124+
if not all([dev_addr is not None, nwk_skey, app_skey]):
125+
raise ValueError("ABP requires dev_addr, nwk_skey, app_skey")
126+
self._dev_addr = dev_addr
127+
self._nwk_skey = nwk_skey
128+
self._app_skey = app_skey
129+
self._joined = True
130+
else:
131+
raise ValueError("mode must be OTAA or ABP")
132+
133+
@property
134+
def joined(self):
135+
"""Whether the device has joined the network."""
136+
return self._joined
137+
138+
@property
139+
def dev_addr(self):
140+
"""Device address (available after join)."""
141+
return self._dev_addr
142+
143+
def join(self, timeout=30000):
144+
"""
145+
Perform OTAA join procedure.
146+
147+
Args:
148+
timeout: Total timeout in ms for join attempts (default 30s)
149+
150+
Raises:
151+
RuntimeError: If join fails after timeout
152+
"""
153+
if self._mode != self.OTAA:
154+
raise RuntimeError("join() only for OTAA mode")
155+
156+
# Configure radio for join
157+
self._radio._radio.set_sync_word(0x34) # LoRaWAN public sync word
158+
159+
start = time.ticks_ms()
160+
while time.ticks_diff(time.ticks_ms(), start) < timeout:
161+
dev_nonce = os.urandom(2)
162+
join_req = self._build_join_request(dev_nonce)
163+
164+
# Send join request
165+
self._radio._radio.send(join_req)
166+
tx_end = time.ticks_ms()
167+
168+
# RX1 window: JoinAcceptDelay1
169+
accept = self._rx_window(tx_end, _JOIN_ACCEPT_DELAY1)
170+
if accept:
171+
if self._process_join_accept(accept, dev_nonce):
172+
self._joined = True
173+
return True
174+
175+
# RX2 window: JoinAcceptDelay2 at RX2 frequency/DR
176+
sf, bw = _DR_TABLE_EU868[_RX2_DR]
177+
saved_freq = self._radio._radio._frequency
178+
self._radio._radio.set_frequency(_RX2_FREQ)
179+
self._radio._radio.set_spreading_factor(sf)
180+
self._radio._radio.set_bandwidth(bw)
181+
182+
accept = self._rx_window(tx_end, _JOIN_ACCEPT_DELAY2)
183+
184+
# Restore original settings
185+
self._radio._radio.set_frequency(saved_freq)
186+
self._radio._radio.set_spreading_factor(7)
187+
self._radio._radio.set_bandwidth(125000)
188+
189+
if accept:
190+
if self._process_join_accept(accept, dev_nonce):
191+
self._joined = True
192+
return True
193+
194+
# Backoff before retry
195+
time.sleep_ms(1000)
196+
197+
raise RuntimeError("Join failed: timeout")
198+
199+
def send(self, port, payload, confirmed=False):
200+
"""
201+
Send an uplink frame.
202+
203+
Args:
204+
port: FPort (1-223)
205+
payload: Data bytes to send
206+
confirmed: If True, send confirmed uplink
207+
208+
Returns:
209+
Downlink data bytes or None
210+
"""
211+
if not self._joined:
212+
raise RuntimeError("Not joined")
213+
214+
if port < 1 or port > 223:
215+
raise ValueError("FPort must be 1-223")
216+
217+
mtype = _MTYPE_CONFIRMED_UP if confirmed else _MTYPE_UNCONFIRMED_UP
218+
frame = self._build_data_frame(mtype, port, payload)
219+
220+
# Send frame
221+
self._radio._radio.set_sync_word(0x34)
222+
self._radio._radio.send(frame)
223+
tx_end = time.ticks_ms()
224+
225+
self._fcnt_up += 1
226+
227+
# Class A: open RX1, then RX2
228+
# RX1
229+
rx_data = self._rx_window(tx_end, self._rx1_delay)
230+
if rx_data:
231+
return self._process_downlink(rx_data)
232+
233+
# RX2
234+
sf, bw = _DR_TABLE_EU868[_RX2_DR]
235+
saved_freq = self._radio._radio._frequency
236+
self._radio._radio.set_frequency(_RX2_FREQ)
237+
self._radio._radio.set_spreading_factor(sf)
238+
self._radio._radio.set_bandwidth(bw)
239+
240+
rx_data = self._rx_window(tx_end, _RECEIVE_DELAY2)
241+
242+
self._radio._radio.set_frequency(saved_freq)
243+
self._radio._radio.set_spreading_factor(7)
244+
self._radio._radio.set_bandwidth(125000)
245+
246+
if rx_data:
247+
return self._process_downlink(rx_data)
248+
249+
return None
250+
251+
def _rx_window(self, tx_end, delay_ms):
252+
"""Open a receive window at tx_end + delay_ms."""
253+
now = time.ticks_ms()
254+
wait = time.ticks_diff(tx_end + delay_ms, now)
255+
if wait > 0:
256+
time.sleep_ms(wait)
257+
return self._radio._radio.recv(timeout_ms=1000)
258+
259+
def _build_join_request(self, dev_nonce):
260+
"""Build LoRaWAN join-request message."""
261+
mhdr = bytes([_MTYPE_JOIN_REQUEST | 0x00]) # Major=0
262+
# AppEUI and DevEUI in little-endian
263+
body = mhdr + self._app_eui[::-1] + self._dev_eui[::-1] + dev_nonce
264+
mic = aes_cmac(self._app_key, body)[:4]
265+
return body + mic
266+
267+
def _process_join_accept(self, data, dev_nonce):
268+
"""Process and validate a join-accept message."""
269+
if len(data) < 17:
270+
return False
271+
272+
# Decrypt join-accept (the entire payload after MHDR is encrypted)
273+
mhdr = data[0]
274+
if (mhdr & 0xE0) != _MTYPE_JOIN_ACCEPT:
275+
return False
276+
277+
from ucryptolib import aes
278+
279+
encrypted = data[1:]
280+
decrypted = bytearray()
281+
for i in range(0, len(encrypted), 16):
282+
block = encrypted[i : i + 16]
283+
if len(block) < 16:
284+
block = block + b"\x00" * (16 - len(block))
285+
decrypted += aes(self._app_key, 1).encrypt(block)
286+
287+
# Parse: AppNonce(3) | NetID(3) | DevAddr(4) | DLSettings(1) | RxDelay(1) [| CFList]
288+
app_nonce = bytes(decrypted[0:3])
289+
net_id = bytes(decrypted[3:6])
290+
self._dev_addr = struct.unpack("<I", decrypted[6:10])[0]
291+
dl_settings = decrypted[10]
292+
rx_delay = decrypted[11]
293+
294+
# Verify MIC
295+
mic_msg = bytes([mhdr]) + bytes(decrypted[:-4])
296+
mic_calc = aes_cmac(self._app_key, mic_msg)[:4]
297+
mic_recv = bytes(decrypted[-4:])
298+
299+
if mic_calc != mic_recv:
300+
return False
301+
302+
# Parse DLSettings
303+
self._rx1_dr_offset = (dl_settings >> 4) & 0x07
304+
305+
# Parse RxDelay
306+
if rx_delay == 0:
307+
self._rx1_delay = _RECEIVE_DELAY1
308+
else:
309+
self._rx1_delay = rx_delay * 1000
310+
311+
# Derive session keys
312+
self._nwk_skey, self._app_skey = derive_session_keys(
313+
self._app_key, app_nonce, net_id, dev_nonce
314+
)
315+
316+
# Reset frame counters
317+
self._fcnt_up = 0
318+
self._fcnt_down = 0
319+
320+
return True
321+
322+
def _build_data_frame(self, mtype, port, payload):
323+
"""Build a LoRaWAN data frame (uplink)."""
324+
mhdr = bytes([mtype | 0x00])
325+
326+
# FHDR: DevAddr(4) | FCtrl(1) | FCnt(2)
327+
fctrl = 0x00 # No ADR, no ACK, no FOptsLen
328+
fhdr = struct.pack("<IBH", self._dev_addr, fctrl, self._fcnt_up & 0xFFFF)
329+
330+
fport = bytes([port])
331+
332+
# Encrypt payload
333+
enc_payload = aes_ctr_encrypt(
334+
self._app_skey, payload, self._dev_addr, self._fcnt_up, _DIR_UP
335+
)
336+
337+
# Assemble frame (without MIC)
338+
frame = mhdr + fhdr + fport + enc_payload
339+
340+
# Compute and append MIC
341+
mic = compute_mic(
342+
self._nwk_skey, frame, self._dev_addr, self._fcnt_up, _DIR_UP, len(frame)
343+
)
344+
345+
return frame + mic
346+
347+
def _process_downlink(self, data):
348+
"""Process a downlink frame, return decrypted payload or None."""
349+
if len(data) < 12:
350+
return None
351+
352+
mhdr = data[0]
353+
mtype = mhdr & 0xE0
354+
if mtype not in (_MTYPE_UNCONFIRMED_DOWN, _MTYPE_CONFIRMED_DOWN):
355+
return None
356+
357+
# Parse FHDR
358+
dev_addr = struct.unpack("<I", data[1:5])[0]
359+
if dev_addr != self._dev_addr:
360+
return None
361+
362+
fctrl = data[5]
363+
fopts_len = fctrl & 0x0F
364+
fcnt_low = struct.unpack("<H", data[6:8])[0]
365+
366+
# Reconstruct full fcnt (assume no rollover for now)
367+
fcnt = (self._fcnt_down & 0xFFFF0000) | fcnt_low
368+
if fcnt < self._fcnt_down:
369+
return None # Replay protection
370+
371+
fhdr_len = 8 + fopts_len
372+
frame_no_mic = data[:-4]
373+
mic_recv = data[-4:]
374+
375+
# Verify MIC
376+
mic_calc = compute_mic(
377+
self._nwk_skey, frame_no_mic, dev_addr, fcnt, _DIR_DOWN, len(frame_no_mic)
378+
)
379+
if mic_calc != mic_recv:
380+
return None
381+
382+
self._fcnt_down = fcnt + 1
383+
384+
# Extract payload
385+
if len(data) <= fhdr_len + 1 + 4:
386+
return b"" # No payload (MAC commands only or empty)
387+
388+
fport = data[fhdr_len]
389+
enc_payload = data[fhdr_len + 1 : -4]
390+
391+
# Decrypt
392+
if fport == 0:
393+
key = self._nwk_skey
394+
else:
395+
key = self._app_skey
396+
397+
return aes_ctr_decrypt(key, enc_payload, dev_addr, fcnt, _DIR_DOWN)

0 commit comments

Comments
 (0)