Skip to content

Commit f162a27

Browse files
VelikovPetarclaudeandremion
authored
Add MessageBufferConfig to allow custom back-pressure config for message.new events (#6472)
Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: André Mion <andremion@gmail.com>
1 parent aba5f4b commit f162a27

7 files changed

Lines changed: 392 additions & 26 deletions

File tree

stream-chat-android-client/api/stream-chat-android-client.api

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -355,15 +355,42 @@ public final class io/getstream/chat/android/client/api/ChatClientConfig {
355355
public fun toString ()Ljava/lang/String;
356356
}
357357

358+
public final class io/getstream/chat/android/client/api/MessageBufferConfig {
359+
public fun <init> ()V
360+
public fun <init> (Ljava/util/Set;ILio/getstream/chat/android/client/api/MessageBufferOverflow;)V
361+
public synthetic fun <init> (Ljava/util/Set;ILio/getstream/chat/android/client/api/MessageBufferOverflow;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
362+
public final fun component1 ()Ljava/util/Set;
363+
public final fun component2 ()I
364+
public final fun component3 ()Lio/getstream/chat/android/client/api/MessageBufferOverflow;
365+
public final fun copy (Ljava/util/Set;ILio/getstream/chat/android/client/api/MessageBufferOverflow;)Lio/getstream/chat/android/client/api/MessageBufferConfig;
366+
public static synthetic fun copy$default (Lio/getstream/chat/android/client/api/MessageBufferConfig;Ljava/util/Set;ILio/getstream/chat/android/client/api/MessageBufferOverflow;ILjava/lang/Object;)Lio/getstream/chat/android/client/api/MessageBufferConfig;
367+
public fun equals (Ljava/lang/Object;)Z
368+
public final fun getCapacity ()I
369+
public final fun getChannelTypes ()Ljava/util/Set;
370+
public final fun getOverflow ()Lio/getstream/chat/android/client/api/MessageBufferOverflow;
371+
public fun hashCode ()I
372+
public fun toString ()Ljava/lang/String;
373+
}
374+
375+
public final class io/getstream/chat/android/client/api/MessageBufferOverflow : java/lang/Enum {
376+
public static final field DROP_LATEST Lio/getstream/chat/android/client/api/MessageBufferOverflow;
377+
public static final field DROP_OLDEST Lio/getstream/chat/android/client/api/MessageBufferOverflow;
378+
public static fun getEntries ()Lkotlin/enums/EnumEntries;
379+
public static fun valueOf (Ljava/lang/String;)Lio/getstream/chat/android/client/api/MessageBufferOverflow;
380+
public static fun values ()[Lio/getstream/chat/android/client/api/MessageBufferOverflow;
381+
}
382+
358383
public final class io/getstream/chat/android/client/api/MessageLimitConfig {
359384
public fun <init> ()V
360-
public fun <init> (Ljava/util/Set;)V
361-
public synthetic fun <init> (Ljava/util/Set;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
385+
public fun <init> (Ljava/util/Set;Lio/getstream/chat/android/client/api/MessageBufferConfig;)V
386+
public synthetic fun <init> (Ljava/util/Set;Lio/getstream/chat/android/client/api/MessageBufferConfig;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
362387
public final fun component1 ()Ljava/util/Set;
363-
public final fun copy (Ljava/util/Set;)Lio/getstream/chat/android/client/api/MessageLimitConfig;
364-
public static synthetic fun copy$default (Lio/getstream/chat/android/client/api/MessageLimitConfig;Ljava/util/Set;ILjava/lang/Object;)Lio/getstream/chat/android/client/api/MessageLimitConfig;
388+
public final fun component2 ()Lio/getstream/chat/android/client/api/MessageBufferConfig;
389+
public final fun copy (Ljava/util/Set;Lio/getstream/chat/android/client/api/MessageBufferConfig;)Lio/getstream/chat/android/client/api/MessageLimitConfig;
390+
public static synthetic fun copy$default (Lio/getstream/chat/android/client/api/MessageLimitConfig;Ljava/util/Set;Lio/getstream/chat/android/client/api/MessageBufferConfig;ILjava/lang/Object;)Lio/getstream/chat/android/client/api/MessageLimitConfig;
365391
public fun equals (Ljava/lang/Object;)Z
366392
public final fun getChannelMessageLimits ()Ljava/util/Set;
393+
public final fun getMessageBufferConfig ()Lio/getstream/chat/android/client/api/MessageBufferConfig;
367394
public fun hashCode ()I
368395
public fun toString ()Ljava/lang/String;
369396
}

stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,14 @@ public data class ChatClientConfig @JvmOverloads constructor(
125125
* @param channelMessageLimits A set of [ChannelMessageLimit] defining the maximum number of messages to keep in
126126
* memory for different channel types. By default, this is an empty set, meaning no limits are applied and all
127127
* messages are kept in memory. Each channel type can have its own limit configured independently.
128+
*
129+
* @param messageBufferConfig Configuration for bounding the inbound `NewMessageEvent` buffer on selected channel
130+
* types. By default, no buffering is applied — events flow through the unbuffered path. See [MessageBufferConfig]
131+
* for details and trade-offs.
128132
*/
129133
public data class MessageLimitConfig(
130134
public val channelMessageLimits: Set<ChannelMessageLimit> = setOf(),
135+
public val messageBufferConfig: MessageBufferConfig = MessageBufferConfig(),
131136
)
132137

133138
/**
@@ -158,3 +163,75 @@ public data class ChannelMessageLimit(
158163
public val channelType: String,
159164
public val baseLimit: Int,
160165
)
166+
167+
/**
168+
* Configuration for buffering inbound `NewMessageEvent`s for specific channel types before they
169+
* are dispatched to the sequential event-handling pipeline.
170+
*
171+
* High-traffic channel types (e.g. livestreams) can produce a flood of new-message events that
172+
* arrive faster than they can be processed sequentially. This configuration applies a bounded
173+
* buffer with a configurable overflow strategy (e.g. drop oldest) for `NewMessageEvent`s on the
174+
* configured channel types only. Events for other channel types — and all non-`NewMessageEvent`
175+
* events — continue to flow through the default unbuffered path with `Int.MAX_VALUE` capacity,
176+
* so signal-critical events such as reads, bans or member updates are never dropped.
177+
*
178+
* By default this is a no-op: no channel types are configured, so the buffered code path is not
179+
* active and the SDK behaves exactly as if this configuration did not exist.
180+
*
181+
* **Event ordering caveat.** When buffering is active, `NewMessageEvent`s for opted-in channel
182+
* types flow through a separate buffer from all other events. As a consequence, the relative
183+
* ordering between buffered `NewMessageEvent`s and non-buffered events (e.g. `ReactionNewEvent`,
184+
* `MessageUpdatedEvent`) for the same channel is **not guaranteed** — a reaction added to
185+
* message X may be processed before the `NewMessageEvent` for X. Because this configuration
186+
* already tolerates dropping events on overflow, callers opting in are expected to tolerate
187+
* this consistency relaxation as well.
188+
*
189+
* Example — drop the oldest pending `NewMessageEvent` for `messaging` channels when more than
190+
* 100 are queued:
191+
* ```kotlin
192+
* ChatClientConfig(
193+
* messageLimitConfig = MessageLimitConfig(
194+
* messageBufferConfig = MessageBufferConfig(
195+
* channelTypes = setOf("messaging"),
196+
* capacity = 100,
197+
* overflow = MessageBufferOverflow.DROP_OLDEST,
198+
* ),
199+
* ),
200+
* )
201+
* ```
202+
*
203+
* @param channelTypes The set of channel types whose `NewMessageEvent`s should be routed through
204+
* the bounded buffer. Channel types not in this set continue to use the unbuffered path. When
205+
* this set is empty (the default), buffering is disabled entirely and the per-event channel-type
206+
* check is skipped.
207+
*
208+
* @param capacity The maximum number of `NewMessageEvent`s that can be queued in the buffer
209+
* while the consumer is busy. Once exceeded, [overflow] decides which event to drop. Defaults to
210+
* `Int.MAX_VALUE`, which effectively disables overflow.
211+
*
212+
* @param overflow The strategy applied when the buffer is full:
213+
* - [MessageBufferOverflow.DROP_OLDEST] (default): the oldest queued event is evicted to make
214+
* room for the new one. Useful for live channels where freshness matters more than completeness.
215+
* - [MessageBufferOverflow.DROP_LATEST]: the newest event is discarded and the queued events are
216+
* kept.
217+
*/
218+
public data class MessageBufferConfig(
219+
public val channelTypes: Set<String> = emptySet(),
220+
public val capacity: Int = Int.MAX_VALUE,
221+
public val overflow: MessageBufferOverflow = MessageBufferOverflow.DROP_OLDEST,
222+
)
223+
224+
/**
225+
* Strategy applied when the [MessageBufferConfig] buffer is full.
226+
*
227+
* Mirrors a subset of [kotlinx.coroutines.channels.BufferOverflow]: the suspending strategy is
228+
* intentionally excluded because the SDK emits into the buffer via non-suspending `tryEmit`,
229+
* which makes the suspending semantics unreachable.
230+
*/
231+
public enum class MessageBufferOverflow {
232+
/** Evict the oldest queued event to make room for the new one. */
233+
DROP_OLDEST,
234+
235+
/** Discard the newest event and keep the queued events. */
236+
DROP_LATEST,
237+
}

stream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequential.kt

Lines changed: 68 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package io.getstream.chat.android.client.internal.state.event.handler.internal
1818

1919
import androidx.annotation.VisibleForTesting
2020
import io.getstream.chat.android.client.ChatEventListener
21+
import io.getstream.chat.android.client.api.MessageBufferConfig
22+
import io.getstream.chat.android.client.api.MessageBufferOverflow
2123
import io.getstream.chat.android.client.api.event.EventHandlingResult
2224
import io.getstream.chat.android.client.api.state.StateRegistry
2325
import io.getstream.chat.android.client.events.AnswerCastedEvent
@@ -130,6 +132,7 @@ import kotlinx.coroutines.SupervisorJob
130132
import kotlinx.coroutines.async
131133
import kotlinx.coroutines.awaitAll
132134
import kotlinx.coroutines.cancelChildren
135+
import kotlinx.coroutines.channels.BufferOverflow
133136
import kotlinx.coroutines.flow.Flow
134137
import kotlinx.coroutines.flow.MutableSharedFlow
135138
import kotlinx.coroutines.flow.StateFlow
@@ -159,6 +162,7 @@ internal class EventHandlerSequential(
159162
private val repos: RepositoryFacade,
160163
private val sideEffect: suspend () -> Unit,
161164
private val syncedEvents: Flow<List<ChatEvent>>,
165+
private val bufferConfig: MessageBufferConfig,
162166
scope: CoroutineScope,
163167
) : EventHandler {
164168

@@ -169,12 +173,61 @@ internal class EventHandlerSequential(
169173

170174
private val mutex = Mutex()
171175
private val socketEvents = MutableSharedFlow<ChatEvent>(extraBufferCapacity = Int.MAX_VALUE)
176+
177+
/**
178+
* Secondary flow used only when [bufferConfig] opts specific channel types into a bounded buffer.
179+
* Allocated lazily so the default configuration pays no cost for it.
180+
*/
181+
private val bufferedNewMessageEvents: MutableSharedFlow<ChatEvent> by lazy {
182+
MutableSharedFlow(
183+
extraBufferCapacity = bufferConfig.capacity,
184+
onBufferOverflow = when (bufferConfig.overflow) {
185+
MessageBufferOverflow.DROP_OLDEST -> BufferOverflow.DROP_OLDEST
186+
MessageBufferOverflow.DROP_LATEST -> BufferOverflow.DROP_LATEST
187+
},
188+
)
189+
}
172190
private val socketEventCollector = SocketEventCollector(scope) { batchEvent ->
173191
handleBatchEvent(batchEvent)
174192
}
175193

176194
private var eventsDisposable: Disposable = EMPTY_DISPOSABLE
177195

196+
/**
197+
* Default listener — emits every event into the unbuffered [socketEvents] flow without
198+
* inspecting [bufferConfig]. Used whenever no channel types are opted in for buffering.
199+
*/
200+
private val defaultSocketEventListener: ChatEventListener<ChatEvent> = ChatEventListener { event ->
201+
logEmitOutcome(event, socketEvents.tryEmit(event))
202+
}
203+
204+
/**
205+
* Listener used only when [bufferConfig] opts specific channel types into a bounded buffer.
206+
* Routes matching [NewMessageEvent]s to [bufferedNewMessageEvents] and everything else to
207+
* [socketEvents].
208+
*/
209+
private val bufferedSocketEventListener: ChatEventListener<ChatEvent> = ChatEventListener { event ->
210+
val target = if (event is NewMessageEvent && event.channelType in bufferConfig.channelTypes) {
211+
bufferedNewMessageEvents
212+
} else {
213+
socketEvents
214+
}
215+
logEmitOutcome(event, target.tryEmit(event))
216+
}
217+
218+
private fun logEmitOutcome(event: ChatEvent, emitted: Boolean) {
219+
if (emitted) {
220+
val cCount = collectedCount.get()
221+
val eCount = emittedCount.incrementAndGet()
222+
val ratio = eCount.toDouble() / cCount.toDouble()
223+
StreamLog.v(TAG_SOCKET) {
224+
"[onSocketEventReceived] event.type: ${event.realType}; $eCount => $cCount ($ratio)"
225+
}
226+
} else {
227+
StreamLog.e(TAG_SOCKET) { "[onSocketEventReceived] failed to emit socket event: $event" }
228+
}
229+
}
230+
178231
init {
179232
logger.d { "<init> no args" }
180233
}
@@ -201,26 +254,23 @@ internal class EventHandlerSequential(
201254
)
202255
}
203256
}
204-
scope.launch {
205-
socketEvents.collect { event ->
206-
collectedCount.incrementAndGet()
207-
initJob.join()
208-
sideEffect()
209-
socketEventCollector.collect(event)
210-
}
257+
val collectSocketEvent: suspend (ChatEvent) -> Unit = { event ->
258+
collectedCount.incrementAndGet()
259+
initJob.join()
260+
sideEffect()
261+
socketEventCollector.collect(event)
211262
}
212-
eventsDisposable = subscribeForEvents { event ->
213-
if (socketEvents.tryEmit(event)) {
214-
val cCount = collectedCount.get()
215-
val eCount = emittedCount.incrementAndGet()
216-
val ratio = eCount.toDouble() / cCount.toDouble()
217-
StreamLog.v(TAG_SOCKET) {
218-
"[onSocketEventReceived] event.type: ${event.realType}; $eCount => $cCount ($ratio)"
219-
}
220-
} else {
221-
StreamLog.e(TAG_SOCKET) { "[onSocketEventReceived] failed to emit socket event: $event" }
222-
}
263+
scope.launch { socketEvents.collect(collectSocketEvent) }
264+
val isBufferingEnabled = bufferConfig.channelTypes.isNotEmpty()
265+
if (isBufferingEnabled) {
266+
scope.launch { bufferedNewMessageEvents.collect(collectSocketEvent) }
267+
}
268+
val activeListener = if (isBufferingEnabled) {
269+
bufferedSocketEventListener
270+
} else {
271+
defaultSocketEventListener
223272
}
273+
eventsDisposable = subscribeForEvents(activeListener)
224274
}
225275
}
226276

stream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/plugin/factory/StreamStatePluginFactory.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package io.getstream.chat.android.client.internal.state.plugin.factory
1919
import android.content.Context
2020
import io.getstream.chat.android.client.ChatClient
2121
import io.getstream.chat.android.client.api.ChatClientConfig
22+
import io.getstream.chat.android.client.api.MessageBufferConfig
2223
import io.getstream.chat.android.client.api.state.StateRegistry
2324
import io.getstream.chat.android.client.events.ChatEvent
2425
import io.getstream.chat.android.client.internal.state.errorhandler.StateErrorHandlerFactory
@@ -154,6 +155,7 @@ public class StreamStatePluginFactory(
154155
repos = repositoryFacade,
155156
syncedEvents = syncManager.syncedEvents,
156157
sideEffect = syncManager::awaitSyncing,
158+
bufferConfig = config.messageLimitConfig.messageBufferConfig,
157159
)
158160

159161
val stateErrorHandlerFactory = StateErrorHandlerFactory(
@@ -189,6 +191,7 @@ public class StreamStatePluginFactory(
189191
repos: RepositoryFacade,
190192
sideEffect: suspend () -> Unit,
191193
syncedEvents: Flow<List<ChatEvent>>,
194+
bufferConfig: MessageBufferConfig,
192195
): EventHandler {
193196
return EventHandlerSequential(
194197
scope = scope,
@@ -201,6 +204,7 @@ public class StreamStatePluginFactory(
201204
repos = repos,
202205
syncedEvents = syncedEvents,
203206
sideEffect = sideEffect,
207+
bufferConfig = bufferConfig,
204208
)
205209
}
206210
}

stream-chat-android-client/src/test/java/io/getstream/chat/android/client/internal/state/event/TotalUnreadCountTest.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.getstream.chat.android.client.internal.state.event
1818

19+
import io.getstream.chat.android.client.api.MessageBufferConfig
1920
import io.getstream.chat.android.client.events.ChatEvent
2021
import io.getstream.chat.android.client.internal.state.event.handler.internal.EventHandler
2122
import io.getstream.chat.android.client.internal.state.event.handler.internal.EventHandlerSequential
@@ -144,6 +145,7 @@ internal class TotalUnreadCountTest {
144145
repos = repos,
145146
sideEffect = sideEffect,
146147
syncedEvents = syncedEvents,
148+
bufferConfig = MessageBufferConfig(),
147149
)
148150

149151
fun givenMockedRepositories(): Fixture {

0 commit comments

Comments
 (0)