Skip to content

Commit 1d1995f

Browse files
authored
Fix exception when resending data channel messages after a resume (#923)
* Fix exception when resending data channel messages after a resume * test for resendReliableData
1 parent 1d60799 commit 1d1995f

3 files changed

Lines changed: 75 additions & 2 deletions

File tree

.changeset/tiny-buses-hear.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"client-sdk-android": patch
3+
---
4+
5+
Fix exception when resending data channel messages after a resume

livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -790,7 +790,12 @@ internal constructor(
790790
}
791791

792792
internal suspend fun resendReliableMessagesForResume(lastMessageSeq: Int): Result<Unit> {
793-
ensurePublisherConnected(LivekitModels.DataPacket.Kind.RELIABLE)
793+
try {
794+
ensurePublisherConnected(LivekitModels.DataPacket.Kind.RELIABLE)
795+
} catch (e: Exception) {
796+
e.rethrowIfCancellationSignal()
797+
return Result.failure(e)
798+
}
794799
val channel = dataChannelForKind(LivekitModels.DataPacket.Kind.RELIABLE)
795800
?: return Result.failure(NullPointerException("reliable channel not established!"))
796801

livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2025 LiveKit, Inc.
2+
* Copyright 2023-2026 LiveKit, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,8 +16,10 @@
1616

1717
package io.livekit.android.room
1818

19+
import com.google.protobuf.ByteString
1920
import io.livekit.android.test.MockE2ETest
2021
import io.livekit.android.test.events.FlowCollector
22+
import io.livekit.android.test.mock.MockDataChannel
2123
import io.livekit.android.test.mock.MockPeerConnection
2224
import io.livekit.android.test.mock.SignalRequestHandler
2325
import io.livekit.android.test.mock.TestData
@@ -27,6 +29,7 @@ import io.livekit.android.util.toOkioByteString
2729
import kotlinx.coroutines.ExperimentalCoroutinesApi
2830
import kotlinx.coroutines.test.advanceUntilIdle
2931
import livekit.LivekitModels
32+
import livekit.LivekitModels.DataPacket
3033
import livekit.LivekitRtc
3134
import livekit.org.webrtc.PeerConnection
3235
import org.junit.Assert
@@ -366,4 +369,64 @@ class RTCEngineMockE2ETest : MockE2ETest() {
366369
val sid = wsFactory.request.url.queryParameter(SignalClient.CONNECT_QUERY_PARTICIPANT_SID)
367370
assertEquals(TestData.JOIN.join.participant.sid, sid)
368371
}
372+
373+
/**
374+
* After a soft reconnect, the server reports [LivekitRtc.ReconnectResponse.lastMessageSeq]. The engine
375+
* drops buffered reliable payloads up to that sequence (inclusive) and re-sends the remainder on the
376+
* reliable data channel — see [RTCEngine.resendReliableMessagesForResume].
377+
*/
378+
@Test
379+
fun softReconnectResendsBufferedReliableData() = runTest {
380+
room.setReconnectionType(ReconnectType.FORCE_SOFT_RECONNECT)
381+
382+
val publisherOfferHandler: SignalRequestHandler = { request ->
383+
if (request.hasOffer()) {
384+
val answer = with(LivekitRtc.SignalResponse.newBuilder()) {
385+
answer = with(LivekitRtc.SessionDescription.newBuilder()) {
386+
sdp = "remote_answer"
387+
type = "answer"
388+
id = request.offer.id
389+
build()
390+
}
391+
build()
392+
}
393+
wsFactory.receiveMessage(answer)
394+
true
395+
} else {
396+
false
397+
}
398+
}
399+
wsFactory.registerSignalRequestHandler(publisherOfferHandler)
400+
connect()
401+
402+
val lastMessageSeq = TestData.RECONNECT.reconnect.lastMessageSeq
403+
val pubDataChannel =
404+
getPublisherPeerConnection().dataChannels[RTCEngine.RELIABLE_DATA_CHANNEL_LABEL] as MockDataChannel
405+
406+
val payloads = listOf(byteArrayOf(1), byteArrayOf(2), byteArrayOf(3))
407+
for (payload in payloads) {
408+
assertTrue(room.localParticipant.publishData(payload).isSuccess)
409+
}
410+
assertEquals(3, pubDataChannel.sentBuffers.size)
411+
412+
disconnectPeerConnection()
413+
testScheduler.advanceTimeBy(1000)
414+
wsFactory.listener.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request))
415+
simulateMessageFromServer(TestData.RECONNECT)
416+
connectPeerConnection()
417+
418+
advanceUntilIdle()
419+
420+
val expectedResentSequences = listOf(1, 2, 3).filter { it > lastMessageSeq }
421+
assertEquals(3 + expectedResentSequences.size, pubDataChannel.sentBuffers.size)
422+
expectedResentSequences.forEachIndexed { index, sequence ->
423+
val packet = DataPacket.parseFrom(
424+
ByteString.copyFrom(pubDataChannel.sentBuffers[3 + index].data),
425+
)
426+
assertEquals(sequence, packet.sequence)
427+
assertTrue(
428+
packet.user.payload.toByteArray().contentEquals(byteArrayOf(sequence.toByte())),
429+
)
430+
}
431+
}
369432
}

0 commit comments

Comments
 (0)