forked from micropython/micropython-lib
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtemp_sensor.py
More file actions
69 lines (51 loc) · 1.79 KB
/
temp_sensor.py
File metadata and controls
69 lines (51 loc) · 1.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import sys
# ruff: noqa: E402
sys.path.append("")
from micropython import const
import asyncio
import aioble
import bluetooth
import random
import struct
# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.temperature
_ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x2A6E)
# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
# How frequently to send advertising beacons.
_ADV_INTERVAL_US = 250_000
# Register GATT server.
temp_service = aioble.Service(_ENV_SENSE_UUID)
temp_characteristic = aioble.Characteristic(
temp_service, _ENV_SENSE_TEMP_UUID, read=True, notify=True
)
aioble.register_services(temp_service)
# Helper to encode the temperature characteristic encoding (sint16, hundredths of a degree).
def _encode_temperature(temp_deg_c):
return struct.pack("<h", int(temp_deg_c * 100))
# This would be periodically polling a hardware sensor.
async def sensor_task():
t = 24.5
while True:
temp_characteristic.write(_encode_temperature(t), send_update=True)
t += random.uniform(-0.5, 0.5)
await asyncio.sleep_ms(1000)
# Serially wait for connections. Don't advertise while a central is
# connected.
async def peripheral_task():
while True:
async with await aioble.advertise(
_ADV_INTERVAL_US,
name="mpy-temp",
services=[_ENV_SENSE_UUID],
appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER,
) as connection:
print("Connection from", connection.device)
await connection.disconnected(timeout_ms=None)
# Run both tasks.
async def main():
t1 = asyncio.create_task(sensor_task())
t2 = asyncio.create_task(peripheral_task())
await asyncio.gather(t1, t2)
asyncio.run(main())