Skip to content

Commit bef5a00

Browse files
authored
Fix duplicate track publish (#1009)
Use a SerialRunner pattern similar to Swift SDK. Resolves #419, #501
1 parent 31ec6b0 commit bef5a00

5 files changed

Lines changed: 284 additions & 63 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
patch type="fixed" "Duplicate tracks published when setCameraEnabled called rapidly"

lib/src/participant/local.dart

Lines changed: 85 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import '../proto/livekit_models.pb.dart' as lk_models;
4545
import '../proto/livekit_rtc.pb.dart' as lk_rtc;
4646
import '../publication/local.dart';
4747
import '../support/platform.dart';
48+
import '../support/serial_runner.dart';
4849
import '../track/local/audio.dart';
4950
import '../track/local/local.dart';
5051
import '../track/local/video.dart';
@@ -70,6 +71,9 @@ class LocalParticipant extends Participant<LocalTrackPublication> {
7071
// Pending signal request responses (keyed by requestId)
7172
final Map<int, Completer<void>> _pendingSignalRequests = {};
7273

74+
// Serializes publish operations to prevent duplicate tracks from concurrent calls
75+
final _publishRunner = SerialRunner<LocalTrackPublication?>();
76+
7377
LocalParticipant._({
7478
required Room room,
7579
required String sid,
@@ -156,6 +160,14 @@ class LocalParticipant extends Participant<LocalTrackPublication> {
156160
Future<LocalTrackPublication<LocalAudioTrack>> publishAudioTrack(
157161
LocalAudioTrack track, {
158162
AudioPublishOptions? publishOptions,
163+
}) async {
164+
final result = await _publishRunner.run(() => _publishAudioTrack(track, publishOptions: publishOptions));
165+
return result! as LocalTrackPublication<LocalAudioTrack>;
166+
}
167+
168+
Future<LocalTrackPublication<LocalAudioTrack>?> _publishAudioTrack(
169+
LocalAudioTrack track, {
170+
AudioPublishOptions? publishOptions,
159171
}) async {
160172
if (audioTrackPublications.any((e) => e.track?.mediaStreamTrack.id == track.mediaStreamTrack.id)) {
161173
throw TrackPublishException('track already exists');
@@ -250,6 +262,14 @@ class LocalParticipant extends Participant<LocalTrackPublication> {
250262
Future<LocalTrackPublication<LocalVideoTrack>> publishVideoTrack(
251263
LocalVideoTrack track, {
252264
VideoPublishOptions? publishOptions,
265+
}) async {
266+
final result = await _publishRunner.run(() => _publishVideoTrack(track, publishOptions: publishOptions));
267+
return result! as LocalTrackPublication<LocalVideoTrack>;
268+
}
269+
270+
Future<LocalTrackPublication<LocalVideoTrack>?> _publishVideoTrack(
271+
LocalVideoTrack track, {
272+
VideoPublishOptions? publishOptions,
253273
}) async {
254274
if (videoTrackPublications.any((e) => e.track?.mediaStreamTrack.id == track.mediaStreamTrack.id)) {
255275
throw TrackPublishException('track already exists');
@@ -758,77 +778,79 @@ class LocalParticipant extends Participant<LocalTrackPublication> {
758778
{bool? captureScreenAudio,
759779
AudioCaptureOptions? audioCaptureOptions,
760780
CameraCaptureOptions? cameraCaptureOptions,
761-
ScreenShareCaptureOptions? screenShareCaptureOptions}) async {
762-
logger.fine('setSourceEnabled(source: $source, enabled: $enabled)');
763-
764-
if (TrackSource.screenShareVideo == source && lkPlatformIsWebMobile()) {
765-
throw TrackCreateException('Screen sharing is not supported on mobile devices');
766-
}
781+
ScreenShareCaptureOptions? screenShareCaptureOptions}) {
782+
return _publishRunner.run(() async {
783+
if (TrackSource.screenShareVideo == source && lkPlatformIsWebMobile()) {
784+
throw TrackCreateException('Screen sharing is not supported on mobile devices');
785+
}
767786

768-
final publication = getTrackPublicationBySource(source);
769-
if (publication != null) {
770-
final stopOnMute = switch (publication.source) {
771-
TrackSource.camera => cameraCaptureOptions?.stopCameraCaptureOnMute ?? true,
772-
TrackSource.microphone => audioCaptureOptions?.stopAudioCaptureOnMute ?? true,
773-
_ => true,
774-
};
775-
if (enabled) {
776-
await publication.unmute(stopOnMute: stopOnMute);
777-
} else {
778-
if (source == TrackSource.screenShareVideo) {
779-
await removePublishedTrack(publication.sid);
780-
final screenAudio = getTrackPublicationBySource(TrackSource.screenShareAudio);
781-
if (screenAudio != null) {
782-
await removePublishedTrack(screenAudio.sid);
783-
}
787+
logger.fine('setSourceEnabled(source: $source, enabled: $enabled)');
788+
789+
final publication = getTrackPublicationBySource(source);
790+
if (publication != null) {
791+
final stopOnMute = switch (publication.source) {
792+
TrackSource.camera => cameraCaptureOptions?.stopCameraCaptureOnMute ?? true,
793+
TrackSource.microphone => audioCaptureOptions?.stopAudioCaptureOnMute ?? true,
794+
_ => true,
795+
};
796+
if (enabled) {
797+
await publication.unmute(stopOnMute: stopOnMute);
784798
} else {
785-
await publication.mute(stopOnMute: stopOnMute);
786-
}
787-
}
788-
return publication;
789-
} else if (enabled) {
790-
if (source == TrackSource.camera) {
791-
final CameraCaptureOptions captureOptions =
792-
cameraCaptureOptions ?? room.roomOptions.defaultCameraCaptureOptions;
793-
final track = await LocalVideoTrack.createCameraTrack(captureOptions);
794-
return await publishVideoTrack(track);
795-
} else if (source == TrackSource.microphone) {
796-
final AudioCaptureOptions captureOptions = audioCaptureOptions ?? room.roomOptions.defaultAudioCaptureOptions;
797-
final track = await LocalAudioTrack.create(captureOptions);
798-
return await publishAudioTrack(track);
799-
} else if (source == TrackSource.screenShareVideo) {
800-
ScreenShareCaptureOptions captureOptions =
801-
screenShareCaptureOptions ?? room.roomOptions.defaultScreenShareCaptureOptions;
802-
803-
if (lkPlatformIs(PlatformType.iOS) && !BroadcastManager().isBroadcasting) {
804-
// Wait until broadcasting to publish track
805-
await BroadcastManager().requestActivation();
806-
return null;
799+
if (source == TrackSource.screenShareVideo) {
800+
await removePublishedTrack(publication.sid);
801+
final screenAudio = getTrackPublicationBySource(TrackSource.screenShareAudio);
802+
if (screenAudio != null) {
803+
await removePublishedTrack(screenAudio.sid);
804+
}
805+
} else {
806+
await publication.mute(stopOnMute: stopOnMute);
807+
}
807808
}
809+
return publication;
810+
} else if (enabled) {
811+
if (source == TrackSource.camera) {
812+
final CameraCaptureOptions captureOptions =
813+
cameraCaptureOptions ?? room.roomOptions.defaultCameraCaptureOptions;
814+
final track = await LocalVideoTrack.createCameraTrack(captureOptions);
815+
return await _publishVideoTrack(track);
816+
} else if (source == TrackSource.microphone) {
817+
final AudioCaptureOptions captureOptions = audioCaptureOptions ?? room.roomOptions.defaultAudioCaptureOptions;
818+
final track = await LocalAudioTrack.create(captureOptions);
819+
return await _publishAudioTrack(track);
820+
} else if (source == TrackSource.screenShareVideo) {
821+
ScreenShareCaptureOptions captureOptions =
822+
screenShareCaptureOptions ?? room.roomOptions.defaultScreenShareCaptureOptions;
823+
824+
if (lkPlatformIs(PlatformType.iOS) && !BroadcastManager().isBroadcasting) {
825+
// Wait until broadcasting to publish track
826+
await BroadcastManager().requestActivation();
827+
return null;
828+
}
808829

809-
/// When capturing chrome table audio, we can't capture audio/video
810-
/// track separately, it has to be returned once in getDisplayMedia,
811-
/// so we publish it twice here, but only return videoTrack to user.
812-
if (captureScreenAudio ?? false) {
813-
captureOptions = captureOptions.copyWith(captureScreenAudio: true);
814-
final tracks = await LocalVideoTrack.createScreenShareTracksWithAudio(captureOptions);
815-
LocalTrackPublication<LocalVideoTrack>? publication;
816-
for (final track in tracks) {
817-
if (track is LocalVideoTrack) {
818-
publication = await publishVideoTrack(track);
819-
} else if (track is LocalAudioTrack) {
820-
await publishAudioTrack(track);
830+
/// When capturing chrome table audio, we can't capture audio/video
831+
/// track separately, it has to be returned once in getDisplayMedia,
832+
/// so we publish it twice here, but only return videoTrack to user.
833+
if (captureScreenAudio ?? false) {
834+
captureOptions = captureOptions.copyWith(captureScreenAudio: true);
835+
final tracks = await LocalVideoTrack.createScreenShareTracksWithAudio(captureOptions);
836+
LocalTrackPublication<LocalVideoTrack>? publication;
837+
for (final track in tracks) {
838+
if (track is LocalVideoTrack) {
839+
publication = await _publishVideoTrack(track);
840+
} else if (track is LocalAudioTrack) {
841+
await _publishAudioTrack(track);
842+
}
821843
}
822-
}
823844

824-
/// just return the video track publication
825-
return publication;
845+
/// just return the video track publication
846+
return publication;
847+
}
848+
final track = await LocalVideoTrack.createScreenShareTrack(captureOptions);
849+
return await _publishVideoTrack(track);
826850
}
827-
final track = await LocalVideoTrack.createScreenShareTrack(captureOptions);
828-
return await publishVideoTrack(track);
829851
}
830-
}
831-
return null;
852+
return null;
853+
});
832854
}
833855

834856
bool _allParticipantsAllowed = true;

lib/src/support/serial_runner.dart

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright 2025 LiveKit, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import 'dart:async';
16+
17+
/// Serializes async operations so concurrent calls execute sequentially.
18+
///
19+
/// When [run] is called while a previous operation is still in progress,
20+
/// the new call waits for the previous one to complete before executing.
21+
/// This prevents race conditions from concurrent calls to the same
22+
/// async operation.
23+
///
24+
/// Equivalent to the Swift SDK's `SerialRunnerActor`.
25+
class SerialRunner<T> {
26+
Future<void>? _pending;
27+
28+
/// Whether an operation is currently in progress.
29+
bool get isRunning => _pending != null;
30+
31+
/// Runs [block] after any pending operation completes.
32+
///
33+
/// If no operation is pending, [block] executes immediately.
34+
/// If an operation is pending, waits for it to finish first.
35+
/// Errors from [block] propagate to the caller only, not to
36+
/// subsequent waiters.
37+
Future<T> run(Future<T> Function() block) async {
38+
while (_pending != null) {
39+
await _pending;
40+
}
41+
42+
final completer = Completer<void>();
43+
_pending = completer.future;
44+
try {
45+
return await block();
46+
} finally {
47+
completer.complete();
48+
_pending = null;
49+
}
50+
}
51+
}

test/core/room_e2e_test.dart

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import '../mock/websocket_mock.dart';
2929
import 'signal_client_test.dart';
3030

3131
void main() {
32+
TestWidgetsFlutterBinding.ensureInitialized();
3233
late E2EContainer container;
3334
late Room room;
3435
late MockWebSocketConnector ws;
@@ -213,6 +214,28 @@ void main() {
213214
expect(trackSubscribed.publication.track, isNotNull);
214215
});
215216
});
217+
218+
group('publish guards', () {
219+
test('concurrent setSourceEnabled serializes calls', () async {
220+
final lp = room.localParticipant!;
221+
222+
// Both calls will fail (no camera hardware in test), but the
223+
// SerialRunner should serialize them — the second waits for the
224+
// first to complete before executing.
225+
final future1 = lp.setSourceEnabled(TrackSource.camera, true);
226+
final future2 = lp.setSourceEnabled(TrackSource.camera, true);
227+
228+
// Both should fail with the same error (no camera), not with
229+
// duplicate track errors or unhandled exceptions.
230+
final results = await Future.wait([
231+
future1.catchError((_) => null),
232+
future2.catchError((_) => null),
233+
]);
234+
235+
// Both calls completed (didn't hang or deadlock).
236+
expect(results.length, 2);
237+
});
238+
});
216239
}
217240

218241
class _FakeMediaStream extends rtc.MediaStream {

0 commit comments

Comments
 (0)