|
| 1 | +"""End-to-end Test for data-channel scenarios. |
| 2 | +
|
| 3 | +Covers one-to-one delivery, broadcasting to all, topic filtering, and targeted |
| 4 | +delivery via `destination_identities`. |
| 5 | +
|
| 6 | +Requires the following environment variables to run: |
| 7 | + LIVEKIT_URL |
| 8 | + LIVEKIT_API_KEY |
| 9 | + LIVEKIT_API_SECRET |
| 10 | +""" |
| 11 | + |
| 12 | +from __future__ import annotations |
| 13 | + |
| 14 | +import asyncio |
| 15 | +import os |
| 16 | +import uuid |
| 17 | +from typing import Callable, Optional |
| 18 | + |
| 19 | +import pytest |
| 20 | + |
| 21 | +from livekit import api, rtc |
| 22 | +from livekit.rtc.room import EventTypes |
| 23 | + |
| 24 | + |
| 25 | +WAIT_TIMEOUT = 20.0 |
| 26 | +WAIT_INTERVAL = 0.1 |
| 27 | + |
| 28 | + |
| 29 | +def skip_if_no_credentials(): |
| 30 | + required_vars = ["LIVEKIT_URL", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET"] |
| 31 | + missing = [var for var in required_vars if not os.getenv(var)] |
| 32 | + return pytest.mark.skipif( |
| 33 | + bool(missing), reason=f"Missing environment variables: {', '.join(missing)}" |
| 34 | + ) |
| 35 | + |
| 36 | + |
| 37 | +def create_token(identity: str, room_name: str) -> str: |
| 38 | + return ( |
| 39 | + api.AccessToken() |
| 40 | + .with_identity(identity) |
| 41 | + .with_name(identity) |
| 42 | + .with_grants( |
| 43 | + api.VideoGrants( |
| 44 | + room_join=True, |
| 45 | + room=room_name, |
| 46 | + ) |
| 47 | + ) |
| 48 | + .to_jwt() |
| 49 | + ) |
| 50 | + |
| 51 | + |
| 52 | +def unique_room_name(base: str) -> str: |
| 53 | + return f"{base}-{uuid.uuid4().hex[:8]}" |
| 54 | + |
| 55 | + |
| 56 | +async def _wait_until( |
| 57 | + predicate: Callable[[], bool], |
| 58 | + *, |
| 59 | + timeout: float = WAIT_TIMEOUT, |
| 60 | + interval: float = WAIT_INTERVAL, |
| 61 | + message: str = "condition not met", |
| 62 | +) -> None: |
| 63 | + loop = asyncio.get_event_loop() |
| 64 | + deadline = loop.time() + timeout |
| 65 | + while loop.time() < deadline: |
| 66 | + if predicate(): |
| 67 | + return |
| 68 | + await asyncio.sleep(interval) |
| 69 | + raise AssertionError(f"timeout waiting: {message}") |
| 70 | + |
| 71 | + |
| 72 | +async def _connect(room: rtc.Room, identity: str, room_name: str) -> str: |
| 73 | + token = create_token(identity, room_name) |
| 74 | + url = os.environ["LIVEKIT_URL"] |
| 75 | + await room.connect(url, token) |
| 76 | + return token |
| 77 | + |
| 78 | + |
| 79 | +async def _ensure_all_connected(rooms: list[rtc.Room]) -> None: |
| 80 | + await _wait_until( |
| 81 | + lambda: all(r.connection_state == rtc.ConnectionState.CONN_CONNECTED for r in rooms), |
| 82 | + message="not all rooms reached CONN_CONNECTED", |
| 83 | + ) |
| 84 | + |
| 85 | + |
| 86 | +async def _ensure_visible(observer: rtc.Room, identities: list[str]) -> None: |
| 87 | + """Wait until `observer` sees every identity in `identities` as a remote participant. |
| 88 | +
|
| 89 | + Targeted publishes resolve identities at publish time, so we must let the |
| 90 | + sender's room state catch up before sending.""" |
| 91 | + |
| 92 | + def _all_visible() -> bool: |
| 93 | + seen = {p.identity for p in observer.remote_participants.values()} |
| 94 | + return all(ident in seen for ident in identities) |
| 95 | + |
| 96 | + await _wait_until( |
| 97 | + _all_visible, |
| 98 | + message=f"not all identities visible to {observer.local_participant.identity}: {identities}", |
| 99 | + ) |
| 100 | + |
| 101 | + |
| 102 | +def _expect_event( |
| 103 | + room: rtc.Room, |
| 104 | + event: EventTypes, |
| 105 | + predicate: Optional[Callable[..., bool]] = None, |
| 106 | +) -> asyncio.Future: |
| 107 | + loop = asyncio.get_event_loop() |
| 108 | + fut: asyncio.Future = loop.create_future() |
| 109 | + |
| 110 | + def _on_event(*args, **kwargs) -> None: |
| 111 | + if fut.done(): |
| 112 | + return |
| 113 | + if predicate is None or predicate(*args, **kwargs): |
| 114 | + fut.set_result(args) |
| 115 | + |
| 116 | + room.on(event, _on_event) |
| 117 | + return fut |
| 118 | + |
| 119 | + |
| 120 | +async def _await_event(fut: asyncio.Future, timeout: float = WAIT_TIMEOUT) -> None: |
| 121 | + try: |
| 122 | + await asyncio.wait_for(fut, timeout=timeout) |
| 123 | + except asyncio.TimeoutError as e: |
| 124 | + raise AssertionError("timed out waiting for event") from e |
| 125 | + |
| 126 | + |
| 127 | +class _DataCollector: |
| 128 | + """Collects `data_received` packets matching `sender_identity` (when set).""" |
| 129 | + |
| 130 | + def __init__(self, room: rtc.Room, sender_identity: Optional[str] = None) -> None: |
| 131 | + self.packets: list[rtc.DataPacket] = [] |
| 132 | + self._sender_identity = sender_identity |
| 133 | + |
| 134 | + def _on_data(packet: rtc.DataPacket) -> None: |
| 135 | + if self._sender_identity is not None and ( |
| 136 | + packet.participant is None or packet.participant.identity != self._sender_identity |
| 137 | + ): |
| 138 | + return |
| 139 | + self.packets.append(packet) |
| 140 | + |
| 141 | + room.on("data_received", _on_data) |
| 142 | + |
| 143 | + def payloads(self) -> list[bytes]: |
| 144 | + return [p.data for p in self.packets] |
| 145 | + |
| 146 | + def topics(self) -> list[str | None]: |
| 147 | + return [p.topic for p in self.packets] |
| 148 | + |
| 149 | + |
| 150 | +async def _assert_no_data( |
| 151 | + room: rtc.Room, collector: _DataCollector, *, settle: float = 1.0 |
| 152 | +) -> None: |
| 153 | + """Give the server time to deliver, then assert nothing arrived.""" |
| 154 | + await asyncio.sleep(settle) |
| 155 | + assert collector.packets == [], ( |
| 156 | + f"{room.local_participant.identity} unexpectedly received " |
| 157 | + f"{len(collector.packets)} packet(s): {collector.payloads()}" |
| 158 | + ) |
| 159 | + |
| 160 | + |
| 161 | +@skip_if_no_credentials() |
| 162 | +@pytest.mark.asyncio |
| 163 | +async def test_data_one_to_one() -> None: |
| 164 | + """sender targets a single identity; only that identity receives.""" |
| 165 | + room_name = unique_room_name("py-dc-1to1") |
| 166 | + |
| 167 | + sender = rtc.Room() |
| 168 | + receiver = rtc.Room() |
| 169 | + bystander = rtc.Room() |
| 170 | + |
| 171 | + await _connect(sender, "sender", room_name) |
| 172 | + await _connect(receiver, "receiver", room_name) |
| 173 | + await _connect(bystander, "bystander", room_name) |
| 174 | + await _ensure_all_connected([sender, receiver, bystander]) |
| 175 | + await _ensure_visible(sender, ["receiver", "bystander"]) |
| 176 | + |
| 177 | + receiver_collector = _DataCollector(receiver, sender_identity="sender") |
| 178 | + bystander_collector = _DataCollector(bystander, sender_identity="sender") |
| 179 | + |
| 180 | + receiver_got = _expect_event( |
| 181 | + receiver, |
| 182 | + "data_received", |
| 183 | + predicate=lambda packet: ( |
| 184 | + packet.participant is not None and packet.participant.identity == "sender" |
| 185 | + ), |
| 186 | + ) |
| 187 | + |
| 188 | + payload = b"hello receiver" |
| 189 | + await sender.local_participant.publish_data(payload, destination_identities=["receiver"]) |
| 190 | + |
| 191 | + await _await_event(receiver_got) |
| 192 | + assert receiver_collector.payloads() == [payload] |
| 193 | + await _assert_no_data(bystander, bystander_collector) |
| 194 | + |
| 195 | + await asyncio.gather(sender.disconnect(), receiver.disconnect(), bystander.disconnect()) |
| 196 | + |
| 197 | + |
| 198 | +@skip_if_no_credentials() |
| 199 | +@pytest.mark.asyncio |
| 200 | +async def test_data_one_to_many_targeted() -> None: |
| 201 | + """sender targets a subset of identities; only that subset receives.""" |
| 202 | + room_name = unique_room_name("py-dc-1tomany") |
| 203 | + |
| 204 | + sender = rtc.Room() |
| 205 | + r1 = rtc.Room() |
| 206 | + r2 = rtc.Room() |
| 207 | + excluded = rtc.Room() |
| 208 | + |
| 209 | + await _connect(sender, "sender", room_name) |
| 210 | + await _connect(r1, "r1", room_name) |
| 211 | + await _connect(r2, "r2", room_name) |
| 212 | + await _connect(excluded, "excluded", room_name) |
| 213 | + await _ensure_all_connected([sender, r1, r2, excluded]) |
| 214 | + await _ensure_visible(sender, ["r1", "r2", "excluded"]) |
| 215 | + |
| 216 | + r1_collector = _DataCollector(r1, sender_identity="sender") |
| 217 | + r2_collector = _DataCollector(r2, sender_identity="sender") |
| 218 | + excluded_collector = _DataCollector(excluded, sender_identity="sender") |
| 219 | + |
| 220 | + r1_got = _expect_event( |
| 221 | + r1, |
| 222 | + "data_received", |
| 223 | + predicate=lambda packet: ( |
| 224 | + packet.participant is not None and packet.participant.identity == "sender" |
| 225 | + ), |
| 226 | + ) |
| 227 | + r2_got = _expect_event( |
| 228 | + r2, |
| 229 | + "data_received", |
| 230 | + predicate=lambda packet: ( |
| 231 | + packet.participant is not None and packet.participant.identity == "sender" |
| 232 | + ), |
| 233 | + ) |
| 234 | + |
| 235 | + payload = b"hello selected" |
| 236 | + await sender.local_participant.publish_data(payload, destination_identities=["r1", "r2"]) |
| 237 | + |
| 238 | + await asyncio.gather(_await_event(r1_got), _await_event(r2_got)) |
| 239 | + assert r1_collector.payloads() == [payload] |
| 240 | + assert r2_collector.payloads() == [payload] |
| 241 | + await _assert_no_data(excluded, excluded_collector) |
| 242 | + |
| 243 | + await asyncio.gather( |
| 244 | + sender.disconnect(), r1.disconnect(), r2.disconnect(), excluded.disconnect() |
| 245 | + ) |
| 246 | + |
| 247 | + |
| 248 | +@skip_if_no_credentials() |
| 249 | +@pytest.mark.asyncio |
| 250 | +async def test_data_broadcast() -> None: |
| 251 | + """Empty `destination_identities` broadcasts to every other participant.""" |
| 252 | + room_name = unique_room_name("py-dc-broadcast") |
| 253 | + |
| 254 | + sender = rtc.Room() |
| 255 | + receivers = [rtc.Room() for _ in range(3)] |
| 256 | + receiver_idents = [f"r{i}" for i in range(len(receivers))] |
| 257 | + |
| 258 | + await _connect(sender, "sender", room_name) |
| 259 | + for room, ident in zip(receivers, receiver_idents): |
| 260 | + await _connect(room, ident, room_name) |
| 261 | + await _ensure_all_connected([sender, *receivers]) |
| 262 | + await _ensure_visible(sender, receiver_idents) |
| 263 | + |
| 264 | + collectors = [_DataCollector(room, sender_identity="sender") for room in receivers] |
| 265 | + received_futures = [ |
| 266 | + _expect_event( |
| 267 | + room, |
| 268 | + "data_received", |
| 269 | + predicate=lambda packet: ( |
| 270 | + packet.participant is not None and packet.participant.identity == "sender" |
| 271 | + ), |
| 272 | + ) |
| 273 | + for room in receivers |
| 274 | + ] |
| 275 | + |
| 276 | + payload = b"hello everyone" |
| 277 | + await sender.local_participant.publish_data(payload) |
| 278 | + |
| 279 | + await asyncio.gather(*(_await_event(f) for f in received_futures)) |
| 280 | + for ident, collector in zip(receiver_idents, collectors): |
| 281 | + assert collector.payloads() == [payload], f"{ident} payloads mismatch" |
| 282 | + |
| 283 | + await asyncio.gather(sender.disconnect(), *(r.disconnect() for r in receivers)) |
| 284 | + |
| 285 | + |
| 286 | +@skip_if_no_credentials() |
| 287 | +@pytest.mark.asyncio |
| 288 | +async def test_data_topic_passthrough() -> None: |
| 289 | + """Topic field is preserved end-to-end and observable by every receiver.""" |
| 290 | + room_name = unique_room_name("py-dc-topic") |
| 291 | + |
| 292 | + sender = rtc.Room() |
| 293 | + r1 = rtc.Room() |
| 294 | + r2 = rtc.Room() |
| 295 | + |
| 296 | + await _connect(sender, "sender", room_name) |
| 297 | + await _connect(r1, "r1", room_name) |
| 298 | + await _connect(r2, "r2", room_name) |
| 299 | + await _ensure_all_connected([sender, r1, r2]) |
| 300 | + await _ensure_visible(sender, ["r1", "r2"]) |
| 301 | + |
| 302 | + r1_collector = _DataCollector(r1, sender_identity="sender") |
| 303 | + r2_collector = _DataCollector(r2, sender_identity="sender") |
| 304 | + |
| 305 | + # Send three messages: two on "chat", one on "telemetry". |
| 306 | + messages = [ |
| 307 | + (b"chat-1", "chat"), |
| 308 | + (b"telemetry-1", "telemetry"), |
| 309 | + (b"chat-2", "chat"), |
| 310 | + ] |
| 311 | + |
| 312 | + def _all_received(collector: _DataCollector) -> bool: |
| 313 | + return len(collector.packets) >= len(messages) |
| 314 | + |
| 315 | + for payload, topic in messages: |
| 316 | + await sender.local_participant.publish_data(payload, topic=topic) |
| 317 | + |
| 318 | + await _wait_until( |
| 319 | + lambda: _all_received(r1_collector) and _all_received(r2_collector), |
| 320 | + message="receivers did not get all topic messages", |
| 321 | + ) |
| 322 | + |
| 323 | + expected_pairs = [(payload, topic) for payload, topic in messages] |
| 324 | + for collector, ident in [(r1_collector, "r1"), (r2_collector, "r2")]: |
| 325 | + got = list(zip(collector.payloads(), collector.topics())) |
| 326 | + assert got == expected_pairs, f"{ident} mismatch: expected {expected_pairs}, got {got}" |
| 327 | + |
| 328 | + # Also verify `chat`-only filtering at the consumer side works as expected. |
| 329 | + chat_only_r1 = [p for p in r1_collector.packets if p.topic == "chat"] |
| 330 | + assert [p.data for p in chat_only_r1] == [b"chat-1", b"chat-2"] |
| 331 | + |
| 332 | + await asyncio.gather(sender.disconnect(), r1.disconnect(), r2.disconnect()) |
| 333 | + |
| 334 | + |
| 335 | +@skip_if_no_credentials() |
| 336 | +@pytest.mark.asyncio |
| 337 | +async def test_data_targeted_with_topic() -> None: |
| 338 | + """Targeted send carries the topic; non-targets receive nothing.""" |
| 339 | + room_name = unique_room_name("py-dc-targeted-topic") |
| 340 | + |
| 341 | + sender = rtc.Room() |
| 342 | + target = rtc.Room() |
| 343 | + other = rtc.Room() |
| 344 | + |
| 345 | + await _connect(sender, "sender", room_name) |
| 346 | + await _connect(target, "target", room_name) |
| 347 | + await _connect(other, "other", room_name) |
| 348 | + await _ensure_all_connected([sender, target, other]) |
| 349 | + await _ensure_visible(sender, ["target", "other"]) |
| 350 | + |
| 351 | + target_collector = _DataCollector(target, sender_identity="sender") |
| 352 | + other_collector = _DataCollector(other, sender_identity="sender") |
| 353 | + |
| 354 | + target_got = _expect_event( |
| 355 | + target, |
| 356 | + "data_received", |
| 357 | + predicate=lambda packet: ( |
| 358 | + packet.participant is not None and packet.participant.identity == "sender" |
| 359 | + ), |
| 360 | + ) |
| 361 | + |
| 362 | + payload = b"private ping" |
| 363 | + topic = "private" |
| 364 | + await sender.local_participant.publish_data( |
| 365 | + payload, destination_identities=["target"], topic=topic |
| 366 | + ) |
| 367 | + |
| 368 | + await _await_event(target_got) |
| 369 | + assert target_collector.payloads() == [payload] |
| 370 | + assert target_collector.topics() == [topic] |
| 371 | + await _assert_no_data(other, other_collector) |
| 372 | + |
| 373 | + await asyncio.gather(sender.disconnect(), target.disconnect(), other.disconnect()) |
0 commit comments