Skip to content

Commit 7be299a

Browse files
committed
fix(rtc): auto-reconnect signaling WebSocket on unexpected loss
`WebSocketClient` had no reconnect logic of its own — `_on_close`, `_on_error`, and the health-check timeout in `_ping_loop` only logged and stopped the connection. When the signaling WS dropped (transient TCP reset, missed health check, server-side close without an SFU error event), the session sat hanging until the frontend's own timeout fired a DELETE. Under concurrent load this is the dominant remaining cause of "the agent disconnected randomly" reports: even with the late-offer fix in place, a single brief WS blip is fatal because nothing kicks reconnection. - `signaling.py`: emit a `connection_lost` event with a reason string on (a) unexpected `_on_close`, (b) `_on_error` after the initial handshake completed, and (c) `_ping_loop` health-check timeout (notify *before* `self.close()` so the user-initiated guard does not suppress it). Idempotent via `_connection_lost_sent`. - `connection_manager.py`: subscribe to `connection_lost` on the WS client and route it into the existing `ReconnectionManager` (`ReconnectionStrategy.FAST`). The manager already handles strategy escalation, locking, and the disconnection-timeout deadline.
1 parent 2b1e111 commit 7be299a

2 files changed

Lines changed: 65 additions & 1 deletion

File tree

getstream/video/rtc/connection_manager.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from getstream.video.rtc.recording import RecordingManager
3737
from getstream.video.rtc.participants import ParticipantsState
3838
from getstream.video.rtc.tracks import SubscriptionConfig, SubscriptionManager
39-
from getstream.video.rtc.reconnection import ReconnectionManager
39+
from getstream.video.rtc.reconnection import ReconnectionManager, ReconnectionStrategy
4040
from getstream.video.rtc.peer_connection import PeerConnectionManager
4141
from getstream.video.rtc.models import JoinCallResponse
4242
from getstream.video.rtc.tracer import Tracer
@@ -266,6 +266,25 @@ async def _on_subscriber_offer(self, event: events_pb2.SubscriberOffer):
266266
finally:
267267
self.subscriber_negotiation_lock.release()
268268

269+
async def _on_signaling_connection_lost(self, reason):
270+
"""Reconnect when the signaling WebSocket drops unexpectedly.
271+
272+
The WebSocketClient itself only logs the error and stops; it has
273+
no reconnect of its own. This handler bridges that gap by routing
274+
the loss into the existing `ReconnectionManager`, so a transient
275+
TCP reset or a missed health check no longer means a dead session.
276+
"""
277+
if not self.running:
278+
return
279+
logger.warning(f"Signaling WS lost; triggering reconnect: {reason}")
280+
try:
281+
await self._reconnector.reconnect(
282+
strategy=ReconnectionStrategy.FAST,
283+
reason=f"signaling ws lost: {reason}",
284+
)
285+
except Exception:
286+
logger.exception("Reconnect after signaling WS loss failed")
287+
269288
async def _connect_coordinator_ws(self):
270289
"""
271290
Connects to the coordinator websocket and subscribes to events.
@@ -403,6 +422,15 @@ async def _connect_internal(
403422
# Connect subscriber offer event to handle SDP negotiation
404423
self._ws_client.on_event("subscriber_offer", self._on_subscriber_offer)
405424

425+
# Drive reconnection when the signaling WS drops outside of an
426+
# SFU-level error event (raw socket close, health-check timeout,
427+
# transport-level exceptions). Without this handler the
428+
# WebSocketClient just logs and stops; the session sits hanging
429+
# until the frontend times out and tears it down.
430+
self._ws_client.on_event(
431+
"connection_lost", self._on_signaling_connection_lost
432+
)
433+
406434
# Re-emit the events so they can be subscribed to on the ConnectionManager
407435
self._ws_client.on_wildcard("*", self.emit)
408436

getstream/video/rtc/signaling.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,43 @@ def _on_error(self, ws, error):
214214
error_event.error.error.message = str(error)
215215
self.first_message = error_event
216216
self.first_message_event.set()
217+
elif not self.closed:
218+
# Established connection failed unexpectedly — signal the owner
219+
# so it can drive reconnection.
220+
self._notify_connection_lost(f"error: {error}")
217221

218222
def _on_close(self, ws, close_status_code, close_msg):
219223
"""Handle WebSocket close event."""
220224
logger.debug(f"WebSocket connection closed: {close_status_code} {close_msg}")
225+
was_unexpected = not self.closed
221226
self.running = False
227+
if was_unexpected:
228+
self._notify_connection_lost(
229+
f"closed by remote (code={close_status_code} msg={close_msg})"
230+
)
231+
232+
def _notify_connection_lost(self, reason: str) -> None:
233+
"""Schedule a ``connection_lost`` emit on the main event loop.
234+
235+
Called from the worker thread (websocket-client callbacks run there,
236+
as does ``_ping_loop``). Owners that wire ``on_event("connection_lost",
237+
...)`` get a single notification per unexpected disconnect and can
238+
drive reconnection. Idempotent guard via ``self._connection_lost_sent``
239+
so a sequence like ``_on_error`` → ``_on_close`` only fires once.
240+
"""
241+
if getattr(self, "_connection_lost_sent", False):
242+
return
243+
self._connection_lost_sent = True
244+
try:
245+
asyncio.run_coroutine_threadsafe(
246+
self._emit_connection_lost(reason),
247+
self.main_loop,
248+
)
249+
except Exception:
250+
logger.exception("Failed to schedule connection_lost emit")
251+
252+
async def _emit_connection_lost(self, reason: str) -> None:
253+
self.emit("connection_lost", reason)
222254

223255
def _start_ping_handler(self):
224256
"""Start the ping mechanism in a background thread."""
@@ -242,6 +274,10 @@ def _ping_loop(self):
242274
current_time = time.time()
243275
if current_time - self.last_health_check_time > self.ping_interval * 2:
244276
logger.warning("Health check failed, closing connection")
277+
# Notify before close() so the owner can reconnect; close()
278+
# itself sets `self.closed=True` and would suppress the
279+
# notification in `_on_close`.
280+
self._notify_connection_lost("health check timeout")
245281
self.close()
246282
return
247283

0 commit comments

Comments
 (0)