Skip to content

Commit f40656f

Browse files
authored
Add Redis heartbeat liveness probe so pubsub subscribers self-recover after car power-cycles (#75)
* Add heartbeat key and staleness threshold to config * Add async heartbeat writer module * Add heartbeat writer unit tests * Tighten heartbeat tests and align writer docstring with behavior * Add _pump_pubsub_with_heartbeat helper to TelemetryNode * Use heartbeat-aware pump in data.py uplink-relay pubsub * Declare uplink_seq nonlocal in data.py _relay; warn in plan * Use heartbeat-aware pump in websocket_bridge pubsub * Register heartbeat writer as an async task * Add end-to-end recovery test for stale-heartbeat reconnect * Tighten pump-recovery test: caplog assertion, more headroom * Measure pubsub liveness via a heartbeat channel and self-heal dead subscriptions Publish a heartbeat on the telemetry_heartbeat pubsub channel every second and have each subscriber measure liveness on the pubsub connection itself: if no message arrives for HEARTBEAT_STALE_S, tear down and re-subscribe. An out-of-band key check can't see a half-dead pubsub connection because regular commands use a different pool connection that redis-py silently reconnects. Wrap the uplink relay in a reconnect loop so the pump returning actually re-subscribes instead of exiting, and pass shutdown_event.is_set as should_stop so SIGTERM stops the bridge cleanly. Add the heartbeat tests to CI.
1 parent d33b0e6 commit f40656f

6 files changed

Lines changed: 316 additions & 55 deletions

File tree

.github/workflows/telemetry-ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ jobs:
166166
tests/test_page_lock.py \
167167
tests/test_stats_publisher.py \
168168
tests/test_status_server.py \
169+
tests/test_heartbeat.py \
169170
-v
170171
171172
# ── Integration tests: full docker-compose stack ─────────────────────────

universal-telemetry-software/src/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
REDIS_STATS_CHANNEL = "system_stats"
2323
REDIS_DIAG_CHANNEL = "link_diagnostics"
2424
REDIS_WS_CLIENTS_KEY = "websocket_bridge:clients"
25+
REDIS_HEARTBEAT_CHANNEL = "telemetry_heartbeat"
26+
HEARTBEAT_STALE_S = 5.0 # subscribers reconnect if no pubsub message arrives for this long
2527

2628
# ── Feature flags ─────────────────────────────────────────────────────────────
2729
ENABLE_UPLINK = os.getenv("ENABLE_UPLINK", "false").lower() == "true"

universal-telemetry-software/src/data.py

Lines changed: 57 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import contextlib
23
import socket
34
import struct
45
import time
@@ -14,9 +15,10 @@
1415
from src.config import (
1516
REMOTE_IP, UDP_PORT, TCP_PORT,
1617
REDIS_URL, REDIS_CAN_CHANNEL, REDIS_UPLINK_CHANNEL, ENABLE_UPLINK,
17-
REDIS_WS_CLIENTS_KEY,
18+
REDIS_WS_CLIENTS_KEY, REDIS_HEARTBEAT_CHANNEL,
1819
)
1920
from src import redis_utils, utils
21+
from src.heartbeat import pump_pubsub_with_heartbeat, run_heartbeat_writer
2022
from src.version import get_git_hash
2123

2224
BATCH_SIZE = 20
@@ -863,51 +865,63 @@ async def uplink_relay():
863865
uplink_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
864866
uplink_seq = 0
865867

866-
try:
867-
r = aioredis.from_url(REDIS_URL)
868-
pubsub = r.pubsub()
869-
await pubsub.subscribe(REDIS_UPLINK_CHANNEL)
870-
logger.info(f"Subscribed to Redis channel: {REDIS_UPLINK_CHANNEL}")
871-
872-
async for message in pubsub.listen():
873-
if message['type'] != 'message':
874-
continue
868+
async def _relay(msg):
869+
nonlocal uplink_seq
870+
if msg['type'] != 'message':
871+
return
872+
try:
873+
data = redis_utils.decode_message(msg['data'])
874+
uplink_msg = json.loads(data)
875875

876-
try:
877-
data = redis_utils.decode_message(message['data'])
878-
uplink_msg = json.loads(data)
876+
can_id = uplink_msg.get("canId")
877+
can_data = uplink_msg.get("data", [])
878+
ref = uplink_msg.get("ref", "unknown")
879879

880-
can_id = uplink_msg.get("canId")
881-
can_data = uplink_msg.get("data", [])
882-
ref = uplink_msg.get("ref", "unknown")
880+
if can_id is None or not isinstance(can_id, int) or can_id < 0:
881+
logger.warning(f"Uplink relay: invalid canId in ref={ref}")
882+
return
883+
if not isinstance(can_data, list) or len(can_data) < 1 or len(can_data) > 8:
884+
logger.warning(f"Uplink relay: invalid data in ref={ref}")
885+
return
883886

884-
if can_id is None or not isinstance(can_id, int) or can_id < 0:
885-
logger.warning(f"Uplink relay: invalid canId in ref={ref}")
886-
continue
887-
if not isinstance(can_data, list) or len(can_data) < 1 or len(can_data) > 8:
888-
logger.warning(f"Uplink relay: invalid data in ref={ref}")
889-
continue
887+
# Pack as uplink UDP packet: 0xCAFE + seq + count(1) + CAN message
888+
uplink_seq += 1
889+
data_bytes = bytes(can_data) + b'\x00' * (8 - len(can_data))
890+
can_msg = CANMessage(time.time(), can_id, data_bytes)
890891

891-
# Pack as uplink UDP packet: 0xCAFE + seq + count(1) + CAN message
892-
uplink_seq += 1
893-
data_bytes = bytes(can_data) + b'\x00' * (8 - len(can_data))
894-
can_msg = CANMessage(time.time(), can_id, data_bytes)
892+
payload = UPLINK_MAGIC
893+
payload += struct.pack("!QH", uplink_seq, 1)
894+
payload += can_msg.pack()
895895

896-
payload = UPLINK_MAGIC
897-
payload += struct.pack("!QH", uplink_seq, 1)
898-
payload += can_msg.pack()
896+
try:
897+
uplink_sock.sendto(payload, (REMOTE_IP, UDP_PORT))
898+
logger.info(f"Uplink relayed to car: canId={can_id} ref={ref} seq={uplink_seq}")
899+
except (PermissionError, OSError) as e:
900+
logger.error(f"Uplink UDP send failed: {e}")
899901

900-
try:
901-
uplink_sock.sendto(payload, (REMOTE_IP, UDP_PORT))
902-
logger.info(f"Uplink relayed to car: canId={can_id} ref={ref} seq={uplink_seq}")
903-
except (PermissionError, OSError) as e:
904-
logger.error(f"Uplink UDP send failed: {e}")
902+
except Exception as e:
903+
logger.error(f"Uplink relay error: {e}")
905904

905+
# Reconnect loop: the pump returns whenever the pubsub connection
906+
# goes silent past HEARTBEAT_STALE_S, and we re-subscribe here.
907+
try:
908+
while True:
909+
r = None
910+
try:
911+
r = aioredis.from_url(REDIS_URL)
912+
pubsub = r.pubsub()
913+
await pubsub.subscribe(REDIS_UPLINK_CHANNEL, REDIS_HEARTBEAT_CHANNEL)
914+
logger.info(f"Subscribed to Redis channels: {REDIS_UPLINK_CHANNEL}, {REDIS_HEARTBEAT_CHANNEL}")
915+
await pump_pubsub_with_heartbeat(pubsub, _relay, log=logger)
916+
except asyncio.CancelledError:
917+
raise
906918
except Exception as e:
907-
logger.error(f"Uplink relay error: {e}")
908-
909-
except Exception as e:
910-
logger.error(f"Uplink relay Redis error: {e}")
919+
logger.error(f"Uplink relay Redis error: {e}")
920+
await asyncio.sleep(1.0)
921+
finally:
922+
if r is not None:
923+
with contextlib.suppress(Exception):
924+
await r.aclose()
911925
finally:
912926
uplink_sock.close()
913927

@@ -983,7 +997,11 @@ async def version_checker():
983997
logger.debug(f"Version check error: {e}")
984998
await asyncio.sleep(30.0)
985999

986-
tasks = [udp_receiver(), missing_reporter(), stats_publisher(), raw_csv_logger(), car_time_injector(), version_checker(), utils.heartbeat_coro(self.telemetry_event)]
1000+
# Base mode has a real Redis server; create an async client for the
1001+
# heartbeat writer. The writer publishes on the heartbeat channel every
1002+
# 1s so pubsub subscribers can detect a dead subscription and reconnect.
1003+
_async_redis = aioredis.from_url(REDIS_URL)
1004+
tasks = [udp_receiver(), missing_reporter(), stats_publisher(), raw_csv_logger(), car_time_injector(), version_checker(), utils.heartbeat_coro(self.telemetry_event), run_heartbeat_writer(_async_redis)]
9871005
if ENABLE_UPLINK:
9881006
tasks.append(uplink_relay())
9891007
await asyncio.gather(*tasks)
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
"""
2+
Pubsub liveness probe — the producer publishes a heartbeat message on a Redis
3+
pubsub channel every second, and every subscriber also subscribes to that
4+
channel. Liveness is measured on the pubsub connection itself: if no message
5+
of any kind (heartbeat or data) arrives for HEARTBEAT_STALE_S, the connection
6+
is presumed half-dead and the subscriber tears down and re-subscribes.
7+
8+
An out-of-band check (e.g. GET on a heartbeat key) cannot detect this state:
9+
regular commands use a different pool connection that redis-py transparently
10+
reconnects, so the key would look fresh while the pubsub connection is dark.
11+
See docs/superpowers/plans/2026-06-11-stack-resilience.md.
12+
"""
13+
import asyncio
14+
import json
15+
import logging
16+
import time
17+
18+
from .config import HEARTBEAT_STALE_S, REDIS_HEARTBEAT_CHANNEL
19+
20+
logger = logging.getLogger(__name__)
21+
22+
HEARTBEAT_INTERVAL_S = 1.0
23+
24+
25+
async def run_heartbeat_writer(redis_client=None) -> None:
26+
"""Publish a heartbeat message on REDIS_HEARTBEAT_CHANNEL every HEARTBEAT_INTERVAL_S.
27+
28+
Stops on cancellation. Logs and continues on any other exception so transient
29+
Redis blips don't take down the writer; if Redis is persistently down, the
30+
surrounding supervisor (systemd / Docker) is expected to restart the process.
31+
Skips silently if no Redis client is available (car mode without Redis).
32+
"""
33+
if redis_client is None:
34+
return
35+
start_mono = time.monotonic()
36+
while True:
37+
try:
38+
payload = json.dumps({"uptime_s": time.monotonic() - start_mono,
39+
"wall_ts": time.time()})
40+
await redis_client.publish(REDIS_HEARTBEAT_CHANNEL, payload)
41+
except asyncio.CancelledError:
42+
raise
43+
except Exception as e:
44+
logger.warning(f"Heartbeat publish failed: {e}")
45+
await asyncio.sleep(HEARTBEAT_INTERVAL_S)
46+
47+
48+
async def pump_pubsub_with_heartbeat(pubsub, on_message, *,
49+
stale_s: float = HEARTBEAT_STALE_S,
50+
should_stop=None,
51+
log=None):
52+
"""Drain a Redis pubsub, returning if the connection goes silent.
53+
54+
Replaces the naive `async for message in pubsub.listen():` pattern that
55+
silently goes dark when the TCP connection or subscription state is lost
56+
after a car power-cycle. The pubsub must be subscribed to
57+
REDIS_HEARTBEAT_CHANNEL in addition to its data channels; with the producer
58+
publishing every second, more than `stale_s` of silence means the
59+
subscription is dead, so the pump returns and the caller's outer loop
60+
re-subscribes. Heartbeat messages only refresh the liveness clock and are
61+
not forwarded to `on_message`.
62+
63+
`on_message` is an `async` callable invoked with the raw pubsub message
64+
dict for each non-heartbeat message. `should_stop` is an optional callable
65+
checked every iteration (pass `shutdown_event.is_set` for clean SIGTERM).
66+
Returns on staleness, pubsub errors, or should_stop; raises on cancellation.
67+
"""
68+
_log = log or logger
69+
last_msg_mono = time.monotonic() # armed at subscribe time
70+
while True:
71+
if should_stop is not None and should_stop():
72+
return
73+
try:
74+
msg = await pubsub.get_message(timeout=1.0, ignore_subscribe_messages=True)
75+
except asyncio.CancelledError:
76+
raise
77+
except Exception as e:
78+
_log.warning(f"pubsub.get_message error, reconnecting: {e}")
79+
await asyncio.sleep(0.5)
80+
return # let the outer while-True re-subscribe
81+
82+
if msg is not None:
83+
last_msg_mono = time.monotonic()
84+
channel = msg.get("channel")
85+
if isinstance(channel, bytes):
86+
channel = channel.decode("utf-8", errors="replace")
87+
if channel == REDIS_HEARTBEAT_CHANNEL:
88+
continue # liveness signal only — don't forward
89+
try:
90+
await on_message(msg)
91+
except Exception as e:
92+
_log.error(f"subscriber handler error: {e}")
93+
elif time.monotonic() - last_msg_mono > stale_s:
94+
_log.warning("heartbeat stale, forcing pubsub reconnect")
95+
return

universal-telemetry-software/src/websocket_bridge.py

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
REDIS_UPLINK_CHANNEL,
1616
REDIS_DIAG_CHANNEL,
1717
REDIS_WS_CLIENTS_KEY,
18+
REDIS_HEARTBEAT_CHANNEL,
1819
ENABLE_UPLINK,
1920
)
2021
from src import redis_utils, utils
22+
from src.heartbeat import pump_pubsub_with_heartbeat
2123

