We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent d6d9275 commit fa9dde3Copy full SHA for fa9dde3
1 file changed
core/designsystem/src/main/java/com/terning/core/designsystem/extension/FlowExt.kt
@@ -8,16 +8,17 @@ import kotlinx.coroutines.flow.flow
8
9
fun <T, K> Flow<T>.groupBy(getKey: (T) -> K): Flow<Pair<K, Flow<T>>> = flow {
10
val storage = mutableMapOf<K, SendChannel<T>>()
11
-
12
- collect { t ->
13
- val key = getKey(t)
14
- val channel = storage.getOrPut(key) {
15
- Channel<T>(capacity = Channel.BUFFERED).also {
16
- emit(key to it.consumeAsFlow())
+ try {
+ collect { t ->
+ val key = getKey(t)
+ val channel = storage.getOrPut(key) {
+ Channel<T>(capacity = Channel.BUFFERED).also {
+ emit(key to it.consumeAsFlow())
17
+ }
18
}
19
+ channel.send(t)
20
- channel.send(t)
21
+ } finally {
22
+ storage.values.forEach { it.close() }
23
- storage.values.forEach { it.close() }
24
0 commit comments