Skip to content

Commit 9ed732b

Browse files
authored
Scope rtc thread requests to their associated PeerConnectionFactory lifecycle (#756)
* Scope rtc thread requests to their associated PeerConnectionFactory lifecycle * fix tests * spotless
1 parent f42ec3a commit 9ed732b

32 files changed

Lines changed: 540 additions & 88 deletions

.changeset/green-mails-thank.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"client-sdk-android": patch
3+
---
4+
5+
Fix race condition when releasing Room object

.changeset/yellow-timers-pump.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"client-sdk-android": patch
3+
---
4+
5+
Ensure room is disconnected before releasing resources

livekit-android-sdk/src/main/java/io/livekit/android/dagger/RTCModule.kt

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import android.media.MediaRecorder
2323
import android.os.Build
2424
import dagger.Module
2525
import dagger.Provides
26+
import dagger.Reusable
2627
import io.livekit.android.LiveKit
2728
import io.livekit.android.audio.AudioBufferCallbackDispatcher
2829
import io.livekit.android.audio.AudioProcessingController
@@ -38,6 +39,9 @@ import io.livekit.android.util.LoggingLevel
3839
import io.livekit.android.webrtc.CustomAudioProcessingFactory
3940
import io.livekit.android.webrtc.CustomVideoDecoderFactory
4041
import io.livekit.android.webrtc.CustomVideoEncoderFactory
42+
import io.livekit.android.webrtc.PeerConnectionFactoryManager
43+
import io.livekit.android.webrtc.peerconnection.RTCThreadToken
44+
import io.livekit.android.webrtc.peerconnection.RTCThreadTokenImpl
4145
import io.livekit.android.webrtc.peerconnection.executeBlockingOnRTCThread
4246
import io.livekit.android.webrtc.peerconnection.executeOnRTCThread
4347
import livekit.org.webrtc.AudioProcessingFactory
@@ -96,7 +100,7 @@ internal object RTCModule {
96100
@Named(InjectionNames.LIB_WEBRTC_INITIALIZATION)
97101
fun libWebrtcInitialization(appContext: Context): LibWebrtcInitialization {
98102
if (!hasInitializedWebrtc) {
99-
executeBlockingOnRTCThread {
103+
executeBlockingOnRTCThread(LibWebrtcInitializationThreadToken) {
100104
if (!hasInitializedWebrtc) {
101105
hasInitializedWebrtc = true
102106
PeerConnectionFactory.initialize(
@@ -334,7 +338,7 @@ internal object RTCModule {
334338

335339
@Provides
336340
@Singleton
337-
fun peerConnectionFactory(
341+
fun peerConnectionFactoryManager(
338342
@Suppress("UNUSED_PARAMETER")
339343
@Named(InjectionNames.LIB_WEBRTC_INITIALIZATION)
340344
webrtcInitialization: LibWebrtcInitialization,
@@ -345,9 +349,9 @@ internal object RTCModule {
345349
peerConnectionFactoryOptions: PeerConnectionFactory.Options?,
346350
memoryManager: CloseableManager,
347351
audioProcessingFactory: AudioProcessingFactory,
348-
): PeerConnectionFactory {
349-
return executeBlockingOnRTCThread {
350-
PeerConnectionFactory.builder()
352+
): PeerConnectionFactoryManager {
353+
return executeBlockingOnRTCThread(LibWebrtcInitializationThreadToken) {
354+
val peerConnectionFactory = PeerConnectionFactory.builder()
351355
.setAudioDeviceModule(audioDeviceModule)
352356
.setAudioProcessingFactory(audioProcessingFactory)
353357
.setVideoEncoderFactory(videoEncoderFactory)
@@ -358,14 +362,31 @@ internal object RTCModule {
358362
}
359363
}
360364
.createPeerConnectionFactory()
365+
return@executeBlockingOnRTCThread PeerConnectionFactoryManager(peerConnectionFactory)
361366
.apply {
362367
memoryManager.registerClosable {
363-
executeOnRTCThread {
368+
executeOnRTCThread(RTCThreadTokenImpl(this)) {
364369
dispose()
365370
}
366371
}
367372
}
368-
}
373+
}!!
374+
}
375+
376+
@Provides
377+
@Singleton
378+
fun peerConnectionFactory(
379+
peerConnectionFactoryManager: PeerConnectionFactoryManager,
380+
): PeerConnectionFactory {
381+
return peerConnectionFactoryManager.peerConnectionFactory
382+
}
383+
384+
@Provides
385+
@Reusable
386+
fun rtcThreadToken(
387+
peerConnectionFactoryManager: PeerConnectionFactoryManager,
388+
): RTCThreadToken {
389+
return RTCThreadTokenImpl(peerConnectionFactoryManager)
369390
}
370391

371392
@Provides
@@ -385,6 +406,17 @@ internal object RTCModule {
385406
}
386407

387408
/**
409+
* Used to ensure [RTCModule.libWebrtcInitialization] is called prior to other methods.
410+
*
388411
* @suppress
389412
*/
390413
object LibWebrtcInitialization
414+
415+
/**
416+
* To be used **only** for initialization and creation of PeerConnectionFactories,
417+
* as those don't need the native threads and cannot deadlock.
418+
*/
419+
private object LibWebrtcInitializationThreadToken : RTCThreadToken {
420+
override val isDisposed: Boolean
421+
get() = false
422+
}

livekit-android-sdk/src/main/java/io/livekit/android/room/PeerConnectionTransport.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import io.livekit.android.webrtc.getFmtps
3838
import io.livekit.android.webrtc.getMsid
3939
import io.livekit.android.webrtc.getRtps
4040
import io.livekit.android.webrtc.isConnected
41+
import io.livekit.android.webrtc.peerconnection.RTCThreadToken
4142
import io.livekit.android.webrtc.peerconnection.executeBlockingOnRTCThread
4243
import io.livekit.android.webrtc.peerconnection.launchBlockingOnRTCThread
4344
import kotlinx.coroutines.CoroutineDispatcher
@@ -72,16 +73,17 @@ constructor(
7273
private val ioDispatcher: CoroutineDispatcher,
7374
connectionFactory: PeerConnectionFactory,
7475
private val sdpFactory: SdpFactory,
76+
private val rtcThreadToken: RTCThreadToken,
7577
) {
7678
private val coroutineScope = CoroutineScope(ioDispatcher + SupervisorJob())
7779

7880
@VisibleForTesting
79-
internal val peerConnection: PeerConnection = executeBlockingOnRTCThread {
81+
internal val peerConnection: PeerConnection = executeBlockingOnRTCThread(rtcThreadToken) {
8082
connectionFactory.createPeerConnection(
8183
config,
8284
pcObserver,
8385
) ?: throw IllegalStateException("peer connection creation failed?")
84-
}
86+
}!!
8587
private val pendingCandidates = mutableListOf<IceCandidate>()
8688
private var restartingIce: Boolean = false
8789

@@ -329,7 +331,7 @@ constructor(
329331
if (isClosed()) {
330332
return null
331333
}
332-
return launchBlockingOnRTCThread {
334+
return launchBlockingOnRTCThread(rtcThreadToken) {
333335
return@launchBlockingOnRTCThread if (isClosed()) {
334336
null
335337
} else {
@@ -344,7 +346,7 @@ constructor(
344346
if (isClosed()) {
345347
return null
346348
}
347-
return executeBlockingOnRTCThread {
349+
return executeBlockingOnRTCThread(rtcThreadToken) {
348350
return@executeBlockingOnRTCThread if (isClosed()) {
349351
null
350352
} else {

livekit-android-sdk/src/main/java/io/livekit/android/room/PublisherTransportObserver.kt

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 LiveKit, Inc.
2+
* Copyright 2023-2025 LiveKit, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ import io.livekit.android.room.util.PeerConnectionStateObservable
2020
import io.livekit.android.util.FlowObservable
2121
import io.livekit.android.util.LKLog
2222
import io.livekit.android.util.flowDelegate
23+
import io.livekit.android.webrtc.peerconnection.RTCThreadToken
2324
import io.livekit.android.webrtc.peerconnection.executeOnRTCThread
2425
import livekit.LivekitRtc
2526
import livekit.org.webrtc.CandidatePairChangeEvent
@@ -34,6 +35,7 @@ import livekit.org.webrtc.SessionDescription
3435
internal class PublisherTransportObserver(
3536
private val engine: RTCEngine,
3637
private val client: SignalClient,
38+
private val rtcThreadToken: RTCThreadToken,
3739
) : PeerConnection.Observer, PeerConnectionTransport.Listener, PeerConnectionStateObservable {
3840

3941
var connectionChangeListener: PeerConnectionStateListener? = null
@@ -44,15 +46,15 @@ internal class PublisherTransportObserver(
4446
private set
4547

4648
override fun onIceCandidate(iceCandidate: IceCandidate?) {
47-
executeOnRTCThread {
49+
executeOnRTCThread(rtcThreadToken) {
4850
val candidate = iceCandidate ?: return@executeOnRTCThread
4951
LKLog.v { "onIceCandidate: $candidate" }
5052
client.sendCandidate(candidate, target = LivekitRtc.SignalTarget.PUBLISHER)
5153
}
5254
}
5355

5456
override fun onRenegotiationNeeded() {
55-
executeOnRTCThread {
57+
executeOnRTCThread(rtcThreadToken) {
5658
engine.negotiatePublisher()
5759
}
5860
}
@@ -62,7 +64,7 @@ internal class PublisherTransportObserver(
6264
}
6365

6466
override fun onOffer(sd: SessionDescription) {
65-
executeOnRTCThread {
67+
executeOnRTCThread(rtcThreadToken) {
6668
client.sendOffer(sd)
6769
}
6870
}
@@ -71,7 +73,7 @@ internal class PublisherTransportObserver(
7173
}
7274

7375
override fun onConnectionChange(newState: PeerConnection.PeerConnectionState) {
74-
executeOnRTCThread {
76+
executeOnRTCThread(rtcThreadToken) {
7577
LKLog.v { "onConnection new state: $newState" }
7678
connectionChangeListener?.invoke(newState)
7779
connectionState = newState

livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import io.livekit.android.webrtc.RTCStatsGetter
4848
import io.livekit.android.webrtc.copy
4949
import io.livekit.android.webrtc.isConnected
5050
import io.livekit.android.webrtc.isDisconnected
51+
import io.livekit.android.webrtc.peerconnection.RTCThreadToken
5152
import io.livekit.android.webrtc.peerconnection.executeBlockingOnRTCThread
5253
import io.livekit.android.webrtc.peerconnection.launchBlockingOnRTCThread
5354
import io.livekit.android.webrtc.toProtoSessionDescription
@@ -107,6 +108,7 @@ internal constructor(
107108
private val pctFactory: PeerConnectionTransport.Factory,
108109
@Named(InjectionNames.DISPATCHER_IO)
109110
private val ioDispatcher: CoroutineDispatcher,
111+
private val rtcThreadToken: RTCThreadToken,
110112
) : SignalClient.Listener {
111113
internal var listener: Listener? = null
112114

@@ -160,8 +162,8 @@ internal constructor(
160162
internal val serverVersion: Semver?
161163
get() = client.serverVersion
162164

163-
private val publisherObserver = PublisherTransportObserver(this, client)
164-
private val subscriberObserver = SubscriberTransportObserver(this, client)
165+
private val publisherObserver = PublisherTransportObserver(this, client, rtcThreadToken)
166+
private val subscriberObserver = SubscriberTransportObserver(this, client, rtcThreadToken)
165167

166168
internal var publisher: PeerConnectionTransport? = null
167169
private var subscriber: PeerConnectionTransport? = null
@@ -243,7 +245,7 @@ internal constructor(
243245
}
244246

245247
private suspend fun configure(joinResponse: JoinResponse, connectOptions: ConnectOptions) {
246-
launchBlockingOnRTCThread {
248+
launchBlockingOnRTCThread(rtcThreadToken) {
247249
configurationLock.withCheckLock(
248250
{
249251
ensureActive()
@@ -316,7 +318,7 @@ internal constructor(
316318
reliableInit,
317319
).also { dataChannel ->
318320

319-
val dataChannelManager = DataChannelManager(dataChannel, DataChannelObserver(dataChannel))
321+
val dataChannelManager = DataChannelManager(dataChannel, DataChannelObserver(dataChannel), rtcThreadToken)
320322
reliableDataChannelManager = dataChannelManager
321323
dataChannel.registerObserver(dataChannelManager)
322324
reliableBufferedAmountJob?.cancel()
@@ -339,7 +341,7 @@ internal constructor(
339341
LOSSY_DATA_CHANNEL_LABEL,
340342
lossyInit,
341343
).also { dataChannel ->
342-
lossyDataChannelManager = DataChannelManager(dataChannel, DataChannelObserver(dataChannel))
344+
lossyDataChannelManager = DataChannelManager(dataChannel, DataChannelObserver(dataChannel), rtcThreadToken)
343345
dataChannel.registerObserver(lossyDataChannelManager)
344346
}
345347
}
@@ -432,7 +434,7 @@ internal constructor(
432434
}
433435

434436
private fun closeResources(reason: String) {
435-
executeBlockingOnRTCThread {
437+
executeBlockingOnRTCThread(rtcThreadToken) {
436438
runBlocking {
437439
configurationLock.withLock {
438440
publisherObserver.connectionChangeListener = null

livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,33 @@ import io.livekit.android.audio.CommunicationWorkaround
3939
import io.livekit.android.dagger.InjectionNames
4040
import io.livekit.android.e2ee.E2EEManager
4141
import io.livekit.android.e2ee.E2EEOptions
42-
import io.livekit.android.events.*
42+
import io.livekit.android.events.BroadcastEventBus
43+
import io.livekit.android.events.DisconnectReason
44+
import io.livekit.android.events.ParticipantEvent
45+
import io.livekit.android.events.RoomEvent
46+
import io.livekit.android.events.collect
4347
import io.livekit.android.memory.CloseableManager
4448
import io.livekit.android.renderer.TextureViewRenderer
4549
import io.livekit.android.room.datastream.incoming.IncomingDataStreamManager
4650
import io.livekit.android.room.metrics.collectMetrics
4751
import io.livekit.android.room.network.NetworkCallbackManagerFactory
48-
import io.livekit.android.room.participant.*
52+
import io.livekit.android.room.participant.AudioTrackPublishDefaults
53+
import io.livekit.android.room.participant.ConnectionQuality
54+
import io.livekit.android.room.participant.LocalParticipant
55+
import io.livekit.android.room.participant.Participant
56+
import io.livekit.android.room.participant.ParticipantListener
57+
import io.livekit.android.room.participant.RemoteParticipant
58+
import io.livekit.android.room.participant.RpcHandler
59+
import io.livekit.android.room.participant.VideoTrackPublishDefaults
60+
import io.livekit.android.room.participant.publishTracksInfo
4961
import io.livekit.android.room.provisions.LKObjects
5062
import io.livekit.android.room.rpc.RpcManager
51-
import io.livekit.android.room.track.*
63+
import io.livekit.android.room.track.LocalAudioTrackOptions
64+
import io.livekit.android.room.track.LocalTrackPublication
65+
import io.livekit.android.room.track.LocalVideoTrackOptions
66+
import io.livekit.android.room.track.RemoteTrackPublication
67+
import io.livekit.android.room.track.Track
68+
import io.livekit.android.room.track.TrackPublication
5269
import io.livekit.android.room.types.toSDKType
5370
import io.livekit.android.room.util.ConnectionWarmer
5471
import io.livekit.android.util.FlowObservable
@@ -57,15 +74,31 @@ import io.livekit.android.util.flow
5774
import io.livekit.android.util.flowDelegate
5875
import io.livekit.android.util.invoke
5976
import io.livekit.android.webrtc.getFilteredStats
60-
import kotlinx.coroutines.*
77+
import kotlinx.coroutines.CancellationException
78+
import kotlinx.coroutines.CoroutineDispatcher
79+
import kotlinx.coroutines.CoroutineExceptionHandler
80+
import kotlinx.coroutines.CoroutineScope
81+
import kotlinx.coroutines.SupervisorJob
82+
import kotlinx.coroutines.cancel
83+
import kotlinx.coroutines.coroutineScope
84+
import kotlinx.coroutines.ensureActive
6185
import kotlinx.coroutines.flow.filterNotNull
6286
import kotlinx.coroutines.flow.first
87+
import kotlinx.coroutines.job
88+
import kotlinx.coroutines.launch
89+
import kotlinx.coroutines.runBlocking
6390
import kotlinx.coroutines.sync.Mutex
6491
import kotlinx.coroutines.sync.withLock
6592
import kotlinx.serialization.Serializable
6693
import livekit.LivekitModels
6794
import livekit.LivekitRtc
68-
import livekit.org.webrtc.*
95+
import livekit.org.webrtc.EglBase
96+
import livekit.org.webrtc.MediaStream
97+
import livekit.org.webrtc.MediaStreamTrack
98+
import livekit.org.webrtc.RTCStatsCollectorCallback
99+
import livekit.org.webrtc.RendererCommon
100+
import livekit.org.webrtc.RtpReceiver
101+
import livekit.org.webrtc.SurfaceViewRenderer
69102
import livekit.org.webrtc.audio.AudioDeviceModule
70103
import java.net.URI
71104
import java.util.Date
@@ -110,6 +143,7 @@ constructor(
110143
private val connectionWarmer: ConnectionWarmer,
111144
private val audioRecordPrewarmer: AudioRecordPrewarmer,
112145
private val incomingDataStreamManager: IncomingDataStreamManager,
146+
private val remoteParticipantFactory: RemoteParticipant.Factory,
113147
) : RTCEngine.Listener, ParticipantListener, RpcManager, IncomingDataStreamManager by incomingDataStreamManager {
114148

115149
private lateinit var coroutineScope: CoroutineScope
@@ -528,6 +562,9 @@ constructor(
528562
* Disconnect from the room.
529563
*/
530564
fun disconnect() {
565+
if (state == State.DISCONNECTED) {
566+
return
567+
}
531568
engine.client.sendLeave()
532569
handleDisconnect(DisconnectReason.CLIENT_INITIATED)
533570
}
@@ -571,6 +608,7 @@ constructor(
571608
* must be created.
572609
*/
573610
fun release() {
611+
disconnect()
574612
closeableManager.close()
575613
}
576614

@@ -744,7 +782,7 @@ constructor(
744782
return participant
745783
}
746784

747-
participant = RemoteParticipant(info, engine.client, ioDispatcher, defaultDispatcher)
785+
participant = remoteParticipantFactory.create(info)
748786
participant.internalListener = this
749787

750788
coroutineScope.launch {

0 commit comments

Comments
 (0)