Skip to content

Commit 9c44a29

Browse files
Merge pull request #6 from caplin/flatMapLatestAndMerge-fix
Remove flatMapLatestAndMerge
2 parents 439d8d3 + b49132a commit 9c44a29

File tree

3 files changed

+1
-121
lines changed

3 files changed

+1
-121
lines changed

util/api/datasourcex-util.api

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -362,8 +362,6 @@ public final class com/caplin/integration/datasourcex/util/flow/SetEvent$Populat
362362
}
363363

364364
public final class com/caplin/integration/datasourcex/util/flow/SetEventKt {
365-
public static final fun flatMapLatestAndMerge (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
366-
public static final fun flatMapLatestAndMergeSet (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
367365
public static final fun runningFoldToSet (Lkotlinx/coroutines/flow/Flow;ZZ)Lkotlinx/coroutines/flow/Flow;
368366
public static synthetic fun runningFoldToSet$default (Lkotlinx/coroutines/flow/Flow;ZZILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
369367
public static final fun toEvents (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/SetEvent.kt

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,9 @@ import com.caplin.integration.datasourcex.util.flow.SetEvent.EntryEvent
44
import com.caplin.integration.datasourcex.util.flow.SetEvent.EntryEvent.Insert
55
import com.caplin.integration.datasourcex.util.flow.SetEvent.EntryEvent.Removed
66
import com.caplin.integration.datasourcex.util.flow.SetEvent.Populated
7-
import java.util.concurrent.ConcurrentHashMap
87
import kotlinx.collections.immutable.persistentSetOf
9-
import kotlinx.coroutines.Job
10-
import kotlinx.coroutines.cancelAndJoin
118
import kotlinx.coroutines.flow.Flow
12-
import kotlinx.coroutines.flow.channelFlow
139
import kotlinx.coroutines.flow.flow
14-
import kotlinx.coroutines.flow.launchIn
15-
import kotlinx.coroutines.flow.onCompletion
16-
import kotlinx.coroutines.flow.onEach
1710

1811
/** Events representing a mutation to a [Set]. */
1912
sealed interface SetEvent<out V : Any> {
@@ -155,43 +148,3 @@ fun <V : Any> Flow<SetEvent<V>>.runningFoldToSet(
155148
}
156149
}
157150
}
158-
159-
/**
160-
* Transforms a flow of sets into a merged flow by applying [entryEventTransformer] to each entry
161-
* event (insert or remove). When a value is inserted, a new flow is created and merged. When a
162-
* value is removed, the corresponding flow is cancelled.
163-
*/
164-
@JvmName("flatMapLatestAndMergeSet")
165-
fun <V : Any, R> Flow<Set<V>>.flatMapLatestAndMerge(
166-
entryEventTransformer: (EntryEvent<V>) -> Flow<R>
167-
): Flow<R> = toEvents().flatMapLatestAndMerge(entryEventTransformer)
168-
169-
/**
170-
* Transforms a flow of [SetEvent] into a merged flow by applying [entryEventTransformer] to each
171-
* entry event. When an [Insert] event is received, a new flow is created and its emissions are
172-
* merged into the resulting flow. When a [Removed] event is received, the previously created flow
173-
* for that value is cancelled.
174-
*/
175-
fun <V : Any, R> Flow<SetEvent<V>>.flatMapLatestAndMerge(
176-
entryEventTransformer: (EntryEvent<V>) -> Flow<R>
177-
) = channelFlow {
178-
val jobs = ConcurrentHashMap<V, Job>()
179-
collect { setEvent ->
180-
when (setEvent) {
181-
is Insert<V> -> {
182-
jobs[setEvent.value]?.cancelAndJoin()
183-
jobs[setEvent.value] =
184-
entryEventTransformer(setEvent)
185-
.onEach { send(it) }
186-
.onCompletion { jobs.remove(setEvent.value) }
187-
.launchIn(this@channelFlow)
188-
}
189-
190-
is Removed<V> -> {
191-
jobs.remove(setEvent.value)?.cancelAndJoin()
192-
}
193-
194-
is Populated -> {}
195-
}
196-
}
197-
}

util/src/test/kotlin/com/caplin/integration/datasourcex/util/flow/SetEventKtTest.kt

Lines changed: 1 addition & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@ import io.kotest.core.spec.style.FunSpec
1010
import io.kotest.matchers.equals.shouldBeEqual
1111
import io.kotest.matchers.types.shouldBeInstanceOf
1212
import kotlinx.coroutines.ExperimentalCoroutinesApi
13-
import kotlinx.coroutines.channels.Channel
14-
import kotlinx.coroutines.flow.consumeAsFlow
15-
import kotlinx.coroutines.flow.emptyFlow
1613
import kotlinx.coroutines.flow.flowOf
1714

1815
class SetEventKtTest :
@@ -41,7 +38,7 @@ class SetEventKtTest :
4138

4239
test("runningFoldToSet with emitPartials") {
4340
flowOf(Insert("A"), Insert("B"), Populated).runningFoldToSet(emitPartials = true).test {
44-
awaitItem().toSet() shouldBeEqual emptySet<String>()
41+
awaitItem().toSet() shouldBeEqual emptySet()
4542
awaitItem().toSet() shouldBeEqual setOf("A")
4643
awaitItem().toSet() shouldBeEqual setOf("A", "B")
4744
awaitComplete()
@@ -60,72 +57,4 @@ class SetEventKtTest :
6057
awaitError().shouldBeInstanceOf<IllegalStateException>()
6158
}
6259
}
63-
64-
test("flatMapLatestAndMerge - When upstream completes first, await inner complete") {
65-
val setFlow = Channel<Set<String>>()
66-
val aFlow = Channel<String>()
67-
68-
setFlow
69-
.consumeAsFlow()
70-
.flatMapLatestAndMerge {
71-
when (it) {
72-
is Insert ->
73-
when (it.value) {
74-
"a" -> aFlow.consumeAsFlow()
75-
else -> throw IllegalArgumentException(it.value)
76-
}
77-
is Removed -> emptyFlow()
78-
}
79-
}
80-
.test {
81-
expectNoEvents()
82-
83-
setFlow.send(setOf("a"))
84-
expectNoEvents()
85-
86-
aFlow.send("A")
87-
88-
awaitItem() shouldBeEqual "A"
89-
90-
setFlow.close()
91-
92-
expectNoEvents()
93-
94-
aFlow.send("B")
95-
96-
awaitItem() shouldBeEqual "B"
97-
98-
aFlow.close()
99-
100-
awaitComplete()
101-
}
102-
}
103-
104-
test("When inner completes first, await upstream complete") {
105-
val setFlow = Channel<Set<String>>()
106-
107-
setFlow
108-
.consumeAsFlow()
109-
.flatMapLatestAndMerge {
110-
when (it) {
111-
is Insert ->
112-
when (it.value) {
113-
"a" -> flowOf("A")
114-
else -> throw IllegalArgumentException(it.value)
115-
}
116-
is Removed -> emptyFlow()
117-
}
118-
}
119-
.test {
120-
expectNoEvents()
121-
122-
setFlow.send(setOf("a"))
123-
124-
awaitItem() shouldBeEqual "A"
125-
126-
setFlow.close()
127-
128-
awaitComplete()
129-
}
130-
}
13160
})

0 commit comments

Comments
 (0)