-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpybricks_virtual_ble.py
More file actions
57 lines (43 loc) · 1.51 KB
/
pybricks_virtual_ble.py
File metadata and controls
57 lines (43 loc) · 1.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
"""
An example showing a "Virtual Hub BLE radio" operating in Python.
"""
import asyncio
import random
from async_timer.pacemaker import TimerPacemaker
from pybricks import _common
from pb_ble import get_virtual_ble
from pb_ble.constants import ScanningMode
async def observe(vble: _common.BLE, observe_channel: int, interval: float = 1.0):
"""
Coroutine that polls and prints broadcasting data
on the given interval.
"""
async for _ in TimerPacemaker(delay=interval):
data = vble.observe(observe_channel)
rssi = vble.signal_strength(observe_channel)
if data:
print(f"Observation: '{data!r}' [{rssi} dBm]")
async def broadcast(vble: _common.BLE, interval: float = 10.0):
"""
Coroutine that broadcasts a new random number
on the given interval.
"""
async for _ in TimerPacemaker(delay=interval):
val = random.randint(0, 3)
await vble.broadcast(val)
print(f"Broadcasting: '{val}'")
async def main():
# observe config
scanning_mode: ScanningMode = "passive"
observe_channel = 0
# broadcast config
broadcast_channel = 1
async with await get_virtual_ble(
broadcast_channel=broadcast_channel,
observe_channels=[observe_channel],
scanning_mode=scanning_mode,
) as vble:
observe_task = asyncio.create_task(observe(vble, observe_channel))
broadcast_task = asyncio.create_task(broadcast(vble))
await asyncio.gather(observe_task, broadcast_task)
asyncio.run(main())