Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions getstream/video/rtc/connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,19 @@ def ws_client(self):
def ws_client(self, value):
self._ws_client = value

@property
def coordinator_ws(self) -> Optional[StreamAPIWS]:
"""The coordinator WebSocket receiving call-scoped events.

Available after connecting. Use ``ws.on()`` or ``ws.on_wildcard()``
to subscribe to events. Send custom events via ``call.send_call_event()``.

Returns None if not connected.
"""
if self._coordinator_ws_client and self._coordinator_ws_client._connected:
return self._coordinator_ws_client
return None

# Publisher / Subscriber peer-connection shortcuts
@property
def publisher_pc(self):
Expand Down
69 changes: 69 additions & 0 deletions tests/rtc/coordinator/test_custom_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
Integration tests for custom event pub/sub via coordinator WebSocket.

Tests the full round-trip: send a custom event via REST, receive it on the
coordinator WS through ConnectionManager.coordinator_ws property.

Requires Stream API credentials (STREAM_API_KEY, STREAM_API_SECRET).
"""

import asyncio
import uuid

import pytest
import pytest_asyncio

from getstream import AsyncStream
from getstream.models import CallRequest, UserRequest
from getstream.video import rtc
from getstream.video.rtc.connection_utils import ConnectionState
from tests.conftest import skip_on_rate_limit


@pytest_asyncio.fixture()
async def test_user(async_client: AsyncStream):
user_id = f"test-user-{uuid.uuid4()}"
await async_client.upsert_users(UserRequest(id=user_id))
yield user_id
try:
await async_client.delete_users(
user_ids=[user_id], user="hard", conversations="hard", messages="hard"
)
except Exception:
pass
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated


@pytest.mark.asyncio
@pytest.mark.integration
@skip_on_rate_limit
async def test_custom_event_round_trip(async_client: AsyncStream, test_user: str):
"""Send a custom event via REST and verify it arrives on coordinator_ws."""
call = async_client.video.call("default", str(uuid.uuid4()))
await call.get_or_create(data=CallRequest(created_by_id=test_user))

async with await rtc.join(call, test_user) as connection:
assert connection.connection_state == ConnectionState.JOINED

ws = connection.coordinator_ws
assert ws is not None

received_event = None
event_received = asyncio.Event()

@ws.on("custom")
def on_custom(event):
nonlocal received_event
received_event = event
event_received.set()

await call.send_call_event(
user_id=test_user,
custom={"type": "test_event", "payload": "hello from test"},
)

await asyncio.wait_for(event_received.wait(), timeout=10.0)

assert received_event is not None
custom_data = received_event.get("custom", {})
assert custom_data.get("type") == "test_event"
assert custom_data.get("payload") == "hello from test"
Loading