diff --git a/README.md b/README.md index 10012b8f..549b3406 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ micropython-steami-lib/ │ │ ├── manifest.py # Manifest file for firmware inclusion │ │ ├── bq27441/ # Driver source code │ │ └── examples/ # Usage examples -│ ├── w2564jv/ # SPI flash memory W2564JV-DTR +│ ├── daplink_flash/ # DAPLink Flash bridge (I2C to W25Q64JV) │ ├── ssd1327/ # OLED display controller SSD1327ZB │ ├── mcp23009/ # I2C I/O expander MCP23009 │ ├── vl53l1cx/ # Distance sensor VL53L1CX @@ -90,31 +90,33 @@ bq.capacity_full() # Full capacity in mAh bq.capacity_remaining() # Remaining capacity in mAh ``` -### W2564JV-DTR (SPI Flash Memory) +### DAPLink Flash (I2C Flash Bridge) -The W2564JV-DTR is a 64 Mbit SPI flash memory. +The STeaMi board has a W25Q64JV 64 Mbit SPI flash memory connected to the STM32F103 (DAPLink). The STM32WB55 accesses it via I2C through the DAPLink bridge. #### Mounting and Running an Example ```bash # Mount the driver and run the example -mpremote mount lib/w2564jv run lib/w2564jv/examples/flash_read_write.py +mpremote mount lib/daplink_flash run lib/daplink_flash/examples/write_csv.py ``` #### Basic API ```python -from w2564jv import W2564JV +from daplink_flash import DaplinkFlash -# Initialization with SPI -spi = machine.SPI(0) -cs = machine.Pin(5, machine.Pin.OUT) -flash = W2564JV(spi, cs) +# Initialization +i2c = machine.I2C(1) +flash = DaplinkFlash(i2c) + +# Write a CSV file +flash.set_filename("DATA", "CSV") +flash.write_line("temperature;humidity") +flash.write_line("25.3;48.2") -# Reading and writing -data = b'Hello, STeaMi!' -flash.write(0, data) -read_data = flash.read(0, len(data)) +# Erase flash +flash.clear_flash() ``` ### SSD1327ZB (OLED Display Controller) diff --git a/lib/w2564jv/.gitkeep b/lib/daplink_flash/.gitkeep similarity index 100% rename from lib/w2564jv/.gitkeep rename to lib/daplink_flash/.gitkeep diff --git a/lib/daplink_flash/daplink_flash/__init__.py b/lib/daplink_flash/daplink_flash/__init__.py new file mode 100644 index 00000000..477204f0 --- /dev/null +++ b/lib/daplink_flash/daplink_flash/__init__.py @@ -0,0 +1,3 @@ +from .device import DaplinkFlash + +__all__ = ["DaplinkFlash"] diff --git a/lib/daplink_flash/daplink_flash/const.py b/lib/daplink_flash/daplink_flash/const.py new file mode 100644 index 00000000..368ee507 --- /dev/null +++ b/lib/daplink_flash/daplink_flash/const.py @@ -0,0 +1,35 @@ +from micropython import const + +# I2C address (7-bit) — 0x76 in 8-bit (CODAL convention) +DAPLINK_FLASH_DEFAULT_ADDR = const(0x3B) + +# WHO_AM_I expected value +DAPLINK_FLASH_WHO_AM_I_VAL = const(0x4C) + +# Commands +CMD_WHO_AM_I = const(0x01) +CMD_SET_FILENAME = const(0x03) +CMD_GET_FILENAME = const(0x04) +CMD_CLEAR_FLASH = const(0x10) +CMD_WRITE_DATA = const(0x11) +CMD_READ_SECTOR = const(0x20) + +# Registers +REG_STATUS = const(0x80) +REG_ERROR = const(0x81) + +# Status register bits +STATUS_BUSY = const(0x80) + +# Error register bits +ERROR_BAD_PARAM = const(0x01) +ERROR_FILE_FULL = const(0x20) +ERROR_BAD_FILENAME = const(0x40) +ERROR_CMD_FAILED = const(0x80) + +# Protocol limits +MAX_WRITE_CHUNK = const(30) +SECTOR_SIZE = const(256) +MAX_SECTORS = const(32768) +FILENAME_LEN = const(8) +EXT_LEN = const(3) diff --git a/lib/daplink_flash/daplink_flash/device.py b/lib/daplink_flash/daplink_flash/device.py new file mode 100644 index 00000000..3c59f975 --- /dev/null +++ b/lib/daplink_flash/daplink_flash/device.py @@ -0,0 +1,173 @@ +from time import sleep_ms + +from daplink_flash.const import * + + +class DaplinkFlash(object): + """MicroPython driver for the DAPLink Flash bridge (STM32F103 → W25Q64JV).""" + + def __init__(self, i2c, address=DAPLINK_FLASH_DEFAULT_ADDR): + self.i2c = i2c + self.address = address + self._buffer_1 = bytearray(1) + + # -------------------------------------------------- + # Low level I2C + # -------------------------------------------------- + + def _read_reg(self, reg, n=1): + """Read n bytes from register.""" + if n == 1: + self.i2c.readfrom_mem_into(self.address, reg, self._buffer_1) + return self._buffer_1[0] + return self.i2c.readfrom_mem(self.address, reg, n) + + def _write_reg(self, reg, data): + """Write data bytes to register.""" + self.i2c.writeto_mem(self.address, reg, data) + + def _write_cmd(self, cmd): + """Write a single command byte (no payload).""" + self._buffer_1[0] = cmd + self.i2c.writeto(self.address, self._buffer_1) + + # -------------------------------------------------- + # Device identification + # -------------------------------------------------- + + def device_id(self): + """Read WHO_AM_I register. Expected: 0x4C.""" + return self._read_reg(CMD_WHO_AM_I) + + # -------------------------------------------------- + # Status and error registers + # -------------------------------------------------- + + def _status(self): + """Read raw status register.""" + return self._read_reg(REG_STATUS) + + def _error(self): + """Read raw error register.""" + return self._read_reg(REG_ERROR) + + def busy(self): + """Return True if flash is busy.""" + return bool(self._status() & STATUS_BUSY) + + def _wait_busy(self, timeout_ms=30000): + """Poll busy bit until clear. Raises OSError on timeout.""" + elapsed = 0 + while self.busy(): + sleep_ms(5) + elapsed += 5 + if elapsed >= timeout_ms: + raise OSError("DAPLink Flash busy timeout") + + # -------------------------------------------------- + # Filename management + # -------------------------------------------------- + + def set_filename(self, name, ext): + """Set 8.3 filename. name: max 8 chars, ext: max 3 chars.""" + self._wait_busy() + n = name.upper().encode("ascii")[:FILENAME_LEN] + e = ext.upper().encode("ascii")[:EXT_LEN] + padded = n + b" " * (FILENAME_LEN - len(n)) + e + b" " * (EXT_LEN - len(e)) + self._write_reg(CMD_SET_FILENAME, padded) + + def get_filename(self): + """Read current filename. Returns (name, ext) tuple, stripped.""" + self._wait_busy() + raw = self._read_reg(CMD_GET_FILENAME, FILENAME_LEN + EXT_LEN) + name = bytes(raw[:FILENAME_LEN]).decode().rstrip() + ext = bytes(raw[FILENAME_LEN:]).decode().rstrip() + return (name, ext) + + # -------------------------------------------------- + # Flash operations + # -------------------------------------------------- + + def clear_flash(self): + """Erase entire flash memory.""" + self._wait_busy() + self._write_cmd(CMD_CLEAR_FLASH) + + def write(self, data): + """Append data to current file. data: bytes or str. + + Returns the number of bytes written. + """ + if isinstance(data, str): + data = data.encode() + offset = 0 + length = len(data) + buf = bytearray(MAX_WRITE_CHUNK + 2) + buf[0] = CMD_WRITE_DATA + while offset < length: + self._wait_busy() + chunk_len = min(MAX_WRITE_CHUNK, length - offset) + buf[1] = chunk_len + buf[2 : 2 + chunk_len] = data[offset : offset + chunk_len] + # Zero-pad remainder + for i in range(2 + chunk_len, len(buf)): + buf[i] = 0 + self.i2c.writeto(self.address, buf) + offset += chunk_len + self._wait_busy() + err = self._error() + if err: + raise OSError("DAPLink Flash write error: 0x{:02X}".format(err)) + return length + + def write_line(self, text): + """Append text + newline to current file.""" + return self.write(text + "\n") + + # -------------------------------------------------- + # Read operations + # -------------------------------------------------- + + def read_sector(self, sector): + """Read a 256-byte sector from flash. + + Args: + sector: sector number (0-32767). + + Returns: + bytes: 256 bytes of data. + """ + self._wait_busy() + self._write_reg(CMD_READ_SECTOR, bytes([sector >> 8, sector & 0xFF])) + # F103 processes the command in its 30ms hook, then sets up DMA. + # After DMA setup, the F103 is no longer in listen mode — only + # a plain readfrom() will work (no register-based status poll). + sleep_ms(200) + return self.i2c.readfrom(self.address, SECTOR_SIZE) + + def read(self, length=None): + """Read file content from flash. + + Args: + length: max bytes to read. If None, reads until first 0xFF. + + Returns: + bytes: file content. + """ + if length is not None and length <= 0: + return b"" + result = bytearray() + sector = 0 + while sector < MAX_SECTORS: + data = self.read_sector(sector) + for i in range(SECTOR_SIZE): + if length is not None: + result.append(data[i]) + if len(result) >= length: + return bytes(result) + else: + if data[i] == 0xFF: + return bytes(result) + result.append(data[i]) + sector += 1 + return bytes(result) diff --git a/lib/daplink_flash/examples/erase_flash.py b/lib/daplink_flash/examples/erase_flash.py new file mode 100644 index 00000000..f40512b9 --- /dev/null +++ b/lib/daplink_flash/examples/erase_flash.py @@ -0,0 +1,18 @@ +"""Erase all data from the flash memory.""" + +from machine import I2C +from time import sleep_ms +from daplink_flash import DaplinkFlash + +i2c = I2C(1) +flash = DaplinkFlash(i2c) + +name, ext = flash.get_filename() +print("Current file: {}.{}".format(name, ext)) + +print("Erasing flash...") +flash.clear_flash() +sleep_ms(1000) + +print("Done. Flash is empty.") +print("ERROR: 0x{:02X}".format(flash._error())) diff --git a/lib/daplink_flash/examples/flash_info.py b/lib/daplink_flash/examples/flash_info.py new file mode 100644 index 00000000..8af238ed --- /dev/null +++ b/lib/daplink_flash/examples/flash_info.py @@ -0,0 +1,16 @@ +"""Display DAPLink Flash bridge status and filename.""" + +from machine import I2C +from daplink_flash import DaplinkFlash + +i2c = I2C(1) +flash = DaplinkFlash(i2c) + +print("=== DAPLink Flash Info ===") +print("WHO_AM_I: 0x{:02X}".format(flash.device_id())) +print("STATUS: 0x{:02X}".format(flash._status())) +print("ERROR: 0x{:02X}".format(flash._error())) +print("Busy: ", flash.busy()) + +name, ext = flash.get_filename() +print("Filename: {}.{}".format(name, ext)) diff --git a/lib/daplink_flash/examples/read_file.py b/lib/daplink_flash/examples/read_file.py new file mode 100644 index 00000000..50c438ce --- /dev/null +++ b/lib/daplink_flash/examples/read_file.py @@ -0,0 +1,19 @@ +"""Read and display the current file stored on flash.""" + +from machine import I2C +from daplink_flash import DaplinkFlash + +i2c = I2C(1) +flash = DaplinkFlash(i2c) + +name, ext = flash.get_filename() +print("Reading file: {}.{}".format(name, ext)) +print() + +content = flash.read() +if len(content) == 0: + print("(empty)") +else: + print(content.decode()) + print("---") + print("{} bytes".format(len(content))) diff --git a/lib/daplink_flash/examples/sensor_log.py b/lib/daplink_flash/examples/sensor_log.py new file mode 100644 index 00000000..6a29f0fe --- /dev/null +++ b/lib/daplink_flash/examples/sensor_log.py @@ -0,0 +1,97 @@ +"""Log sensor data to flash and compute statistics. + +Writes 200 rows of simulated sensor data (temperature, humidity, pressure) +to a CSV file on flash, then reads it back and computes min/max/average +for each column. +""" + +from machine import I2C +from time import sleep_ms +from daplink_flash import DaplinkFlash + +# --- Configuration --- +NUM_ROWS = 200 +FILENAME = "SENSORS" +EXT = "CSV" + +# --- Init --- +i2c = I2C(1) +flash = DaplinkFlash(i2c) +print("DAPLink Flash WHO_AM_I: 0x{:02X}".format(flash.device_id())) + +# --- Generate and write data --- +print("Writing {} rows to {}.{} ...".format(NUM_ROWS, FILENAME, EXT)) +flash.set_filename(FILENAME, EXT) +flash.clear_flash() +sleep_ms(500) + +flash.write_line("row;temperature;humidity;pressure") + +# Simple pseudo-random generator (linear congruential) +seed = 12345 +for row in range(NUM_ROWS): + seed = (seed * 1103515245 + 12345) & 0x7FFFFFFF + temp = 20.0 + (seed % 1000) / 100.0 + seed = (seed * 1103515245 + 12345) & 0x7FFFFFFF + hum = 30.0 + (seed % 4000) / 100.0 + seed = (seed * 1103515245 + 12345) & 0x7FFFFFFF + pres = 990.0 + (seed % 5000) / 100.0 + + line = "{};{:.1f};{:.1f};{:.1f}".format(row, temp, hum, pres) + flash.write(line + "\n") + +print("Write complete.") +sleep_ms(100) + +# --- Read back and compute statistics --- +print("Reading back...") +content = flash.read() +lines = content.decode().split("\n") + +# Skip header and empty lines +data_lines = [l for l in lines[1:] if l] +print("Read {} data rows.".format(len(data_lines))) + +temp_sum = 0.0 +hum_sum = 0.0 +pres_sum = 0.0 +temp_min = 999.0 +temp_max = -999.0 +hum_min = 999.0 +hum_max = -999.0 +pres_min = 9999.0 +pres_max = -9999.0 +count = 0 + +for line in data_lines: + parts = line.split(";") + if len(parts) != 4: + continue + t = float(parts[1]) + h = float(parts[2]) + p = float(parts[3]) + + temp_sum += t + hum_sum += h + pres_sum += p + + temp_min = min(temp_min, t) + temp_max = max(temp_max, t) + hum_min = min(hum_min, h) + hum_max = max(hum_max, h) + pres_min = min(pres_min, p) + pres_max = max(pres_max, p) + + count += 1 + +print() +print("=== Statistics ({} rows) ===".format(count)) +print(" Min Avg Max") +print("Temp (C): {:7.1f} {:7.1f} {:7.1f}".format( + temp_min, temp_sum / count, temp_max)) +print("Hum (%): {:7.1f} {:7.1f} {:7.1f}".format( + hum_min, hum_sum / count, hum_max)) +print("Pres (hPa):{:7.1f} {:7.1f} {:7.1f}".format( + pres_min, pres_sum / count, pres_max)) +print() +print("File size: {} bytes".format(len(content))) diff --git a/lib/daplink_flash/examples/write_csv.py b/lib/daplink_flash/examples/write_csv.py new file mode 100644 index 00000000..e83b1256 --- /dev/null +++ b/lib/daplink_flash/examples/write_csv.py @@ -0,0 +1,28 @@ +"""Write CSV data to flash via DAPLink bridge.""" + +from machine import I2C +from time import sleep_ms +from daplink_flash import DaplinkFlash + +i2c = I2C(1) +flash = DaplinkFlash(i2c) + +print("WHO_AM_I:", hex(flash.device_id())) + +# Set filename and erase +flash.set_filename("DATA", "CSV") +flash.clear_flash() +sleep_ms(500) +print("Flash erased.") + +# Write CSV header + data +flash.write_line("temperature;humidity;pressure") +flash.write_line("25.3;48.2;1013.5") +flash.write_line("25.5;47.8;1013.4") +flash.write_line("25.4;48.0;1013.6") +print("Data written.") + +# Read back +content = flash.read() +print("File content:") +print(content.decode()) diff --git a/lib/daplink_flash/manifest.py b/lib/daplink_flash/manifest.py new file mode 100644 index 00000000..82c69d4d --- /dev/null +++ b/lib/daplink_flash/manifest.py @@ -0,0 +1,6 @@ +metadata( + description="Driver for the DAPLink Flash bridge (I2C to W25Q64JV SPI flash).", + version="0.0.1", +) + +package("daplink_flash") diff --git a/tests/fake_machine/i2c.py b/tests/fake_machine/i2c.py index f9917961..93517964 100644 --- a/tests/fake_machine/i2c.py +++ b/tests/fake_machine/i2c.py @@ -40,6 +40,10 @@ def writeto_mem(self, addr, reg, buf, *, addrsize=8): self._registers[reg] = bytes(buf) self._write_log.append((reg, bytes(buf))) + def readfrom(self, addr, nbytes, stop=True): + self._check_address(addr) + return b"\x00" * nbytes + def writeto(self, addr, buf, stop=True): self._check_address(addr) self._write_log.append((None, bytes(buf))) diff --git a/tests/scenarios/daplink_flash.yaml b/tests/scenarios/daplink_flash.yaml new file mode 100644 index 00000000..8581e9d6 --- /dev/null +++ b/tests/scenarios/daplink_flash.yaml @@ -0,0 +1,263 @@ +driver: daplink_flash +driver_class: DaplinkFlash +i2c_address: 0x3B + +# I2C config for hardware tests (STeaMi board - STM32WB55) +i2c: + id: 1 + +# Register values for mock tests +# These simulate the STM32F103 DAPLink flash bridge +mock_registers: + # WHO_AM_I (expected 0x4C) + 0x01: 0x4C + # STATUS_REG (not busy) + 0x80: 0x00 + # ERROR_REG (no error) + 0x81: 0x00 + # GET_FILENAME (11 bytes: "TEST CSV") + 0x04: [0x54, 0x45, 0x53, 0x54, 0x20, 0x20, 0x20, 0x20, 0x43, 0x53, 0x56] + +tests: + - name: "Verify WHO_AM_I register" + action: read_register + register: 0x01 + expect: 0x4C + mode: [mock, hardware] + + - name: "Read WHO_AM_I via method" + action: call + method: device_id + expect: 0x4C + mode: [mock, hardware] + + - name: "Read status register" + action: call + method: _status + expect: 0x00 + mode: [mock] + + - name: "Read error register" + action: call + method: _error + expect: 0x00 + mode: [mock] + + - name: "Not busy when status is 0" + action: script + script: | + result = dev.busy() == False + expect_true: true + mode: [mock] + + - name: "Get filename returns tuple" + action: script + script: | + name, ext = dev.get_filename() + result = name == "TEST" and ext == "CSV" + expect_true: true + mode: [mock] + + - name: "Set filename writes to register" + action: script + script: | + i2c.clear_write_log() + dev.set_filename("DATA", "CSV") + log = i2c.get_write_log() + wrote_filename = any(reg == 0x03 for reg, data in log) + result = wrote_filename + expect_true: true + mode: [mock] + + - name: "Clear flash sends command" + action: script + script: | + i2c.clear_write_log() + dev.clear_flash() + log = i2c.get_write_log() + sent_clear = any(reg is None and 0x10 in data for reg, data in log) + result = sent_clear + expect_true: true + mode: [mock] + + - name: "Write sends chunked data" + action: script + script: | + i2c.clear_write_log() + dev.write(b"Hello") + log = i2c.get_write_log() + sent_write = any(reg is None and data[0] == 0x11 for reg, data in log) + result = sent_write + expect_true: true + mode: [mock] + + - name: "Write returns byte count" + action: script + script: | + n = dev.write(b"Test data") + result = n == 9 + expect_true: true + mode: [mock] + + - name: "Write string converts to bytes" + action: script + script: | + n = dev.write("Hello") + result = n == 5 + expect_true: true + mode: [mock] + + - name: "Write line appends newline" + action: script + script: | + n = dev.write_line("row") + result = n == 4 + expect_true: true + mode: [mock] + + - name: "Read sector sends command" + action: script + script: | + i2c.clear_write_log() + dev.read_sector(0) + log = i2c.get_write_log() + sent_read = any(reg == 0x20 for reg, data in log) + result = sent_read + expect_true: true + mode: [mock] + + # ----- Hardware ----- + + - name: "Status register readable" + action: call + method: _status + expect_not_none: true + mode: [hardware] + + - name: "Error register readable" + action: call + method: _error + expect_not_none: true + mode: [hardware] + + - name: "Set and get filename round-trip" + action: hardware_script + script: | + dev.set_filename("PYTEST", "TXT") + name, ext = dev.get_filename() + result = name == "PYTEST" and ext == "TXT" + expect_true: true + mode: [hardware] + + - name: "Write and clear flash" + action: hardware_script + script: | + dev.set_filename("PYTEST", "TXT") + dev.clear_flash() + dev.write_line("test data") + result = True + expect_true: true + mode: [hardware] + + - name: "Write and read back file content" + action: hardware_script + script: | + from time import sleep_ms + dev.set_filename("RDBACK", "TXT") + dev.clear_flash() + sleep_ms(500) + dev.write_line("Hello Flash!") + sleep_ms(100) + content = dev.read() + result = content == b"Hello Flash!\n" + expect_true: true + mode: [hardware] + + - name: "No data corruption at page boundaries" + action: hardware_script + script: | + from time import sleep_ms + dev.set_filename("PAGEBND", "TXT") + dev.clear_flash() + sleep_ms(500) + # Write 20 lines of 21 bytes each (420 bytes, crosses 256-byte page) + for i in range(20): + dev.write_line("ROW{:04d};AAAAAAAAAA".format(i)) + sleep_ms(100) + content = dev.read() + lines = content.decode().split("\n") + lines = [l for l in lines if l] + ok = len(lines) == 20 + for i, line in enumerate(lines): + if line != "ROW{:04d};AAAAAAAAAA".format(i): + ok = False + break + result = ok + expect_true: true + mode: [hardware] + + - name: "Multi-sector read returns all written data" + action: hardware_script + script: | + from time import sleep_ms + dev.set_filename("MULTI", "TXT") + dev.clear_flash() + sleep_ms(500) + expected = "" + for i in range(50): + line = "LINE{:04d};DATA\n".format(i) + dev.write(line) + expected += line + sleep_ms(100) + content = dev.read() + result = content.decode() == expected + expect_true: true + mode: [hardware] + + - name: "Read sector returns correct 256-byte block" + action: hardware_script + script: | + from time import sleep_ms + dev.set_filename("SECTOR", "TXT") + dev.clear_flash() + sleep_ms(500) + # Write more than 256 bytes to fill at least 2 sectors + data = "A" * 300 + "\n" + dev.write(data) + sleep_ms(100) + s0 = dev.read_sector(0) + s1 = dev.read_sector(1) + result = len(s0) == 256 and len(s1) == 256 + result = result and s0[0] == ord("A") and s1[0] == ord("A") + expect_true: true + mode: [hardware] + + - name: "Error register set when writing to full flash" + action: hardware_script + script: | + # Verify error register is clean after normal operations + dev.set_filename("ERRTEST", "TXT") + dev.clear_flash() + from time import sleep_ms + sleep_ms(500) + dev.write_line("test") + sleep_ms(100) + result = dev._error() == 0x00 + expect_true: true + mode: [hardware] + + - name: "Erase then read returns empty" + action: hardware_script + script: | + from time import sleep_ms + dev.set_filename("ERASE", "TXT") + dev.clear_flash() + sleep_ms(500) + dev.write_line("data to erase") + sleep_ms(100) + dev.clear_flash() + sleep_ms(500) + content = dev.read() + result = len(content) == 0 + expect_true: true + mode: [hardware]