2224
logger = logging.getLogger("WebSocketBridge")
2325

@@ -297,24 +299,26 @@ async def redis_listener():
297299
try:
298300
r = redis.from_url(REDIS_URL, health_check_interval=30)
299301
pubsub = r.pubsub()
300-
await pubsub.subscribe(REDIS_CHANNEL, REDIS_STATS_CHANNEL, REDIS_DIAG_CHANNEL)
301-
logger.info(f"Subscribed to Redis channels: {REDIS_CHANNEL}, {REDIS_STATS_CHANNEL}, {REDIS_DIAG_CHANNEL}")
302+
await pubsub.subscribe(REDIS_CHANNEL, REDIS_STATS_CHANNEL, REDIS_DIAG_CHANNEL,
303+
REDIS_HEARTBEAT_CHANNEL)
304+
logger.info(f"Subscribed to Redis channels: {REDIS_CHANNEL}, {REDIS_STATS_CHANNEL}, {REDIS_DIAG_CHANNEL}, {REDIS_HEARTBEAT_CHANNEL}")
302305
delay = backoff_min # reset backoff once a subscribe succeeds
303306

304-
async for message in pubsub.listen():
305-
if shutdown_event.is_set():
306-
break
307-
308-
if message['type'] == 'message':
309-
data = redis_utils.decode_message(message['data'])
310-
311-
# Broadcast to all connected clients
312-
if connected_clients:
313-
# Create tasks for sending to each client to avoid blocking
314-
await asyncio.gather(
315-
*[client.send(data) for client in connected_clients],
316-
return_exceptions=True
317-
)
307+
async def _handler(msg):
308+
if msg['type'] != 'message':
309+
return
310+
data = redis_utils.decode_message(msg['data'])
311+
312+
# Broadcast to all connected clients
313+
if connected_clients:
314+
# Create tasks for sending to each client to avoid blocking
315+
await asyncio.gather(
316+
*[client.send(data) for client in connected_clients],
317+
return_exceptions=True
318+
)
319+
320+
await pump_pubsub_with_heartbeat(pubsub, _handler,
321+
should_stop=shutdown_event.is_set, log=logger)
318322
except asyncio.CancelledError:
319323
raise
320324
except Exception as e:

0 commit comments

Comments
 (0)