Skip to content

Commit dca06a1

Browse files
committed
Add MessageBufferConfig.
1 parent 00be390 commit dca06a1

7 files changed

Lines changed: 359 additions & 26 deletions

File tree

stream-chat-android-state/api/stream-chat-android-state.api

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,15 +109,34 @@ public final class io/getstream/chat/android/state/plugin/config/ChannelMessageL
109109
public fun toString ()Ljava/lang/String;
110110
}
111111

112+
public final class io/getstream/chat/android/state/plugin/config/MessageBufferConfig {
113+
public fun <init> ()V
114+
public fun <init> (Ljava/util/Set;ILkotlinx/coroutines/channels/BufferOverflow;)V
115+
public synthetic fun <init> (Ljava/util/Set;ILkotlinx/coroutines/channels/BufferOverflow;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
116+
public final fun component1 ()Ljava/util/Set;
117+
public final fun component2 ()I
118+
public final fun component3 ()Lkotlinx/coroutines/channels/BufferOverflow;
119+
public final fun copy (Ljava/util/Set;ILkotlinx/coroutines/channels/BufferOverflow;)Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;
120+
public static synthetic fun copy$default (Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;Ljava/util/Set;ILkotlinx/coroutines/channels/BufferOverflow;ILjava/lang/Object;)Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;
121+
public fun equals (Ljava/lang/Object;)Z
122+
public final fun getCapacity ()I
123+
public final fun getChannelTypes ()Ljava/util/Set;
124+
public final fun getOverflow ()Lkotlinx/coroutines/channels/BufferOverflow;
125+
public fun hashCode ()I
126+
public fun toString ()Ljava/lang/String;
127+
}
128+
112129
public final class io/getstream/chat/android/state/plugin/config/MessageLimitConfig {
113130
public fun <init> ()V
114-
public fun <init> (Ljava/util/Set;)V
115-
public synthetic fun <init> (Ljava/util/Set;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
131+
public fun <init> (Ljava/util/Set;Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;)V
132+
public synthetic fun <init> (Ljava/util/Set;Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
116133
public final fun component1 ()Ljava/util/Set;
117-
public final fun copy (Ljava/util/Set;)Lio/getstream/chat/android/state/plugin/config/MessageLimitConfig;
118-
public static synthetic fun copy$default (Lio/getstream/chat/android/state/plugin/config/MessageLimitConfig;Ljava/util/Set;ILjava/lang/Object;)Lio/getstream/chat/android/state/plugin/config/MessageLimitConfig;
134+
public final fun component2 ()Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;
135+
public final fun copy (Ljava/util/Set;Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;)Lio/getstream/chat/android/state/plugin/config/MessageLimitConfig;
136+
public static synthetic fun copy$default (Lio/getstream/chat/android/state/plugin/config/MessageLimitConfig;Ljava/util/Set;Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;ILjava/lang/Object;)Lio/getstream/chat/android/state/plugin/config/MessageLimitConfig;
119137
public fun equals (Ljava/lang/Object;)Z
120138
public final fun getChannelMessageLimits ()Ljava/util/Set;
139+
public final fun getMessageBufferConfig ()Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;
121140
public fun hashCode ()I
122141
public fun toString ()Ljava/lang/String;
123142
}

stream-chat-android-state/src/main/java/io/getstream/chat/android/state/event/handler/internal/EventHandlerSequential.kt

Lines changed: 63 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ import io.getstream.chat.android.state.event.handler.internal.batch.BatchEvent
114114
import io.getstream.chat.android.state.event.handler.internal.batch.SocketEventCollector
115115
import io.getstream.chat.android.state.event.handler.internal.utils.realType
116116
import io.getstream.chat.android.state.event.handler.internal.utils.toChannelUserRead
117+
import io.getstream.chat.android.state.plugin.config.MessageBufferConfig
117118
import io.getstream.chat.android.state.plugin.logic.channel.internal.ChannelLogic
118119
import io.getstream.chat.android.state.plugin.logic.internal.LogicRegistry
119120
import io.getstream.chat.android.state.plugin.logic.querychannels.internal.QueryChannelsLogic
@@ -157,6 +158,7 @@ internal class EventHandlerSequential(
157158
private val repos: RepositoryFacade,
158159
private val sideEffect: suspend () -> Unit,
159160
private val syncedEvents: Flow<List<ChatEvent>>,
161+
private val bufferConfig: MessageBufferConfig,
160162
scope: CoroutineScope,
161163
) : EventHandler {
162164

@@ -167,12 +169,58 @@ internal class EventHandlerSequential(
167169

168170
private val mutex = Mutex()
169171
private val socketEvents = MutableSharedFlow<ChatEvent>(extraBufferCapacity = Int.MAX_VALUE)
172+
173+
/**
174+
* Secondary flow used only when [bufferConfig] opts specific channel types into a bounded buffer.
175+
* Allocated lazily so the default configuration pays no cost for it.
176+
*/
177+
private val bufferedNewMessageEvents: MutableSharedFlow<ChatEvent> by lazy {
178+
MutableSharedFlow(
179+
extraBufferCapacity = bufferConfig.capacity,
180+
onBufferOverflow = bufferConfig.overflow,
181+
)
182+
}
170183
private val socketEventCollector = SocketEventCollector(scope) { batchEvent ->
171184
handleBatchEvent(batchEvent)
172185
}
173186

174187
private var eventsDisposable: Disposable = EMPTY_DISPOSABLE
175188

189+
/**
190+
* Default listener — emits every event into the unbuffered [socketEvents] flow without
191+
* inspecting [bufferConfig]. Used whenever no channel types are opted in for buffering.
192+
*/
193+
private val defaultSocketEventListener: ChatEventListener<ChatEvent> = ChatEventListener { event ->
194+
logEmitOutcome(event, socketEvents.tryEmit(event))
195+
}
196+
197+
/**
198+
* Listener used only when [bufferConfig] opts specific channel types into a bounded buffer.
199+
* Routes matching [NewMessageEvent]s to [bufferedNewMessageEvents] and everything else to
200+
* [socketEvents].
201+
*/
202+
private val bufferedSocketEventListener: ChatEventListener<ChatEvent> = ChatEventListener { event ->
203+
val target = if (event is NewMessageEvent && event.channelType in bufferConfig.channelTypes) {
204+
bufferedNewMessageEvents
205+
} else {
206+
socketEvents
207+
}
208+
logEmitOutcome(event, target.tryEmit(event))
209+
}
210+
211+
private fun logEmitOutcome(event: ChatEvent, emitted: Boolean) {
212+
if (emitted) {
213+
val cCount = collectedCount.get()
214+
val eCount = emittedCount.incrementAndGet()
215+
val ratio = eCount.toDouble() / cCount.toDouble()
216+
StreamLog.v(TAG_SOCKET) {
217+
"[onSocketEventReceived] event.type: ${event.realType}; $eCount => $cCount ($ratio)"
218+
}
219+
} else {
220+
StreamLog.e(TAG_SOCKET) { "[onSocketEventReceived] failed to emit socket event: $event" }
221+
}
222+
}
223+
176224
init {
177225
logger.d { "<init> no args" }
178226
}
@@ -199,26 +247,23 @@ internal class EventHandlerSequential(
199247
)
200248
}
201249
}
202-
scope.launch {
203-
socketEvents.collect { event ->
204-
collectedCount.incrementAndGet()
205-
initJob.join()
206-
sideEffect()
207-
socketEventCollector.collect(event)
208-
}
250+
val collectSocketEvent: suspend (ChatEvent) -> Unit = { event ->
251+
collectedCount.incrementAndGet()
252+
initJob.join()
253+
sideEffect()
254+
socketEventCollector.collect(event)
209255
}
210-
eventsDisposable = subscribeForEvents { event ->
211-
if (socketEvents.tryEmit(event)) {
212-
val cCount = collectedCount.get()
213-
val eCount = emittedCount.incrementAndGet()
214-
val ratio = eCount.toDouble() / cCount.toDouble()
215-
StreamLog.v(TAG_SOCKET) {
216-
"[onSocketEventReceived] event.type: ${event.realType}; $eCount => $cCount ($ratio)"
217-
}
218-
} else {
219-
StreamLog.e(TAG_SOCKET) { "[onSocketEventReceived] failed to emit socket event: $event" }
220-
}
256+
scope.launch { socketEvents.collect(collectSocketEvent) }
257+
val isBufferingEnabled = bufferConfig.channelTypes.isNotEmpty()
258+
if (isBufferingEnabled) {
259+
scope.launch { bufferedNewMessageEvents.collect(collectSocketEvent) }
260+
}
261+
val activeListener = if (isBufferingEnabled) {
262+
bufferedSocketEventListener
263+
} else {
264+
defaultSocketEventListener
221265
}
266+
eventsDisposable = subscribeForEvents(activeListener)
222267
}
223268
}
224269

stream-chat-android-state/src/main/java/io/getstream/chat/android/state/plugin/config/StatePluginConfig.kt

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import io.getstream.chat.android.models.TimeDuration
2222
import io.getstream.chat.android.state.extensions.queryChannelsAsState
2323
import io.getstream.chat.android.state.extensions.watchChannelAsState
2424
import io.getstream.chat.android.state.plugin.internal.StatePlugin
25+
import kotlinx.coroutines.channels.BufferOverflow
2526

2627
/**
2728
* Provides a configuration for [io.getstream.chat.android.state.plugin.internal.StatePlugin].
@@ -128,9 +129,14 @@ public data class StatePluginConfig @JvmOverloads constructor(
128129
* @param channelMessageLimits A set of [ChannelMessageLimit] defining the maximum number of messages to keep in
129130
* memory for different channel types. By default, this is an empty set, meaning no limits are applied and all
130131
* messages are kept in memory. Each channel type can have its own limit configured independently.
132+
*
133+
* @param messageBufferConfig Configuration for bounding the inbound `NewMessageEvent` buffer on selected channel
134+
* types. By default, no buffering is applied — events flow through the unbuffered path. See [MessageBufferConfig]
135+
* for details and trade-offs.
131136
*/
132137
public data class MessageLimitConfig(
133138
public val channelMessageLimits: Set<ChannelMessageLimit> = setOf(),
139+
public val messageBufferConfig: MessageBufferConfig = MessageBufferConfig(),
134140
)
135141

136142
/**
@@ -161,3 +167,54 @@ public data class ChannelMessageLimit(
161167
public val channelType: String,
162168
public val baseLimit: Int,
163169
)
170+
171+
/**
172+
* Configuration for buffering inbound `NewMessageEvent`s for specific channel types before they
173+
* are dispatched to the sequential event-handling pipeline.
174+
*
175+
* High-traffic channel types (e.g. livestreams) can produce a flood of new-message events that
176+
* arrive faster than they can be processed sequentially. This configuration applies a bounded
177+
* buffer with a configurable overflow strategy (e.g. drop oldest) for `NewMessageEvent`s on the
178+
* configured channel types only. Events for other channel types — and all non-`NewMessageEvent`
179+
* events — continue to flow through the default unbuffered path with `Int.MAX_VALUE` capacity,
180+
* so signal-critical events such as reads, bans or member updates are never dropped.
181+
*
182+
* By default this is a no-op: no channel types are configured, so the buffered code path is not
183+
* active and the SDK behaves exactly as if this configuration did not exist.
184+
*
185+
* Example — drop the oldest pending `NewMessageEvent` for `messaging` channels when more than
186+
* 100 are queued:
187+
* ```kotlin
188+
* StatePluginConfig(
189+
* messageLimitConfig = MessageLimitConfig(
190+
* messageBufferConfig = MessageBufferConfig(
191+
* channelTypes = setOf("messaging"),
192+
* capacity = 100,
193+
* overflow = BufferOverflow.DROP_OLDEST,
194+
* ),
195+
* ),
196+
* )
197+
* ```
198+
*
199+
* @param channelTypes The set of channel types whose `NewMessageEvent`s should be routed through
200+
* the bounded buffer. Channel types not in this set continue to use the unbuffered path. When
201+
* this set is empty (the default), buffering is disabled entirely and the per-event channel-type
202+
* check is skipped.
203+
*
204+
* @param capacity The maximum number of `NewMessageEvent`s that can be queued in the buffer
205+
* while the consumer is busy. Once exceeded, [overflow] decides which event to drop or whether
206+
* to suspend. Defaults to `Int.MAX_VALUE`, which effectively disables overflow.
207+
*
208+
* @param overflow The strategy applied when the buffer is full:
209+
* - [BufferOverflow.SUSPEND] (default): the producer suspends until space is available. Note
210+
* that the underlying socket listener uses non-suspending `tryEmit`, so with `SUSPEND` an
211+
* overflowing emission is simply reported as a failed emit rather than blocking the socket.
212+
* - [BufferOverflow.DROP_OLDEST]: the oldest queued event is evicted to make room for the new
213+
* one. Useful for live channels where freshness matters more than completeness.
214+
* - [BufferOverflow.DROP_LATEST]: the newest event is discarded and the queued events are kept.
215+
*/
216+
public data class MessageBufferConfig(
217+
public val channelTypes: Set<String> = emptySet(),
218+
public val capacity: Int = Int.MAX_VALUE,
219+
public val overflow: BufferOverflow = BufferOverflow.SUSPEND,
220+
)

stream-chat-android-state/src/main/java/io/getstream/chat/android/state/plugin/factory/StreamStatePluginFactory.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import io.getstream.chat.android.models.User
2929
import io.getstream.chat.android.state.errorhandler.StateErrorHandlerFactory
3030
import io.getstream.chat.android.state.event.handler.internal.EventHandler
3131
import io.getstream.chat.android.state.event.handler.internal.EventHandlerSequential
32+
import io.getstream.chat.android.state.plugin.config.MessageBufferConfig
3233
import io.getstream.chat.android.state.plugin.config.StatePluginConfig
3334
import io.getstream.chat.android.state.plugin.internal.StatePlugin
3435
import io.getstream.chat.android.state.plugin.logic.internal.LogicRegistry
@@ -151,6 +152,7 @@ public class StreamStatePluginFactory(
151152
repos = repositoryFacade,
152153
syncedEvents = syncManager.syncedEvents,
153154
sideEffect = syncManager::awaitSyncing,
155+
bufferConfig = config.messageLimitConfig.messageBufferConfig,
154156
)
155157

156158
if (config.backgroundSyncEnabled) {
@@ -192,6 +194,7 @@ public class StreamStatePluginFactory(
192194
repos: RepositoryFacade,
193195
sideEffect: suspend () -> Unit,
194196
syncedEvents: Flow<List<ChatEvent>>,
197+
bufferConfig: MessageBufferConfig,
195198
): EventHandler {
196199
return EventHandlerSequential(
197200
scope = scope,
@@ -204,6 +207,7 @@ public class StreamStatePluginFactory(
204207
repos = repos,
205208
syncedEvents = syncedEvents,
206209
sideEffect = sideEffect,
210+
bufferConfig = bufferConfig,
207211
)
208212
}
209213
}

stream-chat-android-state/src/test/java/io/getstream/chat/android/state/event/TotalUnreadCountTest.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import io.getstream.chat.android.models.ChannelCapabilities
2525
import io.getstream.chat.android.models.User
2626
import io.getstream.chat.android.state.event.handler.internal.EventHandler
2727
import io.getstream.chat.android.state.event.handler.internal.EventHandlerSequential
28+
import io.getstream.chat.android.state.plugin.config.MessageBufferConfig
2829
import io.getstream.chat.android.state.plugin.state.global.internal.MutableGlobalState
2930
import io.getstream.chat.android.test.TestCoroutineExtension
3031
import kotlinx.coroutines.ExperimentalCoroutinesApi
@@ -146,6 +147,7 @@ internal class TotalUnreadCountTest {
146147
repos = repos,
147148
sideEffect = sideEffect,
148149
syncedEvents = syncedEvents,
150+
bufferConfig = MessageBufferConfig(),
149151
)
150152

151153
fun givenMockedRepositories(): Fixture {

0 commit comments

Comments
 (0)