Skip to content

Commit c68730c

Browse files
committed
test: add integration test for custom event round-trip via coordinator_ws
Verifies the full pub/sub cycle: send a custom event via REST (call.send_call_event), receive it on ConnectionManager.coordinator_ws through ws.on("custom") listener. Uses asyncio.Event for reliable timing instead of fixed sleep.
1 parent 2b3ba0b commit c68730c

File tree

1 file changed

+69
-0
lines changed

1 file changed

+69
-0
lines changed
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""
2+
Integration tests for custom event pub/sub via coordinator WebSocket.
3+
4+
Tests the full round-trip: send a custom event via REST, receive it on the
5+
coordinator WS through ConnectionManager.coordinator_ws property.
6+
7+
Requires Stream API credentials (STREAM_API_KEY, STREAM_API_SECRET).
8+
"""
9+
10+
import asyncio
11+
import uuid
12+
13+
import pytest
14+
import pytest_asyncio
15+
16+
from getstream import AsyncStream
17+
from getstream.models import CallRequest, UserRequest
18+
from getstream.video import rtc
19+
from getstream.video.rtc.connection_utils import ConnectionState
20+
from tests.conftest import skip_on_rate_limit
21+
22+
23+
@pytest_asyncio.fixture()
24+
async def test_user(async_client: AsyncStream):
25+
user_id = f"test-user-{uuid.uuid4()}"
26+
await async_client.upsert_users(UserRequest(id=user_id))
27+
yield user_id
28+
try:
29+
await async_client.delete_users(
30+
user_ids=[user_id], user="hard", conversations="hard", messages="hard"
31+
)
32+
except Exception:
33+
pass
34+
35+
36+
@pytest.mark.asyncio
37+
@pytest.mark.integration
38+
@skip_on_rate_limit
39+
async def test_custom_event_round_trip(async_client: AsyncStream, test_user: str):
40+
"""Send a custom event via REST and verify it arrives on coordinator_ws."""
41+
call = async_client.video.call("default", str(uuid.uuid4()))
42+
await call.get_or_create(data=CallRequest(created_by_id=test_user))
43+
44+
async with await rtc.join(call, test_user) as connection:
45+
assert connection.connection_state == ConnectionState.JOINED
46+
47+
ws = connection.coordinator_ws
48+
assert ws is not None
49+
50+
received_event = None
51+
event_received = asyncio.Event()
52+
53+
@ws.on("custom")
54+
def on_custom(event):
55+
nonlocal received_event
56+
received_event = event
57+
event_received.set()
58+
59+
await call.send_call_event(
60+
user_id=test_user,
61+
custom={"type": "test_event", "payload": "hello from test"},
62+
)
63+
64+
await asyncio.wait_for(event_received.wait(), timeout=10.0)
65+
66+
assert received_event is not None
67+
custom_data = received_event.get("custom", {})
68+
assert custom_data.get("type") == "test_event"
69+
assert custom_data.get("payload") == "hello from test"

0 commit comments

Comments
 (0)