From fd4cc9f4fee67716d45194f1833ae7cda392cfb6 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 14 Apr 2026 00:02:48 +0400 Subject: [PATCH 1/8] feat: enable chat events on coordinator WebSocket The coordinator WS now subscribes to both video and chat products, allowing chat events (message.new, reactions, typing, etc.) to arrive on the same connection as video call events. No second WS needed. - Add "chat" to products in StreamAPIWS auth payload - Add watch_channels() in connection_utils (same pattern as watch_call) - ConnectionManager subscribes to messaging channel after watch_call - Expose coordinator_ws property for event listener registration --- getstream/video/rtc/connection_manager.py | 16 ++++++++++++++++ getstream/video/rtc/connection_utils.py | 14 ++++++++++++++ getstream/video/rtc/coordinator/ws.py | 2 +- 3 files changed, 31 insertions(+), 1 deletion(-) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index b8d992f1..91b942a3 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -24,6 +24,7 @@ connect_websocket, join_call, watch_call, + watch_channels, ) from getstream.video.rtc.coordinator.backoff import exp_backoff from getstream.video.rtc.track_util import ( @@ -294,6 +295,17 @@ async def _connect_coordinator_ws(self): self.call, self.user_id, self._coordinator_ws_client._client_id ) + with telemetry.start_as_current_span( + "watch-channels", + ): + if self.call.id is not None: + await watch_channels( + self.call, + self.user_id, + self._coordinator_ws_client._client_id, + [("messaging", self.call.id)], + ) + async def _connect_internal( self, region: Optional[str] = None, @@ -620,6 +632,10 @@ def ws_client(self): def ws_client(self, value): self._ws_client = value + @property + def coordinator_ws(self): + return self._coordinator_ws_client + # Publisher / Subscriber peer-connection shortcuts @property def publisher_pc(self): diff --git a/getstream/video/rtc/connection_utils.py b/getstream/video/rtc/connection_utils.py index 75036a14..d6d19a38 100644 --- a/getstream/video/rtc/connection_utils.py +++ b/getstream/video/rtc/connection_utils.py @@ -136,6 +136,20 @@ async def watch_call(call: Call, user_id: str, connection_id: str): ) +async def watch_channels(call: Call, user_id: str, connection_id: str, channels): + """Subscribe to chat channel events via the coordinator WS connection.""" + client = user_client(call, user_id) + cids = [f"{ch_type}:{ch_id}" for ch_type, ch_id in channels] + return await client.post( + "/api/v2/chat/channels", + json={ + "filter_conditions": {"cid": {"$in": cids}}, + "watch": True, + "connection_id": connection_id, + }, + ) + + async def join_call( call: Call, user_id: str, diff --git a/getstream/video/rtc/coordinator/ws.py b/getstream/video/rtc/coordinator/ws.py index 01b82099..00189943 100644 --- a/getstream/video/rtc/coordinator/ws.py +++ b/getstream/video/rtc/coordinator/ws.py @@ -117,7 +117,7 @@ async def _build_auth_payload(self) -> dict: ) payload = { "token": self.user_token, - "products": ["video"], + "products": ["video", "chat"], } # Include user_details if available (both for initial connection and reconnections) From c9a147122d0542f57840a8981cba096bda0318ce Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 14 Apr 2026 11:01:11 +0400 Subject: [PATCH 2/8] revert: remove watch_channels and chat products for now Chat event subscription needs discussion with the team -- JS SDK uses two separate WS connections for chat and video, and mixing them on one connection may affect MAU billing. Keep coordinator_ws property for custom events which already work via watch_call. --- getstream/video/rtc/connection_manager.py | 12 ------------ getstream/video/rtc/connection_utils.py | 14 -------------- getstream/video/rtc/coordinator/ws.py | 2 +- 3 files changed, 1 insertion(+), 27 deletions(-) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index 91b942a3..13e6fd28 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -24,7 +24,6 @@ connect_websocket, join_call, watch_call, - watch_channels, ) from getstream.video.rtc.coordinator.backoff import exp_backoff from getstream.video.rtc.track_util import ( @@ -295,17 +294,6 @@ async def _connect_coordinator_ws(self): self.call, self.user_id, self._coordinator_ws_client._client_id ) - with telemetry.start_as_current_span( - "watch-channels", - ): - if self.call.id is not None: - await watch_channels( - self.call, - self.user_id, - self._coordinator_ws_client._client_id, - [("messaging", self.call.id)], - ) - async def _connect_internal( self, region: Optional[str] = None, diff --git a/getstream/video/rtc/connection_utils.py b/getstream/video/rtc/connection_utils.py index d6d19a38..75036a14 100644 --- a/getstream/video/rtc/connection_utils.py +++ b/getstream/video/rtc/connection_utils.py @@ -136,20 +136,6 @@ async def watch_call(call: Call, user_id: str, connection_id: str): ) -async def watch_channels(call: Call, user_id: str, connection_id: str, channels): - """Subscribe to chat channel events via the coordinator WS connection.""" - client = user_client(call, user_id) - cids = [f"{ch_type}:{ch_id}" for ch_type, ch_id in channels] - return await client.post( - "/api/v2/chat/channels", - json={ - "filter_conditions": {"cid": {"$in": cids}}, - "watch": True, - "connection_id": connection_id, - }, - ) - - async def join_call( call: Call, user_id: str, diff --git a/getstream/video/rtc/coordinator/ws.py b/getstream/video/rtc/coordinator/ws.py index 00189943..01b82099 100644 --- a/getstream/video/rtc/coordinator/ws.py +++ b/getstream/video/rtc/coordinator/ws.py @@ -117,7 +117,7 @@ async def _build_auth_payload(self) -> dict: ) payload = { "token": self.user_token, - "products": ["video", "chat"], + "products": ["video"], } # Include user_details if available (both for initial connection and reconnections) From aa1be05f5143b2a2daad499421327897669aed54 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 14 Apr 2026 11:22:54 +0400 Subject: [PATCH 3/8] docs: add docstring to coordinator_ws with send/receive examples --- getstream/video/rtc/connection_manager.py | 25 +++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index 13e6fd28..19e4e6be 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -622,6 +622,31 @@ def ws_client(self, value): @property def coordinator_ws(self): + """The coordinator WebSocket client for real-time event pub/sub. + + After joining a call, the coordinator WS receives call-scoped events + via the ``watch_call`` subscription that happens automatically on connect. + + **Receiving events** -- use ``ws.on(event_type, handler)`` for specific + event types, or ``ws.on_wildcard(pattern, handler)`` for pattern matching:: + + ws = connection_manager.coordinator_ws + + @ws.on("custom") + def on_custom(event): + print(event["custom"]) + + **Sending custom events** -- use ``call.send_call_event()`` to broadcast + custom events to all participants watching the call:: + + await call.send_call_event( + user_id="agent", + custom={"type": "status_update", "status": "processing"}, + ) + + Returns: + StreamAPIWS instance, or None if not connected. + """ return self._coordinator_ws_client # Publisher / Subscriber peer-connection shortcuts From 810998f91ca9237c2a84c101d44d308b7dcec3aa Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 14 Apr 2026 11:26:47 +0400 Subject: [PATCH 4/8] docs: shrink coordinator_ws docstring, move examples to commit The docstring was too verbose for a property. Keeping it concise. Usage examples for reference: Receiving events: ws = connection_manager.coordinator_ws @ws.on("custom") def on_custom(event): print(event["custom"]) ws.on_wildcard("call.**", lambda event_type, event: print(event_type)) Sending custom events: await call.send_call_event( user_id="agent", custom={"type": "status_update", "status": "processing"}, ) Available event types (verified): - custom: user-defined events (turn detection, agent heartbeat, etc.) - call.session_participant_count_updated: participant count changes - health.check: keep-alive pings --- getstream/video/rtc/connection_manager.py | 26 ++++------------------- 1 file changed, 4 insertions(+), 22 deletions(-) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index 19e4e6be..439b3d39 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -622,30 +622,12 @@ def ws_client(self, value): @property def coordinator_ws(self): - """The coordinator WebSocket client for real-time event pub/sub. + """The coordinator WebSocket receiving call-scoped events. - After joining a call, the coordinator WS receives call-scoped events - via the ``watch_call`` subscription that happens automatically on connect. + Available after connecting. Use ``ws.on()`` or ``ws.on_wildcard()`` + to subscribe to events. Send custom events via ``call.send_call_event()``. - **Receiving events** -- use ``ws.on(event_type, handler)`` for specific - event types, or ``ws.on_wildcard(pattern, handler)`` for pattern matching:: - - ws = connection_manager.coordinator_ws - - @ws.on("custom") - def on_custom(event): - print(event["custom"]) - - **Sending custom events** -- use ``call.send_call_event()`` to broadcast - custom events to all participants watching the call:: - - await call.send_call_event( - user_id="agent", - custom={"type": "status_update", "status": "processing"}, - ) - - Returns: - StreamAPIWS instance, or None if not connected. + Returns None if not connected. """ return self._coordinator_ws_client From 2b3ba0bdce591517a118c11f48d2bd4023bebabc Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 14 Apr 2026 12:07:22 +0400 Subject: [PATCH 5/8] feat: add return type and connection check to coordinator_ws Return Optional[StreamAPIWS] and check _connected before returning, so callers get None instead of a disconnected client. --- getstream/video/rtc/connection_manager.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index 439b3d39..72caed91 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -621,7 +621,7 @@ def ws_client(self, value): self._ws_client = value @property - def coordinator_ws(self): + def coordinator_ws(self) -> Optional[StreamAPIWS]: """The coordinator WebSocket receiving call-scoped events. Available after connecting. Use ``ws.on()`` or ``ws.on_wildcard()`` @@ -629,7 +629,9 @@ def coordinator_ws(self): Returns None if not connected. """ - return self._coordinator_ws_client + if self._coordinator_ws_client and self._coordinator_ws_client._connected: + return self._coordinator_ws_client + return None # Publisher / Subscriber peer-connection shortcuts @property From c68730c76b8cf2caccb8a02e468a85a4c37ce8ed Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Thu, 16 Apr 2026 15:11:08 +0300 Subject: [PATCH 6/8] test: add integration test for custom event round-trip via coordinator_ws Verifies the full pub/sub cycle: send a custom event via REST (call.send_call_event), receive it on ConnectionManager.coordinator_ws through ws.on("custom") listener. Uses asyncio.Event for reliable timing instead of fixed sleep. --- tests/rtc/coordinator/test_custom_events.py | 69 +++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 tests/rtc/coordinator/test_custom_events.py diff --git a/tests/rtc/coordinator/test_custom_events.py b/tests/rtc/coordinator/test_custom_events.py new file mode 100644 index 00000000..b7247267 --- /dev/null +++ b/tests/rtc/coordinator/test_custom_events.py @@ -0,0 +1,69 @@ +""" +Integration tests for custom event pub/sub via coordinator WebSocket. + +Tests the full round-trip: send a custom event via REST, receive it on the +coordinator WS through ConnectionManager.coordinator_ws property. + +Requires Stream API credentials (STREAM_API_KEY, STREAM_API_SECRET). +""" + +import asyncio +import uuid + +import pytest +import pytest_asyncio + +from getstream import AsyncStream +from getstream.models import CallRequest, UserRequest +from getstream.video import rtc +from getstream.video.rtc.connection_utils import ConnectionState +from tests.conftest import skip_on_rate_limit + + +@pytest_asyncio.fixture() +async def test_user(async_client: AsyncStream): + user_id = f"test-user-{uuid.uuid4()}" + await async_client.upsert_users(UserRequest(id=user_id)) + yield user_id + try: + await async_client.delete_users( + user_ids=[user_id], user="hard", conversations="hard", messages="hard" + ) + except Exception: + pass + + +@pytest.mark.asyncio +@pytest.mark.integration +@skip_on_rate_limit +async def test_custom_event_round_trip(async_client: AsyncStream, test_user: str): + """Send a custom event via REST and verify it arrives on coordinator_ws.""" + call = async_client.video.call("default", str(uuid.uuid4())) + await call.get_or_create(data=CallRequest(created_by_id=test_user)) + + async with await rtc.join(call, test_user) as connection: + assert connection.connection_state == ConnectionState.JOINED + + ws = connection.coordinator_ws + assert ws is not None + + received_event = None + event_received = asyncio.Event() + + @ws.on("custom") + def on_custom(event): + nonlocal received_event + received_event = event + event_received.set() + + await call.send_call_event( + user_id=test_user, + custom={"type": "test_event", "payload": "hello from test"}, + ) + + await asyncio.wait_for(event_received.wait(), timeout=10.0) + + assert received_event is not None + custom_data = received_event.get("custom", {}) + assert custom_data.get("type") == "test_event" + assert custom_data.get("payload") == "hello from test" From f3f36d132c48413d51052eea3d99bcba1980b75f Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Thu, 16 Apr 2026 15:16:30 +0300 Subject: [PATCH 7/8] fix: log cleanup failures in test fixture instead of silently swallowing --- tests/rtc/coordinator/test_custom_events.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/rtc/coordinator/test_custom_events.py b/tests/rtc/coordinator/test_custom_events.py index b7247267..8557595d 100644 --- a/tests/rtc/coordinator/test_custom_events.py +++ b/tests/rtc/coordinator/test_custom_events.py @@ -8,6 +8,7 @@ """ import asyncio +import logging import uuid import pytest @@ -19,6 +20,8 @@ from getstream.video.rtc.connection_utils import ConnectionState from tests.conftest import skip_on_rate_limit +logger = logging.getLogger(__name__) + @pytest_asyncio.fixture() async def test_user(async_client: AsyncStream): @@ -30,7 +33,7 @@ async def test_user(async_client: AsyncStream): user_ids=[user_id], user="hard", conversations="hard", messages="hard" ) except Exception: - pass + logger.warning("Failed to clean up test user %s", user_id, exc_info=True) @pytest.mark.asyncio From b151b75a134ab1980bac21831b6aadc2d7717053 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 17 Apr 2026 11:09:42 +0300 Subject: [PATCH 8/8] test: use two participants in custom event test Sender creates the call and sends the event via REST, receiver joins via rtc.join and verifies the event arrives on coordinator_ws. Confirms cross-participant event delivery. --- tests/rtc/coordinator/test_custom_events.py | 27 ++++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/tests/rtc/coordinator/test_custom_events.py b/tests/rtc/coordinator/test_custom_events.py index 8557595d..2dc02fdb 100644 --- a/tests/rtc/coordinator/test_custom_events.py +++ b/tests/rtc/coordinator/test_custom_events.py @@ -24,27 +24,29 @@ @pytest_asyncio.fixture() -async def test_user(async_client: AsyncStream): - user_id = f"test-user-{uuid.uuid4()}" - await async_client.upsert_users(UserRequest(id=user_id)) - yield user_id +async def test_users(async_client: AsyncStream): + user_ids = [f"test-user-{uuid.uuid4()}" for _ in range(2)] + await async_client.upsert_users(*[UserRequest(id=uid) for uid in user_ids]) + yield user_ids try: await async_client.delete_users( - user_ids=[user_id], user="hard", conversations="hard", messages="hard" + user_ids=user_ids, user="hard", conversations="hard", messages="hard" ) except Exception: - logger.warning("Failed to clean up test user %s", user_id, exc_info=True) + logger.warning("Failed to clean up test users %s", user_ids, exc_info=True) @pytest.mark.asyncio @pytest.mark.integration @skip_on_rate_limit -async def test_custom_event_round_trip(async_client: AsyncStream, test_user: str): +async def test_custom_event_round_trip(async_client: AsyncStream, test_users: list): """Send a custom event via REST and verify it arrives on coordinator_ws.""" + sender, receiver = test_users + call = async_client.video.call("default", str(uuid.uuid4())) - await call.get_or_create(data=CallRequest(created_by_id=test_user)) + await call.get_or_create(data=CallRequest(created_by_id=sender)) - async with await rtc.join(call, test_user) as connection: + async with await rtc.join(call, receiver) as connection: assert connection.connection_state == ConnectionState.JOINED ws = connection.coordinator_ws @@ -60,8 +62,8 @@ def on_custom(event): event_received.set() await call.send_call_event( - user_id=test_user, - custom={"type": "test_event", "payload": "hello from test"}, + user_id=sender, + custom={"type": "test_event", "payload": "hello from sender"}, ) await asyncio.wait_for(event_received.wait(), timeout=10.0) @@ -69,4 +71,5 @@ def on_custom(event): assert received_event is not None custom_data = received_event.get("custom", {}) assert custom_data.get("type") == "test_event" - assert custom_data.get("payload") == "hello from test" + assert custom_data.get("payload") == "hello from sender" + assert received_event.get("user", {}).get("id") == sender