Skip to content

Commit 777ed87

Browse files
committed
cleanup after track unpublish
1 parent 1c5e3d3 commit 777ed87

2 files changed

Lines changed: 71 additions & 36 deletions

File tree

examples/local_video/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,5 +62,6 @@ uv run --project examples/local_video python examples/local_video/subscriber.py
6262
```
6363

6464
Use `--participant py-cam` to only subscribe to video from a specific participant identity.
65+
The subscriber keeps running across unpublish/republish cycles and will attach to the next matching video track.
6566

6667
Press `q` in the video window or `Ctrl+C` in the terminal to exit.

examples/local_video/subscriber.py

Lines changed: 70 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import argparse
44
import asyncio
5+
from dataclasses import dataclass
56
from datetime import datetime, timezone
67
import logging
78
import os
@@ -23,6 +24,13 @@
2324
WINDOW_NAME = "livekit_video"
2425

2526

27+
@dataclass(frozen=True)
28+
class SubscribedVideoTrack:
29+
track: rtc.Track
30+
publication: rtc.RemoteTrackPublication
31+
participant: rtc.RemoteParticipant
32+
33+
2634
def parse_args() -> argparse.Namespace:
2735
parser = argparse.ArgumentParser(
2836
description="Subscribe to a LiveKit video track and optionally display packet metadata.",
@@ -181,28 +189,28 @@ def _window_is_open() -> bool:
181189
return False
182190

183191

184-
async def _wait_for_video_track(
185-
track_ready: asyncio.Event,
192+
async def _next_video_track(
193+
track_queue: asyncio.Queue[SubscribedVideoTrack],
186194
stop_event: asyncio.Event,
187-
) -> bool:
195+
) -> SubscribedVideoTrack | None:
188196
while not stop_event.is_set():
189197
try:
190-
await asyncio.wait_for(track_ready.wait(), timeout=0.5)
191-
return True
198+
return await asyncio.wait_for(track_queue.get(), timeout=0.5)
192199
except asyncio.TimeoutError:
193200
continue
194-
return False
201+
return None
195202

196203

197204
async def _render_video(
198205
video_stream: rtc.VideoStream,
199206
args: argparse.Namespace,
200207
stop_event: asyncio.Event,
208+
active_track_gone: asyncio.Event,
201209
) -> None:
202210
cv2.namedWindow(WINDOW_NAME, cv2.WINDOW_AUTOSIZE)
203211

204212
try:
205-
while not stop_event.is_set():
213+
while not stop_event.is_set() and not active_track_gone.is_set():
206214
try:
207215
frame_event = await asyncio.wait_for(video_stream.__anext__(), timeout=0.5)
208216
except asyncio.TimeoutError:
@@ -236,10 +244,9 @@ async def _render_video(
236244
async def run(args: argparse.Namespace, stop_event: asyncio.Event) -> None:
237245
url, api_key, api_secret = _require_connection(args)
238246
room = rtc.Room()
239-
track_ready = asyncio.Event()
240-
selected_track: rtc.Track | None = None
241-
selected_publication: rtc.RemoteTrackPublication | None = None
242-
selected_participant: rtc.RemoteParticipant | None = None
247+
track_queue: asyncio.Queue[SubscribedVideoTrack] = asyncio.Queue()
248+
active_publication_sid: str | None = None
249+
active_track_gone = asyncio.Event()
243250
video_stream: rtc.VideoStream | None = None
244251

245252
@room.on("track_subscribed")
@@ -248,9 +255,6 @@ def on_track_subscribed(
248255
publication: rtc.RemoteTrackPublication,
249256
participant: rtc.RemoteParticipant,
250257
) -> None:
251-
nonlocal selected_track, selected_publication, selected_participant
252-
if selected_track is not None:
253-
return
254258
if track.kind != rtc.TrackKind.KIND_VIDEO:
255259
return
256260
if args.participant and participant.identity != args.participant:
@@ -261,37 +265,67 @@ def on_track_subscribed(
261265
)
262266
return
263267

264-
selected_track = track
265-
selected_publication = publication
266-
selected_participant = participant
267-
track_ready.set()
268+
track_queue.put_nowait(
269+
SubscribedVideoTrack(
270+
track=track,
271+
publication=publication,
272+
participant=participant,
273+
)
274+
)
275+
276+
@room.on("track_unsubscribed")
277+
def on_track_unsubscribed(
278+
track: rtc.Track,
279+
publication: rtc.RemoteTrackPublication,
280+
participant: rtc.RemoteParticipant,
281+
) -> None:
282+
nonlocal active_publication_sid
283+
if publication.sid == active_publication_sid:
284+
logging.info("active video track unsubscribed: %s", publication.sid)
285+
active_track_gone.set()
286+
287+
@room.on("track_unpublished")
288+
def on_track_unpublished(
289+
publication: rtc.RemoteTrackPublication,
290+
participant: rtc.RemoteParticipant,
291+
) -> None:
292+
nonlocal active_publication_sid
293+
if publication.sid == active_publication_sid:
294+
logging.info("active video track unpublished: %s", publication.sid)
295+
active_track_gone.set()
268296

269297
try:
270298
token = _create_token(args, api_key, api_secret)
271299
logging.info("connecting to room %s as %s", args.room_name, args.identity)
272300
await room.connect(url, token)
273301
logging.info("connected to room %s", room.name)
274-
logging.info("waiting for a video track")
275302

276-
if not await _wait_for_video_track(track_ready, stop_event):
277-
return
303+
while not stop_event.is_set():
304+
logging.info("waiting for a video track")
305+
subscribed = await _next_video_track(track_queue, stop_event)
306+
if subscribed is None:
307+
break
278308

279-
assert selected_track is not None
280-
assert selected_publication is not None
281-
assert selected_participant is not None
282-
logging.info(
283-
"subscribed to %s from %s with packet trailer features: %s",
284-
selected_publication.sid,
285-
selected_participant.identity,
286-
_feature_names(list(selected_publication.packet_trailer_features)),
287-
)
309+
active_publication_sid = subscribed.publication.sid
310+
active_track_gone = asyncio.Event()
311+
logging.info(
312+
"subscribed to %s from %s with packet trailer features: %s",
313+
subscribed.publication.sid,
314+
subscribed.participant.identity,
315+
_feature_names(list(subscribed.publication.packet_trailer_features)),
316+
)
288317

289-
video_stream = rtc.VideoStream.from_track(
290-
track=selected_track,
291-
format=rtc.VideoBufferType.RGB24,
292-
capacity=1,
293-
)
294-
await _render_video(video_stream, args, stop_event)
318+
video_stream = rtc.VideoStream.from_track(
319+
track=subscribed.track,
320+
format=rtc.VideoBufferType.RGB24,
321+
capacity=1,
322+
)
323+
try:
324+
await _render_video(video_stream, args, stop_event, active_track_gone)
325+
finally:
326+
await video_stream.aclose()
327+
video_stream = None
328+
active_publication_sid = None
295329
finally:
296330
if video_stream is not None:
297331
await video_stream.aclose()

0 commit comments

Comments
 (0)