Skip to content

Commit dce1cfb

Browse files
authored
DataChannel fixes and logging (#897)
* Properly cancel jobs awaiting on DataChannel low buffer instead of completing on dispose * Add some logging around when a publisher couldn't connect in ensurePublisherConnected * fill in missing changeset
1 parent abbf5f1 commit dce1cfb

8 files changed

Lines changed: 229 additions & 26 deletions

File tree

.changeset/warm-buttons-switch.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"client-sdk-android": patch
3+
---
4+
5+
Properly cancel jobs awaiting on DataChannel low buffer instead of completing on dispose

.changeset/wise-spoons-occur.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"client-sdk-android": patch
3+
---
4+
5+
Fix exception not being caught when using LocalParticipant.publishData

gradle.properties

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ org.gradle.jvmargs=-Xmx4g
1010
# When configured, Gradle will run in incubating parallel mode.
1111
# This option should only be used with decoupled projects. More details, visit
1212
# http://www.gradle.org/docs/current/userguide/multi_project_builds.html#sec:decoupled_projects
13-
# org.gradle.parallel=true
13+
org.gradle.parallel=true
1414
# AndroidX package structure to make it clearer which packages are bundled with the
1515
# Android operating system, and which are packaged with your app's APK
1616
# https://developer.android.com/topic/libraries/support-library/androidx-rn
@@ -20,6 +20,7 @@ android.enableJetifier=false
2020
# Kotlin code style for this project: "official" or "obsolete":
2121
kotlin.code.style=official
2222

23+
org.gradle.caching=true
2324
###############################################################
2425

2526
GROUP=io.livekit

livekit-android-sdk/src/main/java/io/livekit/android/coroutines/FlowExt.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2025 LiveKit, Inc.
2+
* Copyright 2025-2026 LiveKit, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,7 +18,6 @@ package io.livekit.android.coroutines
1818

1919
import kotlinx.coroutines.cancel
2020
import kotlinx.coroutines.coroutineScope
21-
import kotlinx.coroutines.currentCoroutineContext
2221
import kotlinx.coroutines.flow.Flow
2322
import kotlinx.coroutines.flow.collect
2423
import kotlinx.coroutines.flow.flow
@@ -47,12 +46,12 @@ fun <T> Flow<T>.cancelOnSignal(signal: Flow<Unit?>): Flow<T> = flow {
4746
coroutineScope {
4847
launch {
4948
signal.takeWhile { it == null }.collect()
50-
currentCoroutineContext().cancel()
49+
this@coroutineScope.cancel()
5150
}
5251

5352
collect {
5453
emit(it)
5554
}
56-
currentCoroutineContext().cancel()
55+
this@coroutineScope.cancel()
5756
}
5857
}

livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -804,17 +804,7 @@ internal constructor(
804804
} catch (e: Exception) {
805805
return
806806
}
807-
val manager = when (kind) {
808-
LivekitModels.DataPacket.Kind.RELIABLE -> reliableDataChannelManager
809-
LivekitModels.DataPacket.Kind.LOSSY -> lossyDataChannelManager
810-
LivekitModels.DataPacket.Kind.UNRECOGNIZED -> {
811-
throw IllegalArgumentException()
812-
}
813-
}
814-
815-
if (manager == null) {
816-
return
817-
}
807+
val manager = dataChannelManagerForKind(kind) ?: return
818808
manager.waitForBufferedAmountLow(DATA_CHANNEL_LOW_THRESHOLD.toLong())
819809
}
820810

@@ -825,7 +815,7 @@ internal constructor(
825815
}
826816

827817
if (publisher == null) {
828-
throw RoomException.ConnectException("Publisher isn't setup yet! Is room not connected?!")
818+
throw RoomException.ConnectException("Publisher isn't setup yet! Is the room connected?")
829819
}
830820

