File tree Expand file tree Collapse file tree 1 file changed +4
-9
lines changed
util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow Expand file tree Collapse file tree 1 file changed +4
-9
lines changed Original file line number Diff line number Diff line change @@ -168,29 +168,24 @@ fun <V : Any, R> Flow<Set<V>>.flatMapLatestAndMerge(
168168
169169/* *
170170 * Transforms a flow of [SetEvent] into a merged flow by applying [entryEventTransformer] to each
171- * entry event. When an [Insert] event is received, a new flow is created and its emissions are
172- * merged into the resulting flow. When a [Removed] event is received, the previously created flow
173- * for that value is cancelled.
171+ * entry event. When an event is received, a new flow is created and its emissions are
172+ * merged into the resulting flow.
174173 */
175174fun <V : Any , R > Flow<SetEvent<V>>.flatMapLatestAndMerge (
176175 entryEventTransformer : (EntryEvent <V >) -> Flow <R >
177176) = channelFlow {
178177 val jobs = ConcurrentHashMap <V , Job >()
179178 collect { setEvent ->
180179 when (setEvent) {
181- is Insert <V > -> {
180+ is EntryEvent <V > -> {
182181 jobs[setEvent.value]?.cancelAndJoin()
183182 jobs[setEvent.value] =
184183 entryEventTransformer(setEvent)
185184 .onEach { send(it) }
186- .onCompletion { jobs.remove(setEvent.value) }
185+ .onCompletion { throwable -> if (throwable == null ) jobs.remove(setEvent.value) }
187186 .launchIn(this @channelFlow)
188187 }
189188
190- is Removed <V > -> {
191- jobs.remove(setEvent.value)?.cancelAndJoin()
192- }
193-
194189 is Populated -> {}
195190 }
196191 }
You can’t perform that action at this time.
0 commit comments