Skip to content

Commit ae9471a

Browse files
committed
Trying to optimise graph_myopod.py
1 parent 4d9bd43 commit ae9471a

1 file changed

Lines changed: 93 additions & 42 deletions

File tree

examples/graph_myopod.py

Lines changed: 93 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ def __init__(self):
9191
self.plot.setLabel('left', "EMG Reading")
9292
self.plot.setLabel('bottom', "Time (s)")
9393
self.plot.setXRange(-PLOT_DURATION_S, 0)
94+
# Enable performance optimisations
95+
self.plot.setClipToView(True) # Clip data to visible range
96+
self.plot.setDownsampling(mode='peak') # Enable downsampling
9497
# Curves will be managed dynamically later
9598

9699
# Add right y-axis for converted values (Keep for now, needs rework)
@@ -107,12 +110,12 @@ def __init__(self):
107110

108111
# --- Data Structures (Revised) ---
109112
self.discovered_devices = {} # address -> (BLEDevice, parsed_ad, rssi, last_seen)
110-
self.connected_devices = {} # address -> dict {client, myopod, data_deque, timestamp_deque, curve, colour, native_rate, conv_factor, streaming}
113+
self.connected_devices = {} # address -> dict {client, myopod, data_deque, timestamp_deque, curve, colour, native_rate, conv_factor, streaming, notification_queue}
111114

112115
# --- Timers (Keep relevant ones) ---
113116
self.plot_timer = QtCore.QTimer()
114117
self.plot_timer.timeout.connect(self.update_plot)
115-
self.plot_timer.start(100) # Increase interval to 100ms (10Hz update rate)
118+
self.plot_timer.start(100) # update frequency (100Hz)
116119

117120
self.scan_timer = QtCore.QTimer()
118121
self.scan_timer.timeout.connect(lambda: asyncio.create_task(self.background_scan()))
@@ -131,21 +134,46 @@ def log(self, msg):
131134
print(msg)
132135

133136
def update_plot(self):
134-
"""Updates all active plot curves with new data."""
135-
now = time.perf_counter() # Use a common 'now' for alignment if needed
136-
137-
for address, device_info in self.connected_devices.items():
138-
timestamps = device_info['timestamp_deque']
139-
data = device_info['data_deque']
140-
curve = device_info['curve']
141-
142-
if timestamps and curve:
143-
# Use the latest timestamp from *this* device for its relative x-axis
144-
last_ts = timestamps[-1]
145-
x_data = np.array([t - last_ts for t in timestamps])
146-
y_data = np.array(data)
137+
"""Updates all active plot curves with new data from the queues."""
138+
# self.log("[DEBUG] update_plot called") # Optional high-frequency log
139+
for address, device_info in list(self.connected_devices.items()):
140+
notification_queue = device_info.get('notification_queue')
141+
data_deque = device_info.get('data_deque')
142+
timestamp_deque = device_info.get('timestamp_deque')
143+
curve = device_info.get('curve')
144+
145+
if not notification_queue or not data_deque or not timestamp_deque or not curve:
146+
continue
147+
148+
# Process all pending notifications from the queue for this device
149+
packets_processed = 0
150+
# self.log(f"[DEBUG] Checking queue for {address}") # Optional high-frequency log
151+
while not notification_queue.empty():
152+
try:
153+
packet: StreamDataPacket = notification_queue.get_nowait()
154+
if packet and packet.data_points:
155+
current_time = time.perf_counter()
156+
num_points = len(packet.data_points)
157+
data_deque.extend(packet.data_points)
158+
timestamp_deque.extend([current_time] * num_points)
159+
packets_processed += 1
160+
except asyncio.QueueEmpty:
161+
break
162+
except Exception as e:
163+
self.log(f"[ERROR] Error processing packet from queue for {address}: {e}")
164+
break
165+
166+
if packets_processed > 0:
167+
self.log(f"[DEBUG] Processed {packets_processed} packets from queue for {address}")
168+
169+
# Update plot only if data exists
170+
# self.log(f"[DEBUG] Deque size for {address} before plot: {len(data_deque)}") # Optional high-frequency log
171+
if data_deque:
172+
last_ts = timestamp_deque[-1]
173+
x_data = [t - last_ts for t in timestamp_deque]
174+
y_data = list(data_deque)
147175
curve.setData(x_data, y_data)
148-
elif curve: # Clear curve if no data but curve exists
176+
elif curve:
149177
curve.setData([], [])
150178

151179
async def background_scan(self):
@@ -343,7 +371,7 @@ async def on_connect_selected_btn(self):
343371
self.update_device_list()
344372

345373
async def connect_device(self, address):
346-
"""Connects to a single device, sets up stream and plot curve."""
374+
"""Connects to a single device, sets up stream, plot curve, and notification queue."""
347375
if address not in self.discovered_devices:
348376
self.log(f"Error connecting to {address}: Not found in discovered devices.")
349377
return
@@ -357,11 +385,20 @@ async def connect_device(self, address):
357385
ble_device, _, _, _ = self.discovered_devices[address]
358386
self.log(f"Connecting to {address}...")
359387

