Skip to content

Commit cb2e12a

Browse files
Fixed RTCEngine.addTrack leaking pendingTrackResolvers on cancellation (#920)
When the AddTrack request timed out via withDeadline (20s) or its caller coroutine was cancelled, the cid stayed in pendingTrackResolvers because suspendCancellableCoroutine had no invokeOnCancellation cleanup. A retry of the same track instance (same cid) hit the duplicate guard and threw DuplicateTrackException, blocking republish until the connection closed, the signal reconnected, or the server eventually replied for that cid. Added cont.invokeOnCancellation that removes the entry under the existing synchronized monitor, with an identity check so a stale handler cannot evict a freshly-registered resolver bound to the same cid. Added two regression tests in RTCEngineMockE2ETest covering the timeout and caller-cancellation paths. Co-authored-by: davidliu <davidliu@deviange.net>
1 parent 1d1995f commit cb2e12a

3 files changed

Lines changed: 135 additions & 0 deletions

File tree

.changeset/sturdy-foxes-publish.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"client-sdk-android": patch
3+
---
4+
5+
Fixed RTCEngine.addTrack leaking pendingTrackResolvers entries on timeout or caller cancellation, which previously caused subsequent publishes of the same track to fail with DuplicateTrackException until the connection was torn down.

livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,13 @@ internal constructor(
403403
synchronized(pendingTrackResolvers) {
404404
pendingTrackResolvers[cid] = cont
405405
}
406+
cont.invokeOnCancellation {
407+
synchronized(pendingTrackResolvers) {
408+
if (pendingTrackResolvers[cid] === cont) {
409+
pendingTrackResolvers.remove(cid)
410+
}
411+
}
412+
}
406413
client.sendAddTrack(
407414
cid = cid,
408415
name = name,

livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,31 @@
1717
package io.livekit.android.room
1818

1919
import com.google.protobuf.ByteString
20+
import io.livekit.android.room.track.TrackException
2021
import io.livekit.android.test.MockE2ETest
2122
import io.livekit.android.test.events.FlowCollector
2223
import io.livekit.android.test.mock.MockDataChannel
2324
import io.livekit.android.test.mock.MockPeerConnection
2425
import io.livekit.android.test.mock.SignalRequestHandler
2526
import io.livekit.android.test.mock.TestData
2627
import io.livekit.android.test.util.toPBByteString
28+
import io.livekit.android.util.TimeoutException
2729
import io.livekit.android.util.flow
2830
import io.livekit.android.util.toOkioByteString
31+
import kotlinx.coroutines.CoroutineScope
2932
import kotlinx.coroutines.ExperimentalCoroutinesApi
33+
import kotlinx.coroutines.SupervisorJob
34+
import kotlinx.coroutines.async
35+
import kotlinx.coroutines.cancel
3036
import kotlinx.coroutines.test.advanceUntilIdle
37+
import kotlinx.coroutines.test.runCurrent
3138
import livekit.LivekitModels
3239
import livekit.LivekitModels.DataPacket
3340
import livekit.LivekitRtc
3441
import livekit.org.webrtc.PeerConnection
3542
import org.junit.Assert
3643
import org.junit.Assert.assertEquals
44+
import org.junit.Assert.assertFalse
3745
import org.junit.Assert.assertTrue
3846
import org.junit.Before
3947
import org.junit.Test
@@ -370,6 +378,121 @@ class RTCEngineMockE2ETest : MockE2ETest() {
370378
assertEquals(TestData.JOIN.join.participant.sid, sid)
371379
}
372380

381+
/**
382+
* Regression: an AddTrack timeout used to leave the cid in pendingTrackResolvers,
383+
* poisoning every subsequent publish of the same track with DuplicateTrackException
384+
* until the connection was torn down (or the server eventually responded.
385+
*/
386+
@Test
387+
fun addTrackTimeoutDoesNotPoisonRetry() = runTest {
388+
connect()
389+
// The default mock handler auto-replies to ADD_TRACK; remove it so we can
390+
// simulate the "server never responds" case that triggers the timeout.
391+
wsFactory.unregisterSignalRequestHandler(wsFactory.defaultSignalRequestHandler)
392+
393+
// Use a SupervisorJob so timeout/cancellation in addTrack does not cancel the test scope.
394+
val supervisor = CoroutineScope(coroutineRule.dispatcher + SupervisorJob())
395+
try {
396+
val cid = TestData.LOCAL_TRACK_PUBLISHED.trackPublished.cid
397+
398+
val firstPublish = supervisor.async {
399+
rtcEngine.addTrack(cid, "audio", LivekitModels.TrackType.AUDIO, stream = null)
400+
}
401+
runCurrent()
402+
403+
// Push past the 20s AddTrack deadline without the server replying.
404+
testScheduler.advanceTimeBy(21_000)
405+
runCurrent()
406+
407+
assertTrue("firstPublish should be completed", firstPublish.isCompleted)
408+
val firstFailure = firstPublish.getCompletionExceptionOrNull()
409+
assertTrue(
410+
"Expected TimeoutException, got $firstFailure",
411+
firstFailure is TimeoutException,
412+
)
413+
414+
// Retry with the same cid — must not be rejected by the duplicate guard.
415+
val secondPublish = supervisor.async {
416+
rtcEngine.addTrack(cid, "audio", LivekitModels.TrackType.AUDIO, stream = null)
417+
}
418+
// runCurrent (not advanceUntilIdle): otherwise the new 20s deadline fires
419+
// before the simulated server response is delivered.
420+
runCurrent()
421+
422+
if (secondPublish.isCompleted) {
423+
val syncFailure = secondPublish.getCompletionExceptionOrNull()
424+
assertFalse(
425+
"Retry must not fail synchronously with DuplicateTrackException, got $syncFailure",
426+
syncFailure is TrackException.DuplicateTrackException,
427+
)
428+
}
429+
430+
// Server now replies for the retry; the second publish should resolve cleanly.
431+
simulateMessageFromServer(TestData.LOCAL_TRACK_PUBLISHED)
432+
runCurrent()
433+
434+
assertTrue("secondPublish should be completed", secondPublish.isCompleted)
435+
val secondFailure = secondPublish.getCompletionExceptionOrNull()
436+
assertTrue("Retry should have succeeded, got $secondFailure", secondFailure == null)
437+
assertEquals(
438+
TestData.LOCAL_TRACK_PUBLISHED.trackPublished.track.sid,
439+
secondPublish.getCompleted().sid,
440+
)
441+
} finally {
442+
supervisor.cancel()
443+
}
444+
}
445+
446+
/**
447+
* Regression: caller cancellation of an in-flight addTrack must also clean up
448+
* the pendingTrackResolvers entry so the same cid can be retried.
449+
*/
450+
@Test
451+
fun addTrackCallerCancellationDoesNotPoisonRetry() = runTest {
452+
connect()
453+
wsFactory.unregisterSignalRequestHandler(wsFactory.defaultSignalRequestHandler)
454+
455+
val supervisor = CoroutineScope(coroutineRule.dispatcher + SupervisorJob())
456+
try {
457+
val cid = TestData.LOCAL_TRACK_PUBLISHED.trackPublished.cid
458+
459+
val firstPublish = supervisor.async {
460+
rtcEngine.addTrack(cid, "audio", LivekitModels.TrackType.AUDIO, stream = null)
461+
}
462+
runCurrent()
463+
assertFalse("firstPublish should still be in-flight", firstPublish.isCompleted)
464+
465+
firstPublish.cancel()
466+
runCurrent()
467+
assertTrue("firstPublish should be cancelled", firstPublish.isCancelled)
468+
469+
val secondPublish = supervisor.async {
470+
rtcEngine.addTrack(cid, "audio", LivekitModels.TrackType.AUDIO, stream = null)
471+
}
472+
runCurrent()
473+
474+
if (secondPublish.isCompleted) {
475+
val syncFailure = secondPublish.getCompletionExceptionOrNull()
476+
assertFalse(
477+
"Retry must not fail synchronously with DuplicateTrackException, got $syncFailure",
478+
syncFailure is TrackException.DuplicateTrackException,
479+
)
480+
}
481+
482+
simulateMessageFromServer(TestData.LOCAL_TRACK_PUBLISHED)
483+
runCurrent()
484+
485+
assertTrue("secondPublish should be completed", secondPublish.isCompleted)
486+
val secondFailure = secondPublish.getCompletionExceptionOrNull()
487+
assertTrue(
488+
"Retry after cancellation should have succeeded, got $secondFailure",
489+
secondFailure == null,
490+
)
491+
} finally {
492+
supervisor.cancel()
493+
}
494+
}
495+
373496
/**
374497
* After a soft reconnect, the server reports [LivekitRtc.ReconnectResponse.lastMessageSeq]. The engine
375498
* drops buffered reliable payloads up to that sequence (inclusive) and re-sends the remainder on the

0 commit comments

Comments
 (0)