diff --git a/BLE/BLE_Beacon/BLE_Beacon_temperature.py b/BLE/BLE_Beacon/BLE_Beacon_temperature.py new file mode 100644 index 0000000..87941a5 --- /dev/null +++ b/BLE/BLE_Beacon/BLE_Beacon_temperature.py @@ -0,0 +1,131 @@ +"""BLE beacon advertising example using aioble and SSD1327 OLED. + +Broadcasts a BLE advertisement containing the board name and a live +temperature reading from the WSEN-PADS sensor. The beacon is visible +from any BLE scanner app (nRF Connect, LightBlue, etc.). + +Hardware: + - STM32WB55 BLE radio + - SSD1327 128x128 OLED display (round) + - WSEN-PADS pressure + temperature sensor + +BLE payload: + - Complete Local Name: "STeaMi-XXXX" (last 2 bytes of MAC address) + - Manufacturer Specific Data: temperature as int16 (x100, in 0.01 C) +""" + +import sys + +sys.path.insert(0, "/remote") + +import bluetooth +import struct +import uasyncio as asyncio + +import aioble +import ssd1327 +from machine import I2C, SPI, Pin +from steami_screen import GRAY, GREEN, LIGHT, Screen, SSD1327Display, WHITE +from wsen_pads import WSEN_PADS + +# === BLE setup === +ble = bluetooth.BLE() +ble.active(True) + +mac_bytes = ble.config("mac")[1] +mac_suffix = "".join(f"{b:02X}" for b in mac_bytes[-2:]) +DEVICE_NAME = f"STeaMi-{mac_suffix}" +print("Device name:", DEVICE_NAME) + +# === Display === +spi = SPI(1) +dc = Pin("DATA_COMMAND_DISPLAY") +res = Pin("RST_DISPLAY") +cs = Pin("CS_DISPLAY") +display = SSD1327Display(ssd1327.WS_OLED_128X128_SPI(spi, dc, res, cs)) +screen = Screen(display) + +# === Sensor === +i2c = I2C(1) +pads = WSEN_PADS(i2c) + +# === BLE parameters === +ADV_INTERVAL_US = 200_000 # 200 ms between advertisements +ADV_TIMEOUT_MS = 500 # advertise for 500 ms per cycle +SENSOR_INTERVAL_MS = 1000 # read sensor every 1 second + +# === Shared state === +temperature = 0.0 + + +def build_adv_payload(name, temp_raw): + """Build a BLE advertising payload. + + Contains: + - Complete Local Name (type 0x09) + - Manufacturer Specific Data (type 0xFF): int16 temperature x100 + """ + payload = bytearray() + + # Complete Local Name + name_bytes = name.encode() + payload += bytes((len(name_bytes) + 1, 0x09)) + name_bytes + + # Manufacturer Specific Data: temperature encoded as int16 (x100) + man_data = struct.pack("h", temp_raw) + payload += bytes((len(man_data) + 1, 0xFF)) + man_data + + return payload + + +async def sensor_task(): + """Read temperature from WSEN-PADS every second.""" + global temperature + while True: + try: + temperature = pads.temperature() + except Exception as e: + print("Sensor error:", e) + await asyncio.sleep_ms(SENSOR_INTERVAL_MS) + + +async def ble_task(): + """Advertise BLE beacon with device name and temperature.""" + while True: + # Encode temperature as int16 (multiply by 100 to keep 2 decimals) + temp_raw = int(temperature * 100) + adv_payload = build_adv_payload(DEVICE_NAME, temp_raw) + + try: + await aioble.advertise( + interval_us=ADV_INTERVAL_US, + adv_data=adv_payload, + connectable=False, + timeout_ms=ADV_TIMEOUT_MS, + ) + except asyncio.TimeoutError: + pass # Normal: non-connectable advertisement timeout + + await asyncio.sleep_ms(100) + + +async def display_task(): + """Update OLED display with current beacon state.""" + while True: + screen.clear() + screen.title("BLE BEACON") + screen.value(f"{temperature:.1f}", unit="C") + screen.subtitle(DEVICE_NAME, "Advertising...") + screen.show() + await asyncio.sleep_ms(500) + + +async def main(): + await asyncio.gather( + sensor_task(), + ble_task(), + display_task(), + ) + + +asyncio.run(main()) diff --git a/BLE/BLE_HOT_COLD/main.py b/BLE/BLE_HOT_COLD/main.py new file mode 100644 index 0000000..99aee5a --- /dev/null +++ b/BLE/BLE_HOT_COLD/main.py @@ -0,0 +1,474 @@ +"""BLE Hot/Cold treasure hunt game using aioble and SSD1327 OLED. + +Two STeaMi boards run this same file: + - UP/DOWN to pick TREASURE or SEEKER, RIGHT to confirm + - LEFT at any time to return to mode selection + +The seeker scans for the treasure beacon, maps RSSI to proximity zones, +displays feedback on the OLED and plays buzzer beeps that accelerate +as the player gets closer. + +Proximity zones (0-100%): + COLD ( 0 to 25 ) -> sad face + 1 slow beep every 3s (440 Hz) + WARM ( 25 to 50 ) -> sleeping face + 1 beep every 1.2s (660 Hz) + HOT ( 50 to 75 ) -> happy face + 2 fast beeps every 500ms (880 Hz) + BURNING ( 75 to 100] -> love face + 3 rapid beeps every 150ms (1200 Hz) + +RSSI smoothing uses an Exponential Moving Average (EMA) so the display +reacts immediately to large jumps while staying stable during small +fluctuations. Zone transitions are protected by a hysteresis band to +avoid flickering at zone boundaries. + +Hardware: + - 2 x STM32WB55 BLE radio + - SSD1327 128x128 OLED display (round) + - MCP23009E D-PAD (UP/DOWN/LEFT/RIGHT) + - SPEAKER pin (buzzer) + +Learning goals: + - Practical application of RSSI to distance + - EMA signal smoothing: reactive yet stable + - Hysteresis to avoid zone flickering + - Multi-sensory feedback (screen + sound) + - Asyncio task coordination +""" + +import bluetooth +import uasyncio as asyncio +from time import sleep_us, ticks_ms, ticks_diff + +import aioble +import ssd1327 +from machine import I2C, SPI, Pin +from mcp23009e import MCP23009E +from mcp23009e.const import ( + MCP23009_BTN_DOWN, + MCP23009_BTN_LEFT, + MCP23009_BTN_RIGHT, + MCP23009_BTN_UP, + MCP23009_DIR_INPUT, + MCP23009_I2C_ADDR, + MCP23009_LOGIC_LOW, + MCP23009_PULLUP, +) +from steami_screen import BLUE, RED, Screen, SSD1327Display, YELLOW + +# === BLE setup === +ble = bluetooth.BLE() +ble.active(True) +mac_bytes = ble.config("mac")[1] +mac_suffix = "".join(f"{b:02X}" for b in mac_bytes[-2:]) +DEVICE_NAME = f"STeaMi-{mac_suffix}" +BEACON_NAME = "Beacon_M1" + +# === Display === +spi = SPI(1) +dc = Pin("DATA_COMMAND_DISPLAY") +res = Pin("RST_DISPLAY") +cs = Pin("CS_DISPLAY") +display = SSD1327Display(ssd1327.WS_OLED_128X128_SPI(spi, dc, res, cs)) +screen = Screen(display) + +# === Buzzer === +speaker = Pin("SPEAKER", Pin.OUT) + +# === Buttons === +i2c = I2C(1) +reset_expander = Pin("RST_EXPANDER", Pin.OUT) +mcp = MCP23009E(i2c, address=MCP23009_I2C_ADDR, reset_pin=reset_expander) + +_DPAD = { + MCP23009_BTN_UP: "UP", + MCP23009_BTN_DOWN: "DOWN", + MCP23009_BTN_LEFT: "LEFT", + MCP23009_BTN_RIGHT: "RIGHT", +} + + +def setup_buttons(): + for pin in _DPAD: + mcp.setup(pin, MCP23009_DIR_INPUT, pullup=MCP23009_PULLUP) + + +def _read_button(): + """Return the name of the currently pressed button, or None.""" + for pin, name in _DPAD.items(): + if mcp.get_level(pin) == MCP23009_LOGIC_LOW: + return name + return None + + +# ============================================================================= +# === CALIBRATION ============================================================= +# ============================================================================= +# Adjust these two values to match your environment: +# - Hold boards 1-2 cm apart, note the RSSI -> set RSSI_MAX +# - Hold boards ~5 m apart, note the RSSI -> set RSSI_MIN +RSSI_MIN = -80 # dBm -- considered far (cold) +RSSI_MAX = -30 # dBm -- considered very close (burning) + +# EMA weight: 0.0 = frozen, 1.0 = no smoothing. +# 0.4 reacts quickly to large jumps while dampening noise. +EMA_ALPHA = 0.4 + +# ============================================================================= +# === BLE / TIMING PARAMETERS ================================================= +# ============================================================================= +ADV_INTERVAL_US = 50_000 # beacon advertising interval (50 ms) +ADV_TIMEOUT_MS = 100 # short window per advertise() call +SCAN_DURATION_MS = 200 # scan burst -- shorter = more reactive +SIGNAL_TIMEOUT_MS = 3000 # declare signal lost after this many ms with no packet +DISPLAY_INTERVAL_MS = 150 # OLED refresh rate (decoupled from BLE scan) + +MODE_SELECTION_S = 10 # auto-confirm countdown + +# ============================================================================= +# === PROXIMITY ZONES ========================================================= +# ============================================================================= +ZONE_COLD = 25 +ZONE_WARM = 50 +ZONE_HOT = 75 + +# Dead-band around each threshold: must cross by this % to change zone. +# 8% absorbs typical BLE RSSI noise (~5 dBm on a 50 dBm scale = 10%). +ZONE_HYSTERESIS = 8 + +_ZONE_DATA = [ + ("COLD", BLUE, "sad"), + ("WARM", YELLOW, "sleeping"), + ("HOT", RED, "happy"), + ("BURNING!", RED, "love"), +] + +# Beep patterns per zone: (frequency Hz, single-beep duration ms, beep count, gap between beeps ms) +# Rhythm changes make zones unmistakable even with eyes closed. +_ZONE_BEEP = [ + (440, 80, 1, 0), # COLD -- one low slow beep + (660, 100, 1, 0), # WARM -- one medium beep + (880, 70, 2, 80), # HOT -- da-dum da-dum + (1200, 55, 3, 50), # BURNING -- rapid triple burst +] + +# Time to wait AFTER each beep group before the next one (ms). +_ZONE_INTERVAL = (3000, 1200, 500, 150) + +# ============================================================================= +# === SHARED STATE ============================================================ +# ============================================================================= +current_rssi = float(RSSI_MIN) # EMA-smoothed RSSI +last_beacon_ticks = 0 +beacon_seen = False +current_zone_idx = 0 # 0=COLD, 1=WARM, 2=HOT, 3=BURNING +_exit_requested = False + + +# ============================================================================= +# === BUZZER ================================================================== +# ============================================================================= + + +async def tone_async(freq, duration_ms): + """Play a short blocking tone, then yield to the event loop.""" + if freq == 0: + await asyncio.sleep_ms(duration_ms) + return + period_us = int(1_000_000 / freq) + half = period_us // 2 + cycles = (duration_ms * 1000) // period_us + for _ in range(cycles): + speaker.on() + sleep_us(half) + speaker.off() + sleep_us(half) + await asyncio.sleep_ms(0) + + +async def beep_pattern(zone_idx): + """Play the beep pattern for the given zone.""" + freq, dur, count, gap = _ZONE_BEEP[zone_idx] + for i in range(count): + await tone_async(freq, dur) + if gap and i < count - 1: + await asyncio.sleep_ms(gap) + + +async def _wait_ms(ms): + """Wait for up to ms, waking early on exit, signal loss, or zone change.""" + zone = current_zone_idx + elapsed = 0 + while elapsed < ms: + if _exit_requested or not beacon_seen or current_zone_idx != zone: + return + await asyncio.sleep_ms(50) + elapsed += 50 + + +async def buzzer_task(): + """Play beep patterns that reflect the current proximity zone.""" + while not _exit_requested: + if not beacon_seen: + # Slow searching pulse; wake immediately when beacon appears. + await tone_async(350, 50) + await _wait_ms(1000) + continue + + zone = current_zone_idx + await beep_pattern(zone) + await _wait_ms(_ZONE_INTERVAL[zone]) + + +# ============================================================================= +# === HELPERS ================================================================= +# ============================================================================= + + +def build_adv_payload(name): + """Minimal BLE advertising payload carrying the device name.""" + name_bytes = name.encode() + return bytes((len(name_bytes) + 1, 0x09)) + name_bytes + + +def rssi_to_proximity(rssi): + """Map smoothed RSSI to a 0-100 proximity percentage.""" + clamped = max(RSSI_MIN, min(RSSI_MAX, rssi)) + return int((clamped - RSSI_MIN) / (RSSI_MAX - RSSI_MIN) * 100) + + +def update_zone(proximity): + """Advance or retreat one zone at a time using hysteresis.""" + global current_zone_idx + z = current_zone_idx + thresholds = (ZONE_COLD, ZONE_WARM, ZONE_HOT) + if z < 3 and proximity >= thresholds[z] + ZONE_HYSTERESIS: + z += 1 + elif z > 0 and proximity < thresholds[z - 1] - ZONE_HYSTERESIS: + z -= 1 + current_zone_idx = z + + +async def _sleep_interruptible(ms): + """Sleep for ms, waking up early if _exit_requested is set.""" + while ms > 0 and not _exit_requested: + await asyncio.sleep_ms(min(100, ms)) + ms -= 100 + + +# ============================================================================= +# === MODE SELECTION ========================================================== +# ============================================================================= + +_MODES = ["TREASURE", "SEEKER"] + + +def _draw_mode_select(selected, remaining): + screen.clear() + screen.title("SELECT MODE") + screen.menu(_MODES, selected=selected) + screen.subtitle("U/D:select", "RIGHT:OK", f"{remaining}s") + screen.show() + + +async def select_mode_async(): + """Menu-driven mode selection with D-PAD and auto-confirm countdown.""" + global _exit_requested + _exit_requested = False + + selected = 1 # Default: SEEKER + last_btn = None + remaining = MODE_SELECTION_S + + # Wait for any held button from the previous mode to release + while _read_button() is not None: + await asyncio.sleep_ms(20) + + while remaining > 0: + _draw_mode_select(selected, remaining) + for _ in range(20): # 20 * 50 ms = 1 s per countdown tick + btn = _read_button() + if btn != last_btn: + if btn == "UP": + selected = 0 + _draw_mode_select(selected, remaining) + elif btn == "DOWN": + selected = 1 + _draw_mode_select(selected, remaining) + elif btn == "RIGHT": + _draw_mode_select(selected, 0) + await asyncio.sleep_ms(300) + return selected == 0 + last_btn = btn + await asyncio.sleep_ms(50) + remaining -= 1 + + return selected == 0 # True = TREASURE + + +# ============================================================================= +# === EXIT WATCHER ============================================================ +# ============================================================================= + + +async def exit_watcher_task(): + """Set _exit_requested when LEFT is pressed, then return.""" + global _exit_requested + while _read_button() is not None: + await asyncio.sleep_ms(20) + while True: + if _read_button() == "LEFT": + while _read_button() is not None: + await asyncio.sleep_ms(20) + _exit_requested = True + return + await asyncio.sleep_ms(50) + + +# ============================================================================= +# === TREASURE MODE (BEACON) ================================================== +# ============================================================================= + + +async def treasure_ble_task(): + """Advertise as the treasure beacon until exit is requested.""" + adv_payload = build_adv_payload(BEACON_NAME) + print(f"Treasure mode: advertising as {BEACON_NAME}") + while not _exit_requested: + try: + await aioble.advertise( + interval_us=ADV_INTERVAL_US, + adv_data=adv_payload, + connectable=False, + timeout_ms=ADV_TIMEOUT_MS, + ) + except asyncio.TimeoutError: + pass + + +async def treasure_display_task(): + """Show treasure mode on OLED until exit is requested.""" + while not _exit_requested: + screen.clear() + screen.title("TREASURE") + screen.face("love") + screen.subtitle("Find me!", BEACON_NAME, "LEFT: menu") + screen.show() + await _sleep_interruptible(2000) + + +async def run_treasure(): + screen.clear() + screen.title("TREASURE") + screen.face("love") + screen.subtitle("Starting...") + screen.show() + await asyncio.sleep_ms(800) + await asyncio.gather( + treasure_ble_task(), + treasure_display_task(), + exit_watcher_task(), + ) + + +# ============================================================================= +# === SEEKER MODE (SCANNER) =================================================== +# ============================================================================= + + +async def seeker_ble_task(): + """Scan for the treasure beacon and update shared state with EMA smoothing.""" + global current_rssi, last_beacon_ticks, beacon_seen, current_zone_idx + first_sample = True + while not _exit_requested: + async with aioble.scan( + SCAN_DURATION_MS, + interval_us=10000, + window_us=10000, + active=True, + ) as scanner: + async for result in scanner: + if _exit_requested: + break + if result.name() != BEACON_NAME: + continue + raw = result.rssi + if first_sample: + current_rssi = float(raw) + first_sample = False + else: + current_rssi = EMA_ALPHA * raw + (1 - EMA_ALPHA) * current_rssi + proximity = rssi_to_proximity(current_rssi) + prev_zone = current_zone_idx + update_zone(proximity) + last_beacon_ticks = ticks_ms() + beacon_seen = True + if current_zone_idx != prev_zone: + print( + f"ema={current_rssi:.1f} dBm prox={proximity}%" + f" -> {_ZONE_DATA[current_zone_idx][0]}" + ) + + if _exit_requested: + break + + if beacon_seen and ticks_diff(ticks_ms(), last_beacon_ticks) > SIGNAL_TIMEOUT_MS: + beacon_seen = False + first_sample = True + current_rssi = float(RSSI_MIN) + current_zone_idx = 0 + print("Signal lost") + + +async def seeker_display_task(): + """Refresh OLED at a fixed rate from shared state.""" + while not _exit_requested: + screen.clear() + if beacon_seen: + proximity = rssi_to_proximity(current_rssi) + label, color, face = _ZONE_DATA[current_zone_idx] + screen.gauge(proximity, min_val=0, max_val=100, color=color) + screen.title(label) + screen.face(face, compact=True) + screen.subtitle(f"{int(current_rssi)} dBm", f"{proximity}%") + else: + screen.title("SEARCHING...") + screen.face("surprised") + screen.subtitle("Looking for", BEACON_NAME) + screen.show() + await asyncio.sleep_ms(DISPLAY_INTERVAL_MS) + + +async def run_seeker(): + global beacon_seen, current_rssi, current_zone_idx + beacon_seen = False + current_rssi = float(RSSI_MIN) + current_zone_idx = 0 + + screen.clear() + screen.title("SEEKER") + screen.face("surprised") + screen.subtitle("Starting...", "LEFT: menu") + screen.show() + await asyncio.sleep_ms(800) + await asyncio.gather( + seeker_ble_task(), + seeker_display_task(), + buzzer_task(), + exit_watcher_task(), + ) + + +# ============================================================================= +# === MAIN LOOP =============================================================== +# ============================================================================= + + +async def main(): + setup_buttons() + while True: + is_treasure = await select_mode_async() + if is_treasure: + await run_treasure() + else: + await run_seeker() + + +asyncio.run(main()) diff --git a/BLE/Room_mapper/main.py b/BLE/Room_mapper/main.py new file mode 100644 index 0000000..32e621c --- /dev/null +++ b/BLE/Room_mapper/main.py @@ -0,0 +1,339 @@ +"""BLE RSSI room mapper example using aioble, SSD1327 OLED and DAPLink flash. + +One board acts as scanner while 2-3 other boards act as beacons. +The user moves around the room with the scanner and presses RIGHT to +record RSSI measurements at each position. Data is saved as CSV via +DAPLink flash. + +Roles (single file): + - Navigate menu with UP/DOWN, confirm with RIGHT + - BEACON mode: select beacon ID and advertise continuously, LEFT to exit + - SCANNER mode: scan beacons, record RSSI points, save to CSV + +Beacon naming convention: + Beacon_M1, Beacon_M2, Beacon_M3 + +CSV format: + point_id,beacon_name,rssi + +Hardware: + - 3-4 STeaMi boards (2-3 beacons + 1 scanner) + - SSD1327 OLED + D-PAD + DAPLink flash on scanner board + +Learning goals: + - Experimental data collection methodology + - Understanding RSSI variability + - File I/O via DAPLink flash + - Introduction to radio fingerprinting +""" + +import sys + +sys.path.insert(0, "/remote") + +import bluetooth +import uasyncio as asyncio +from time import sleep_ms + +import aioble +import ssd1327 +from daplink_bridge import DaplinkBridge +from daplink_flash import DaplinkFlash +from machine import I2C, SPI, Pin +from mcp23009e import MCP23009E +from mcp23009e.const import ( + MCP23009_BTN_DOWN, + MCP23009_BTN_LEFT, + MCP23009_BTN_RIGHT, + MCP23009_BTN_UP, + MCP23009_DIR_INPUT, + MCP23009_I2C_ADDR, + MCP23009_LOGIC_LOW, + MCP23009_PULLUP, +) +from steami_screen import LIGHT, GRAY, Screen, SSD1327Display + +# === BLE setup === +ble = bluetooth.BLE() +ble.active(True) + +mac_bytes = ble.config("mac")[1] +mac_suffix = "".join(f"{b:02X}" for b in mac_bytes[-2:]) +DEVICE_NAME = f"STeaMi-{mac_suffix}" + +# === Beacon names === +BEACON_NAMES = ["Beacon_M1", "Beacon_M2", "Beacon_M3"] + +# === RSSI calibration offsets (environment-specific) === +# Adjust these values based on your hardware measurements at contact distance. +RSSI_OFFSET = { + "Beacon_M1": 0, + "Beacon_M2": 28, + "Beacon_M3": 28, +} + +# === Display === +spi = SPI(1) +dc = Pin("DATA_COMMAND_DISPLAY") +res = Pin("RST_DISPLAY") +cs = Pin("CS_DISPLAY") +display = SSD1327Display(ssd1327.WS_OLED_128X128_SPI(spi, dc, res, cs)) +screen = Screen(display) + +# === Buttons === +i2c = I2C(1) +reset_expander = Pin("RST_EXPANDER", Pin.OUT) +mcp = MCP23009E(i2c, address=MCP23009_I2C_ADDR, reset_pin=reset_expander) +for btn in [MCP23009_BTN_UP, MCP23009_BTN_DOWN, MCP23009_BTN_RIGHT, MCP23009_BTN_LEFT]: + mcp.setup(btn, MCP23009_DIR_INPUT, pullup=MCP23009_PULLUP) + +# === DAPLink flash === +bridge = DaplinkBridge(i2c) +flash = DaplinkFlash(bridge) + +# === BLE parameters === +ADV_INTERVAL_US = 50_000 +ADV_TIMEOUT_MS = 100 +SCAN_DURATION_MS = 500 + +# === Shared state === +current_rssi = {} +point_id = 0 +stop_flag = False + + +# ============================================================================= +# === HELPERS ================================================================= +# ============================================================================= + + +def build_adv_payload(name): + """Build a minimal BLE advertising payload with device name.""" + payload = bytearray() + name_bytes = name.encode() + payload += bytes((len(name_bytes) + 1, 0x09)) + name_bytes + return payload + + +def is_pressed(btn): + """Return True if button is currently pressed.""" + return mcp.get_level(btn) == MCP23009_LOGIC_LOW + + +def wait_released(btn): + """Wait until button is released.""" + while is_pressed(btn): + sleep_ms(20) + + +def menu_select(title, items): + """Generic D-PAD menu. Returns selected index.""" + selected = 0 + while True: + screen.clear() + screen.title(title) + screen.menu(items, selected=selected) + screen.subtitle("UP/DOWN: nav", "RIGHT: confirm") + screen.show() + + sleep_ms(150) + if is_pressed(MCP23009_BTN_UP): + wait_released(MCP23009_BTN_UP) + selected = (selected - 1) % len(items) + elif is_pressed(MCP23009_BTN_DOWN): + wait_released(MCP23009_BTN_DOWN) + selected = (selected + 1) % len(items) + elif is_pressed(MCP23009_BTN_RIGHT): + wait_released(MCP23009_BTN_RIGHT) + return selected + + +# ============================================================================= +# === BEACON MODE ============================================================= +# ============================================================================= + + +async def beacon_ble_task(beacon_name): + """Advertise as Beacon_Mx until stop_flag is set.""" + adv_payload = build_adv_payload(beacon_name) + print(f"Beacon mode: advertising as {beacon_name}") + while not stop_flag: + try: + await aioble.advertise( + interval_us=ADV_INTERVAL_US, + adv_data=adv_payload, + connectable=False, + timeout_ms=ADV_TIMEOUT_MS, + ) + except asyncio.TimeoutError: + pass + + +async def beacon_display_task(beacon_name): + """Show beacon status on OLED until stop_flag is set.""" + while not stop_flag: + screen.clear() + screen.title("BEACON") + screen.subtitle(beacon_name, "Broadcasting...", "LEFT: menu") + screen.show() + await asyncio.sleep_ms(1000) + + +async def beacon_button_task(): + """Watch for LEFT button to exit beacon mode.""" + global stop_flag + while not stop_flag: + if is_pressed(MCP23009_BTN_LEFT): + wait_released(MCP23009_BTN_LEFT) + stop_flag = True + return + await asyncio.sleep_ms(100) + + +async def run_beacon(beacon_name): + """Run beacon mode until LEFT is pressed.""" + await asyncio.gather( + beacon_ble_task(beacon_name), + beacon_display_task(beacon_name), + beacon_button_task(), + ) + + +# ============================================================================= +# === SCANNER MODE ============================================================ +# ============================================================================= + + +async def scanner_ble_task(): + """Scan for all beacons and update current RSSI with calibration offset.""" + global current_rssi + while not stop_flag: + async with aioble.scan( + SCAN_DURATION_MS, + interval_us=10000, + window_us=10000, + active=True, + ) as scanner: + async for result in scanner: + name = result.name() + if name in BEACON_NAMES: + current_rssi[name] = result.rssi + RSSI_OFFSET.get(name, 0) + + +async def scanner_display_task(): + """Display beacons RSSI, handle recording and stop.""" + global point_id, stop_flag + + while not stop_flag: + if is_pressed(MCP23009_BTN_RIGHT): + wait_released(MCP23009_BTN_RIGHT) + if current_rssi: + point_id += 1 + for name, rssi in current_rssi.items(): + line = f"{point_id},{name},{rssi}" + flash.write_line(line) + print(line) + + screen.clear() + screen.title(f"Point {point_id} OK") + screen.subtitle( + f"{len(current_rssi)} beacons", + "RIGHT: next", + "LEFT: stop", + ) + screen.show() + await asyncio.sleep_ms(800) + + elif is_pressed(MCP23009_BTN_LEFT): + wait_released(MCP23009_BTN_LEFT) + stop_flag = True + break + + else: + screen.clear() + screen.title("ROOM MAPPER") + + cx, cy = screen.center + y = 38 + for name in BEACON_NAMES: + rssi = current_rssi.get(name, "---") + short = name[-2:] + label = f"{short}: {rssi} dBm" + x = cx - len(label) * 4 + screen._d.text(label, x, y, LIGHT) + y += 14 + + screen.subtitle(f"#{point_id} R:rec L:stop") + screen.show() + + await asyncio.sleep_ms(200) + + +async def run_scanner(): + """Run scanner mode until stop_flag is set.""" + await asyncio.gather( + scanner_ble_task(), + scanner_display_task(), + ) + + +# ============================================================================= +# === MAIN LOOP =============================================================== +# ============================================================================= + +while True: + # Reset state + current_rssi = {} + point_id = 0 + stop_flag = False + + # Main mode selection menu + mode_idx = menu_select("SELECT MODE", ["SCANNER", "BEACON"]) + + if mode_idx == 1: + # Beacon mode — select beacon ID + beacon_idx = menu_select("SELECT BEACON", BEACON_NAMES) + beacon_name = BEACON_NAMES[beacon_idx] + + screen.clear() + screen.title("BEACON") + screen.subtitle(beacon_name, "Starting...") + screen.show() + sleep_ms(500) + + asyncio.run(run_beacon(beacon_name)) + + # Back to menu after LEFT pressed + screen.clear() + screen.title("BEACON STOPPED") + screen.subtitle("Returning to menu...") + screen.show() + sleep_ms(1000) + + else: + # Scanner mode + flash.set_filename("RSSI_MAP", "CSV") + flash.clear_flash() + flash.write_line("point_id,beacon_name,rssi") + + screen.clear() + screen.title("ROOM MAPPER") + screen.subtitle("RIGHT: record", "LEFT: stop") + screen.show() + sleep_ms(500) + + asyncio.run(run_scanner()) + + # Show summary then loop back to menu + screen.clear() + screen.title("SAVED!") + screen.subtitle( + f"{point_id} points", + "RSSI_MAP.CSV", + "RIGHT: menu", + ) + screen.show() + + while not is_pressed(MCP23009_BTN_RIGHT): + sleep_ms(100) + wait_released(MCP23009_BTN_RIGHT) diff --git a/BLE/Trilateration_positioning/main.py b/BLE/Trilateration_positioning/main.py new file mode 100644 index 0000000..bc9e31a --- /dev/null +++ b/BLE/Trilateration_positioning/main.py @@ -0,0 +1,470 @@ +"""BLE trilateration indoor positioning example using aioble and SSD1327 OLED. + +3 fixed beacon boards broadcast their identity. A mobile board scans all 3, +converts RSSI to distance using the log-distance path loss model, then +trilaterates a 2D position displayed as a dot on a live map. + +Roles (single file): + - Navigate menu with UP/DOWN, confirm with RIGHT + - BEACON mode: select beacon ID (M1/M2/M3) and advertise continuously + - MOBILE mode: scan all beacons, trilaterate and display live position + +Path loss model (per beacon): + d = 10 ^ ((RSSI_ref - RSSI) / (10 * n)) + RSSI_ref: measured at 1m with nRF Connect (iPhone 17 Pro Max) + n: computed from 1m and 2m measurements per beacon + +Calibration data: + Beacon RSSI@1m RSSI@2m n + M1 -75 -87 3.99 + M2 -75 -88 4.32 + M3 -80 -88 2.66 + +Beacon coordinates (cm, measured on site): + M1 = ( 0, 0) + M2 = (420, 0) + M3 = (330, 278) + +Hardware: + - 4 STeaMi boards (3 beacons + 1 mobile) + - SSD1327 128x128 OLED on mobile board + - D-PAD for menu navigation +""" + +import bluetooth +import math +import uasyncio as asyncio +from time import sleep_ms + +import aioble +import ssd1327 +from machine import I2C, SPI, Pin +from mcp23009e import MCP23009E +from mcp23009e.const import ( + MCP23009_BTN_DOWN, + MCP23009_BTN_LEFT, + MCP23009_BTN_RIGHT, + MCP23009_BTN_UP, + MCP23009_DIR_INPUT, + MCP23009_I2C_ADDR, + MCP23009_LOGIC_LOW, + MCP23009_PULLUP, +) +from steami_screen import DARK, GRAY, LIGHT, Screen, SSD1327Display, WHITE + +# === BLE setup === +ble = bluetooth.BLE() +ble.active(True) + +mac_bytes = ble.config("mac")[1] +mac_suffix = "".join(f"{b:02X}" for b in mac_bytes[-2:]) +DEVICE_NAME = f"STeaMi-{mac_suffix}" + +# === Beacon names === +BEACON_NAMES = ["Beacon_M1", "Beacon_M2", "Beacon_M3"] + +# === Beacon coordinates in cm (measured on site) === +BEACON_POS = { + "Beacon_M1": (0, 0), + "Beacon_M2": (420, 0), + "Beacon_M3": (330, 278), +} + +# === Geometric centroid of the triangle (cm) === +CENTROID = ( + (0 + 420 + 330) // 3, + (0 + 0 + 278) // 3, +) + +# === Path loss calibration (measured on site) === +# RSSI_ref: RSSI at 1 meter +RSSI_REF = { + "Beacon_M1": -75, + "Beacon_M2": -75, + "Beacon_M3": -80, +} + +# Path loss exponent n computed from 1m and 2m measurements: +# n = (RSSI_ref - RSSI_2m) / (10 * log10(2)) +PATH_LOSS_N = { + "Beacon_M1": 3.99, + "Beacon_M2": 4.32, + "Beacon_M3": 2.66, +} + +# === Distance clamping === +MAX_DIST_CM = 430 # Slightly above the largest real distance (420cm M1-M2) + +# === Trilateration validity === +MAX_VALID_DIST_CM = 550 # Reject solutions too far from centroid + +# === RSSI smoothing === +RSSI_SAMPLES = 8 + +# === Position filtering === +ALPHA = 0.15 # Exponential smoothing: 0.0=max smooth, 1.0=no filter +MIN_MOVE_CM = 15 # Ignore position changes smaller than this + +# === Display === +spi = SPI(1) +dc = Pin("DATA_COMMAND_DISPLAY") +res = Pin("RST_DISPLAY") +cs = Pin("CS_DISPLAY") +display = SSD1327Display(ssd1327.WS_OLED_128X128_SPI(spi, dc, res, cs)) +screen = Screen(display) + +# === Buttons === +i2c = I2C(1) +reset_expander = Pin("RST_EXPANDER", Pin.OUT) +mcp = MCP23009E(i2c, address=MCP23009_I2C_ADDR, reset_pin=reset_expander) +for btn in [MCP23009_BTN_UP, MCP23009_BTN_DOWN, MCP23009_BTN_RIGHT, MCP23009_BTN_LEFT]: + mcp.setup(btn, MCP23009_DIR_INPUT, pullup=MCP23009_PULLUP) + +# === BLE parameters === +ADV_INTERVAL_US = 50_000 +ADV_TIMEOUT_MS = 100 +SCAN_DURATION_MS = 300 + +# === Shared state === +current_rssi = {} +estimated_pos = None +filtered_pos = None +stop_flag = False +rssi_history = {name: [] for name in BEACON_NAMES} + +# === Map display area (pixels) === +MAP_X = 28 +MAP_Y = 14 +MAP_W = 72 +MAP_H = 72 + +# === Real space bounding box (cm) === +SPACE_W = 450 +SPACE_H = 310 + + +# ============================================================================= +# === HELPERS ================================================================= +# ============================================================================= + + +def build_adv_payload(name): + """Build a minimal BLE advertising payload with device name.""" + payload = bytearray() + name_bytes = name.encode() + payload += bytes((len(name_bytes) + 1, 0x09)) + name_bytes + return payload + + +def is_pressed(btn): + """Return True if button is currently pressed.""" + return mcp.get_level(btn) == MCP23009_LOGIC_LOW + + +def wait_released(btn): + """Wait until button is released.""" + while is_pressed(btn): + sleep_ms(20) + + +def menu_select(title, items): + """Generic D-PAD menu. Returns selected index.""" + selected = 0 + while True: + screen.clear() + screen.title(title) + screen.menu(items, selected=selected) + screen.subtitle("UP/DOWN: nav", "RIGHT: confirm") + screen.show() + + sleep_ms(150) + if is_pressed(MCP23009_BTN_UP): + wait_released(MCP23009_BTN_UP) + selected = (selected - 1) % len(items) + elif is_pressed(MCP23009_BTN_DOWN): + wait_released(MCP23009_BTN_DOWN) + selected = (selected + 1) % len(items) + elif is_pressed(MCP23009_BTN_RIGHT): + wait_released(MCP23009_BTN_RIGHT) + return selected + + +def rssi_to_distance(rssi, beacon_name): + """Convert RSSI to distance in cm using per-beacon path loss model. + + Distance is clamped to MAX_DIST_CM to avoid aberrant values. + """ + ref = RSSI_REF.get(beacon_name, -75) + n = PATH_LOSS_N.get(beacon_name, 3.5) + d_m = 10 ** ((ref - rssi) / (10 * n)) + return min(d_m * 100, MAX_DIST_CM) + + +def smooth_rssi(name, new_rssi): + """Apply moving average to RSSI readings.""" + history = rssi_history[name] + history.append(new_rssi) + if len(history) > RSSI_SAMPLES: + history.pop(0) + return sum(history) // len(history) + + +def trilaterate(distances): + """Estimate 2D position from 3 beacon distances using least-squares. + + Rejects solutions too far from the triangle centroid. + + Args: + distances: dict {beacon_name: distance_cm} + + Returns: + (x, y) in cm, or None if invalid. + """ + if len(distances) < 3: + return None + + names = list(distances.keys()) + x1, y1 = BEACON_POS[names[0]] + x2, y2 = BEACON_POS[names[1]] + x3, y3 = BEACON_POS[names[2]] + r1 = distances[names[0]] + r2 = distances[names[1]] + r3 = distances[names[2]] + + A = 2 * (x2 - x1) + B = 2 * (y2 - y1) + C = r1**2 - r2**2 - x1**2 + x2**2 - y1**2 + y2**2 + D = 2 * (x3 - x1) + E = 2 * (y3 - y1) + F = r1**2 - r3**2 - x1**2 + x3**2 - y1**2 + y3**2 + + denom = A * E - B * D + if abs(denom) < 1e-6: + return None + + x = (C * E - F * B) / denom + y = (A * F - D * C) / denom + + # Reject if too far from centroid + dx = x - CENTROID[0] + dy = y - CENTROID[1] + if math.sqrt(dx * dx + dy * dy) > MAX_VALID_DIST_CM: + return None + + return (x, y) + + +def apply_filter(new_pos): + """Apply exponential smoothing with minimum movement threshold.""" + global filtered_pos + + if filtered_pos is None: + filtered_pos = new_pos + return filtered_pos + + fx = ALPHA * new_pos[0] + (1 - ALPHA) * filtered_pos[0] + fy = ALPHA * new_pos[1] + (1 - ALPHA) * filtered_pos[1] + + dx = fx - filtered_pos[0] + dy = fy - filtered_pos[1] + if math.sqrt(dx * dx + dy * dy) < MIN_MOVE_CM: + return filtered_pos + + filtered_pos = (fx, fy) + return filtered_pos + + +def world_to_screen(x, y): + """Convert real-world cm coordinates to OLED map pixel coordinates.""" + px = MAP_X + int(x / SPACE_W * MAP_W) + py = MAP_Y + MAP_H - int(y / SPACE_H * MAP_H) + px = max(MAP_X, min(MAP_X + MAP_W, px)) + py = max(MAP_Y, min(MAP_Y + MAP_H, py)) + return px, py + + +# ============================================================================= +# === BEACON MODE ============================================================= +# ============================================================================= + + +async def beacon_ble_task(beacon_name): + """Advertise as Beacon_Mx until stop_flag is set.""" + adv_payload = build_adv_payload(beacon_name) + print(f"Beacon mode: advertising as {beacon_name}") + while not stop_flag: + try: + await aioble.advertise( + interval_us=ADV_INTERVAL_US, + adv_data=adv_payload, + connectable=False, + timeout_ms=ADV_TIMEOUT_MS, + ) + except asyncio.TimeoutError: + pass + + +async def beacon_display_task(beacon_name): + """Show beacon status on OLED until stop_flag is set.""" + while not stop_flag: + screen.clear() + screen.title("BEACON") + screen.subtitle(beacon_name, "Broadcasting...", "LEFT: menu") + screen.show() + await asyncio.sleep_ms(1000) + + +async def beacon_button_task(): + """Watch for LEFT button to exit beacon mode.""" + global stop_flag + while not stop_flag: + if is_pressed(MCP23009_BTN_LEFT): + wait_released(MCP23009_BTN_LEFT) + stop_flag = True + return + await asyncio.sleep_ms(100) + + +async def run_beacon(beacon_name): + """Run beacon mode until LEFT is pressed.""" + await asyncio.gather( + beacon_ble_task(beacon_name), + beacon_display_task(beacon_name), + beacon_button_task(), + ) + + +# ============================================================================= +# === MOBILE MODE ============================================================= +# ============================================================================= + + +async def mobile_ble_task(): + """Scan for all beacons and update smoothed RSSI.""" + global current_rssi + while not stop_flag: + async with aioble.scan( + SCAN_DURATION_MS, + interval_us=10000, + window_us=10000, + active=True, + ) as scanner: + async for result in scanner: + name = result.name() + if name in BEACON_NAMES: + current_rssi[name] = smooth_rssi(name, result.rssi) + + +async def mobile_display_task(): + """Compute trilateration, filter position and display live map.""" + global estimated_pos, stop_flag + + d = screen._d + + while not stop_flag: + if is_pressed(MCP23009_BTN_LEFT): + wait_released(MCP23009_BTN_LEFT) + stop_flag = True + break + + # Compute distances from smoothed RSSI + distances = { + name: rssi_to_distance(current_rssi[name], name) + for name in BEACON_NAMES + if name in current_rssi + } + + # Trilaterate and filter + if len(distances) == 3: + raw_pos = trilaterate(distances) + if raw_pos: + estimated_pos = apply_filter(raw_pos) + + # Draw map + screen.clear() + screen._rect(MAP_X, MAP_Y, MAP_W, MAP_H, DARK) + + # Draw beacon markers with labels + for name, (cx, cy) in BEACON_POS.items(): + px, py = world_to_screen(cx, cy) + screen._fill_rect(px - 3, py - 3, 6, 6, LIGHT) + short = name[-2:] + lx = px - 8 + ly = py - 14 if py > MAP_Y + 20 else py + 8 + d.text(short, lx, ly, GRAY) + + # Draw filtered position + if estimated_pos: + ex = max(0, min(SPACE_W, estimated_pos[0])) + ey = max(0, min(SPACE_H, estimated_pos[1])) + px, py = world_to_screen(ex, ey) + screen._fill_circle(px, py, 3, WHITE) + + # Distances in subtitle + if distances: + parts = [ + f"{name[-2:]}:{int(distances[name])}cm" + for name in BEACON_NAMES + if name in distances + ] + screen.subtitle(*parts) + else: + screen.subtitle("Scanning...", "LEFT: menu") + + screen.show() + await asyncio.sleep_ms(200) + + +async def run_mobile(): + """Run mobile trilateration mode until LEFT is pressed.""" + await asyncio.gather( + mobile_ble_task(), + mobile_display_task(), + ) + + +# ============================================================================= +# === MAIN LOOP =============================================================== +# ============================================================================= + +while True: + current_rssi = {} + estimated_pos = None + filtered_pos = None + stop_flag = False + rssi_history = {name: [] for name in BEACON_NAMES} + + mode_idx = menu_select("SELECT MODE", ["MOBILE", "BEACON"]) + + if mode_idx == 1: + beacon_idx = menu_select("SELECT BEACON", BEACON_NAMES) + beacon_name = BEACON_NAMES[beacon_idx] + + screen.clear() + screen.title("BEACON") + screen.subtitle(beacon_name, "Starting...") + screen.show() + sleep_ms(500) + + asyncio.run(run_beacon(beacon_name)) + + screen.clear() + screen.title("BEACON STOPPED") + screen.subtitle("Returning to menu...") + screen.show() + sleep_ms(1000) + + else: + screen.clear() + screen.title("TRILATERATION") + screen.subtitle("Scanning beacons...", "LEFT: menu") + screen.show() + sleep_ms(500) + + asyncio.run(run_mobile()) + + screen.clear() + screen.title("STOPPED") + screen.subtitle("Returning to menu...") + screen.show() + sleep_ms(1000)