388+
# --- Stop scanning timer if this is the first connection ---
389+
if not self.connected_devices: # Check before adding the new device
390+
self.log("First device connecting, stopping background scan.")
391+
self.scan_timer.stop()
392+
360393
try:
361394
client = BleakClient(ble_device)
362395
await client.connect()
363396
if not client.is_connected:
364397
self.log(f"Failed to connect to {address}")
398+
# Restart scanning if connection failed and no other devices are connected
399+
if not self.connected_devices:
400+
self.log("Connection failed, restarting background scan.")
401+
self.scan_timer.start(300)
365402
return
366403

367404
myopod = MyoPod(client)
@@ -372,7 +409,10 @@ async def connect_device(self, address):
372409
curve = self.plot.plot(pen=assigned_colour)
373410
self.log(f"Assigned colour {assigned_colour} to {address}")
374411

375-
# --- Store Connection Info ---
412+
# --- Create Notification Queue ---
413+
notification_queue = asyncio.Queue()
414+
415+
# --- Store Connection Info (including queue) ---
376416
device_info = {
377417
'client': client,
378418
'myopod': myopod,
@@ -382,7 +422,8 @@ async def connect_device(self, address):
382422
'colour': assigned_colour,
383423
'native_rate': SAMPLE_RATE_HZ, # Placeholder, updated later
384424
'conv_factor': None,
385-
'streaming': False
425+
'streaming': False,
426+
'notification_queue': notification_queue # Add the queue
386427
}
387428
self.connected_devices[address] = device_info
388429

@@ -400,7 +441,7 @@ async def connect_device(self, address):
400441
)
401442

402443
# --- Start Stream Notifications ---
403-
# Use partial to include address in handler callback
444+
# Handler now just puts packet onto the queue
404445
bound_handler = functools.partial(self.notification_handler, address)
405446
self.log(f"[{address}] Starting stream subscription...")
406447
await myopod.start_stream(bound_handler)
@@ -419,14 +460,25 @@ async def connect_device(self, address):
419460

420461
except BleakError as e:
421462
self.log(f"Connection to {address} failed (BleakError): {e}")
422-
# Cleanup potentially partially connected state
423-
await self.disconnect_device(address)
463+
# Ensure cleanup and potentially restart scanning
464+
await self.handle_connection_failure(address)
424465
except Exception as e:
425466
self.log(f"Error during connection to {address}: {e}")
426-
await self.disconnect_device(address)
467+
# Ensure cleanup and potentially restart scanning
468+
await self.handle_connection_failure(address)
427469
finally:
428-
# Update UI list regardless of success/failure
429-
self.update_device_list() # Update bolding/checkmark
470+
self.update_device_list()
471+
472+
async def handle_connection_failure(self, address):
473+
"""Handles cleanup after a connection attempt fails."""
474+
# Device might have been partially added to connected_devices if error occurred late
475+
if address in self.connected_devices:
476+
await self.disconnect_device(address) # Use existing disconnect logic for cleanup
477+
else:
478+
# If it wasn't even added, just check if scanning needs restarting
479+
if not self.connected_devices and not self.scan_timer.isActive():
480+
self.log("Connection failed, restarting background scan.")
481+
self.scan_timer.start(300)
430482

431483
async def disconnect_device(self, address):
432484
"""Disconnects a single device and cleans up resources."""
@@ -467,27 +519,26 @@ async def disconnect_device(self, address):
467519
client = None
468520

469521
self.log(f"Disconnect of {address} complete.")
522+
# Restart scanning timer if this was the last connected device
523+
if not self.connected_devices and not self.scan_timer.isActive():
524+
self.log("Last device disconnected, restarting background scan.")
525+
self.scan_timer.start(300)
526+
470527
# Update UI list after disconnect is fully processed
471528
self.update_device_list()
472529

473530
def notification_handler(self, address, packet: StreamDataPacket):
474-
"""Handles incoming parsed packets and routes data to the correct deques."""
475-
if address in self.connected_devices:
476-
device_info = self.connected_devices[address]
477-
data_deque = device_info['data_deque']
478-
timestamp_deque = device_info['timestamp_deque']
479-
480-
if packet.data_points:
481-
# For simplicity, assume packets arrive roughly in order
482-
# and use perf_counter for local timing.
483-
# A more robust solution might use packet timestamps if available/reliable.
484-
current_time = time.perf_counter()
485-
for value in packet.data_points:
486-
data_deque.append(value)
487-
timestamp_deque.append(current_time)
488-
# Note: If multiple points are in one packet, they get the same timestamp here.
489-
else:
490-
self.log(f"[WARN] Received notification for unknown or disconnected device: {address}")
531+
"""Minimal handler: Puts received packet onto the device's queue."""
532+
try:
533+
if address in self.connected_devices:
534+
queue = self.connected_devices[address].get('notification_queue')
535+
if queue:
536+
queue.put_nowait(packet)
537+
# self.log(f"[DEBUG] Queued packet for {address} (Points: {len(packet.data_points) if packet else 0})") # Optional high-frequency log
538+
except asyncio.QueueFull:
539+
self.log(f"[WARN] Notification queue full for {address}. Discarding packet.")
540+
except Exception as e:
541+
self.log(f"[ERROR] Exception in notification_handler for {address}: {e}")
491542

492543
async def apply_global_stream_config(self):
493544
"""Applies the global stream configuration to all connected devices sequentially."""

0 commit comments

Comments
 (0)