831821
if (publisher?.isConnected() != true &&
@@ -835,23 +825,38 @@ internal constructor(
835825
this.negotiatePublisher()
836826
}
837827

838-
val targetChannel = dataChannelForKind(kind) ?: throw RoomException.ConnectException("Publisher isn't setup yet! Is room not connected?!")
839-
if (targetChannel.state() == DataChannel.State.OPEN) {
828+
// Ensure data channel exists
829+
dataChannelForKind(kind) ?: throw RoomException.ConnectException("Publisher data channel not established for ${kind.name}; is the room connected?")
830+
831+
val channelManager = dataChannelManagerForKind(kind)
832+
?: throw RoomException.ConnectException("Publisher data channel manager not established for ${kind.name}; is the room connected?")
833+
if (channelManager.state == DataChannel.State.OPEN) {
840834
return
841835
}
842836

843837
// wait until publisher ICE connected
844838
val endTime = SystemClock.elapsedRealtime() + MAX_ICE_CONNECT_TIMEOUT_MS
845839
while (SystemClock.elapsedRealtime() < endTime) {
846-
if (publisher?.isConnected() == true && targetChannel.state() == DataChannel.State.OPEN) {
840+
if (publisher?.isConnected() == true &&
841+
channelManager.state == DataChannel.State.OPEN
842+
) {
847843
return
848844
}
849845
delay(50)
850846
}
851847

852-
throw RoomException.ConnectException("could not establish publisher connection")
848+
throw RoomException.ConnectException(
849+
"could not establish publisher connection: publisher state: ${publisherObserver.connectionState}, channel state: ${channelManager.state}",
850+
)
853851
}
854852

853+
private fun dataChannelManagerForKind(kind: LivekitModels.DataPacket.Kind): DataChannelManager? =
854+
when (kind) {
855+
LivekitModels.DataPacket.Kind.RELIABLE -> reliableDataChannelManager
856+
LivekitModels.DataPacket.Kind.LOSSY -> lossyDataChannelManager
857+
LivekitModels.DataPacket.Kind.UNRECOGNIZED -> throw IllegalArgumentException("Unknown data packet kind!")
858+
}
859+
855860
private fun dataChannelForKind(kind: LivekitModels.DataPacket.Kind) =
856861
when (kind) {
857862
LivekitModels.DataPacket.Kind.RELIABLE -> reliableDataChannel

livekit-android-sdk/src/main/java/io/livekit/android/webrtc/DataChannelManager.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2025 LiveKit, Inc.
2+
* Copyright 2025-2026 LiveKit, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,7 +31,7 @@ import livekit.org.webrtc.DataChannel
3131
/**
3232
* @suppress
3333
*/
34-
internal class DataChannelManager(
34+
class DataChannelManager(
3535
val dataChannel: DataChannel,
3636
private val dataMessageListener: DataChannel.Observer,
3737
val rtcThreadToken: RTCThreadToken,

livekit-android-test/src/main/java/io/livekit/android/test/mock/MockDataChannel.kt

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2025 LiveKit, Inc.
2+
* Copyright 2023-2026 LiveKit, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,44 +23,94 @@ class MockDataChannel(private val label: String?) : DataChannel(1L) {
2323
var observer: Observer? = null
2424
var sentBuffers = mutableListOf<Buffer>()
2525

26+
private var stateBacking: State = State.OPEN
27+
var state: State
28+
get() {
29+
ensureNotDisposed()
30+
return stateBacking
31+
}
32+
set(value) {
33+
ensureNotDisposed()
34+
if (value == stateBacking) {
35+
return
36+
}
37+
stateBacking = value
38+
observer?.onStateChange()
39+
}
40+
41+
private var bufferedAmountBacking: Long = 0L
42+
var bufferedAmount: Long
43+
get() {
44+
ensureNotDisposed()
45+
return bufferedAmountBacking
46+
}
47+
set(value) {
48+
if (value == bufferedAmountBacking) {
49+
return
50+
}
51+
val previous = bufferedAmountBacking
52+
bufferedAmountBacking = value
53+
observer?.onBufferedAmountChange(previous)
54+
}
55+
56+
var isDisposed = false
57+
2658
fun clearSentBuffers() {
2759
sentBuffers.clear()
2860
}
2961

3062
override fun registerObserver(observer: Observer?) {
63+
ensureNotDisposed()
3164
this.observer = observer
3265
}
3366

3467
override fun unregisterObserver() {
68+
ensureNotDisposed()
69+
this.observer = null
3570
}
3671

3772
override fun label(): String? {
73+
ensureNotDisposed()
3874
return label
3975
}
4076

4177
override fun id(): Int {
78+
ensureNotDisposed()
4279
return 0
4380
}
4481

4582
override fun state(): State {
46-
return State.OPEN
83+
ensureNotDisposed()
84+
return state
4785
}
4886

4987
override fun bufferedAmount(): Long {
50-
return 0
88+
ensureNotDisposed()
89+
return bufferedAmount
5190
}
5291

5392
override fun send(buffer: Buffer): Boolean {
93+
ensureNotDisposed()
5494
sentBuffers.add(buffer)
5595
return true
5696
}
5797

5898
override fun close() {
99+
ensureNotDisposed()
100+
state = State.CLOSED
59101
}
60102

61103
override fun dispose() {
104+
ensureNotDisposed()
105+
state = State.CLOSED
106+
isDisposed = true
62107
}
63108

109+
fun ensureNotDisposed() {
110+
if (isDisposed) {
111+
throw IllegalStateException("Data channel is closed!")
112+
}
113+
}
64114
fun simulateBufferReceived(buffer: Buffer) {
65115
observer?.onMessage(buffer)
66116
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright 2026 LiveKit, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.livekit.android.webrtc
18+
19+
import io.livekit.android.test.BaseTest
20+
import io.livekit.android.test.mock.MockDataChannel
21+
import io.livekit.android.test.mock.MockRTCThreadToken
22+
import kotlinx.coroutines.CancellationException
23+
import kotlinx.coroutines.ExperimentalCoroutinesApi
24+
import kotlinx.coroutines.async
25+
import kotlinx.coroutines.yield
26+
import livekit.org.webrtc.DataChannel
27+
import org.junit.Assert.assertEquals
28+
import org.junit.Assert.assertSame
29+
import org.junit.Assert.assertTrue
30+
import org.junit.Assert.fail
31+
import org.junit.Test
32+
import java.nio.ByteBuffer
33+
34+
@OptIn(ExperimentalCoroutinesApi::class)
35+
class DataChannelManagerTest : BaseTest() {
36+
37+
companion object {
38+
private val NOOP_OBSERVER = object : DataChannel.Observer {
39+
override fun onBufferedAmountChange(amount: Long) {}
40+
41+
override fun onStateChange() {}
42+
43+
override fun onMessage(buffer: DataChannel.Buffer) {}
44+
}
45+
}
46+
47+
@Test
48+
fun onMessage_forwardsToDelegate() {
49+
val channel = MockDataChannel("dc")
50+
var received: DataChannel.Buffer? = null
51+
val delegate = object : DataChannel.Observer {
52+
override fun onBufferedAmountChange(amount: Long) {}
53+
54+
override fun onStateChange() {}
55+
56+
override fun onMessage(buffer: DataChannel.Buffer) {
57+
received = buffer
58+
}
59+
}
60+
val manager = DataChannelManager(channel, delegate, MockRTCThreadToken())
61+
val payload = ByteBuffer.wrap(byteArrayOf(1, 2, 3))
62+
val buffer = DataChannel.Buffer(payload, true)
63+
manager.onMessage(buffer)
64+
assertSame(buffer, received)
65+
}
66+
67+
@Test
68+
fun onStateChange_updatesStateFromChannel() {
69+
val channel = MockDataChannel("dc")
70+
val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken())
71+
channel.registerObserver(manager)
72+
assertEquals(DataChannel.State.OPEN, manager.state)
73+
channel.state = DataChannel.State.CLOSING
74+
assertEquals(DataChannel.State.CLOSING, manager.state)
75+
}
76+
77+
@Test
78+
fun onBufferedAmountChange_updatesBufferedAmountFromChannel() {
79+
val channel = MockDataChannel("dc")
80+
val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken())
81+
channel.registerObserver(manager)
82+
channel.bufferedAmount = 12_345L
83+
assertEquals(12_345L, manager.bufferedAmount)
84+
}
85+
86+
@Test
87+
fun dispose_closesDataChannelOnce() {
88+
val channel = MockDataChannel("dc")
89+
val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken())
90+
manager.dispose()
91+
assertTrue(manager.disposed)
92+
}
93+
94+
@Test
95+
fun dispose_secondCallIsNoOp() {
96+
val channel = MockDataChannel("dc")
97+
val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken())
98+
manager.dispose()
99+
manager.dispose()
100+
}
101+
102+
@Test
103+
fun waitForBufferedAmountLow_completesWhenAlreadyBelowThreshold() = runTest {
104+
val channel = MockDataChannel("dc")
105+
val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken())
106+
channel.registerObserver(manager)
107+
channel.bufferedAmount = 100L
108+
manager.waitForBufferedAmountLow(15_000L)
109+
}
110+
111+
@Test
112+
fun waitForBufferedAmountLow_completesWhenAmountDrops() = runTest {
113+
val channel = MockDataChannel("dc")
114+
val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken())
115+
channel.registerObserver(manager)
116+
channel.bufferedAmount = 25_000L
117+
val waiter = async { manager.waitForBufferedAmountLow(15_000L) }
118+
yield()
119+
channel.bufferedAmount = 0L
120+
waiter.await()
121+
}
122+
123+
@Test
124+
fun waitForBufferedAmountLow_cancelledWhenDisposedWhileWaiting() = runTest {
125+
val channel = MockDataChannel("dc")
126+
val manager = DataChannelManager(channel, NOOP_OBSERVER, MockRTCThreadToken())
127+
channel.registerObserver(manager)
128+
channel.bufferedAmount = 100_000L
129+
val waiter = async { manager.waitForBufferedAmountLow(15_000L) }
130+
yield()
131+
manager.dispose()
132+
try {
133+
waiter.await()
134+
fail("expected CancellationException")
135+
} catch (_: CancellationException) {
136+
}
137+
}
138+
}

0 commit comments

Comments
 (0)