Skip to content

Commit 3e8034a

Browse files
Fixed silent reliable data send loss (#921)
1 parent c0a805a commit 3e8034a

5 files changed

Lines changed: 129 additions & 14 deletions

File tree

.changeset/quiet-ducks-deliver.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"client-sdk-android": patch
3+
---
4+
5+
Fixed silent loss of reliable data when DataChannel.send returned false and when buffered items were replayed across multiple resumes.

livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,12 @@ internal constructor(
678678
connectionState = ConnectionState.CONNECTED
679679
}
680680
if (lastMessageSeq != null) {
681-
resendReliableMessagesForResume(lastMessageSeq)
681+
resendReliableMessagesForResume(lastMessageSeq).onFailure { e ->
682+
LKLog.w(e) {
683+
"Reliable data replay did not complete on resume; " +
684+
"buffered items remain queued for the next resume."
685+
}
686+
}
682687
}
683688
// Is connected, notify and return.
684689
regionUrlProvider?.clearAttemptedRegions()
@@ -757,29 +762,39 @@ internal constructor(
757762
}
758763
}
759764

760-
if (dataPacket.kind == LivekitModels.DataPacket.Kind.RELIABLE) {
765+
val isReliable = dataPacket.kind == LivekitModels.DataPacket.Kind.RELIABLE
766+
if (isReliable) {
761767
dataPacket = dataPacket.toBuilder()
762768
.setSequence(reliableDataSequence)
763769
.build()
764-
reliableDataSequence++
765770
}
766771

767-
val byteBuffer = ByteBuffer.wrap(dataPacket.toByteArray())
772+
val packetBytes = dataPacket.toByteArray()
768773

769-
if (dataPacket.kind == LivekitModels.DataPacket.Kind.RELIABLE) {
770-
reliableMessageBuffer.queue(DataPacketItem(byteBuffer, dataPacket.sequence))
771-
if (this.connectionState == ConnectionState.RECONNECTING) {
772-
return Result.success(Unit)
773-
}
774+
if (isReliable && this.connectionState == ConnectionState.RECONNECTING) {
775+
reliableMessageBuffer.queue(DataPacketItem(ByteBuffer.wrap(packetBytes), dataPacket.sequence))
776+
reliableDataSequence++
777+
return Result.success(Unit)
774778
}
775779
val buf = DataChannel.Buffer(
776-
byteBuffer,
780+
ByteBuffer.wrap(packetBytes),
777781
true,
778782
)
779783
val channel = dataChannelForKind(dataPacket.kind)
780784
?: throw RoomException.ConnectException("channel not established for ${dataPacket.kind.name}")
781785

782-
channel.send(buf)
786+
if (!channel.send(buf)) {
787+
return Result.failure(
788+
RoomException.ConnectException("failed to send data packet for ${dataPacket.kind.name}"),
789+
)
790+
}
791+
792+
if (isReliable) {
793+
// Wrap a fresh ByteBuffer so the queued item's position stays at 0; the
794+
// ByteBuffer just sent has been drained by DataChannel.send.
795+
reliableMessageBuffer.queue(DataPacketItem(ByteBuffer.wrap(packetBytes), dataPacket.sequence))
796+
reliableDataSequence++
797+
}
783798
} catch (e: Exception) {
784799
e.rethrowIfCancellationSignal()
785800
return Result.failure(e)
@@ -808,8 +823,17 @@ internal constructor(
808823

809824
synchronized(reliableStateLock) {
810825
reliableMessageBuffer.popToSequence(lastMessageSeq)
811-
reliableMessageBuffer.getAll().forEach { item ->
812-
channel.send(DataChannel.Buffer(item.data, true))
826+
for (item in reliableMessageBuffer.getAll()) {
827+
// Send a duplicate so the underlying buffer keeps position=0 and survives
828+
// multiple resume attempts. DataChannel.send drains the buffer's position to
829+
// its limit; without duplicate(), the second replay would send empty bytes.
830+
if (!channel.send(DataChannel.Buffer(item.data.duplicate(), true))) {
831+
return Result.failure(
832+
RoomException.ConnectException(
833+
"failed to replay reliable data packet at sequence ${item.sequence}",
834+
),
835+
)
836+
}
813837
}
814838
}
815839

livekit-android-test/src/main/java/io/livekit/android/test/mock/MockDataChannel.kt

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,17 @@ class MockDataChannel(private val label: String?) : DataChannel(1L) {
2323
var observer: Observer? = null
2424
var sentBuffers = mutableListOf<Buffer>()
2525

26+
/** Snapshot of the bytes visible at send time, captured via the Buffer's current position/limit. */
27+
var sentPayloads = mutableListOf<ByteArray>()
28+
var sendResult = true
29+
30+
/**
31+
* When true, [send] advances the buffer's position to its limit, mirroring
32+
* the real WebRTC wrapper which drains the buffer via `ByteBuffer.get(byte[])`.
33+
* Off by default to keep existing tests that read from `sentBuffers` working.
34+
*/
35+
var consumeSentBuffer = false
36+
2637
private var stateBacking: State = State.OPEN
2738
var state: State
2839
get() {
@@ -57,6 +68,7 @@ class MockDataChannel(private val label: String?) : DataChannel(1L) {
5768

5869
fun clearSentBuffers() {
5970
sentBuffers.clear()
71+
sentPayloads.clear()
6072
}
6173

6274
override fun registerObserver(observer: Observer?) {
@@ -92,7 +104,15 @@ class MockDataChannel(private val label: String?) : DataChannel(1L) {
92104
override fun send(buffer: Buffer): Boolean {
93105
ensureNotDisposed()
94106
sentBuffers.add(buffer)
95-
return true
107+
// Capture what native WebRTC would receive at this moment (position..limit).
108+
val savedPos = buffer.data.position()
109+
val payload = ByteArray(buffer.data.remaining())
110+
buffer.data.get(payload)
111+
sentPayloads.add(payload)
112+
if (!consumeSentBuffer) {
113+
buffer.data.position(savedPos)
114+
}
115+
return sendResult
96116
}
97117

98118
override fun close() {

livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import livekit.LivekitModels.DataPacket
4242
import livekit.LivekitRtc
4343
import livekit.org.webrtc.PeerConnection
4444
import org.junit.Assert
45+
import org.junit.Assert.assertArrayEquals
4546
import org.junit.Assert.assertEquals
4647
import org.junit.Assert.assertFalse
4748
import org.junit.Assert.assertTrue
@@ -412,6 +413,49 @@ class RTCEngineMockE2ETest : MockE2ETest() {
412413
assertEquals(TestData.JOIN.join.participant.sid, sid)
413414
}
414415

416+
@Test
417+
fun resendReliableMessagesReplaysFullPayloadAcrossMultipleResumes() = runTest {
418+
connect()
419+
val pubPeerConnection = getPublisherPeerConnection()
420+
val pubDataChannel = pubPeerConnection
421+
.dataChannels[RTCEngine.RELIABLE_DATA_CHANNEL_LABEL] as MockDataChannel
422+
pubDataChannel.consumeSentBuffer = true
423+
424+
val payload = "hello-resume".toByteArray()
425+
val publishResult = room.localParticipant.publishData(payload)
426+
assertTrue(publishResult.isSuccess)
427+
428+
// Two consecutive resumes without server progress (lastMessageSeq=0 pops nothing).
429+
rtcEngine.resendReliableMessagesForResume(0)
430+
rtcEngine.resendReliableMessagesForResume(0)
431+
432+
// 1 initial publish + 2 replays.
433+
assertEquals(3, pubDataChannel.sentPayloads.size)
434+
435+
// Both replays must carry the original payload, not an empty buffer.
436+
val replay1 = DataPacket.parseFrom(pubDataChannel.sentPayloads[1])
437+
val replay2 = DataPacket.parseFrom(pubDataChannel.sentPayloads[2])
438+
assertArrayEquals(payload, replay1.user.payload.toByteArray())
439+
assertArrayEquals(payload, replay2.user.payload.toByteArray())
440+
}
441+
442+
@Test
443+
fun resendReliableMessagesReturnsFailureWhenReplaySendFails() = runTest {
444+
connect()
445+
val pubPeerConnection = getPublisherPeerConnection()
446+
val pubDataChannel = pubPeerConnection
447+
.dataChannels[RTCEngine.RELIABLE_DATA_CHANNEL_LABEL] as MockDataChannel
448+
449+
val publishResult = room.localParticipant.publishData("queued".toByteArray())
450+
assertTrue(publishResult.isSuccess)
451+
452+
pubDataChannel.sendResult = false
453+
454+
val resendResult = rtcEngine.resendReliableMessagesForResume(0)
455+
assertTrue(resendResult.isFailure)
456+
assertTrue(resendResult.exceptionOrNull() is RoomException.ConnectException)
457+
}
458+
415459
/**
416460
* Regression: an AddTrack timeout used to leave the cid in pendingTrackResolvers,
417461
* poisoning every subsequent publish of the same track with DuplicateTrackException

livekit-android-test/src/test/java/io/livekit/android/room/participant/LocalParticipantMockE2ETest.kt

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -899,4 +899,26 @@ class LocalParticipantMockE2ETest : MockE2ETest() {
899899
assertTrue(result.isFailure)
900900
assertTrue(result.exceptionOrNull() is RoomException.ConnectException)
901901
}
902+
903+
@Test
904+
fun publishDataReturnsFailureWhenDataChannelSendFails() = runTest {
905+
connect()
906+
val pubPeerConnection = getPublisherPeerConnection()
907+
val pubDataChannel = pubPeerConnection.dataChannels[RTCEngine.RELIABLE_DATA_CHANNEL_LABEL] as MockDataChannel
908+
pubDataChannel.sendResult = false
909+
910+
val result = room.localParticipant.publishData("hello".toByteArray())
911+
912+
assertTrue(result.isFailure)
913+
assertTrue(result.exceptionOrNull() is RoomException.ConnectException)
914+
915+
pubDataChannel.sendResult = true
916+
val retryResult = room.localParticipant.publishData("hello".toByteArray())
917+
918+
assertTrue(retryResult.isSuccess)
919+
assertEquals(2, pubDataChannel.sentBuffers.size)
920+
921+
val retriedPacket = DataPacket.parseFrom(ByteString.copyFrom(pubDataChannel.sentBuffers[1].data))
922+
assertEquals(1, retriedPacket.sequence)
923+
}
902924
}

0 commit comments

Comments
 (0)