From c9c9d044a0fd5d22c5ada433f39fbbd9bd5c6703 Mon Sep 17 00:00:00 2001 From: Danilo D Date: Thu, 2 Apr 2026 09:59:25 -0500 Subject: [PATCH 1/2] lora/lora-sx127x-pycom: Add unified SX1272/SX1276 LoRa driver. Add a LoRa radio driver that handles both SX1272 and SX1276 transceivers. The existing lora-sx127x package only supports SX1276 (version register 0x12); this driver also handles SX1272 (version 0x22) with its different register layouts for bandwidth, coding rate, CRC, and RSSI calculation. Used by Pycom LoPy (SX1272) and LoPy4 (SX1276) boards as defined in micropython/micropython#19026. Signed-off-by: Danilo D --- .../lora-sx127x-pycom/lora/sx127x_pycom.py | 466 ++++++++++++++++++ .../lora/lora-sx127x-pycom/manifest.py | 2 + 2 files changed, 468 insertions(+) create mode 100644 micropython/lora/lora-sx127x-pycom/lora/sx127x_pycom.py create mode 100644 micropython/lora/lora-sx127x-pycom/manifest.py diff --git a/micropython/lora/lora-sx127x-pycom/lora/sx127x_pycom.py b/micropython/lora/lora-sx127x-pycom/lora/sx127x_pycom.py new file mode 100644 index 000000000..3336f5e43 --- /dev/null +++ b/micropython/lora/lora-sx127x-pycom/lora/sx127x_pycom.py @@ -0,0 +1,466 @@ +""" +SX1272/SX1276 LoRa radio driver for MicroPython. + +Unified driver handling register-level differences between SX1272 and SX1276. +Based on Semtech datasheets DS_SX1272/73_V4 and DS_SX1276-7-8-9_W_APP_V7. +""" + +import time +from machine import Pin +from micropython import const + +# Common registers +_REG_FIFO = const(0x00) +_REG_OP_MODE = const(0x01) +_REG_FRF_MSB = const(0x06) +_REG_FRF_MID = const(0x07) +_REG_FRF_LSB = const(0x08) +_REG_PA_CONFIG = const(0x09) +_REG_OCP = const(0x0B) +_REG_LNA = const(0x0C) +_REG_FIFO_ADDR_PTR = const(0x0D) +_REG_FIFO_TX_BASE_ADDR = const(0x0E) +_REG_FIFO_RX_BASE_ADDR = const(0x0F) +_REG_FIFO_RX_CURRENT_ADDR = const(0x10) +_REG_IRQ_FLAGS_MASK = const(0x11) +_REG_IRQ_FLAGS = const(0x12) +_REG_RX_NB_BYTES = const(0x13) +_REG_PKT_SNR_VALUE = const(0x19) +_REG_PKT_RSSI_VALUE = const(0x1A) +_REG_MODEM_CONFIG_1 = const(0x1D) +_REG_MODEM_CONFIG_2 = const(0x1E) +_REG_SYMB_TIMEOUT_LSB = const(0x1F) +_REG_PREAMBLE_MSB = const(0x20) +_REG_PREAMBLE_LSB = const(0x21) +_REG_PAYLOAD_LENGTH = const(0x22) +_REG_MAX_PAYLOAD_LENGTH = const(0x23) +_REG_MODEM_CONFIG_3 = const(0x26) +_REG_DETECTION_OPTIMIZE = const(0x31) +_REG_DETECTION_THRESHOLD = const(0x37) +_REG_SYNC_WORD = const(0x39) +_REG_DIO_MAPPING_1 = const(0x40) +_REG_VERSION = const(0x42) +_REG_PA_DAC = const(0x4D) + +# Modes +_MODE_LONG_RANGE = const(0x80) +_MODE_SLEEP = const(0x00) +_MODE_STDBY = const(0x01) +_MODE_TX = const(0x03) +_MODE_RX_CONTINUOUS = const(0x05) +_MODE_RX_SINGLE = const(0x06) + +# IRQ flags +_IRQ_TX_DONE = const(0x08) +_IRQ_RX_DONE = const(0x40) +_IRQ_RX_TIMEOUT = const(0x80) +_IRQ_PAYLOAD_CRC_ERROR = const(0x20) + +# PA config +_PA_BOOST = const(0x80) + +# Expected chip version register values +_VERSION_SX1272 = const(0x22) +_VERSION_SX1276 = const(0x12) + +# Bandwidth encoding tables +_BW_TABLE_SX1276 = { + 7800: 0, + 10400: 1, + 15600: 2, + 20800: 3, + 31250: 4, + 41700: 5, + 62500: 6, + 125000: 7, + 250000: 8, + 500000: 9, +} +_BW_TABLE_SX1272 = { + 125000: 0, + 250000: 1, + 500000: 2, +} + +# FXOSC / 2^19 +_FSTEP = 61.03515625 # Hz per step + + +class SX127x: + """Low-level driver for SX1272 and SX1276 LoRa transceivers.""" + + def __init__(self, spi, cs_pin, dio0_pin, reset_pin=-1, chip=None): + """ + Args: + spi: machine.SPI instance, already initialized + cs_pin: machine.Pin for chip select (active low) + dio0_pin: machine.Pin for DIO0 interrupt + reset_pin: machine.Pin for hardware reset, or -1 if not available + chip: 1272 or 1276; if None, auto-detect from version register + """ + self._spi = spi + self._cs = cs_pin + self._cs.init(Pin.OUT, value=1) + self._dio0 = dio0_pin + self._dio0.init(Pin.IN) + self._reset_pin = reset_pin + + self._frequency = 0 + self._tx_power = 14 + self._irq_callback = None + + # Hardware reset if available + self.reset() + + # Detect chip variant + version = self._read_reg(_REG_VERSION) + if chip is not None: + self._chip = chip + elif version == _VERSION_SX1272: + self._chip = 1272 + elif version == _VERSION_SX1276: + self._chip = 1276 + else: + raise RuntimeError("Unknown SX127x chip version: 0x{:02x}".format(version)) + + # Put into sleep mode (LoRa) + self._write_reg(_REG_OP_MODE, _MODE_LONG_RANGE | _MODE_SLEEP) + time.sleep_ms(10) + + # Set FIFO base addresses + self._write_reg(_REG_FIFO_TX_BASE_ADDR, 0) + self._write_reg(_REG_FIFO_RX_BASE_ADDR, 0) + + # Set LNA boost + self._write_reg(_REG_LNA, self._read_reg(_REG_LNA) | 0x03) + + # Set auto AGC + if self._chip == 1276: + self._write_reg(_REG_MODEM_CONFIG_3, 0x04) # AGC auto on + else: + # SX1272: AGC auto on is bit 2 of ModemConfig2 (register 0x1E) + val = self._read_reg(_REG_MODEM_CONFIG_2) + self._write_reg(_REG_MODEM_CONFIG_2, val | 0x04) + + # Standby mode + self.standby() + + def reset(self): + """Perform hardware or soft reset.""" + if self._reset_pin != -1 and self._reset_pin is not None: + pin = self._reset_pin + if not isinstance(pin, Pin): + pin = Pin(pin, Pin.OUT) + pin.value(0) + time.sleep_ms(10) + pin.value(1) + time.sleep_ms(10) + else: + # Soft reset: write mode register to reset state + # Switch to FSK sleep then back to LoRa sleep + self._write_reg(_REG_OP_MODE, 0x00) # FSK sleep + time.sleep_ms(10) + self._write_reg(_REG_OP_MODE, _MODE_LONG_RANGE | _MODE_SLEEP) + time.sleep_ms(10) + + def standby(self): + """Enter standby mode.""" + self._write_reg(_REG_OP_MODE, _MODE_LONG_RANGE | _MODE_STDBY) + + def sleep(self): + """Enter sleep mode.""" + self._write_reg(_REG_OP_MODE, _MODE_LONG_RANGE | _MODE_SLEEP) + + def set_frequency(self, freq_hz): + """Set carrier frequency in Hz.""" + self._frequency = int(freq_hz) + frf = int(freq_hz / _FSTEP) + self._write_reg(_REG_FRF_MSB, (frf >> 16) & 0xFF) + self._write_reg(_REG_FRF_MID, (frf >> 8) & 0xFF) + self._write_reg(_REG_FRF_LSB, frf & 0xFF) + + def set_spreading_factor(self, sf): + """Set spreading factor (6-12).""" + if sf < 6 or sf > 12: + raise ValueError("SF must be 6-12") + + # Detection optimize and threshold for SF6 + if sf == 6: + self._write_reg(_REG_DETECTION_OPTIMIZE, 0xC5) + self._write_reg(_REG_DETECTION_THRESHOLD, 0x0C) + else: + self._write_reg(_REG_DETECTION_OPTIMIZE, 0xC3) + self._write_reg(_REG_DETECTION_THRESHOLD, 0x0A) + + val = self._read_reg(_REG_MODEM_CONFIG_2) + self._write_reg(_REG_MODEM_CONFIG_2, (val & 0x0F) | (sf << 4)) + + def set_bandwidth(self, bw_hz): + """Set signal bandwidth in Hz.""" + if self._chip == 1276: + if bw_hz not in _BW_TABLE_SX1276: + raise ValueError("Unsupported BW for SX1276") + bw_val = _BW_TABLE_SX1276[bw_hz] + val = self._read_reg(_REG_MODEM_CONFIG_1) + self._write_reg(_REG_MODEM_CONFIG_1, (val & 0x0F) | (bw_val << 4)) + else: + if bw_hz not in _BW_TABLE_SX1272: + raise ValueError("Unsupported BW for SX1272") + bw_val = _BW_TABLE_SX1272[bw_hz] + val = self._read_reg(_REG_MODEM_CONFIG_1) + self._write_reg(_REG_MODEM_CONFIG_1, (val & 0x3F) | (bw_val << 6)) + + def set_coding_rate(self, cr): + """Set coding rate denominator (5-8, i.e. 4/5 to 4/8).""" + if cr < 5 or cr > 8: + raise ValueError("CR must be 5-8") + cr_val = cr - 4 + val = self._read_reg(_REG_MODEM_CONFIG_1) + if self._chip == 1276: + self._write_reg(_REG_MODEM_CONFIG_1, (val & 0xF1) | (cr_val << 1)) + else: + self._write_reg(_REG_MODEM_CONFIG_1, (val & 0xC7) | (cr_val << 3)) + + def set_preamble_length(self, length): + """Set preamble length in symbols.""" + self._write_reg(_REG_PREAMBLE_MSB, (length >> 8) & 0xFF) + self._write_reg(_REG_PREAMBLE_LSB, length & 0xFF) + + def set_sync_word(self, sw): + """Set sync word (0x12 = private, 0x34 = LoRaWAN public).""" + self._write_reg(_REG_SYNC_WORD, sw) + + def set_tx_power(self, level, use_pa_boost=True): + """Set transmit power in dBm.""" + self._tx_power = level + if use_pa_boost: + if level > 17: + # Enable +20dBm on PA_BOOST + level = min(level, 20) + self._write_reg(_REG_PA_DAC, 0x87) + self.set_ocp(140) + self._write_reg(_REG_PA_CONFIG, _PA_BOOST | (level - 5)) + else: + self._write_reg(_REG_PA_DAC, 0x84) # default + self.set_ocp(100) + level = max(2, min(level, 17)) + self._write_reg(_REG_PA_CONFIG, _PA_BOOST | (level - 2)) + else: + # RFO pin + level = max(0, min(level, 14)) + self._write_reg(_REG_PA_CONFIG, 0x70 | level) + + def set_ocp(self, ma): + """Set over-current protection trim in mA.""" + if ma <= 120: + ocp_trim = int((ma - 45) / 5) + elif ma <= 240: + ocp_trim = int((ma + 30) / 10) + else: + ocp_trim = 27 + self._write_reg(_REG_OCP, 0x20 | (ocp_trim & 0x1F)) + + def set_implicit_header(self, implicit): + """Enable or disable implicit header mode.""" + val = self._read_reg(_REG_MODEM_CONFIG_1) + if self._chip == 1276: + if implicit: + self._write_reg(_REG_MODEM_CONFIG_1, val | 0x01) + else: + self._write_reg(_REG_MODEM_CONFIG_1, val & 0xFE) + else: + if implicit: + self._write_reg(_REG_MODEM_CONFIG_1, val | 0x04) + else: + self._write_reg(_REG_MODEM_CONFIG_1, val & 0xFB) + + def set_crc(self, enable): + """Enable or disable CRC checking.""" + if self._chip == 1276: + val = self._read_reg(_REG_MODEM_CONFIG_2) + if enable: + self._write_reg(_REG_MODEM_CONFIG_2, val | 0x04) + else: + self._write_reg(_REG_MODEM_CONFIG_2, val & 0xFB) + else: + val = self._read_reg(_REG_MODEM_CONFIG_1) + if enable: + self._write_reg(_REG_MODEM_CONFIG_1, val | 0x02) + else: + self._write_reg(_REG_MODEM_CONFIG_1, val & 0xFD) + + def send(self, data): + """Transmit data (bytes). Blocks until TX_DONE.""" + self.standby() + + # Set FIFO pointer to TX base + self._write_reg(_REG_FIFO_ADDR_PTR, 0) + # Write payload + self._write_buf(_REG_FIFO, data) + # Set payload length + self._write_reg(_REG_PAYLOAD_LENGTH, len(data)) + + # Clear IRQ flags + self._write_reg(_REG_IRQ_FLAGS, 0xFF) + + # Map DIO0 to TX_DONE + self._write_reg(_REG_DIO_MAPPING_1, 0x40) + + # Start TX + self._write_reg(_REG_OP_MODE, _MODE_LONG_RANGE | _MODE_TX) + + # Wait for TX_DONE + while not (self._read_reg(_REG_IRQ_FLAGS) & _IRQ_TX_DONE): + time.sleep_ms(1) + + # Clear IRQ flags + self._write_reg(_REG_IRQ_FLAGS, 0xFF) + self.standby() + + def recv(self, timeout_ms=0): + """ + Receive a packet. + + Args: + timeout_ms: 0 = single RX with register timeout, >0 = poll for ms + + Returns: + bytes or None if timeout/CRC error + """ + self.standby() + + # Clear IRQ flags + self._write_reg(_REG_IRQ_FLAGS, 0xFF) + + # Map DIO0 to RX_DONE + self._write_reg(_REG_DIO_MAPPING_1, 0x00) + + if timeout_ms > 0: + # Continuous RX, poll with Python timeout + self._write_reg(_REG_OP_MODE, _MODE_LONG_RANGE | _MODE_RX_CONTINUOUS) + start = time.ticks_ms() + while time.ticks_diff(time.ticks_ms(), start) < timeout_ms: + irq = self._read_reg(_REG_IRQ_FLAGS) + if irq & _IRQ_RX_DONE: + break + time.sleep_ms(1) + else: + self.standby() + return None + else: + # Single RX with hardware timeout + self._write_reg(_REG_OP_MODE, _MODE_LONG_RANGE | _MODE_RX_SINGLE) + while True: + irq = self._read_reg(_REG_IRQ_FLAGS) + if irq & (_IRQ_RX_DONE | _IRQ_RX_TIMEOUT): + break + time.sleep_ms(1) + if irq & _IRQ_RX_TIMEOUT: + self._write_reg(_REG_IRQ_FLAGS, 0xFF) + self.standby() + return None + + # Check CRC + irq = self._read_reg(_REG_IRQ_FLAGS) + self._write_reg(_REG_IRQ_FLAGS, 0xFF) + + if irq & _IRQ_PAYLOAD_CRC_ERROR: + self.standby() + return None + + # Read payload + nb_bytes = self._read_reg(_REG_RX_NB_BYTES) + rx_addr = self._read_reg(_REG_FIFO_RX_CURRENT_ADDR) + self._write_reg(_REG_FIFO_ADDR_PTR, rx_addr) + payload = self._read_buf(_REG_FIFO, nb_bytes) + + self.standby() + return bytes(payload) + + def on_recv(self, callback): + """ + Set up asynchronous receive via DIO0 interrupt. + + callback(payload_bytes) is called when a packet is received. + Pass None to disable. + """ + if callback is None: + self._dio0.irq(handler=None) + self._irq_callback = None + self.standby() + return + + self._irq_callback = callback + self.standby() + + # Clear IRQ flags + self._write_reg(_REG_IRQ_FLAGS, 0xFF) + + # Map DIO0 to RX_DONE + self._write_reg(_REG_DIO_MAPPING_1, 0x00) + + # Set up IRQ + self._dio0.irq(trigger=Pin.IRQ_RISING, handler=self._handle_irq) + + # Start continuous RX + self._write_reg(_REG_OP_MODE, _MODE_LONG_RANGE | _MODE_RX_CONTINUOUS) + + def _handle_irq(self, pin): + """DIO0 interrupt handler — reads IRQ flags to disambiguate.""" + irq = self._read_reg(_REG_IRQ_FLAGS) + self._write_reg(_REG_IRQ_FLAGS, 0xFF) + + if irq & _IRQ_RX_DONE and not (irq & _IRQ_PAYLOAD_CRC_ERROR): + nb_bytes = self._read_reg(_REG_RX_NB_BYTES) + rx_addr = self._read_reg(_REG_FIFO_RX_CURRENT_ADDR) + self._write_reg(_REG_FIFO_ADDR_PTR, rx_addr) + payload = self._read_buf(_REG_FIFO, nb_bytes) + if self._irq_callback: + self._irq_callback(bytes(payload)) + + @property + def rssi(self): + """Last packet RSSI in dBm.""" + raw = self._read_reg(_REG_PKT_RSSI_VALUE) + if self._chip == 1272: + return -139 + raw + else: + # SX1276 HF port (>862 MHz) + if self._frequency >= 862000000: + return -157 + raw + else: + return -164 + raw + + @property + def snr(self): + """Last packet SNR in dB.""" + raw = self._read_reg(_REG_PKT_SNR_VALUE) + if raw > 127: + raw -= 256 + return raw * 0.25 + + # --- SPI register access --- + + def _read_reg(self, addr): + self._cs.value(0) + self._spi.write(bytearray([addr & 0x7F])) + result = self._spi.read(1) + self._cs.value(1) + return result[0] + + def _write_reg(self, addr, value): + self._cs.value(0) + self._spi.write(bytearray([addr | 0x80, value])) + self._cs.value(1) + + def _read_buf(self, addr, length): + self._cs.value(0) + self._spi.write(bytearray([addr & 0x7F])) + result = self._spi.read(length) + self._cs.value(1) + return result + + def _write_buf(self, addr, data): + self._cs.value(0) + self._spi.write(bytearray([addr | 0x80]) + data) + self._cs.value(1) diff --git a/micropython/lora/lora-sx127x-pycom/manifest.py b/micropython/lora/lora-sx127x-pycom/manifest.py new file mode 100644 index 000000000..f1f2331ba --- /dev/null +++ b/micropython/lora/lora-sx127x-pycom/manifest.py @@ -0,0 +1,2 @@ +metadata(description="SX1272/SX1276 unified LoRa driver for Pycom boards.", version="0.1.0") +package("lora") From cf877ea0da6865607502a0d79ac8dc0320700a87 Mon Sep 17 00:00:00 2001 From: Danilo D Date: Thu, 2 Apr 2026 09:59:37 -0500 Subject: [PATCH 2/2] lora/lora-lorawan: Add LoRaWAN 1.0.x MAC layer. Add LoRaWAN Class A device operation with OTAA and ABP activation. Includes AES-128-CMAC/CTR crypto primitives built on ucryptolib for MIC calculation, payload encryption, and session key derivation per the LoRaWAN 1.0.x specification. Used by Pycom LoPy/LoPy4 boards as defined in micropython/micropython#19026. Signed-off-by: Danilo D --- micropython/lora/lora-lorawan/lora/lorawan.py | 397 ++++++++++++++++++ .../lora/lora-lorawan/lora/lorawan_crypto.py | 153 +++++++ micropython/lora/lora-lorawan/manifest.py | 2 + 3 files changed, 552 insertions(+) create mode 100644 micropython/lora/lora-lorawan/lora/lorawan.py create mode 100644 micropython/lora/lora-lorawan/lora/lorawan_crypto.py create mode 100644 micropython/lora/lora-lorawan/manifest.py diff --git a/micropython/lora/lora-lorawan/lora/lorawan.py b/micropython/lora/lora-lorawan/lora/lorawan.py new file mode 100644 index 000000000..1b5f085bf --- /dev/null +++ b/micropython/lora/lora-lorawan/lora/lorawan.py @@ -0,0 +1,397 @@ +""" +LoRaWAN 1.0.x MAC layer for MicroPython. + +Implements Class A device operation with OTAA and ABP activation. +Runs on top of the SX127x raw LoRa driver. + +Usage (OTAA): + from lora import LoRa + from lorawan import LoRaWAN + + radio = LoRa(frequency=868100000, sf=7, bw=125000) + wan = LoRaWAN(radio, mode=LoRaWAN.OTAA, + dev_eui=bytes.fromhex('...'), + app_eui=bytes.fromhex('...'), + app_key=bytes.fromhex('...')) + wan.join() + wan.send(1, b'Hello') + +Usage (ABP): + wan = LoRaWAN(radio, mode=LoRaWAN.ABP, + dev_addr=0x01234567, + nwk_skey=bytes.fromhex('...'), + app_skey=bytes.fromhex('...')) + wan.send(1, b'Hello') +""" + +from micropython import const +import struct +import time +import os +from lora.lorawan_crypto import ( + aes_cmac, + aes_ctr_encrypt, + aes_ctr_decrypt, + compute_mic, + derive_session_keys, +) + +# LoRaWAN MHDR message types +_MTYPE_JOIN_REQUEST = const(0x00) +_MTYPE_JOIN_ACCEPT = const(0x20) +_MTYPE_UNCONFIRMED_UP = const(0x40) +_MTYPE_UNCONFIRMED_DOWN = const(0x60) +_MTYPE_CONFIRMED_UP = const(0x80) +_MTYPE_CONFIRMED_DOWN = const(0xA0) + +# Direction constants +_DIR_UP = const(0) +_DIR_DOWN = const(1) + +# LoRaWAN data rate table for EU868 +_DR_TABLE_EU868 = { + 0: (12, 125000), + 1: (11, 125000), + 2: (10, 125000), + 3: (9, 125000), + 4: (8, 125000), + 5: (7, 125000), +} + +# Default RX2 parameters (EU868) +_RX2_FREQ = 869525000 +_RX2_DR = 0 # SF12/125kHz + +# Class A timing +_JOIN_ACCEPT_DELAY1 = 5000 # ms +_JOIN_ACCEPT_DELAY2 = 6000 # ms +_RECEIVE_DELAY1 = 1000 # ms +_RECEIVE_DELAY2 = 2000 # ms + + +class LoRaWAN: + """LoRaWAN 1.0.x Class A device.""" + + OTAA = 0 + ABP = 1 + + def __init__( + self, + radio, + mode=0, + dev_eui=None, + app_eui=None, + app_key=None, + dev_addr=None, + nwk_skey=None, + app_skey=None, + ): + """ + Initialize LoRaWAN MAC layer. + + Args: + radio: LoRa radio instance (from lora module) + mode: LoRaWAN.OTAA or LoRaWAN.ABP + dev_eui: 8-byte Device EUI (OTAA) + app_eui: 8-byte Application EUI / JoinEUI (OTAA) + app_key: 16-byte Application Key (OTAA) + dev_addr: 4-byte Device Address as int (ABP) + nwk_skey: 16-byte Network Session Key (ABP) + app_skey: 16-byte Application Session Key (ABP) + """ + self._radio = radio + self._mode = mode + self._joined = False + + # Frame counters + self._fcnt_up = 0 + self._fcnt_down = 0 + + # RX1 delay and data rate offset + self._rx1_delay = _RECEIVE_DELAY1 + self._rx1_dr_offset = 0 + + if mode == self.OTAA: + if not all([dev_eui, app_eui, app_key]): + raise ValueError("OTAA requires dev_eui, app_eui, app_key") + self._dev_eui = dev_eui + self._app_eui = app_eui + self._app_key = app_key + self._dev_addr = None + self._nwk_skey = None + self._app_skey = None + elif mode == self.ABP: + if not all([dev_addr is not None, nwk_skey, app_skey]): + raise ValueError("ABP requires dev_addr, nwk_skey, app_skey") + self._dev_addr = dev_addr + self._nwk_skey = nwk_skey + self._app_skey = app_skey + self._joined = True + else: + raise ValueError("mode must be OTAA or ABP") + + @property + def joined(self): + """Whether the device has joined the network.""" + return self._joined + + @property + def dev_addr(self): + """Device address (available after join).""" + return self._dev_addr + + def join(self, timeout=30000): + """ + Perform OTAA join procedure. + + Args: + timeout: Total timeout in ms for join attempts (default 30s) + + Raises: + RuntimeError: If join fails after timeout + """ + if self._mode != self.OTAA: + raise RuntimeError("join() only for OTAA mode") + + # Configure radio for join + self._radio._radio.set_sync_word(0x34) # LoRaWAN public sync word + + start = time.ticks_ms() + while time.ticks_diff(time.ticks_ms(), start) < timeout: + dev_nonce = os.urandom(2) + join_req = self._build_join_request(dev_nonce) + + # Send join request + self._radio._radio.send(join_req) + tx_end = time.ticks_ms() + + # RX1 window: JoinAcceptDelay1 + accept = self._rx_window(tx_end, _JOIN_ACCEPT_DELAY1) + if accept: + if self._process_join_accept(accept, dev_nonce): + self._joined = True + return True + + # RX2 window: JoinAcceptDelay2 at RX2 frequency/DR + sf, bw = _DR_TABLE_EU868[_RX2_DR] + saved_freq = self._radio._radio._frequency + self._radio._radio.set_frequency(_RX2_FREQ) + self._radio._radio.set_spreading_factor(sf) + self._radio._radio.set_bandwidth(bw) + + accept = self._rx_window(tx_end, _JOIN_ACCEPT_DELAY2) + + # Restore original settings + self._radio._radio.set_frequency(saved_freq) + self._radio._radio.set_spreading_factor(7) + self._radio._radio.set_bandwidth(125000) + + if accept: + if self._process_join_accept(accept, dev_nonce): + self._joined = True + return True + + # Backoff before retry + time.sleep_ms(1000) + + raise RuntimeError("Join failed: timeout") + + def send(self, port, payload, confirmed=False): + """ + Send an uplink frame. + + Args: + port: FPort (1-223) + payload: Data bytes to send + confirmed: If True, send confirmed uplink + + Returns: + Downlink data bytes or None + """ + if not self._joined: + raise RuntimeError("Not joined") + + if port < 1 or port > 223: + raise ValueError("FPort must be 1-223") + + mtype = _MTYPE_CONFIRMED_UP if confirmed else _MTYPE_UNCONFIRMED_UP + frame = self._build_data_frame(mtype, port, payload) + + # Send frame + self._radio._radio.set_sync_word(0x34) + self._radio._radio.send(frame) + tx_end = time.ticks_ms() + + self._fcnt_up += 1 + + # Class A: open RX1, then RX2 + # RX1 + rx_data = self._rx_window(tx_end, self._rx1_delay) + if rx_data: + return self._process_downlink(rx_data) + + # RX2 + sf, bw = _DR_TABLE_EU868[_RX2_DR] + saved_freq = self._radio._radio._frequency + self._radio._radio.set_frequency(_RX2_FREQ) + self._radio._radio.set_spreading_factor(sf) + self._radio._radio.set_bandwidth(bw) + + rx_data = self._rx_window(tx_end, _RECEIVE_DELAY2) + + self._radio._radio.set_frequency(saved_freq) + self._radio._radio.set_spreading_factor(7) + self._radio._radio.set_bandwidth(125000) + + if rx_data: + return self._process_downlink(rx_data) + + return None + + def _rx_window(self, tx_end, delay_ms): + """Open a receive window at tx_end + delay_ms.""" + now = time.ticks_ms() + wait = time.ticks_diff(tx_end + delay_ms, now) + if wait > 0: + time.sleep_ms(wait) + return self._radio._radio.recv(timeout_ms=1000) + + def _build_join_request(self, dev_nonce): + """Build LoRaWAN join-request message.""" + mhdr = bytes([_MTYPE_JOIN_REQUEST | 0x00]) # Major=0 + # AppEUI and DevEUI in little-endian + body = mhdr + self._app_eui[::-1] + self._dev_eui[::-1] + dev_nonce + mic = aes_cmac(self._app_key, body)[:4] + return body + mic + + def _process_join_accept(self, data, dev_nonce): + """Process and validate a join-accept message.""" + if len(data) < 17: + return False + + # Decrypt join-accept (the entire payload after MHDR is encrypted) + mhdr = data[0] + if (mhdr & 0xE0) != _MTYPE_JOIN_ACCEPT: + return False + + from ucryptolib import aes + + encrypted = data[1:] + decrypted = bytearray() + for i in range(0, len(encrypted), 16): + block = encrypted[i : i + 16] + if len(block) < 16: + block = block + b"\x00" * (16 - len(block)) + decrypted += aes(self._app_key, 1).encrypt(block) + + # Parse: AppNonce(3) | NetID(3) | DevAddr(4) | DLSettings(1) | RxDelay(1) [| CFList] + app_nonce = bytes(decrypted[0:3]) + net_id = bytes(decrypted[3:6]) + self._dev_addr = struct.unpack("> 4) & 0x07 + + # Parse RxDelay + if rx_delay == 0: + self._rx1_delay = _RECEIVE_DELAY1 + else: + self._rx1_delay = rx_delay * 1000 + + # Derive session keys + self._nwk_skey, self._app_skey = derive_session_keys( + self._app_key, app_nonce, net_id, dev_nonce + ) + + # Reset frame counters + self._fcnt_up = 0 + self._fcnt_down = 0 + + return True + + def _build_data_frame(self, mtype, port, payload): + """Build a LoRaWAN data frame (uplink).""" + mhdr = bytes([mtype | 0x00]) + + # FHDR: DevAddr(4) | FCtrl(1) | FCnt(2) + fctrl = 0x00 # No ADR, no ACK, no FOptsLen + fhdr = struct.pack(" 0 and len(msg) % _BLK == 0 + + X = b"\x00" * _BLK + for i in range(n - 1): + block = msg[i * _BLK : (i + 1) * _BLK] + X = _aes_ecb(key, _xor(X, block)) + + # Last block + if last_complete: + block = msg[(n - 1) * _BLK : n * _BLK] + block = _xor(block, K1) + else: + block = msg[(n - 1) * _BLK :] + block = block + b"\x80" + b"\x00" * (_BLK - 1 - len(block)) + block = _xor(block, K2) + + return _aes_ecb(key, _xor(X, block)) + + +def aes_ctr_encrypt(key, payload, dev_addr, fcnt, direction): + """ + LoRaWAN payload encryption using AES-128 in CTR mode. + + Per LoRaWAN 1.0.x spec section 4.3.3. + + Args: + key: 16-byte AppSKey (for FRMPayload) or NwkSKey (for FOpts) + payload: plaintext bytes + dev_addr: 4-byte device address (little-endian int) + fcnt: frame counter (uint32) + direction: 0 = uplink, 1 = downlink + + Returns: + encrypted/decrypted bytes (symmetric) + """ + k = (len(payload) + _BLK - 1) // _BLK + S = bytearray() + for i in range(1, k + 1): + A_i = struct.pack("