@@ -40,7 +40,6 @@ import kotlinx.coroutines.delay
4040import kotlinx.coroutines.flow.MutableSharedFlow
4141import kotlinx.coroutines.launch
4242import kotlinx.coroutines.suspendCancellableCoroutine
43- import kotlinx.coroutines.withTimeout
4443import kotlinx.serialization.decodeFromString
4544import kotlinx.serialization.encodeToString
4645import kotlinx.serialization.json.Json
@@ -63,7 +62,6 @@ import java.util.Date
6362import javax.inject.Inject
6463import javax.inject.Named
6564import javax.inject.Singleton
66- import kotlin.time.Duration.Companion.seconds
6765
6866/* *
6967 * SignalClient to LiveKit WS servers
@@ -99,6 +97,7 @@ constructor(
9997
10098 // join will always return a JoinResponse.
10199 // reconnect will return a ReconnectResponse or a Unit if a different response was received.
100+ @Volatile
102101 private var joinContinuation: CancellableContinuation <
103102 Either <
104103 JoinResponse ,
@@ -186,19 +185,17 @@ constructor(
186185 .addHeader(" Authorization" , " Bearer $token " )
187186 .build()
188187
189- return withTimeout(5 .seconds) {
190- suspendCancellableCoroutine { cont ->
191- // Wait for join response through WebSocketListener
192- joinContinuation = cont
193- // When a coroutine is canceled, WebSocket must be interrupted.
194- cont.invokeOnCancellation {
195- LKLog .w { " connect cancelled, abort websocket" }
196- currentWs?.cancel()
197- currentWs = null
198- joinContinuation = null
199- }
200- currentWs = websocketFactory.newWebSocket(request, this @SignalClient)
201- }
188+ return suspendCancellableCoroutine { cont ->
189+ // Wait for join response through WebSocketListener
190+ joinContinuation = cont
191+ cont.invokeOnCancellation {
192+ // If the coroutine is cancelled, websocket needs to be cancelled.
193+ // onFailure will handle cleanup.
194+ LKLog .v { " connect cancelled, abort websocket" }
195+ currentWs?.cancel()
196+ joinContinuation = null
197+ }
198+ currentWs = websocketFactory.newWebSocket(request, this @SignalClient)
202199 }
203200 }
204201
@@ -362,6 +359,7 @@ constructor(
362359 listener?.onError(t)
363360 joinContinuation?.cancel(t)
364361 }
362+ joinContinuation = null
365363
366364 val wasConnected = isConnected
367365
@@ -671,6 +669,7 @@ constructor(
671669 version = serverVersion
672670 )
673671 joinContinuation?.resumeWith(Result .success(Either .Left (response.join)))
672+ joinContinuation = null
674673 } else if (response.hasLeave()) {
675674 // Some reconnects may immediately send leave back without a join response first.
676675 handleSignalResponseImpl(ws, response)
@@ -685,8 +684,10 @@ constructor(
685684
686685 if (response.hasReconnect()) {
687686 joinContinuation?.resumeWith(Result .success(Either .Right (Either .Left (response.reconnect))))
687+ joinContinuation = null
688688 } else {
689689 joinContinuation?.resumeWith(Result .success(Either .Right (Either .Right (Unit ))))
690+ joinContinuation = null
690691 // Non-reconnect response, handle normally
691692 shouldProcessMessage = true
692693 }
@@ -879,7 +880,7 @@ constructor(
879880 pingJob = null
880881 pongJob?.cancel()
881882 pongJob = null
882- currentWs?.cancel( )
883+ currentWs?.close(code, reason )
883884 currentWs = null
884885 joinContinuation?.cancel()
885886 joinContinuation = null
0 commit comments