Skip to content

Commit a2a3ed9

Browse files
committed
feat(ble): Add BLE RSSI proximity detector example with OLED display.
1 parent bcc237a commit a2a3ed9

1 file changed

Lines changed: 219 additions & 0 deletions

File tree

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
"""BLE RSSI proximity detector example using aioble and SSD1327 OLED.
2+
3+
Two STeaMi boards run this same file:
4+
- Hold button UP during the 10-second startup window → BEACON mode
5+
- Release button UP → SCANNER mode (default)
6+
7+
The scanner reads the RSSI of the beacon and maps it to a proximity level
8+
displayed as a gauge on the round OLED. RSSI is averaged over N samples
9+
to reduce noise while keeping fast response.
10+
11+
Hardware:
12+
- 2 x STM32WB55 BLE radio
13+
- SSD1327 128x128 OLED display (round)
14+
- MCP23009E D-PAD (button UP used for mode selection)
15+
16+
Learning goals:
17+
- BLE scanning with aioble
18+
- RSSI as a proxy for distance
19+
- Signal averaging to reduce noise
20+
"""
21+
22+
import sys
23+
24+
sys.path.insert(0, "/remote")
25+
26+
import bluetooth
27+
import uasyncio as asyncio
28+
from time import sleep_ms
29+
30+
import aioble
31+
import ssd1327
32+
from machine import I2C, SPI, Pin
33+
from mcp23009e import MCP23009E
34+
from mcp23009e.const import (
35+
MCP23009_BTN_UP,
36+
MCP23009_DIR_INPUT,
37+
MCP23009_I2C_ADDR,
38+
MCP23009_LOGIC_LOW,
39+
MCP23009_PULLUP,
40+
)
41+
from steami_screen import GRAY, LIGHT, RED, Screen, SSD1327Display
42+
43+
# === BLE setup ===
44+
ble = bluetooth.BLE()
45+
ble.active(True)
46+
47+
mac_bytes = ble.config("mac")[1]
48+
mac_suffix = "".join(f"{b:02X}" for b in mac_bytes[-2:])
49+
DEVICE_NAME = f"STeaMi-{mac_suffix}"
50+
BEACON_NAME = "STeaMi-BEACON"
51+
52+
# === Display ===
53+
spi = SPI(1)
54+
dc = Pin("DATA_COMMAND_DISPLAY")
55+
res = Pin("RST_DISPLAY")
56+
cs = Pin("CS_DISPLAY")
57+
display = SSD1327Display(ssd1327.WS_OLED_128X128_SPI(spi, dc, res, cs))
58+
screen = Screen(display)
59+
60+
# === Buttons ===
61+
i2c = I2C(1)
62+
reset_expander = Pin("RST_EXPANDER", Pin.OUT)
63+
mcp = MCP23009E(i2c, address=MCP23009_I2C_ADDR, reset_pin=reset_expander)
64+
mcp.setup(MCP23009_BTN_UP, MCP23009_DIR_INPUT, pullup=MCP23009_PULLUP)
65+
66+
# === BLE parameters ===
67+
ADV_INTERVAL_US = 50_000 # 50ms beacon interval for fast detection
68+
ADV_TIMEOUT_MS = 100 # Short advertising window
69+
SCAN_DURATION_MS = 500 # Very short scan bursts for fast response
70+
RSSI_SAMPLES = 2 # Few samples for fast reaction
71+
RSSI_MIN = -90 # dBm considered far
72+
RSSI_MAX = -30 # dBm considered very close
73+
74+
# === Mode selection window ===
75+
MODE_SELECTION_S = 10
76+
77+
# === Shared state ===
78+
rssi_samples = []
79+
current_rssi = RSSI_MIN
80+
81+
82+
def build_adv_payload(name):
83+
"""Build a minimal BLE advertising payload with device name."""
84+
payload = bytearray()
85+
name_bytes = name.encode()
86+
payload += bytes((len(name_bytes) + 1, 0x09)) + name_bytes
87+
return payload
88+
89+
90+
def rssi_to_proximity(rssi):
91+
"""Map RSSI value to a 0-100 proximity percentage."""
92+
clamped = max(RSSI_MIN, min(RSSI_MAX, rssi))
93+
return int((clamped - RSSI_MIN) / (RSSI_MAX - RSSI_MIN) * 100)
94+
95+
96+
def rssi_to_color(proximity):
97+
"""Return display color based on proximity level."""
98+
if proximity > 70:
99+
return RED
100+
elif proximity > 40:
101+
return LIGHT
102+
else:
103+
return GRAY
104+
105+
106+
def select_mode():
107+
"""Show a 10-second countdown and detect button UP to select mode."""
108+
print("Hold UP button for BEACON mode, release for SCANNER mode.")
109+
for remaining in range(MODE_SELECTION_S, 0, -1):
110+
screen.clear()
111+
screen.title("SELECT MODE")
112+
screen.subtitle(
113+
"Hold UP: BEACON",
114+
"Release: SCANNER",
115+
f"Starting in {remaining}s...",
116+
)
117+
screen.show()
118+
sleep_ms(1000)
119+
if mcp.get_level(MCP23009_BTN_UP) == MCP23009_LOGIC_LOW:
120+
print("Button UP held -> BEACON mode")
121+
return True
122+
print("Button UP released -> SCANNER mode")
123+
return False
124+
125+
126+
# =============================================================================
127+
# === BEACON MODE =============================================================
128+
# =============================================================================
129+
130+
131+
async def beacon_ble_task():
132+
"""Advertise as STeaMi-BEACON continuously."""
133+
adv_payload = build_adv_payload(BEACON_NAME)
134+
print(f"Beacon mode: advertising as {BEACON_NAME}")
135+
while True:
136+
try:
137+
await aioble.advertise(
138+
interval_us=ADV_INTERVAL_US,
139+
adv_data=adv_payload,
140+
connectable=False,
141+
timeout_ms=ADV_TIMEOUT_MS,
142+
)
143+
except asyncio.TimeoutError:
144+
pass
145+
146+
147+
async def beacon_display_task():
148+
"""Show beacon status on OLED."""
149+
while True:
150+
screen.clear()
151+
screen.title("BEACON")
152+
screen.subtitle(BEACON_NAME, "Broadcasting...")
153+
screen.show()
154+
await asyncio.sleep_ms(1000)
155+
156+
157+
async def run_beacon():
158+
await asyncio.gather(
159+
beacon_ble_task(),
160+
beacon_display_task(),
161+
)
162+
163+
164+
# =============================================================================
165+
# === SCANNER MODE ============================================================
166+
# =============================================================================
167+
168+
169+
async def scanner_ble_task():
170+
global current_rssi
171+
display_counter = 0
172+
while True:
173+
async with aioble.scan(
174+
SCAN_DURATION_MS,
175+
interval_us=10000,
176+
window_us=10000,
177+
active=True,
178+
) as scanner:
179+
async for result in scanner:
180+
if result.name() == BEACON_NAME:
181+
current_rssi = result.rssi # Valeur brute directe
182+
print(f"RSSI: {current_rssi} dBm")
183+
display_counter += 1
184+
if display_counter >= 3:
185+
display_counter = 0
186+
proximity = rssi_to_proximity(current_rssi)
187+
color = rssi_to_color(proximity)
188+
screen.clear()
189+
screen.title("PROXIMITY")
190+
screen.gauge(proximity, min_val=0, max_val=100, color=color)
191+
screen.value(str(current_rssi), unit="dBm")
192+
screen.subtitle(f"{proximity}%")
193+
screen.show()
194+
195+
196+
async def run_scanner():
197+
await scanner_ble_task()
198+
199+
200+
# =============================================================================
201+
# === ENTRY POINT =============================================================
202+
# =============================================================================
203+
204+
is_beacon = select_mode()
205+
206+
if is_beacon:
207+
screen.clear()
208+
screen.title("BEACON MODE")
209+
screen.subtitle("Starting...")
210+
screen.show()
211+
sleep_ms(1000)
212+
asyncio.run(run_beacon())
213+
else:
214+
screen.clear()
215+
screen.title("SCANNER MODE")
216+
screen.subtitle("Starting...")
217+
screen.show()
218+
sleep_ms(1000)
219+
asyncio.run(run_scanner())

0 commit comments

Comments
 (0)