diff --git a/plugins/getstream/tests/test_track_resolver.py b/plugins/getstream/tests/test_track_resolver.py new file mode 100644 index 000000000..56422ffa0 --- /dev/null +++ b/plugins/getstream/tests/test_track_resolver.py @@ -0,0 +1,269 @@ +import asyncio + +import pytest +from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import ( + TrackType as StreamTrackType, +) +from vision_agents.plugins.getstream._track_resolver import TrackResolver + + +@pytest.fixture +def resolver(): + return TrackResolver(poll_interval=0.005) + + +class TestTrackResolver: + async def test_known_track_reuse(self, resolver): + resolver.register( + track_id="t1", + user_id="u1", + session_id="s1", + webrtc_kind="audio", + ) + first = await resolver.resolve( + user_id="u1", + session_id="s1", + stream_track_type=StreamTrackType.TRACK_TYPE_AUDIO, + ) + + resolver.unpublish( + user_id="u1", + session_id="s1", + stream_track_type=StreamTrackType.TRACK_TYPE_AUDIO, + ) + + second = await resolver.resolve( + user_id="u1", + session_id="s1", + stream_track_type=StreamTrackType.TRACK_TYPE_AUDIO, + ) + + assert first == "t1" + assert second == "t1" + + async def test_session_migration(self, resolver): + resolver.register( + track_id="t1", + user_id="u1", + session_id="s_old", + webrtc_kind="audio", + ) + await resolver.resolve( + user_id="u1", + session_id="s_old", + stream_track_type=StreamTrackType.TRACK_TYPE_AUDIO, + ) + + # New session arrives without a fresh register() — same WebRTC track is reused. + migrated = await resolver.resolve( + user_id="u1", + session_id="s_new", + stream_track_type=StreamTrackType.TRACK_TYPE_AUDIO, + timeout=0.1, + ) + assert migrated == "t1" + + # The old session entry is gone; the new one owns the track now. + old_unpublish = resolver.unpublish( + user_id="u1", + session_id="s_old", + stream_track_type=StreamTrackType.TRACK_TYPE_AUDIO, + ) + new_unpublish = resolver.unpublish( + user_id="u1", + session_id="s_new", + stream_track_type=StreamTrackType.TRACK_TYPE_AUDIO, + ) + assert old_unpublish is None + assert new_unpublish == "t1" + + async def test_pending_arrives_first(self, resolver): + resolver.register( + track_id="t1", + user_id="u1", + session_id="s1", + webrtc_kind="video", + ) + track_id = await resolver.resolve( + user_id="u1", + session_id="s1", + stream_track_type=StreamTrackType.TRACK_TYPE_VIDEO, + ) + assert track_id == "t1" + + async def test_track_published_arrives_first(self, resolver): + resolve_task = asyncio.create_task( + resolver.resolve( + user_id="u1", + session_id="s1", + stream_track_type=StreamTrackType.TRACK_TYPE_VIDEO, + timeout=1.0, + ) + ) + await asyncio.sleep(0.02) + resolver.register( + track_id="t1", + user_id="u1", + session_id="s1", + webrtc_kind="video", + ) + track_id = await resolve_task + assert track_id == "t1" + + async def test_anonymous_fallback_success(self, resolver): + resolver.register( + track_id="t_anon", + user_id=None, + session_id=None, + webrtc_kind="video", + ) + track_id = await resolver.resolve( + user_id="u1", + session_id="s1", + stream_track_type=StreamTrackType.TRACK_TYPE_VIDEO, + ) + assert track_id == "t_anon" + + async def test_anonymous_fallback_ambiguous(self, resolver): + resolver.register( + track_id="t_anon_a", + user_id=None, + session_id=None, + webrtc_kind="video", + ) + resolver.register( + track_id="t_anon_b", + user_id=None, + session_id=None, + webrtc_kind="video", + ) + with pytest.raises(TimeoutError): + await resolver.resolve( + user_id="u1", + session_id="s1", + stream_track_type=StreamTrackType.TRACK_TYPE_VIDEO, + timeout=0.05, + ) + + async def test_timeout_no_pending(self, resolver): + with pytest.raises(TimeoutError): + await resolver.resolve( + user_id="u1", + session_id="s1", + stream_track_type=StreamTrackType.TRACK_TYPE_VIDEO, + timeout=0.05, + ) + + async def test_cancel_during_resolve(self, resolver): + resolve_task = asyncio.create_task( + resolver.resolve( + user_id="u1", + session_id="s1", + stream_track_type=StreamTrackType.TRACK_TYPE_VIDEO, + timeout=10.0, + ) + ) + await asyncio.sleep(0.02) + + resolver.cancel(user_id="u1", session_id="s1") + + track_id = await asyncio.wait_for(resolve_task, timeout=0.5) + assert track_id is None + + async def test_stale_pending_is_evicted(self): + # Short TTL so we can verify the eviction without long sleeps. + resolver = TrackResolver(poll_interval=0.005, pending_ttl=0.05) + + # Stale anonymous video registered first; would normally make the + # fallback ambiguous when a second anonymous video arrives. + resolver.register( + track_id="t_stale", + user_id=None, + session_id=None, + webrtc_kind="video", + ) + await asyncio.sleep(0.08) + + resolver.register( + track_id="t_fresh", + user_id=None, + session_id=None, + webrtc_kind="video", + ) + track_id = await resolver.resolve( + user_id="u1", + session_id="s1", + stream_track_type=StreamTrackType.TRACK_TYPE_VIDEO, + timeout=0.5, + ) + assert track_id == "t_fresh" + + async def test_concurrent_resolves_serialized(self, resolver): + # Duplicate TrackPublishedEvent (e.g. from republish_tracks) starts two + # resolves for the same key. Both must succeed with the same track_id. + task_a = asyncio.create_task( + resolver.resolve( + user_id="u1", + session_id="s1", + stream_track_type=StreamTrackType.TRACK_TYPE_AUDIO, + timeout=0.5, + ) + ) + task_b = asyncio.create_task( + resolver.resolve( + user_id="u1", + session_id="s1", + stream_track_type=StreamTrackType.TRACK_TYPE_AUDIO, + timeout=0.5, + ) + ) + await asyncio.sleep(0.02) + resolver.register( + track_id="t1", + user_id="u1", + session_id="s1", + webrtc_kind="audio", + ) + results = await asyncio.gather(task_a, task_b) + assert results == ["t1", "t1"] + + async def test_cancel_drops_named_pending(self, resolver): + # Named pending was registered (track_added fired), participant leaves + # before TrackPublishedEvent. cancel() should drop the orphan. + resolver.register( + track_id="t_orphan", + user_id="u1", + session_id="s1", + webrtc_kind="video", + ) + resolver.cancel(user_id="u1", session_id="s1") + + # If the orphan were still around, the next anonymous video would be + # ambiguous against an exact-tuple lookup attempt — but here we just + # verify a fresh resolve for the same key times out (no stale match). + with pytest.raises(TimeoutError): + await resolver.resolve( + user_id="u1", + session_id="s1", + stream_track_type=StreamTrackType.TRACK_TYPE_VIDEO, + timeout=0.05, + ) + + async def test_cancel_before_resolve_is_noop(self, resolver): + # Cancel arrives first; nothing in flight, no-op. + resolver.cancel(user_id="u1", session_id="s1") + + # Subsequent resolve runs normally and finds the matching pending. + resolver.register( + track_id="t1", + user_id="u1", + session_id="s1", + webrtc_kind="video", + ) + track_id = await resolver.resolve( + user_id="u1", + session_id="s1", + stream_track_type=StreamTrackType.TRACK_TYPE_VIDEO, + timeout=0.5, + ) + assert track_id == "t1" diff --git a/plugins/getstream/vision_agents/plugins/getstream/_track_resolver.py b/plugins/getstream/vision_agents/plugins/getstream/_track_resolver.py new file mode 100644 index 000000000..62c34679c --- /dev/null +++ b/plugins/getstream/vision_agents/plugins/getstream/_track_resolver.py @@ -0,0 +1,261 @@ +import asyncio +import logging +import time +from dataclasses import dataclass + +from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import ( + TrackType as StreamTrackType, +) + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class _TrackKey: + user_id: str + session_id: str + stream_track_type: StreamTrackType.ValueType + + +@dataclass +class _TrackEntry: + track_id: str + published: bool + + +@dataclass +class _PendingTrack: + user_id: str | None + session_id: str | None + webrtc_kind: str + registered_at: float + + +def _get_webrtc_kind(stream_track_type: StreamTrackType.ValueType) -> str: + """Get the expected WebRTC kind (audio/video) for an SFU track type.""" + if stream_track_type in ( + StreamTrackType.TRACK_TYPE_AUDIO, + StreamTrackType.TRACK_TYPE_SCREEN_SHARE_AUDIO, + ): + return "audio" + elif stream_track_type in ( + StreamTrackType.TRACK_TYPE_VIDEO, + StreamTrackType.TRACK_TYPE_SCREEN_SHARE, + ): + return "video" + else: + return "video" + + +class TrackResolver: + """Correlate SFU TrackPublished events with WebRTC track_added callbacks.""" + + def __init__(self, poll_interval: float = 0.01, pending_ttl: float = 60.0) -> None: + self._poll_interval = poll_interval + self._pending_ttl = pending_ttl + self._track_map: dict[_TrackKey, _TrackEntry] = {} + self._pending: dict[str, _PendingTrack] = {} + # in-flight resolves; event set when participant leaves to abort the poll + self._resolving: dict[_TrackKey, asyncio.Event] = {} + # serialize resolves per key so duplicate TrackPublishedEvents don't race + self._resolve_locks: dict[_TrackKey, asyncio.Lock] = {} + + def register( + self, + *, + track_id: str, + user_id: str | None, + session_id: str | None, + webrtc_kind: str, + ) -> None: + """Store a WebRTC track_added until the SFU confirms the semantic type.""" + self._pending[track_id] = _PendingTrack( + user_id=user_id, + session_id=session_id, + webrtc_kind=webrtc_kind, + registered_at=time.monotonic(), + ) + + async def resolve( + self, + *, + user_id: str, + session_id: str, + stream_track_type: StreamTrackType.ValueType, + timeout: float = 10.0, + ) -> str | None: + """Resolve track_id for an SFU TrackPublishedEvent. + + Handles known-track reuse, session migration, and the await-pending poll. + Returns None if the participant left mid-resolve (via cancel()). + Raises TimeoutError if the WebRTC track_added never arrives within `timeout`. + """ + track_key = _TrackKey( + user_id=user_id, + session_id=session_id, + stream_track_type=stream_track_type, + ) + + lock = self._resolve_locks.setdefault(track_key, asyncio.Lock()) + async with lock: + track_id = self._reuse_known(track_key) + if track_id is None: + track_id = self._migrate_session(track_key=track_key) + if track_id is None: + track_id = await self._await_pending( + track_key=track_key, timeout=timeout + ) + if track_id is None: + return None + + self._track_map[track_key] = _TrackEntry(track_id=track_id, published=True) + return track_id + + def unpublish( + self, + *, + user_id: str, + session_id: str, + stream_track_type: StreamTrackType.ValueType, + ) -> str | None: + """Flip the track to unpublished. Returns track_id if known, else None.""" + track_key = _TrackKey( + user_id=user_id, + session_id=session_id, + stream_track_type=stream_track_type, + ) + entry = self._track_map.get(track_key) + if entry is None: + return None + entry.published = False + return entry.track_id + + def cancel(self, *, user_id: str, session_id: str) -> None: + """Abort any in-flight resolve for this participant and drop their pending entries.""" + for key, event in self._resolving.items(): + if key.user_id == user_id and key.session_id == session_id: + event.set() + + orphaned = [ + tid + for tid, pending in self._pending.items() + if pending.user_id == user_id and pending.session_id == session_id + ] + for tid in orphaned: + del self._pending[tid] + + def _reuse_known(self, track_key: _TrackKey) -> str | None: + entry = self._track_map.get(track_key) + if entry is None: + return None + entry.published = True + return entry.track_id + + def _migrate_session(self, *, track_key: _TrackKey) -> str | None: + # User reconnected with a new session — the WebRTC media track is reused + # so track_added won't fire again. Migrate the stale session entry. + for old_key, old_entry in list(self._track_map.items()): + if ( + old_key.user_id == track_key.user_id + and old_key.stream_track_type == track_key.stream_track_type + and old_key.session_id != track_key.session_id + ): + del self._track_map[old_key] + logger.debug( + f"Migrated track for {track_key.user_id} from session " + f"{old_key.session_id} to {track_key.session_id}" + ) + return old_entry.track_id + return None + + async def _await_pending( + self, *, track_key: _TrackKey, timeout: float + ) -> str | None: + # SFU might send TrackPublishedEvent before WebRTC processes track_added. + webrtc_kind = _get_webrtc_kind(track_key.stream_track_type) + cancelled = asyncio.Event() + self._resolving[track_key] = cancelled + try: + elapsed = 0.0 + while elapsed < timeout and not cancelled.is_set(): + track_id = self._match_pending( + user_id=track_key.user_id, + session_id=track_key.session_id, + webrtc_kind=webrtc_kind, + ) + if track_id is not None: + return track_id + await asyncio.sleep(self._poll_interval) + elapsed += self._poll_interval + + if cancelled.is_set(): + logger.debug( + f"Resolve cancelled: user={track_key.user_id} " + f"session={track_key.session_id} left during resolve" + ) + return None + raise TimeoutError( + f"No track_added for user={track_key.user_id} session={track_key.session_id} " + f"type={StreamTrackType.Name(track_key.stream_track_type)} after {timeout}s " + f"(pending={self._pending}, map={self._track_map})" + ) + finally: + self._resolving.pop(track_key, None) + + def _match_pending( + self, + *, + user_id: str, + session_id: str, + webrtc_kind: str, + ) -> str | None: + self._clear_stale_pending() + + for tid, pending in list(self._pending.items()): + if ( + pending.user_id == user_id + and pending.session_id == session_id + and pending.webrtc_kind == webrtc_kind + ): + del self._pending[tid] + return tid + + # Fallback: some video track_added callbacks can arrive with user=None. + # In that case we can still match by WebRTC kind, but only if there + # is exactly one anonymous candidate — multiple anonymous entries + # with the same kind would be ambiguous and could misbind. + anonymous_candidates = [ + tid + for tid, pending in self._pending.items() + if pending.user_id is None + and pending.session_id is None + and pending.webrtc_kind == webrtc_kind + ] + if len(anonymous_candidates) == 1: + tid = anonymous_candidates[0] + del self._pending[tid] + return tid + return None + + def _clear_stale_pending(self) -> None: + now = time.monotonic() + stale = [ + tid + for tid, pending in self._pending.items() + if pending.registered_at <= now - self._pending_ttl + ] + for tid in stale: + evicted = self._pending.pop(tid) + logger.debug( + f"Evicting stale pending track: id={tid} user={evicted.user_id} " + f"session={evicted.session_id} kind={evicted.webrtc_kind} " + f"age={now - evicted.registered_at:.1f}s" + ) + + # Drop resolve locks with no holder and no waiters; safe because + # setdefault will create a fresh one if a future resolve needs it. + free_keys = [ + key for key, lock in self._resolve_locks.items() if not lock.locked() + ] + for key in free_keys: + del self._resolve_locks[key] diff --git a/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py b/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py index 21da768ae..fca86e407 100644 --- a/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py +++ b/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py @@ -33,6 +33,7 @@ from vision_agents.plugins.getstream.stream_conversation import StreamConversation from . import sfu_events +from ._track_resolver import TrackResolver if TYPE_CHECKING: from vision_agents.core.agents.agents import Agent @@ -163,12 +164,7 @@ def __init__(self, **kwargs): self.conversation: Optional[StreamConversation] = None self.channel_type = "messaging" self._agent_user_id: str | None = None - # Track mapping: (user_id, session_id, track_type_int) -> {"track_id": str, "published": bool} - # track_type_int is from TrackType enum (e.g., TrackType.TRACK_TYPE_AUDIO) - self._track_map: dict = {} - # Temporary storage for tracks before SFU confirms their type - # track_id -> (user_id|None, session_id|None, webrtc_type_string) - self._pending_tracks: dict = {} + self._track_resolver = TrackResolver() self._real_connection: Optional[ConnectionManager] = None self._call: Optional[StreamCall] = None @@ -184,170 +180,45 @@ def _connection(self) -> ConnectionManager: raise ValueError("Edge connection is not set") return self._real_connection - def _get_webrtc_kind(self, track_type_int: int) -> str: - """Get the expected WebRTC kind (audio/video) for an SFU track type.""" - # Map SFU track types to WebRTC kinds - if track_type_int in ( - StreamTrackType.TRACK_TYPE_AUDIO, - StreamTrackType.TRACK_TYPE_SCREEN_SHARE_AUDIO, - ): - return "audio" - elif track_type_int in ( - StreamTrackType.TRACK_TYPE_VIDEO, - StreamTrackType.TRACK_TYPE_SCREEN_SHARE, - ): - return "video" - else: - # Default to video for unknown types - return "video" - async def _on_track_published(self, event: sfu_events.TrackPublishedEvent): """Handle track published events from SFU - spawn TrackAddedEvent with correct type.""" if not event.payload: return if event.participant and event.participant.user_id: - session_id = event.participant.session_id user_id = event.participant.user_id + session_id = event.participant.session_id else: user_id = event.payload.user_id session_id = event.payload.session_id - # Convert Stream track type to the Vision agents track type - track_type_int = event.payload.type # TrackType enum int from SFU - track_type = _to_core_track_type(track_type_int) - webrtc_track_kind = self._get_webrtc_kind(track_type_int) - - # Skip processing the agent's own tracks - we don't subscribe to them - is_agent_track = user_id == self._agent_user_id - if is_agent_track: - logger.debug( - f'Skipping agent\'s own track: "{track_type.name}" from {user_id}' - ) + if not user_id or not session_id or user_id == self._agent_user_id: return - # First check if track already exists in map (e.g., from previous unpublish/republish) - track_key = (user_id, session_id, track_type_int) - if track_key in self._track_map: - self._track_map[track_key]["published"] = True - track_id = self._track_map[track_key]["track_id"] - - # Emit TrackAddedEvent so agent can switch to this track - self.events.send( - events.TrackAddedEvent( - plugin_name="getstream", - track_id=track_id, - track_type=track_type, - participant=_to_core_participant(event.participant), - ) - ) + stream_track_type = event.payload.type + track_id = await self._track_resolver.resolve( + user_id=user_id, + session_id=session_id, + stream_track_type=stream_track_type, + ) + if track_id is None: + # Participant left mid-resolve; nothing to wire up. return - # User reconnected with a new session — the WebRTC media track is reused - # so track_added won't fire again. Migrate the stale session entry. - for old_key, old_info in list(self._track_map.items()): - old_user, old_session, old_type = old_key - if ( - old_user == user_id - and old_type == track_type_int - and old_session != session_id - ): - del self._track_map[old_key] - self._track_map[track_key] = { - "track_id": old_info["track_id"], - "published": True, - } - logger.info( - f"Migrated track for {user_id} from session {old_session} to {session_id}" - ) - self.events.send( - events.TrackAddedEvent( - plugin_name="getstream", - track_id=old_info["track_id"], - track_type=track_type, - participant=_to_core_participant(event.participant), - ) - ) - return - - # Wait for pending track to be populated (with 10 second timeout) - # SFU might send TrackPublishedEvent before WebRTC processes track_added - track_id = None - timeout = 10.0 - poll_interval = 0.01 - elapsed = 0.0 - - while elapsed < timeout: - # Find pending track for this user/session with matching kind - for tid, (pending_user, pending_session, pending_kind) in list( - self._pending_tracks.items() - ): - if ( - pending_user == user_id - and pending_session == session_id - and pending_kind == webrtc_track_kind - ): - track_id = tid - del self._pending_tracks[tid] - break - - # Fallback: some video track_added callbacks can arrive with user=None. - # In that case we can still match by WebRTC kind, but only if there - # is exactly one anonymous candidate — multiple anonymous entries - # with the same kind would be ambiguous and could misbind. - if track_id is None: - anonymous_candidates = [ - tid - for tid, ( - pending_user, - pending_session, - pending_kind, - ) in self._pending_tracks.items() - if pending_user is None - and pending_session is None - and pending_kind == webrtc_track_kind - ] - if len(anonymous_candidates) == 1: - track_id = anonymous_candidates[0] - del self._pending_tracks[track_id] - - if track_id: - break - - # Wait a bit before checking again - await asyncio.sleep(poll_interval) - elapsed += poll_interval - - if track_id: - # Store with correct type from SFU - self._track_map[track_key] = {"track_id": track_id, "published": True} - - # Only emit TrackAddedEvent for remote participants, not for agent's own tracks - if not is_agent_track: - # NOW spawn TrackAddedEvent with correct type - self.events.send( - events.TrackAddedEvent( - plugin_name="getstream", - track_id=track_id, - track_type=track_type, - participant=_to_core_participant(event.participant), - ) - ) - - else: - raise TimeoutError( - f"Timeout waiting for pending track: {track_type.name} from user {user_id}, " - f"session {session_id}. Waited {timeout}s but WebRTC track_added with matching kind was never received." - f"Pending tracks: {self._pending_tracks}\n" - f"Key: {track_key}\n" - f"Track map: {self._track_map}\n" + self.events.send( + events.TrackAddedEvent( + plugin_name="getstream", + track_id=track_id, + track_type=_to_core_track_type(stream_track_type), + participant=_to_core_participant(event.participant), ) + ) async def _on_track_removed( self, event: sfu_events.ParticipantLeftEvent | sfu_events.TrackUnpublishedEvent ): """Handle track unpublished and participant left events.""" - if not event.payload: # NOTE: mypy typecheck + if not event.payload: return participant = event.participant @@ -358,41 +229,52 @@ async def _on_track_removed( user_id = event.payload.user_id session_id = event.payload.session_id + if not user_id or not session_id: + return + + # Cancel on full leave. + if isinstance(event, sfu_events.ParticipantLeftEvent): + self._track_resolver.cancel(user_id=user_id, session_id=session_id) + # Determine which tracks to remove - if hasattr(event.payload, "type") and event.payload is not None: - # TrackUnpublishedEvent - single track + if isinstance(event, sfu_events.TrackUnpublishedEvent): + # Single track tracks_to_remove = [event.payload.type] event_desc = "Track unpublished" else: # ParticipantLeftEvent - all published tracks - tracks_to_remove = ( - event.participant.published_tracks if event.participant else None - ) or [] + tracks_to_remove = cast( + list[StreamTrackType.ValueType], + list( + (event.participant.published_tracks if event.participant else None) + or [] + ), + ) event_desc = "Participant left" track_names = [StreamTrackType.Name(t) for t in tracks_to_remove] logger.info(f"{event_desc}: {user_id}, tracks: {track_names}") - # Mark each track as unpublished and send TrackRemovedEvent - for track_type_int in tracks_to_remove: - track_type = _to_core_track_type(track_type_int) - track_key = (user_id, session_id, track_type_int) - track_info = self._track_map.get(track_key) - - if track_info: - track_id = track_info["track_id"] - self.events.send( - events.TrackRemovedEvent( - plugin_name="getstream", - track_id=track_id, - track_type=track_type, - participant=_to_core_participant(participant), - ) + for stream_track_type in tracks_to_remove: + track_id = self._track_resolver.unpublish( + user_id=user_id, + session_id=session_id, + stream_track_type=stream_track_type, + ) + if track_id is None: + logger.debug( + f"Track not found: user={user_id} session={session_id} " + f"type={StreamTrackType.Name(stream_track_type)}" ) - # Mark as unpublished instead of removing - self._track_map[track_key]["published"] = False - else: - logger.warning(f"Track not found in map: {track_key}") + continue + self.events.send( + events.TrackRemovedEvent( + plugin_name="getstream", + track_id=track_id, + track_type=_to_core_track_type(stream_track_type), + participant=_to_core_participant(participant), + ) + ) async def _on_call_ended(self, event: sfu_events.CallEndedEvent): self.events.send( @@ -472,13 +354,12 @@ async def join( @connection.on("track_added") async def on_track(track_id, track_type, user): - # Store track in pending map - wait for SFU to confirm type before spawning TrackAddedEvent - pending_user_id = user.user_id if user else None - pending_session_id = user.session_id if user else None - self._pending_tracks[track_id] = ( - pending_user_id, - pending_session_id, - track_type, + # Wait for SFU to confirm type before spawning TrackAddedEvent + self._track_resolver.register( + track_id=track_id, + user_id=user.user_id if user else None, + session_id=user.session_id if user else None, + webrtc_kind=track_type, ) self.events.silent(events.AudioReceivedEvent)