Skip to content

Commit 0d5eb62

Browse files
authored
Update docs for stream receivers and errors (#965)
* Update docs for stream receivers and errors * Fix BaseStreamReceiver.readAll leaking thrown exceptions Also add isClosed and closeException * Revert "Fix BaseStreamReceiver.readAll leaking thrown exceptions" This reverts commit 9ce4625. * Fix `BaseStreamReceiver.readAll` leaking thrown exceptions * spotless * Let readAll throw exceptions to align with other platform behavior * spotless
1 parent 91cd3ed commit 0d5eb62

6 files changed

Lines changed: 120 additions & 9 deletions

File tree

.changeset/new-tomatoes-grow.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"client-sdk-android": patch
3+
---
4+
5+
Clarified documentation regarding data stream receivers and errors

livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/StreamException.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2025 LiveKit, Inc.
2+
* Copyright 2025-2026 LiveKit, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -63,7 +63,7 @@ sealed class StreamException(message: String? = null) : Exception(message) {
6363
class FileInfoUnavailableException : StreamException()
6464

6565
/**
66-
*
66+
* Encryption of the data chunks did not match the declared encryption type.
6767
*/
6868
class EncryptionTypeMismatch(message: String? = null) : StreamException(message)
6969
}

livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2025 LiveKit, Inc.
2+
* Copyright 2025-2026 LiveKit, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,14 +16,41 @@
1616

1717
package io.livekit.android.room.datastream.incoming
1818

19+
import io.livekit.android.room.datastream.StreamException
1920
import kotlinx.coroutines.channels.Channel
2021
import kotlinx.coroutines.flow.Flow
21-
import kotlinx.coroutines.flow.catch
2222
import kotlinx.coroutines.flow.first
2323
import kotlinx.coroutines.flow.fold
2424

25+
/**
26+
* Base class for reading incoming data streams.
27+
*
28+
* @see [flow]
29+
* @see [readNext]
30+
* @see [readAll]
31+
*/
2532
abstract class BaseStreamReceiver<T>(private val source: Channel<ByteArray>) {
2633

34+
/**
35+
* A [Flow] of stream data as it arrives.
36+
*
37+
* Collect this flow to process incoming data incrementally. The flow completes normally when
38+
* the stream receives all the data without error. If the stream ends abnormally, the
39+
* flow fails with a [StreamException] after all buffered chunks have been emitted.
40+
*
41+
* Example:
42+
* ```
43+
* reader.flow
44+
* .catch { e ->
45+
* if (e is StreamException) {
46+
* handleStreamError(e)
47+
* } else {
48+
* throw e
49+
* }
50+
* }
51+
* .collect { chunk -> process(chunk) }
52+
* ```
53+
*/
2754
abstract val flow: Flow<T>
2855

2956
internal fun close(error: Exception?) {
@@ -34,17 +61,20 @@ abstract class BaseStreamReceiver<T>(private val source: Channel<ByteArray>) {
3461
* Suspends and waits for the next piece of data.
3562
*
3663
* @return the next available piece of data.
37-
* @throws NoSuchElementException when the stream is closed and no more data is available.
64+
* @throws NoSuchElementException when the stream is closed normally and no more data is available.
65+
* @throws StreamException when the stream is closed abnormally.
3866
*/
3967
suspend fun readNext(): T {
4068
return flow.first()
4169
}
4270

4371
/**
4472
* Suspends and waits for all available data until the stream is closed.
73+
*
74+
* @return A list of all data received if the stream is closed normally.
75+
* @throws StreamException when the stream is closed abnormally.
4576
*/
4677
suspend fun readAll(): List<T> {
47-
flow.catch { }
4878
return flow.fold(mutableListOf()) { acc, value ->
4979
acc.add(value)
5080
return@fold acc

livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/ByteStreamReceiver.kt

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2025 LiveKit, Inc.
2+
* Copyright 2025-2026 LiveKit, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,12 +16,21 @@
1616

1717
package io.livekit.android.room.datastream.incoming
1818

19+
import io.livekit.android.room.Room
1920
import io.livekit.android.room.datastream.ByteStreamInfo
2021
import kotlinx.coroutines.channels.Channel
2122
import kotlinx.coroutines.flow.Flow
2223
import kotlinx.coroutines.flow.receiveAsFlow
2324

25+
/**
26+
* Receiver for an incoming byte stream.
27+
*
28+
* Provided to [ByteStreamHandler] callbacks registered through [Room].
29+
*
30+
* @see Room.registerByteStreamHandler
31+
*/
2432
class ByteStreamReceiver(
33+
/** Metadata for this stream. */
2534
val info: ByteStreamInfo,
2635
channel: Channel<ByteArray>,
2736
) : BaseStreamReceiver<ByteArray>(channel) {

livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/TextStreamReceiver.kt

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2025 LiveKit, Inc.
2+
* Copyright 2025-2026 LiveKit, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,13 +16,23 @@
1616

1717
package io.livekit.android.room.datastream.incoming
1818

19+
import io.livekit.android.room.Room
1920
import io.livekit.android.room.datastream.TextStreamInfo
2021
import kotlinx.coroutines.channels.Channel
2122
import kotlinx.coroutines.flow.Flow
2223
import kotlinx.coroutines.flow.map
2324
import kotlinx.coroutines.flow.receiveAsFlow
2425

26+
/**
27+
* Receiver for an incoming text stream.
28+
*
29+
* Chunks are decoded as UTF-8 strings. Provided to [TextStreamHandler] callbacks registered
30+
* through [Room].
31+
*
32+
* @see Room.registerTextStreamHandler
33+
*/
2534
class TextStreamReceiver(
35+
/** Metadata for this stream. */
2636
val info: TextStreamInfo,
2737
source: Channel<ByteArray>,
2838
) : BaseStreamReceiver<String>(source) {

livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2025 LiveKit, Inc.
2+
* Copyright 2025-2026 LiveKit, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -88,6 +88,63 @@ class StreamReaderTest : BaseTest() {
8888
}
8989
}
9090

91+
@Test
92+
fun readAllEmptyStream() = runTest {
93+
val emptyChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver()
94+
emptyChannel.close()
95+
val emptyReader = ByteStreamReceiver(
96+
ByteStreamInfo(
97+
id = "id",
98+
topic = "topic",
99+
timestampMs = 3,
100+
totalSize = null,
101+
attributes = mapOf(),
102+
mimeType = "mime",
103+
name = null,
104+
encryptionType = LivekitModels.Encryption.Type.NONE,
105+
),
106+
emptyChannel,
107+
)
108+
109+
runBlocking {
110+
val data = emptyReader.readAll()
111+
assertEquals(0, data.size)
112+
}
113+
}
114+
115+
@Test
116+
fun readAllThrowsStreamException() = runTest {
117+
val errorChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver()
118+
errorChannel.trySend(ByteArray(1) { 0 })
119+
errorChannel.trySend(ByteArray(1) { 1 })
120+
errorChannel.trySend(ByteArray(1) { 2 })
121+
errorChannel.close(StreamException.AbnormalEndException("reason"))
122+
val errorReader = ByteStreamReceiver(
123+
ByteStreamInfo(
124+
id = "id",
125+
topic = "topic",
126+
timestampMs = 3,
127+
totalSize = null,
128+
attributes = mapOf(),
129+
mimeType = "mime",
130+
name = null,
131+
encryptionType = LivekitModels.Encryption.Type.NONE,
132+
),
133+
errorChannel,
134+
)
135+
136+
var threwOnce = false
137+
runBlocking {
138+
try {
139+
errorReader.readAll()
140+
} catch (e: StreamException.AbnormalEndException) {
141+
threwOnce = true
142+
}
143+
}
144+
145+
assertTrue(threwOnce)
146+
}
147+
91148
@Test
92149
fun overreadThrows() = runTest {
93150
var threwOnce = false

0 commit comments

Comments
 (0)