Skip to content

Commit 44442be

Browse files
committed
fix(rtc): auto-reconnect signaling WebSocket on unexpected loss
`WebSocketClient` had no reconnect logic of its own — `_on_close`, `_on_error`, and the health-check timeout in `_ping_loop` only logged and stopped the connection. When the signaling WS dropped (transient TCP reset, missed health check, server-side close without an SFU error event), the session sat hanging until the frontend's own timeout fired a DELETE. Under concurrent load this is the dominant remaining cause of "the agent disconnected randomly" reports: even with the late-offer fix in place, a single brief WS blip is fatal because nothing kicks reconnection. - `signaling.py`: emit a `connection_lost` event with a reason string on (a) unexpected `_on_close`, (b) `_on_error` after the initial handshake completed, and (c) `_ping_loop` health-check timeout (notify *before* `self.close()` so the user-initiated guard does not suppress it). Idempotent via `_connection_lost_sent`. - `connection_manager.py`: subscribe to `connection_lost` on the WS client and route it into the existing `ReconnectionManager` (`ReconnectionStrategy.FAST`). The manager already handles strategy escalation, locking, and the disconnection-timeout deadline.
1 parent c71b551 commit 44442be

4 files changed

Lines changed: 139 additions & 1 deletion

File tree

getstream/video/rtc/connection_manager.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from getstream.video.rtc.recording import RecordingManager
3737
from getstream.video.rtc.participants import ParticipantsState
3838
from getstream.video.rtc.tracks import SubscriptionConfig, SubscriptionManager
39-
from getstream.video.rtc.reconnection import ReconnectionManager
39+
from getstream.video.rtc.reconnection import ReconnectionManager, ReconnectionStrategy
4040
from getstream.video.rtc.peer_connection import PeerConnectionManager
4141
from getstream.video.rtc.models import JoinCallResponse
4242
from getstream.video.rtc.tracer import Tracer
@@ -277,6 +277,25 @@ async def _on_subscriber_offer(self, event: events_pb2.SubscriberOffer):
277277
finally:
278278
self.subscriber_negotiation_lock.release()
279279

