1313import logging
1414import struct
1515import time
16+ import unittest
1617from typing import TYPE_CHECKING , Callable
1718
1819from openlifu .io .LIFUConfig import OW_ERROR , OW_I2C_PASSTHRU
@@ -152,6 +153,122 @@ def parse_signed_package(pkg: bytes) -> dict:
152153 }
153154
154155
156+ # ---------------------------------------------------------------------------
157+ # Internal tests for DFU package parsing / CRC (deterministic, no hardware)
158+ # ---------------------------------------------------------------------------
159+
160+ def _build_synthetic_signed_package (
161+ fw : bytes ,
162+ meta : bytes ,
163+ fw_address : int = 0x08000000 ,
164+ meta_address : int = 0x08008000 ,
165+ ) -> bytes :
166+ """Construct a minimal, self-consistent signed DFU package for testing.
167+
168+ This uses the module's own header layout/CRC implementation so that tests
169+ validate :func:`stm32_crc32` and :func:`parse_signed_package` end to end.
170+ """
171+ hdr_size = struct .calcsize (_PKG_HDR_FULL )
172+ payload = fw + meta
173+ fw_len = len (fw )
174+ meta_len = len (meta )
175+
176+ payload_crc = stm32_crc32 (payload )
177+
178+ # First pack with a placeholder header CRC so we can compute the real one.
179+ header_crc_placeholder = 0
180+ header = struct .pack (
181+ _PKG_HDR_FULL ,
182+ _PKG_MAGIC ,
183+ _PKG_VERSION ,
184+ hdr_size ,
185+ fw_address ,
186+ fw_len ,
187+ meta_address ,
188+ meta_len ,
189+ payload_crc ,
190+ header_crc_placeholder ,
191+ )
192+
193+ header_crc = stm32_crc32 (header [:- 4 ])
194+ header = struct .pack (
195+ _PKG_HDR_FULL ,
196+ _PKG_MAGIC ,
197+ _PKG_VERSION ,
198+ hdr_size ,
199+ fw_address ,
200+ fw_len ,
201+ meta_address ,
202+ meta_len ,
203+ payload_crc ,
204+ header_crc ,
205+ )
206+
207+ return header + payload
208+
209+
210+ class TestSignedPackage (unittest .TestCase ):
211+ """Unit tests for :func:`stm32_crc32` and :func:`parse_signed_package`.
212+
213+ These tests are deterministic and require no hardware; they can be run by
214+ any standard Python test runner to guard against regressions that might
215+ otherwise risk bricking devices during DFU.
216+ """
217+
218+ def test_parse_signed_package_valid (self ) -> None :
219+ fw = b"\x01 \x02 \x03 \x04 "
220+ meta = b"\xAA \xBB "
221+ fw_addr = 0x08001000
222+ meta_addr = 0x08009000
223+
224+ pkg = _build_synthetic_signed_package (
225+ fw = fw ,
226+ meta = meta ,
227+ fw_address = fw_addr ,
228+ meta_address = meta_addr ,
229+ )
230+
231+ parsed = parse_signed_package (pkg )
232+
233+ assert parsed ["fw_address" ] == fw_addr
234+ assert parsed ["meta_address" ] == meta_addr
235+ assert parsed ["fw" ] == fw
236+ assert parsed ["meta" ] == meta
237+
238+ def test_parse_signed_package_header_crc_mismatch (self ) -> None :
239+ """Corrupt the header so header CRC verification fails."""
240+ fw = b"\x10 \x20 "
241+ meta = b"\x30 "
242+ pkg = _build_synthetic_signed_package (fw = fw , meta = meta )
243+
244+ # Flip a bit inside the header (but keep magic/version/size plausible).
245+ pkg_bytes = bytearray (pkg )
246+ if len (pkg_bytes ) < 8 :
247+ self .skipTest ("synthetic package unexpectedly small" )
248+ pkg_bytes [4 ] ^= 0x01
249+ corrupted = bytes (pkg_bytes )
250+
251+ with self .assertRaisesRegex (ValueError , "header CRC mismatch" ):
252+ parse_signed_package (corrupted )
253+
254+ def test_parse_signed_package_payload_crc_mismatch (self ) -> None :
255+ """Corrupt the payload so payload CRC verification fails."""
256+ fw = b"\xDE \xAD \xBE \xEF "
257+ meta = b"\x00 \x01 "
258+ pkg = _build_synthetic_signed_package (fw = fw , meta = meta )
259+
260+ hdr_size = struct .calcsize (_PKG_HDR_FULL )
261+ pkg_bytes = bytearray (pkg )
262+ # Flip a bit in the first payload byte (after the header).
263+ if len (pkg_bytes ) <= hdr_size :
264+ self .skipTest ("synthetic package unexpectedly small" )
265+ pkg_bytes [hdr_size ] ^= 0x01
266+ corrupted = bytes (pkg_bytes )
267+
268+ with self .assertRaisesRegex (ValueError , "payload CRC mismatch" ):
269+ parse_signed_package (corrupted )
270+
271+
155272# ---------------------------------------------------------------------------
156273# USB DFU client (module 0)
157274# ---------------------------------------------------------------------------
@@ -414,7 +531,6 @@ def __init__(self, uart: LIFUUart,
414531 write_read_delay_s : float = 0.005 ):
415532 self ._uart = uart
416533 self ._addr = i2c_addr
417- self ._wr_delay = write_read_delay_s
418534
419535 # --- low-level transport primitives ---
420536
@@ -437,11 +553,14 @@ def _write(self, payload: bytes) -> None:
437553
438554 def _exchange (self , payload : bytes , read_len : int ,
439555 pre_read_delay_s : float | None = None ) -> bytes :
440- """Write *payload* to the I2C slave, wait, then read *read_len* bytes back.
441-
442- The firmware inserts a fixed 5 ms gap between write and read.
443- An optional extra host-side delay can be added via *pre_read_delay_s*
444- (not usually needed).
556+ """Write *payload* to the I2C slave and read *read_len* bytes back.
557+
558+ The firmware executes a combined write+read transaction and inserts a
559+ fixed 5 ms gap between the write and read phases internally.
560+ The optional *pre_read_delay_s* parameter adds an extra host-side delay
561+ **before** issuing the passthrough transaction (i.e. before the
562+ firmware performs the write+read). This does *not* change the internal
563+ 5 ms gap handled by the firmware and is rarely needed.
445564 """
446565 if pre_read_delay_s and pre_read_delay_s > 0 :
447566 time .sleep (pre_read_delay_s )
@@ -784,16 +903,35 @@ def update_module(self,
784903 "Verifying I2C DFU entry (module %d, addr=0x%02X via master)..." ,
785904 module , i2c_addr ,
786905 )
787- try :
788- bl_version = self .get_bootloader_version_i2c (i2c_addr = i2c_addr )
789- except (RuntimeError , TimeoutError ) as e :
790- raise RuntimeError (
791- f"Module { module } did not enter I2C DFU mode at "
792- f"0x{ i2c_addr :02X} : { e } "
793- ) from e
906+ start_time = time .time ()
907+ bl_version = None
908+ last_error : Exception | None = None
909+ while True :
910+ elapsed = time .time () - start_time
911+ if elapsed >= dfu_enum_timeout_s :
912+ break
913+ try :
914+ candidate = self .get_bootloader_version_i2c (i2c_addr = i2c_addr )
915+ if candidate :
916+ bl_version = candidate
917+ break
918+ # Treat empty version string as a failure worth retrying.
919+ last_error = RuntimeError (
920+ "I2C DFU bootloader returned an empty version string"
921+ )
922+ except (RuntimeError , TimeoutError ) as e :
923+ last_error = e
924+ # Small delay before retrying to avoid busy-waiting.
925+ time .sleep (0.2 )
794926 if not bl_version :
927+ if last_error is not None :
928+ raise RuntimeError (
929+ f"Module { module } did not enter I2C DFU mode at "
930+ f"0x{ i2c_addr :02X} within { dfu_enum_timeout_s } s: { last_error } "
931+ ) from last_error
795932 raise RuntimeError (
796- f"Module { module } I2C DFU bootloader returned an empty version string"
933+ f"Module { module } did not enter I2C DFU mode at "
934+ f"0x{ i2c_addr :02X} within { dfu_enum_timeout_s } s"
797935 )
798936 logger .info ("I2C DFU bootloader version: %s" , bl_version )
799937 self .program_i2c (
0 commit comments