Skip to content

Commit 6893e13

Browse files
authored
Fully configure frame processors when they are used directly on an audio stream (#679)
1 parent 9c4e3c9 commit 6893e13

7 files changed

Lines changed: 862 additions & 6 deletions

File tree

livekit-rtc/livekit/rtc/audio_stream.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ def __init__(
6565
num_channels: int = 1,
6666
frame_size_ms: int | None = None,
6767
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
68+
auto_close_noise_cancellation: bool = True,
6869
**kwargs: Any,
6970
) -> None:
7071
"""Initialize an `AudioStream` instance.
@@ -81,6 +82,9 @@ def __init__(
8182
noise_cancellation (Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]], optional):
8283
If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance
8384
created by the noise cancellation module.
85+
auto_close_noise_cancellation (bool):
86+
When the audio stream closes, should the FrameProcessor's close method be run? If
87+
False, then the frame processor can be reused with another AudioStream.
8488
8589
Example:
8690
```python
@@ -113,11 +117,13 @@ def __init__(
113117
self._audio_filter_module: str | None = None
114118
self._audio_filter_options: dict[str, Any] | None = None
115119
self._processor: FrameProcessor[AudioFrame] | None = None
120+
self._processor_auto_close = True
116121
if isinstance(noise_cancellation, NoiseCancellationOptions):
117122
self._audio_filter_module = noise_cancellation.module_id
118123
self._audio_filter_options = noise_cancellation.options
119124
elif isinstance(noise_cancellation, FrameProcessor):
120125
self._processor = noise_cancellation
126+
self._processor_auto_close = auto_close_noise_cancellation
121127

122128
self._task = self._loop.create_task(self._run())
123129
self._task.add_done_callback(task_done_logger)
@@ -132,6 +138,9 @@ def __init__(
132138
self._ffi_handle = FfiHandle(stream.handle.id)
133139
self._info = stream.info
134140

141+
if self._track is not None:
142+
self._track._register_audio_stream(self)
143+
135144
@classmethod
136145
def from_participant(
137146
cls,
@@ -144,6 +153,7 @@ def from_participant(
144153
num_channels: int = 1,
145154
frame_size_ms: int | None = None,
146155
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
156+
auto_close_noise_cancellation: bool = True,
147157
) -> AudioStream:
148158
"""Create an `AudioStream` from a participant's audio track.
149159
@@ -179,8 +189,9 @@ def from_participant(
179189
track=None, # type: ignore
180190
sample_rate=sample_rate,
181191
num_channels=num_channels,
182-
noise_cancellation=noise_cancellation,
183192
frame_size_ms=frame_size_ms,
193+
noise_cancellation=noise_cancellation,
194+
auto_close_noise_cancellation=auto_close_noise_cancellation,
184195
)
185196

186197
@classmethod
@@ -194,6 +205,7 @@ def from_track(
194205
num_channels: int = 1,
195206
frame_size_ms: int | None = None,
196207
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
208+
auto_close_noise_cancellation: bool = False,
197209
) -> AudioStream:
198210
"""Create an `AudioStream` from an existing audio track.
199211
@@ -203,9 +215,12 @@ def from_track(
203215
capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded).
204216
sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000.
205217
num_channels (int, optional): The number of audio channels. Defaults to 1.
206-
noise_cancellation (Optional[NoiseCancellationOptions], optional):
207-
If noise cancellation is used, pass a `NoiseCancellationOptions` instance
218+
noise_cancellation (Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]], optional):
219+
If noise cancellation is used, pass a `NoiseCancellationOptions` or `FrameProcessor[AudioFrame]` instance
208220
created by the noise cancellation module.
221+
auto_close_noise_cancellation (bool):
222+
When the audio stream closes, leaves the FrameProcessor in an unclosed state so it
223+
can be used with another AudioStream.
209224
210225
Returns:
211226
AudioStream: An instance of `AudioStream` that can be used to receive audio frames.
@@ -225,8 +240,9 @@ def from_track(
225240
capacity=capacity,
226241
sample_rate=sample_rate,
227242
num_channels=num_channels,
228-
noise_cancellation=noise_cancellation,
229243
frame_size_ms=frame_size_ms,
244+
noise_cancellation=noise_cancellation,
245+
auto_close_noise_cancellation=auto_close_noise_cancellation,
230246
)
231247

232248
def __del__(self) -> None:
@@ -303,8 +319,12 @@ async def aclose(self) -> None:
303319
This method cleans up resources associated with the audio stream and waits for
304320
any pending operations to complete.
305321
"""
322+
if self._track is not None:
323+
self._track._unregister_audio_stream(self)
306324
self._ffi_handle.dispose()
307325
await self._task
326+
if self._processor is not None and self._processor_auto_close:
327+
self._processor._close()
308328

309329
def _is_event(self, e: proto_ffi.FfiEvent) -> bool:
310330
return e.audio_stream_event.stream_handle == self._ffi_handle.handle

livekit-rtc/livekit/rtc/frame_processor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,12 @@ def _on_stream_info_updated(
2424
publication_sid: str,
2525
) -> None: ...
2626

27+
def _on_stream_info_cleared(self) -> None: ...
28+
2729
def _on_credentials_updated(self, *, token: str, url: str) -> None: ...
2830

31+
def _on_credentials_cleared(self) -> None: ...
32+
2933
@abstractmethod
3034
def _process(self, frame: T) -> T: ...
3135

livekit-rtc/livekit/rtc/participant.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -852,6 +852,15 @@ async def unpublish_track(self, track_sid: str) -> None:
852852
# so when it processed local_track_unpublished first.
853853
self._track_publications.pop(track_sid, None)
854854
if publication is not None:
855+
# Clear the processor's room context here too: this path races
856+
# the local_track_unpublished room event, and whichever loses
857+
# the race finds the publication already gone and skips its own
858+
# _set_room(None). Calling it from both paths guarantees the
859+
# processor is cleared (and the token_refreshed listener
860+
# detached) exactly once it matters; _set_room(None) is
861+
# idempotent, so a double-clear when this path wins is safe.
862+
if publication._track is not None:
863+
publication._track._set_room(None)
855864
publication._track = None
856865
queue.task_done()
857866
finally:

livekit-rtc/livekit/rtc/room.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -754,6 +754,8 @@ def _on_room_event(self, event: proto_room.RoomEvent) -> None:
754754
sid = event.local_track_published.track_sid
755755
lpublication = self.local_participant.track_publications[sid]
756756
ltrack = lpublication.track
757+
if ltrack is not None:
758+
ltrack._set_room(self)
757759
self.emit("local_track_published", lpublication, ltrack)
758760
elif which == "local_track_unpublished":
759761
# During teardown the publication may already have been removed
@@ -767,7 +769,21 @@ def _on_room_event(self, event: proto_room.RoomEvent) -> None:
767769
unpublished = self.local_participant._track_publications.get(sid)
768770
if unpublished is not None:
769771
del self.local_participant._track_publications[sid]
772+
track = unpublished.track
773+
if track is not None:
774+
track._set_room(None)
775+
# Emit while `publication.track` is still set, preserving the
776+
# pre-existing payload for callbacks. This handler is synchronous
777+
# and emit() invokes listeners synchronously, so nulling the track
778+
# right after still completes before any other coroutine (e.g.
779+
# unpublish_track) can interleave.
770780
self.emit("local_track_unpublished", unpublished)
781+
# Mirror track_unsubscribed: drop the publication's track
782+
# reference. This also makes unpublish_track's own _set_room(None)
783+
# a no-op when it loses the race (its `publication._track is not
784+
# None` guard short-circuits), avoiding a redundant clear.
785+
if track is not None:
786+
unpublished._track = None
771787
else:
772788
logging.debug("local_track_unpublished for untracked publication sid %s", sid)
773789
elif which == "local_track_republished":
@@ -784,6 +800,15 @@ def _on_room_event(self, event: proto_room.RoomEvent) -> None:
784800
del self.local_participant._track_publications[previous_sid]
785801
republished._info = event.local_track_republished.info
786802
self.local_participant._track_publications[republished.sid] = republished
803+
if republished.track is not None:
804+
# Keep the local-track invariant (track.sid == publication.sid,
805+
# set at publish_track) intact across republish, then re-push
806+
# metadata so any attached FrameProcessor learns the new
807+
# publication SID / credentials. _set_room with the same room
808+
# is a no-op for the token_refreshed listener but re-fans the
809+
# metadata to every registered AudioStream.
810+
republished.track._info.sid = republished.sid
811+
republished.track._set_room(self)
787812
self.emit("local_track_republished", republished, previous_sid)
788813
elif which == "local_track_subscribed":
789814
sid = event.local_track_subscribed.track_sid
@@ -809,17 +834,21 @@ def _on_room_event(self, event: proto_room.RoomEvent) -> None:
809834
rpublication._subscribed = True
810835
if track_info.kind == TrackKind.KIND_VIDEO:
811836
remote_video_track = RemoteVideoTrack(owned_track_info)
837+
remote_video_track._set_room(self)
812838
rpublication._track = remote_video_track
813839
self.emit("track_subscribed", remote_video_track, rpublication, rparticipant)
814840
elif track_info.kind == TrackKind.KIND_AUDIO:
815841
remote_audio_track = RemoteAudioTrack(owned_track_info)
842+
remote_audio_track._set_room(self)
816843
rpublication._track = remote_audio_track
817844
self.emit("track_subscribed", remote_audio_track, rpublication, rparticipant)
818845
elif which == "track_unsubscribed":
819846
identity = event.track_unsubscribed.participant_identity
820847
rparticipant = self._remote_participants[identity]
821848
rpublication = rparticipant.track_publications[event.track_unsubscribed.track_sid]
822849
rtrack = rpublication.track
850+
if rtrack is not None:
851+
rtrack._set_room(None)
823852
rpublication._track = None
824853
rpublication._subscribed = False
825854
self.emit("track_unsubscribed", rtrack, rpublication, rparticipant)

livekit-rtc/livekit/rtc/track.py

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,19 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from typing import TYPE_CHECKING, List, Union
15+
from __future__ import annotations
16+
17+
import weakref
18+
from typing import TYPE_CHECKING, List, Optional, Union
1619
from ._ffi_client import FfiHandle, FfiClient
1720
from ._proto import ffi_pb2 as proto_ffi
1821
from ._proto import track_pb2 as proto_track
1922
from ._proto import stats_pb2 as proto_stats
2023

2124
if TYPE_CHECKING:
2225
from .audio_source import AudioSource
26+
from .audio_stream import AudioStream
27+
from .room import Room
2328
from .video_source import VideoSource
2429
from .platform_audio import PlatformAudioSource
2530

@@ -28,6 +33,83 @@ class Track:
2833
def __init__(self, owned_info: proto_track.OwnedTrack):
2934
self._info = owned_info.info
3035
self._ffi_handle = FfiHandle(owned_info.handle.id)
36+
self._room_ref: Optional[weakref.ref[Room]] = None
37+
self._audio_streams: weakref.WeakSet[AudioStream] = weakref.WeakSet()
38+
39+
def _resolve_room(self) -> Optional[Room]:
40+
return self._room_ref() if self._room_ref is not None else None
41+
42+
def _set_room(self, room: Optional[Room]) -> None:
43+
old_room = self._resolve_room()
44+
if old_room is None and room is None:
45+
# Already roomless — nothing to detach and nothing to re-clear.
46+
# Without this guard a second _set_room(None) (e.g. the unpublish /
47+
# local_track_unpublished race calling it from both paths) would
48+
# re-fire _on_*_cleared on every registered processor.
49+
return
50+
if old_room is not room:
51+
if old_room is not None:
52+
old_room.off("token_refreshed", self._on_room_token_refreshed)
53+
if room is not None:
54+
room.on("token_refreshed", self._on_room_token_refreshed)
55+
56+
self._room_ref = weakref.ref(room) if room is not None else None
57+
58+
for stream in self._audio_streams:
59+
self._push_processor_metadata_to_stream(stream, room)
60+
61+
def _on_room_token_refreshed(self) -> None:
62+
room = self._resolve_room()
63+
if room is None or room._token is None or room._server_url is None:
64+
return
65+
for stream in self._audio_streams:
66+
if not stream._processor:
67+
continue
68+
stream._processor._on_credentials_updated(token=room._token, url=room._server_url)
69+
70+
def _push_processor_metadata_to_stream(self, stream: AudioStream, room: Optional[Room]) -> None:
71+
if not stream._processor:
72+
return
73+
74+
if room is None:
75+
# track left a room - clear processor's room context
76+
stream._processor._on_stream_info_cleared()
77+
stream._processor._on_credentials_cleared()
78+
return
79+
80+
identity = ""
81+
pub_sid = ""
82+
track_sid = self.sid
83+
if track_sid:
84+
for participant in room.remote_participants.values():
85+
publication = participant.track_publications.get(track_sid)
86+
if publication is not None:
87+
identity, pub_sid = participant.identity, publication.sid
88+
break
89+
else:
90+
local = room._local_participant
91+
if local is not None:
92+
for local_publication in local.track_publications.values():
93+
if local_publication.sid == track_sid:
94+
identity, pub_sid = local.identity, local_publication.sid
95+
break
96+
97+
stream._processor._on_stream_info_updated(
98+
room_name=room.name,
99+
participant_identity=identity,
100+
publication_sid=pub_sid,
101+
)
102+
if room._token is not None and room._server_url is not None:
103+
stream._processor._on_credentials_updated(token=room._token, url=room._server_url)
104+
105+
def _register_audio_stream(self, stream: AudioStream) -> None:
106+
self._audio_streams.add(stream)
107+
room = self._resolve_room()
108+
if room is not None:
109+
self._push_processor_metadata_to_stream(stream, room)
110+
111+
def _unregister_audio_stream(self, stream: AudioStream) -> None:
112+
self._audio_streams.discard(stream)
31113

32114
@property
33115
def sid(self) -> str:

0 commit comments

Comments
 (0)