280+
async def _on_signaling_connection_lost(self, reason):
281+
"""Reconnect when the signaling WebSocket drops unexpectedly.
282+
283+
The WebSocketClient itself only logs the error and stops; it has
284+
no reconnect of its own. This handler bridges that gap by routing
285+
the loss into the existing `ReconnectionManager`, so a transient
286+
TCP reset or a missed health check no longer means a dead session.
287+
"""
288+
if not self.running:
289+
return
290+
logger.warning(f"Signaling WS lost; triggering reconnect: {reason}")
291+
try:
292+
await self._reconnector.reconnect(
293+
strategy=ReconnectionStrategy.FAST,
294+
reason=f"signaling ws lost: {reason}",
295+
)
296+
except Exception:
297+
logger.exception("Reconnect after signaling WS loss failed")
298+
280299
async def _connect_coordinator_ws(self):
281300
"""
282301
Connects to the coordinator websocket and subscribes to events.
@@ -414,6 +433,15 @@ async def _connect_internal(
414433
# Connect subscriber offer event to handle SDP negotiation
415434
self._ws_client.on_event("subscriber_offer", self._on_subscriber_offer)
416435

436+
# Drive reconnection when the signaling WS drops outside of an
437+
# SFU-level error event (raw socket close, health-check timeout,
438+
# transport-level exceptions). Without this handler the
439+
# WebSocketClient just logs and stops; the session sits hanging
440+
# until the frontend times out and tears it down.
441+
self._ws_client.on_event(
442+
"connection_lost", self._on_signaling_connection_lost
443+
)
444+
417445
# Re-emit the events so they can be subscribed to on the ConnectionManager
418446
self._ws_client.on_wildcard("*", self.emit)
419447

getstream/video/rtc/signaling.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ def __init__(
6767
self.thread = None
6868
self.running = False
6969
self.closed = False
70+
self._connection_lost_sent = False
7071

7172
# For ping/health check mechanism
7273
self.ping_thread = None
@@ -214,11 +215,42 @@ def _on_error(self, ws, error):
214215
error_event.error.error.message = str(error)
215216
self.first_message = error_event
216217
self.first_message_event.set()
218+
elif not self.closed:
219+
self._notify_connection_lost(f"error: {error}")
217220

218221
def _on_close(self, ws, close_status_code, close_msg):
219222
"""Handle WebSocket close event."""
220223
logger.debug(f"WebSocket connection closed: {close_status_code} {close_msg}")
224+
was_unexpected = not self.closed
221225
self.running = False
226+
if was_unexpected:
227+
self._notify_connection_lost(
228+
f"closed by remote (code={close_status_code} msg={close_msg})"
229+
)
230+
231+
def _notify_connection_lost(self, reason: str) -> None:
232+
"""Emit ``connection_lost`` once, hopping onto the main loop.
233+
234+
Callers run on the WS worker thread or ``_ping_loop`` thread; pyee
235+
schedules async listeners via ``loop.create_task``, which is not
236+
thread-safe. Same hop pattern as ``_on_message`` for SFU events.
237+
Idempotent so a chained ``_on_error`` → ``_on_close`` only fires
238+
one event.
239+
"""
240+
if self._connection_lost_sent:
241+
return
242+
self._connection_lost_sent = True
243+
try:
244+
asyncio.run_coroutine_threadsafe(
245+
self._emit_connection_lost(reason),
246+
self.main_loop,
247+
)
248+
except Exception:
249+
logger.exception("Failed to schedule connection_lost emit")
250+
251+
async def _emit_connection_lost(self, reason: str) -> None:
252+
with telemetry.attach_span(self.parent_span):
253+
self.emit("connection_lost", reason)
222254

223255
def _start_ping_handler(self):
224256
"""Start the ping mechanism in a background thread."""
@@ -242,6 +274,10 @@ def _ping_loop(self):
242274
current_time = time.time()
243275
if current_time - self.last_health_check_time > self.ping_interval * 2:
244276
logger.warning("Health check failed, closing connection")
277+
# Notify before close() so the owner can reconnect; close()
278+
# itself sets `self.closed=True` and would suppress the
279+
# notification in `_on_close`.
280+
self._notify_connection_lost("health check timeout")
245281
self.close()
246282
return
247283

tests/rtc/test_connection_manager.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
SfuJoinError,
1616
)
1717
from getstream.video.rtc.pb.stream.video.sfu.models import models_pb2
18+
from getstream.video.rtc.reconnection import ReconnectionStrategy
1819

1920
load_dotenv()
2021

@@ -203,3 +204,23 @@ def test_rejects_negative_max_join_retries(self):
203204
pytest.raises(ValueError, match="max_join_retries must be >= 0"),
204205
):
205206
ConnectionManager(call=MagicMock(), user_id="user1", max_join_retries=-1)
207+
208+
@pytest.mark.asyncio
209+
async def test_signaling_connection_lost_triggers_fast_reconnect(
210+
self, connection_manager
211+
):
212+
"""A signaling-WS `connection_lost` event drives a FAST reconnect.
213+
214+
Without this handler the session would sit hanging on a transient
215+
socket drop until the frontend tears it down.
216+
"""
217+
cm = connection_manager
218+
cm.running = True
219+
cm._reconnector.reconnect = AsyncMock()
220+
221+
await cm._on_signaling_connection_lost("health check timeout")
222+
223+
cm._reconnector.reconnect.assert_called_once()
224+
kwargs = cm._reconnector.reconnect.call_args.kwargs
225+
assert kwargs["strategy"] == ReconnectionStrategy.FAST
226+
assert "health check timeout" in kwargs["reason"]

tests/test_signaling.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,59 @@ async def test_thread_usage(self, join_request, mock_websocket):
295295
# Thread should be joined during close
296296
assert not client.running
297297

298+
@pytest.mark.asyncio
299+
async def test_connection_lost_emitted_on_unexpected_close(
300+
self, join_request, mock_websocket
301+
):
302+
"""An unexpected WS close after handshake emits a `connection_lost` event.
303+
304+
The owner (ConnectionManager) relies on this signal to drive
305+
reconnection — without it, a transient socket drop leaves the
306+
session hanging until the frontend times out.
307+
"""
308+
client = WebSocketClient(
309+
"wss://test.url", join_request, asyncio.get_running_loop()
310+
)
311+
312+
received: list[str] = []
313+
314+
async def on_lost(reason):
315+
received.append(reason)
316+
317+
client.on_event("connection_lost", on_lost)
318+
319+
# Complete handshake so we're past the initial connect phase.
320+
join_response = events_pb2.SfuEvent()
321+
join_response.join_response.reconnected = False
322+
323+
connect_task = asyncio.create_task(client.connect())
324+
await asyncio.sleep(0.1)
325+
326+
on_open_callback = mock_websocket.call_args[1]["on_open"]
327+
on_open_callback(mock_websocket.return_value)
328+
329+
on_message_callback = mock_websocket.call_args[1]["on_message"]
330+
on_message_callback(
331+
mock_websocket.return_value, join_response.SerializeToString()
332+
)
333+
await connect_task
334+
335+
# Simulate the remote dropping the connection (not user-initiated).
336+
on_close_callback = mock_websocket.call_args[1]["on_close"]
337+
on_close_callback(mock_websocket.return_value, 1006, "abnormal closure")
338+
339+
# Allow the threadsafe-scheduled emit to run on the loop.
340+
await asyncio.sleep(0.1)
341+
342+
assert len(received) == 1, (
343+
f"expected exactly one connection_lost event, got {received}"
344+
)
345+
assert "1006" in received[0], (
346+
f"reason should mention the close code, got {received[0]!r}"
347+
)
348+
349+
client.close()
350+
298351
@pytest.mark.asyncio
299352
async def test_on_open_traces_ws_open_and_join_request(
300353
self, join_request, mock_websocket

0 commit comments

Comments
 (0)