Skip to content

Commit a81381f

Browse files
committed
fix(rtc): auto-reconnect signaling WebSocket on unexpected loss
`WebSocketClient` had no reconnect logic of its own — `_on_close`, `_on_error`, and the health-check timeout in `_ping_loop` only logged and stopped the connection. When the signaling WS dropped (transient TCP reset, missed health check, server-side close without an SFU error event), the session sat hanging until the frontend's own timeout fired a DELETE. Under concurrent load this is the dominant remaining cause of "the agent disconnected randomly" reports: even with the late-offer fix in place, a single brief WS blip is fatal because nothing kicks reconnection. - `signaling.py`: emit a `connection_lost` event with a reason string on (a) unexpected `_on_close`, (b) `_on_error` after the initial handshake completed, and (c) `_ping_loop` health-check timeout (notify *before* `self.close()` so the user-initiated guard does not suppress it). Idempotent via `_connection_lost_sent`. - `connection_manager.py`: subscribe to `connection_lost` on the WS client and route it into the existing `ReconnectionManager` (`ReconnectionStrategy.FAST`). The manager already handles strategy escalation, locking, and the disconnection-timeout deadline.
1 parent 8c880c5 commit a81381f

2 files changed

Lines changed: 65 additions & 1 deletion

File tree

getstream/video/rtc/connection_manager.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from getstream.video.rtc.recording import RecordingManager
3737
from getstream.video.rtc.participants import ParticipantsState
3838
from getstream.video.rtc.tracks import SubscriptionConfig, SubscriptionManager
39-
from getstream.video.rtc.reconnection import ReconnectionManager
39+
from getstream.video.rtc.reconnection import ReconnectionManager, ReconnectionStrategy
4040
from getstream.video.rtc.peer_connection import PeerConnectionManager
4141
from getstream.video.rtc.models import JoinCallResponse
4242
from getstream.video.rtc.tracer import Tracer
@@ -277,6 +277,25 @@ async def _on_subscriber_offer(self, event: events_pb2.SubscriberOffer):
277277
finally:
278278
self.subscriber_negotiation_lock.release()
279279

280+
async def _on_signaling_connection_lost(self, reason):
281+
"""Reconnect when the signaling WebSocket drops unexpectedly.
282+
283+
The WebSocketClient itself only logs the error and stops; it has
284+
no reconnect of its own. This handler bridges that gap by routing
285+
the loss into the existing `ReconnectionManager`, so a transient
286+
TCP reset or a missed health check no longer means a dead session.
287+
"""
288+
if not self.running:
289+
return
290+
logger.warning(f"Signaling WS lost; triggering reconnect: {reason}")
291+
try:
292+
await self._reconnector.reconnect(
293+
strategy=ReconnectionStrategy.FAST,
294+
reason=f"signaling ws lost: {reason}",
295+
)
296+
except Exception:
297+
logger.exception("Reconnect after signaling WS loss failed")
298+
280299
async def _connect_coordinator_ws(self):
281300
"""
282301
Connects to the coordinator websocket and subscribes to events.
@@ -414,6 +433,15 @@ async def _connect_internal(
414433
# Connect subscriber offer event to handle SDP negotiation
415434
self._ws_client.on_event("subscriber_offer", self._on_subscriber_offer)
416435

436+
# Drive reconnection when the signaling WS drops outside of an
437+
# SFU-level error event (raw socket close, health-check timeout,
438+
# transport-level exceptions). Without this handler the
439+
# WebSocketClient just logs and stops; the session sits hanging
440+
# until the frontend times out and tears it down.
441+
self._ws_client.on_event(
442+
"connection_lost", self._on_signaling_connection_lost
443+
)
444+
417445
# Re-emit the events so they can be subscribed to on the ConnectionManager
418446
self._ws_client.on_wildcard("*", self.emit)
419447

getstream/video/rtc/signaling.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,43 @@ def _on_error(self, ws, error):
214214
error_event.error.error.message = str(error)
215215
self.first_message = error_event
216216
self.first_message_event.set()
217+
elif not self.closed:
218+
# Established connection failed unexpectedly — signal the owner
219+
# so it can drive reconnection.
220+
self._notify_connection_lost(f"error: {error}")
217221

218222
def _on_close(self, ws, close_status_code, close_msg):
219223
"""Handle WebSocket close event."""
220224
logger.debug(f"WebSocket connection closed: {close_status_code} {close_msg}")
225+
was_unexpected = not self.closed
221226
self.running = False
227+
if was_unexpected:
228+
self._notify_connection_lost(
229+
f"closed by remote (code={close_status_code} msg={close_msg})"
230+
)
231+
232+
def _notify_connection_lost(self, reason: str) -> None:
233+
"""Schedule a ``connection_lost`` emit on the main event loop.
234+
235+
Called from the worker thread (websocket-client callbacks run there,
236+
as does ``_ping_loop``). Owners that wire ``on_event("connection_lost",
237+
...)`` get a single notification per unexpected disconnect and can
238+
drive reconnection. Idempotent guard via ``self._connection_lost_sent``
239+
so a sequence like ``_on_error`` → ``_on_close`` only fires once.
240+
"""
241+
if getattr(self, "_connection_lost_sent", False):
242+
return
243+
self._connection_lost_sent = True
244+
try:
245+
asyncio.run_coroutine_threadsafe(
246+
self._emit_connection_lost(reason),
247+
self.main_loop,
248+
)
249+
except Exception:
250+
logger.exception("Failed to schedule connection_lost emit")
251+
252+
async def _emit_connection_lost(self, reason: str) -> None:
253+
self.emit("connection_lost", reason)
222254

223255
def _start_ping_handler(self):
224256
"""Start the ping mechanism in a background thread."""
@@ -242,6 +274,10 @@ def _ping_loop(self):
242274
current_time = time.time()
243275
if current_time - self.last_health_check_time > self.ping_interval * 2:
244276
logger.warning("Health check failed, closing connection")
277+
# Notify before close() so the owner can reconnect; close()
278+
# itself sets `self.closed=True` and would suppress the
279+
# notification in `_on_close`.
280+
self._notify_connection_lost("health check timeout")
245281
self.close()
246282
return
247283

0 commit comments

Comments
 (0)