|
1 | 1 | import asyncio |
| 2 | +import contextlib |
2 | 3 | import redis.asyncio as redis |
3 | 4 | import websockets |
4 | 5 | import os |
@@ -277,31 +278,57 @@ async def direct_queue_listener(queue: asyncio.Queue): |
277 | 278 |
|
278 | 279 |
|
279 | 280 | async def redis_listener(): |
280 | | - """Listens to Redis and broadcasts to all WS clients.""" |
281 | | - try: |
282 | | - r = redis.from_url(REDIS_URL) |
283 | | - pubsub = r.pubsub() |
284 | | - await pubsub.subscribe(REDIS_CHANNEL, REDIS_STATS_CHANNEL, REDIS_DIAG_CHANNEL) |
285 | | - logger.info(f"Subscribed to Redis channels: {REDIS_CHANNEL}, {REDIS_STATS_CHANNEL}, {REDIS_DIAG_CHANNEL}") |
| 281 | + """Listens to Redis and broadcasts to all WS clients. |
| 282 | +
|
| 283 | + Wrapped in a reconnect loop. A dropped Redis pub/sub connection — an idle |
| 284 | + timeout, a transient blip on the Docker bridge network, or a Redis restart — |
| 285 | + must not silently kill the data feed. Without this loop the coroutine would |
| 286 | + exit on the first ConnectionError while the WebSocket server kept running: |
| 287 | + PECAN stays connected but never receives another frame, so the dashboard |
| 288 | + goes dead with no error visible anywhere. `health_check_interval` lets |
| 289 | + redis-py detect a half-open connection instead of blocking forever in |
| 290 | + listen(). |
| 291 | + """ |
| 292 | + backoff_min, backoff_max = 0.5, 10.0 |
| 293 | + delay = backoff_min |
286 | 294 |
|
287 | | - async for message in pubsub.listen(): |
| 295 | + while not shutdown_event.is_set(): |
| 296 | + r = None |
| 297 | + try: |
| 298 | + r = redis.from_url(REDIS_URL, health_check_interval=30) |
| 299 | + pubsub = r.pubsub() |
| 300 | + await pubsub.subscribe(REDIS_CHANNEL, REDIS_STATS_CHANNEL, REDIS_DIAG_CHANNEL) |
| 301 | + logger.info(f"Subscribed to Redis channels: {REDIS_CHANNEL}, {REDIS_STATS_CHANNEL}, {REDIS_DIAG_CHANNEL}") |
| 302 | + delay = backoff_min # reset backoff once a subscribe succeeds |
| 303 | + |
| 304 | + async for message in pubsub.listen(): |
| 305 | + if shutdown_event.is_set(): |
| 306 | + break |
| 307 | + |
| 308 | + if message['type'] == 'message': |
| 309 | + data = redis_utils.decode_message(message['data']) |
| 310 | + |
| 311 | + # Broadcast to all connected clients |
| 312 | + if connected_clients: |
| 313 | + # Create tasks for sending to each client to avoid blocking |
| 314 | + await asyncio.gather( |
| 315 | + *[client.send(data) for client in connected_clients], |
| 316 | + return_exceptions=True |
| 317 | + ) |
| 318 | + except asyncio.CancelledError: |
| 319 | + raise |
| 320 | + except Exception as e: |
288 | 321 | if shutdown_event.is_set(): |
289 | 322 | break |
290 | | - |
291 | | - if message['type'] == 'message': |
292 | | - data = redis_utils.decode_message(message['data']) |
293 | | - |
294 | | - # Broadcast to all connected clients |
295 | | - if connected_clients: |
296 | | - # Create tasks for sending to each client to avoid blocking |
297 | | - await asyncio.gather( |
298 | | - *[client.send(data) for client in connected_clients], |
299 | | - return_exceptions=True |
300 | | - ) |
301 | | - except Exception as e: |
302 | | - logger.error(f"Redis error: {e}") |
303 | | - finally: |
304 | | - logger.info("Redis listener stopping...") |
| 323 | + logger.error(f"Redis listener error: {e} — reconnecting in {delay:.1f}s") |
| 324 | + await asyncio.sleep(delay) |
| 325 | + delay = min(delay * 2, backoff_max) |
| 326 | + finally: |
| 327 | + if r is not None: |
| 328 | + with contextlib.suppress(Exception): |
| 329 | + await r.aclose() |
| 330 | + |
| 331 | + logger.info("Redis listener stopping...") |
305 | 332 |
|
306 | 333 |
|
307 | 334 | async def _handle_client_message(websocket, raw: str, redis_client): |
|
0 commit comments