Skip to content

Commit 1704332

Browse files
committed
feat: add MVP of propagating room downwards from room -> track -> audio stream
And extracting metadata from that room that can be fed into the frame processor.
1 parent e8df6d0 commit 1704332

3 files changed

Lines changed: 91 additions & 2 deletions

File tree

livekit-rtc/livekit/rtc/audio_stream.py

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import asyncio
1818
import json
1919
from dataclasses import dataclass
20-
from typing import Any, AsyncIterator, Optional
20+
from typing import TYPE_CHECKING, Any, AsyncIterator, Optional
2121

2222
from ._ffi_client import FfiClient, FfiHandle
2323
from ._proto import audio_frame_pb2 as proto_audio_frame
@@ -30,6 +30,9 @@
3030
from .track import Track
3131
from .frame_processor import FrameProcessor
3232

33+
if TYPE_CHECKING:
34+
from .room import Room
35+
3336

3437
@dataclass
3538
class AudioFrameEvent:
@@ -65,6 +68,7 @@ def __init__(
6568
num_channels: int = 1,
6669
frame_size_ms: int | None = None,
6770
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
71+
room: Optional["Room"] = None,
6872
**kwargs,
6973
) -> None:
7074
"""Initialize an `AudioStream` instance.
@@ -81,6 +85,9 @@ def __init__(
8185
noise_cancellation (Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]], optional):
8286
If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance
8387
created by the noise cancellation module.
88+
room (Optional[Room], optional): The room this stream's track belongs to, used to
89+
resolve `room_name`, `participant_identity`, and `publication_sid`. May be `None`
90+
if the track is not (yet) associated with a room.
8491
8592
Example:
8693
```python
@@ -98,6 +105,8 @@ def __init__(
98105
```
99106
"""
100107
self._track: Track | None = track
108+
self._room: Room | None = room
109+
print("ROOM:", room)
101110
self._sample_rate = sample_rate
102111
self._num_channels = num_channels
103112
self._frame_size_ms = frame_size_ms
@@ -132,6 +141,9 @@ def __init__(
132141
self._ffi_handle = FfiHandle(stream.handle.id)
133142
self._info = stream.info
134143

144+
if self._track is not None:
145+
self._track._register_audio_stream(self)
146+
135147
@classmethod
136148
def from_participant(
137149
cls,
@@ -144,6 +156,7 @@ def from_participant(
144156
num_channels: int = 1,
145157
frame_size_ms: int | None = None,
146158
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
159+
room: Optional["Room"] = None,
147160
) -> AudioStream:
148161
"""Create an `AudioStream` from a participant's audio track.
149162
@@ -181,6 +194,7 @@ def from_participant(
181194
num_channels=num_channels,
182195
noise_cancellation=noise_cancellation,
183196
frame_size_ms=frame_size_ms,
197+
room=room,
184198
)
185199

186200
@classmethod
@@ -194,6 +208,7 @@ def from_track(
194208
num_channels: int = 1,
195209
frame_size_ms: int | None = None,
196210
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
211+
room: Optional["Room"] = None,
197212
) -> AudioStream:
198213
"""Create an `AudioStream` from an existing audio track.
199214
@@ -227,8 +242,54 @@ def from_track(
227242
num_channels=num_channels,
228243
noise_cancellation=noise_cancellation,
229244
frame_size_ms=frame_size_ms,
245+
room=room,
230246
)
231247

248+
def _set_room(self, room: Optional["Room"]) -> None:
249+
self._room = room
250+
print("ROOM UPDATE:", room)
251+
252+
@property
253+
def room(self) -> Optional["Room"]:
254+
return self._room
255+
256+
@property
257+
def room_name(self) -> Optional[str]:
258+
return self._room.name if self._room is not None else None
259+
260+
@property
261+
def participant_identity(self) -> Optional[str]:
262+
pub = self._find_publication()
263+
if pub is None:
264+
return None
265+
identity, _ = pub
266+
return identity
267+
268+
@property
269+
def publication_sid(self) -> Optional[str]:
270+
pub = self._find_publication()
271+
if pub is None:
272+
return None
273+
_, sid = pub
274+
return sid
275+
276+
def _find_publication(self) -> Optional[tuple[str, str]]:
277+
if self._room is None or self._track is None:
278+
return None
279+
track_sid = self._track.sid
280+
if not track_sid:
281+
return None
282+
for participant in self._room.remote_participants.values():
283+
publication = participant.track_publications.get(track_sid)
284+
if publication is not None:
285+
return participant.identity, publication.sid
286+
local = self._room._local_participant
287+
if local is not None:
288+
for publication in local.track_publications.values():
289+
if publication.sid == track_sid:
290+
return local.identity, publication.sid
291+
return None
292+
232293
def __del__(self) -> None:
233294
FfiClient.instance.queue.unsubscribe(self._ffi_queue)
234295

@@ -303,6 +364,8 @@ async def aclose(self) -> None:
303364
This method cleans up resources associated with the audio stream and waits for
304365
any pending operations to complete.
305366
"""
367+
if self._track is not None:
368+
self._track._unregister_audio_stream(self)
306369
self._ffi_handle.dispose()
307370
await self._task
308371

livekit-rtc/livekit/rtc/room.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -726,10 +726,14 @@ def _on_room_event(self, event: proto_room.RoomEvent):
726726
sid = event.local_track_published.track_sid
727727
lpublication = self.local_participant.track_publications[sid]
728728
ltrack = lpublication.track
729+
if ltrack is not None:
730+
ltrack._set_room(self)
729731
self.emit("local_track_published", lpublication, ltrack)
730732
elif which == "local_track_unpublished":
731733
sid = event.local_track_unpublished.publication_sid
732734
lpublication = self.local_participant.track_publications[sid]
735+
if lpublication.track is not None:
736+
lpublication.track._set_room(None)
733737
self.emit("local_track_unpublished", lpublication)
734738
elif which == "local_track_republished":
735739
# The SDK auto-republished a local track during a full
@@ -770,17 +774,21 @@ def _on_room_event(self, event: proto_room.RoomEvent):
770774
rpublication._subscribed = True
771775
if track_info.kind == TrackKind.KIND_VIDEO:
772776
remote_video_track = RemoteVideoTrack(owned_track_info)
777+
remote_video_track._set_room(self)
773778
rpublication._track = remote_video_track
774779
self.emit("track_subscribed", remote_video_track, rpublication, rparticipant)
775780
elif track_info.kind == TrackKind.KIND_AUDIO:
776781
remote_audio_track = RemoteAudioTrack(owned_track_info)
782+
remote_audio_track._set_room(self)
777783
rpublication._track = remote_audio_track
778784
self.emit("track_subscribed", remote_audio_track, rpublication, rparticipant)
779785
elif which == "track_unsubscribed":
780786
identity = event.track_unsubscribed.participant_identity
781787
rparticipant = self._remote_participants[identity]
782788
rpublication = rparticipant.track_publications[event.track_unsubscribed.track_sid]
783789
rtrack = rpublication.track
790+
if rtrack is not None:
791+
rtrack._set_room(None)
784792
rpublication._track = None
785793
rpublication._subscribed = False
786794
self.emit("track_unsubscribed", rtrack, rpublication, rparticipant)

livekit-rtc/livekit/rtc/track.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,39 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from typing import TYPE_CHECKING, List, Union
15+
import weakref
16+
from typing import TYPE_CHECKING, List, Optional, Union
1617
from ._ffi_client import FfiHandle, FfiClient
1718
from ._proto import ffi_pb2 as proto_ffi
1819
from ._proto import track_pb2 as proto_track
1920
from ._proto import stats_pb2 as proto_stats
2021

2122
if TYPE_CHECKING:
2223
from .audio_source import AudioSource
24+
from .audio_stream import AudioStream
25+
from .room import Room
2326
from .video_source import VideoSource
2427

2528

2629
class Track:
2730
def __init__(self, owned_info: proto_track.OwnedTrack):
2831
self._info = owned_info.info
2932
self._ffi_handle = FfiHandle(owned_info.handle.id)
33+
self._room: Optional["Room"] = None
34+
self._audio_streams: "weakref.WeakSet[AudioStream]" = weakref.WeakSet()
35+
36+
def _set_room(self, room: Optional["Room"]) -> None:
37+
self._room = room
38+
for stream in self._audio_streams:
39+
stream._set_room(room)
40+
41+
def _register_audio_stream(self, stream: "AudioStream") -> None:
42+
self._audio_streams.add(stream)
43+
if self._room is not None:
44+
stream._set_room(self._room)
45+
46+
def _unregister_audio_stream(self, stream: "AudioStream") -> None:
47+
self._audio_streams.discard(stream)
3048

3149
@property
3250
def sid(self) -> str:

0 commit comments

Comments
 (0)