diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt b/src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt index c5c7c7f7..70325902 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt @@ -72,3 +72,311 @@ public fun Flow.withLatestFrom( @Suppress("NOTHING_TO_INLINE") public inline fun Flow.withLatestFrom(other: Flow): Flow> = withLatestFrom(other, ::Pair) + +/** + * Merges multiple [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any. + * Values emitted by self before all other [Flow]s have emitted any values will be omitted. + * + * @param others Array of other [Flow]s + * @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any. + */ +public fun Flow.withLatestFrom( + others: Array>, + transform: suspend (A, Array) -> R, +): Flow { + return flow { + val refs = Array>(others.size) { AtomicRef(null) } + + try { + coroutineScope { + val jobs = others.mapIndexed { index, flow -> + launch(start = CoroutineStart.UNDISPATCHED) { + flow.collect { refs[index].value = it ?: INTERNAL_NULL_VALUE } + } + } + + collect { value -> + val values = Array(refs.size) { index -> + refs[index].value ?: return@collect + } + + val unboxedValues = Array(values.size) { index -> + INTERNAL_NULL_VALUE.unbox(values[index]) + } + + emit(transform(value, unboxedValues)) + } + + jobs.forEach { it.cancelAndJoin() } + } + } finally { + refs.forEach { it.value = null } + } + } +} + +/** + * Merges three [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any. + * Values emitted by self before all other [Flow]s have emitted any values will be omitted. + * + * @param other2 Second [Flow] + * @param other3 Third [Flow] + * @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any. + */ +public fun Flow.withLatestFrom( + other2: Flow, + other3: Flow, + transform: suspend (A, B, C) -> R, +): Flow { + return withLatestFrom(arrayOf(other2, other3)) { value, others -> + transform( + value, + others[0] as B, + others[1] as C, + ) + } +} + +@Suppress("NOTHING_TO_INLINE") +public inline fun Flow.withLatestFrom( + other2: Flow, + other3: Flow, +): Flow> = withLatestFrom(other2, other3, ::Triple) + +/** + * Merges four [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any. + * Values emitted by self before all other [Flow]s have emitted any values will be omitted. + * + * @param other2 Second [Flow] + * @param other3 Third [Flow] + * @param other4 Fourth [Flow] + * @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any. + */ +public fun Flow.withLatestFrom( + other2: Flow, + other3: Flow, + other4: Flow, + transform: suspend (A, B, C, D) -> R, +): Flow { + return withLatestFrom(arrayOf(other2, other3, other4)) { value, others -> + transform( + value, + others[0] as B, + others[1] as C, + others[2] as D, + ) + } +} + +/** + * Merges five [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any. + * Values emitted by self before all other [Flow]s have emitted any values will be omitted. + * + * @param other2 Second [Flow] + * @param other3 Third [Flow] + * @param other4 Fourth [Flow] + * @param other5 Fifth [Flow] + * @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any. + */ +public fun Flow.withLatestFrom( + other2: Flow, + other3: Flow, + other4: Flow, + other5: Flow, + transform: suspend (A, B, C, D, E) -> R, +): Flow { + return withLatestFrom(arrayOf(other2, other3, other4, other5)) { value, others -> + transform( + value, + others[0] as B, + others[1] as C, + others[2] as D, + others[3] as E, + ) + } +} + +/** + * Merges six [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any. + * Values emitted by self before all other [Flow]s have emitted any values will be omitted. + * + * @param other2 Second [Flow] + * @param other3 Third [Flow] + * @param other4 Fourth [Flow] + * @param other5 Fifth [Flow] + * @param other6 Sixth [Flow] + * @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any. + */ +public fun Flow.withLatestFrom( + other2: Flow, + other3: Flow, + other4: Flow, + other5: Flow, + other6: Flow, + transform: suspend (A, B, C, D, E, F) -> R, +): Flow { + return withLatestFrom(arrayOf(other2, other3, other4, other5, other6)) { value, others -> + transform( + value, + others[0] as B, + others[1] as C, + others[2] as D, + others[3] as E, + others[4] as F, + ) + } +} + +/** + * Merges seven [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any. + * Values emitted by self before all other [Flow]s have emitted any values will be omitted. + * + * @param other2 Second [Flow] + * @param other3 Third [Flow] + * @param other4 Fourth [Flow] + * @param other5 Fifth [Flow] + * @param other6 Sixth [Flow] + * @param other7 Seventh [Flow] + * @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any. + */ +public fun Flow.withLatestFrom( + other2: Flow, + other3: Flow, + other4: Flow, + other5: Flow, + other6: Flow, + other7: Flow, + transform: suspend (A, B, C, D, E, F, G) -> R, +): Flow { + return withLatestFrom(arrayOf(other2, other3, other4, other5, other6, other7)) { value, others -> + transform( + value, + others[0] as B, + others[1] as C, + others[2] as D, + others[3] as E, + others[4] as F, + others[5] as G, + ) + } +} + +/** + * Merges eight [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any. + * Values emitted by self before all other [Flow]s have emitted any values will be omitted. + * + * @param other2 Second [Flow] + * @param other3 Third [Flow] + * @param other4 Fourth [Flow] + * @param other5 Fifth [Flow] + * @param other6 Sixth [Flow] + * @param other7 Seventh [Flow] + * @param other8 Eighth [Flow] + * @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any. + */ +public fun Flow.withLatestFrom( + other2: Flow, + other3: Flow, + other4: Flow, + other5: Flow, + other6: Flow, + other7: Flow, + other8: Flow, + transform: suspend (A, B, C, D, E, F, G, H) -> R, +): Flow { + return withLatestFrom(arrayOf(other2, other3, other4, other5, other6, other7, other8)) { value, others -> + transform( + value, + others[0] as B, + others[1] as C, + others[2] as D, + others[3] as E, + others[4] as F, + others[5] as G, + others[6] as H, + ) + } +} + +/** + * Merges nine [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any. + * Values emitted by self before all other [Flow]s have emitted any values will be omitted. + * + * @param other2 Second [Flow] + * @param other3 Third [Flow] + * @param other4 Fourth [Flow] + * @param other5 Fifth [Flow] + * @param other6 Sixth [Flow] + * @param other7 Seventh [Flow] + * @param other8 Eighth [Flow] + * @param other9 Ninth [Flow] + * @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any. + */ +public fun Flow.withLatestFrom( + other2: Flow, + other3: Flow, + other4: Flow, + other5: Flow, + other6: Flow, + other7: Flow, + other8: Flow, + other9: Flow, + transform: suspend (A, B, C, D, E, F, G, H, I) -> R, +): Flow { + return withLatestFrom(arrayOf(other2, other3, other4, other5, other6, other7, other8, other9)) { value, others -> + transform( + value, + others[0] as B, + others[1] as C, + others[2] as D, + others[3] as E, + others[4] as F, + others[5] as G, + others[6] as H, + others[7] as I, + ) + } +} + +/** + * Merges ten [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any. + * Values emitted by self before all other [Flow]s have emitted any values will be omitted. + * + * @param other2 Second [Flow] + * @param other3 Third [Flow] + * @param other4 Fourth [Flow] + * @param other5 Fifth [Flow] + * @param other6 Sixth [Flow] + * @param other7 Seventh [Flow] + * @param other8 Eighth [Flow] + * @param other9 Ninth [Flow] + * @param other10 Tenth [Flow] + * @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any. + */ +public fun Flow.withLatestFrom( + other2: Flow, + other3: Flow, + other4: Flow, + other5: Flow, + other6: Flow, + other7: Flow, + other8: Flow, + other9: Flow, + other10: Flow, + transform: suspend (A, B, C, D, E, F, G, H, I, J) -> R, +): Flow { + return withLatestFrom(arrayOf(other2, other3, other4, other5, other6, other7, other8, other9, other10)) { value, others -> + transform( + value, + others[0] as B, + others[1] as C, + others[2] as D, + others[3] as E, + others[4] as F, + others[5] as G, + others[6] as H, + others[7] as I, + others[8] as J, + ) + } +} diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt.backup b/src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt.backup new file mode 100644 index 00000000..038be650 --- /dev/null +++ b/src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt.backup @@ -0,0 +1,682 @@ +/* + * MIT License + * + * Copyright (c) 2021-2024 Petrus Nguyễn Thái Học + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.hoc081098.flowext + +import com.hoc081098.flowext.internal.AtomicRef +import com.hoc081098.flowext.internal.INTERNAL_NULL_VALUE +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.launch + +/** + * Merges two [Flow]s into one [Flow] by combining each value from self with the latest value from the second [Flow], if any. + * Values emitted by self before the second [Flow] has emitted any values will be omitted. + * + * @param other Second [Flow] + * @param transform A transform function to apply to each value from self combined with the latest value from the second [Flow], if any. + */ +public fun Flow.withLatestFrom( + other: Flow, + transform: suspend (A, B) -> R, +): Flow { + return flow { + val otherRef = AtomicRef(null) + + try { + coroutineScope { + val otherCollectionJob = launch(start = CoroutineStart.UNDISPATCHED) { + other.collect { otherRef.value = it ?: INTERNAL_NULL_VALUE } + } + + collect { value -> + emit( + transform( + value, + INTERNAL_NULL_VALUE.unbox(otherRef.value ?: return@collect), + ), + ) + } + otherCollectionJob.cancelAndJoin() + } + } finally { + otherRef.value = null + } + } +} + +@Suppress("NOTHING_TO_INLINE") +public inline fun Flow.withLatestFrom(other: Flow): Flow> = + withLatestFrom(other, ::Pair) + +/** + * Merges multiple [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any. + * Values emitted by self before all other [Flow]s have emitted any values will be omitted. + * + * @param others Array of other [Flow]s + * @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any. + */ +public fun Flow.withLatestFrom( + others: Array>, + transform: suspend (A, Array) -> R, +): Flow { + return flow { + val refs = Array>(others.size) { AtomicRef(null) } + + try { + coroutineScope { + val jobs = others.mapIndexed { index, flow -> + launch(start = CoroutineStart.UNDISPATCHED) { + flow.collect { refs[index].value = it ?: INTERNAL_NULL_VALUE } + } + } + + collect { value -> + val values = Array(refs.size) { index -> + refs[index].value ?: return@collect + } + + val unboxedValues = Array(values.size) { index -> + INTERNAL_NULL_VALUE.unbox(values[index]) + } + + emit(transform(value, unboxedValues)) + } + + jobs.forEach { it.cancelAndJoin() } + } + } finally { + refs.forEach { it.value = null } + } + } +} + +/** + * Merges three [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any. + * Values emitted by self before all other [Flow]s have emitted any values will be omitted. + * + * @param other2 Second [Flow] + * @param other3 Third [Flow] + * @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any. + */ +public fun Flow.withLatestFrom( + other2: Flow, + other3: Flow, + transform: suspend (A, B, C) -> R, +): Flow { + return withLatestFrom(arrayOf(other2, other3)) { value, others -> + transform( + value, + others[0] as B, + others[1] as C, + ) + } +} + +@Suppress("NOTHING_TO_INLINE") +public inline fun Flow.withLatestFrom( + other2: Flow, + other3: Flow, +): Flow> = withLatestFrom(other2, other3, ::Triple) + +/** + * Merges four [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any. + * Values emitted by self before all other [Flow]s have emitted any values will be omitted. + * + * @param other2 Second [Flow] + * @param other3 Third [Flow] + * @param other4 Fourth [Flow] + * @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any. + */ +public fun Flow.withLatestFrom( + other2: Flow, + other3: Flow, + other4: Flow, + transform: suspend (A, B, C, D) -> R, +): Flow { + return withLatestFrom(arrayOf(other2, other3, other4)) { value, others -> + transform( + value, + others[0] as B, + others[1] as C, + others[2] as D, + ) + } +} + +/** + * Merges five [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any. + * Values emitted by self before all other [Flow]s have emitted any values will be omitted. + * + * @param other2 Second [Flow] + * @param other3 Third [Flow] + * @param other4 Fourth [Flow] + * @param other5 Fifth [Flow] + * @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any. + */ +public fun Flow.withLatestFrom( + other2: Flow, + other3: Flow, + other4: Flow, + other5: Flow, + transform: suspend (A, B, C, D, E) -> R, +): Flow { + return withLatestFrom(arrayOf(other2, other3, other4, other5)) { value, others -> + transform( + value, + others[0] as B, + others[1] as C, + others[2] as D, + others[3] as E, + ) + } +} + +/** + * Merges six [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any. + * Values emitted by self before all other [Flow]s have emitted any values will be omitted. + * + * @param other2 Second [Flow] + * @param other3 Third [Flow] + * @param other4 Fourth [Flow] + * @param other5 Fifth [Flow] + * @param other6 Sixth [Flow] + * @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any. + */ +public fun Flow.withLatestFrom( + other2: Flow, + other3: Flow, + other4: Flow, + other5: Flow, + other6: Flow, + transform: suspend (A, B, C, D, E, F) -> R, +): Flow { + return flow { + val other2Ref = AtomicRef(null) + val other3Ref = AtomicRef(null) + val other4Ref = AtomicRef(null) + val other5Ref = AtomicRef(null) + val other6Ref = AtomicRef(null) + + try { + coroutineScope { + val other2Job = launch(start = CoroutineStart.UNDISPATCHED) { + other2.collect { other2Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other3Job = launch(start = CoroutineStart.UNDISPATCHED) { + other3.collect { other3Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other4Job = launch(start = CoroutineStart.UNDISPATCHED) { + other4.collect { other4Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other5Job = launch(start = CoroutineStart.UNDISPATCHED) { + other5.collect { other5Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other6Job = launch(start = CoroutineStart.UNDISPATCHED) { + other6.collect { other6Ref.value = it ?: INTERNAL_NULL_VALUE } + } + + collect { value -> + val value2 = other2Ref.value ?: return@collect + val value3 = other3Ref.value ?: return@collect + val value4 = other4Ref.value ?: return@collect + val value5 = other5Ref.value ?: return@collect + val value6 = other6Ref.value ?: return@collect + emit( + transform( + value, + INTERNAL_NULL_VALUE.unbox(value2), + INTERNAL_NULL_VALUE.unbox(value3), + INTERNAL_NULL_VALUE.unbox(value4), + INTERNAL_NULL_VALUE.unbox(value5), + INTERNAL_NULL_VALUE.unbox(value6), + ), + ) + } + other2Job.cancelAndJoin() + other3Job.cancelAndJoin() + other4Job.cancelAndJoin() + other5Job.cancelAndJoin() + other6Job.cancelAndJoin() + } + } finally { + other2Ref.value = null + other3Ref.value = null + other4Ref.value = null + other5Ref.value = null + other6Ref.value = null + } + } +} + +/** + * Merges seven [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any. + * Values emitted by self before all other [Flow]s have emitted any values will be omitted. + * + * @param other2 Second [Flow] + * @param other3 Third [Flow] + * @param other4 Fourth [Flow] + * @param other5 Fifth [Flow] + * @param other6 Sixth [Flow] + * @param other7 Seventh [Flow] + * @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any. + */ +public fun Flow.withLatestFrom( + other2: Flow, + other3: Flow, + other4: Flow, + other5: Flow, + other6: Flow, + other7: Flow, + transform: suspend (A, B, C, D, E, F, G) -> R, +): Flow { + return flow { + val other2Ref = AtomicRef(null) + val other3Ref = AtomicRef(null) + val other4Ref = AtomicRef(null) + val other5Ref = AtomicRef(null) + val other6Ref = AtomicRef(null) + val other7Ref = AtomicRef(null) + + try { + coroutineScope { + val other2Job = launch(start = CoroutineStart.UNDISPATCHED) { + other2.collect { other2Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other3Job = launch(start = CoroutineStart.UNDISPATCHED) { + other3.collect { other3Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other4Job = launch(start = CoroutineStart.UNDISPATCHED) { + other4.collect { other4Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other5Job = launch(start = CoroutineStart.UNDISPATCHED) { + other5.collect { other5Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other6Job = launch(start = CoroutineStart.UNDISPATCHED) { + other6.collect { other6Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other7Job = launch(start = CoroutineStart.UNDISPATCHED) { + other7.collect { other7Ref.value = it ?: INTERNAL_NULL_VALUE } + } + + collect { value -> + val value2 = other2Ref.value ?: return@collect + val value3 = other3Ref.value ?: return@collect + val value4 = other4Ref.value ?: return@collect + val value5 = other5Ref.value ?: return@collect + val value6 = other6Ref.value ?: return@collect + val value7 = other7Ref.value ?: return@collect + emit( + transform( + value, + INTERNAL_NULL_VALUE.unbox(value2), + INTERNAL_NULL_VALUE.unbox(value3), + INTERNAL_NULL_VALUE.unbox(value4), + INTERNAL_NULL_VALUE.unbox(value5), + INTERNAL_NULL_VALUE.unbox(value6), + INTERNAL_NULL_VALUE.unbox(value7), + ), + ) + } + other2Job.cancelAndJoin() + other3Job.cancelAndJoin() + other4Job.cancelAndJoin() + other5Job.cancelAndJoin() + other6Job.cancelAndJoin() + other7Job.cancelAndJoin() + } + } finally { + other2Ref.value = null + other3Ref.value = null + other4Ref.value = null + other5Ref.value = null + other6Ref.value = null + other7Ref.value = null + } + } +} + +/** + * Merges eight [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any. + * Values emitted by self before all other [Flow]s have emitted any values will be omitted. + * + * @param other2 Second [Flow] + * @param other3 Third [Flow] + * @param other4 Fourth [Flow] + * @param other5 Fifth [Flow] + * @param other6 Sixth [Flow] + * @param other7 Seventh [Flow] + * @param other8 Eighth [Flow] + * @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any. + */ +public fun Flow.withLatestFrom( + other2: Flow, + other3: Flow, + other4: Flow, + other5: Flow, + other6: Flow, + other7: Flow, + other8: Flow, + transform: suspend (A, B, C, D, E, F, G, H) -> R, +): Flow { + return flow { + val other2Ref = AtomicRef(null) + val other3Ref = AtomicRef(null) + val other4Ref = AtomicRef(null) + val other5Ref = AtomicRef(null) + val other6Ref = AtomicRef(null) + val other7Ref = AtomicRef(null) + val other8Ref = AtomicRef(null) + + try { + coroutineScope { + val other2Job = launch(start = CoroutineStart.UNDISPATCHED) { + other2.collect { other2Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other3Job = launch(start = CoroutineStart.UNDISPATCHED) { + other3.collect { other3Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other4Job = launch(start = CoroutineStart.UNDISPATCHED) { + other4.collect { other4Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other5Job = launch(start = CoroutineStart.UNDISPATCHED) { + other5.collect { other5Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other6Job = launch(start = CoroutineStart.UNDISPATCHED) { + other6.collect { other6Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other7Job = launch(start = CoroutineStart.UNDISPATCHED) { + other7.collect { other7Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other8Job = launch(start = CoroutineStart.UNDISPATCHED) { + other8.collect { other8Ref.value = it ?: INTERNAL_NULL_VALUE } + } + + collect { value -> + val value2 = other2Ref.value ?: return@collect + val value3 = other3Ref.value ?: return@collect + val value4 = other4Ref.value ?: return@collect + val value5 = other5Ref.value ?: return@collect + val value6 = other6Ref.value ?: return@collect + val value7 = other7Ref.value ?: return@collect + val value8 = other8Ref.value ?: return@collect + emit( + transform( + value, + INTERNAL_NULL_VALUE.unbox(value2), + INTERNAL_NULL_VALUE.unbox(value3), + INTERNAL_NULL_VALUE.unbox(value4), + INTERNAL_NULL_VALUE.unbox(value5), + INTERNAL_NULL_VALUE.unbox(value6), + INTERNAL_NULL_VALUE.unbox(value7), + INTERNAL_NULL_VALUE.unbox(value8), + ), + ) + } + other2Job.cancelAndJoin() + other3Job.cancelAndJoin() + other4Job.cancelAndJoin() + other5Job.cancelAndJoin() + other6Job.cancelAndJoin() + other7Job.cancelAndJoin() + other8Job.cancelAndJoin() + } + } finally { + other2Ref.value = null + other3Ref.value = null + other4Ref.value = null + other5Ref.value = null + other6Ref.value = null + other7Ref.value = null + other8Ref.value = null + } + } +} + +/** + * Merges nine [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any. + * Values emitted by self before all other [Flow]s have emitted any values will be omitted. + * + * @param other2 Second [Flow] + * @param other3 Third [Flow] + * @param other4 Fourth [Flow] + * @param other5 Fifth [Flow] + * @param other6 Sixth [Flow] + * @param other7 Seventh [Flow] + * @param other8 Eighth [Flow] + * @param other9 Ninth [Flow] + * @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any. + */ +public fun Flow.withLatestFrom( + other2: Flow, + other3: Flow, + other4: Flow, + other5: Flow, + other6: Flow, + other7: Flow, + other8: Flow, + other9: Flow, + transform: suspend (A, B, C, D, E, F, G, H, I) -> R, +): Flow { + return flow { + val other2Ref = AtomicRef(null) + val other3Ref = AtomicRef(null) + val other4Ref = AtomicRef(null) + val other5Ref = AtomicRef(null) + val other6Ref = AtomicRef(null) + val other7Ref = AtomicRef(null) + val other8Ref = AtomicRef(null) + val other9Ref = AtomicRef(null) + + try { + coroutineScope { + val other2Job = launch(start = CoroutineStart.UNDISPATCHED) { + other2.collect { other2Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other3Job = launch(start = CoroutineStart.UNDISPATCHED) { + other3.collect { other3Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other4Job = launch(start = CoroutineStart.UNDISPATCHED) { + other4.collect { other4Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other5Job = launch(start = CoroutineStart.UNDISPATCHED) { + other5.collect { other5Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other6Job = launch(start = CoroutineStart.UNDISPATCHED) { + other6.collect { other6Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other7Job = launch(start = CoroutineStart.UNDISPATCHED) { + other7.collect { other7Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other8Job = launch(start = CoroutineStart.UNDISPATCHED) { + other8.collect { other8Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other9Job = launch(start = CoroutineStart.UNDISPATCHED) { + other9.collect { other9Ref.value = it ?: INTERNAL_NULL_VALUE } + } + + collect { value -> + val value2 = other2Ref.value ?: return@collect + val value3 = other3Ref.value ?: return@collect + val value4 = other4Ref.value ?: return@collect + val value5 = other5Ref.value ?: return@collect + val value6 = other6Ref.value ?: return@collect + val value7 = other7Ref.value ?: return@collect + val value8 = other8Ref.value ?: return@collect + val value9 = other9Ref.value ?: return@collect + emit( + transform( + value, + INTERNAL_NULL_VALUE.unbox(value2), + INTERNAL_NULL_VALUE.unbox(value3), + INTERNAL_NULL_VALUE.unbox(value4), + INTERNAL_NULL_VALUE.unbox(value5), + INTERNAL_NULL_VALUE.unbox(value6), + INTERNAL_NULL_VALUE.unbox(value7), + INTERNAL_NULL_VALUE.unbox(value8), + INTERNAL_NULL_VALUE.unbox(value9), + ), + ) + } + other2Job.cancelAndJoin() + other3Job.cancelAndJoin() + other4Job.cancelAndJoin() + other5Job.cancelAndJoin() + other6Job.cancelAndJoin() + other7Job.cancelAndJoin() + other8Job.cancelAndJoin() + other9Job.cancelAndJoin() + } + } finally { + other2Ref.value = null + other3Ref.value = null + other4Ref.value = null + other5Ref.value = null + other6Ref.value = null + other7Ref.value = null + other8Ref.value = null + other9Ref.value = null + } + } +} + +/** + * Merges ten [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any. + * Values emitted by self before all other [Flow]s have emitted any values will be omitted. + * + * @param other2 Second [Flow] + * @param other3 Third [Flow] + * @param other4 Fourth [Flow] + * @param other5 Fifth [Flow] + * @param other6 Sixth [Flow] + * @param other7 Seventh [Flow] + * @param other8 Eighth [Flow] + * @param other9 Ninth [Flow] + * @param other10 Tenth [Flow] + * @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any. + */ +public fun Flow.withLatestFrom( + other2: Flow, + other3: Flow, + other4: Flow, + other5: Flow, + other6: Flow, + other7: Flow, + other8: Flow, + other9: Flow, + other10: Flow, + transform: suspend (A, B, C, D, E, F, G, H, I, J) -> R, +): Flow { + return flow { + val other2Ref = AtomicRef(null) + val other3Ref = AtomicRef(null) + val other4Ref = AtomicRef(null) + val other5Ref = AtomicRef(null) + val other6Ref = AtomicRef(null) + val other7Ref = AtomicRef(null) + val other8Ref = AtomicRef(null) + val other9Ref = AtomicRef(null) + val other10Ref = AtomicRef(null) + + try { + coroutineScope { + val other2Job = launch(start = CoroutineStart.UNDISPATCHED) { + other2.collect { other2Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other3Job = launch(start = CoroutineStart.UNDISPATCHED) { + other3.collect { other3Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other4Job = launch(start = CoroutineStart.UNDISPATCHED) { + other4.collect { other4Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other5Job = launch(start = CoroutineStart.UNDISPATCHED) { + other5.collect { other5Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other6Job = launch(start = CoroutineStart.UNDISPATCHED) { + other6.collect { other6Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other7Job = launch(start = CoroutineStart.UNDISPATCHED) { + other7.collect { other7Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other8Job = launch(start = CoroutineStart.UNDISPATCHED) { + other8.collect { other8Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other9Job = launch(start = CoroutineStart.UNDISPATCHED) { + other9.collect { other9Ref.value = it ?: INTERNAL_NULL_VALUE } + } + val other10Job = launch(start = CoroutineStart.UNDISPATCHED) { + other10.collect { other10Ref.value = it ?: INTERNAL_NULL_VALUE } + } + + collect { value -> + val value2 = other2Ref.value ?: return@collect + val value3 = other3Ref.value ?: return@collect + val value4 = other4Ref.value ?: return@collect + val value5 = other5Ref.value ?: return@collect + val value6 = other6Ref.value ?: return@collect + val value7 = other7Ref.value ?: return@collect + val value8 = other8Ref.value ?: return@collect + val value9 = other9Ref.value ?: return@collect + val value10 = other10Ref.value ?: return@collect + emit( + transform( + value, + INTERNAL_NULL_VALUE.unbox(value2), + INTERNAL_NULL_VALUE.unbox(value3), + INTERNAL_NULL_VALUE.unbox(value4), + INTERNAL_NULL_VALUE.unbox(value5), + INTERNAL_NULL_VALUE.unbox(value6), + INTERNAL_NULL_VALUE.unbox(value7), + INTERNAL_NULL_VALUE.unbox(value8), + INTERNAL_NULL_VALUE.unbox(value9), + INTERNAL_NULL_VALUE.unbox(value10), + ), + ) + } + other2Job.cancelAndJoin() + other3Job.cancelAndJoin() + other4Job.cancelAndJoin() + other5Job.cancelAndJoin() + other6Job.cancelAndJoin() + other7Job.cancelAndJoin() + other8Job.cancelAndJoin() + other9Job.cancelAndJoin() + other10Job.cancelAndJoin() + } + } finally { + other2Ref.value = null + other3Ref.value = null + other4Ref.value = null + other5Ref.value = null + other6Ref.value = null + other7Ref.value = null + other8Ref.value = null + other9Ref.value = null + other10Ref.value = null + } + } +} diff --git a/src/commonTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt b/src/commonTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt index 6ad4d53b..69134545 100644 --- a/src/commonTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt +++ b/src/commonTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt @@ -142,4 +142,265 @@ class WithLatestFromTest : BaseTest() { ), ) } + + @Test + fun testWithLatestFrom3() = runTest { + val f1 = flowOf(1, 2, 3, 4) + val f2 = flowOf("a", "b", "c", "d", "e") + val f3 = flowOf(true, false, true) + assertEquals( + f2.withLatestFrom(f1, f3).toList(), + listOf( + Triple("a", 4, true), + Triple("b", 4, true), + Triple("c", 4, true), + Triple("d", 4, true), + Triple("e", 4, true), + ), + ) + } + + @Test + fun testWithLatestFrom3WithTransform() = runTest { + val f1 = flowOf(1, 2, 3, 4) + val f2 = flowOf("a", "b", "c", "d", "e") + val f3 = flowOf(true, false, true) + assertEquals( + f2.withLatestFrom(f1, f3) { a, b, c -> "$a-$b-$c" }.toList(), + listOf( + "a-4-true", + "b-4-true", + "c-4-true", + "d-4-true", + "e-4-true", + ), + ) + } + + @Test + fun testWithLatestFrom3WithTiming() = runTest { + val f1 = flowOf(1, 2, 3, 4).onEach { delay(300) } + val f2 = flowOf("a", "b", "c", "d", "e").onEach { delay(100) } + val f3 = flowOf(true, false, true).onEach { delay(150) } + assertEquals( + f2.withLatestFrom(f1, f3).toList(), + listOf( + Triple("c", 1, false), + Triple("d", 1, false), + Triple("e", 1, true), + ), + ) + } + + @Test + fun testWithLatestFrom4() = runTest { + val f1 = flowOf(1, 2, 3, 4) + val f2 = flowOf("a", "b", "c", "d", "e") + val f3 = flowOf(true, false, true) + val f4 = flowOf('x', 'y', 'z') + assertEquals( + f2.withLatestFrom(f1, f3, f4) { a, b, c, d -> "$a-$b-$c-$d" }.toList(), + listOf( + "a-4-true-z", + "b-4-true-z", + "c-4-true-z", + "d-4-true-z", + "e-4-true-z", + ), + ) + } + + @Test + fun testWithLatestFrom5() = runTest { + val f1 = flowOf(1, 2, 3, 4) + val f2 = flowOf("a", "b", "c", "d", "e") + val f3 = flowOf(true, false, true) + val f4 = flowOf('x', 'y', 'z') + val f5 = flowOf(10.0, 20.0, 30.0) + assertEquals( + f2.withLatestFrom(f1, f3, f4, f5) { a, b, c, d, e -> "$a-$b-$c-$d-$e" }.toList(), + listOf( + "a-4-true-z-30", + "b-4-true-z-30", + "c-4-true-z-30", + "d-4-true-z-30", + "e-4-true-z-30", + ), + ) + } + + @Test + fun testWithLatestFrom6() = runTest { + val f1 = flowOf(1, 2, 3, 4) + val f2 = flowOf("a", "b", "c", "d", "e") + val f3 = flowOf(true, false, true) + val f4 = flowOf('x', 'y', 'z') + val f5 = flowOf(10.0, 20.0, 30.0) + val f6 = flowOf("first", "second", "third") + assertEquals( + f2.withLatestFrom(f1, f3, f4, f5, f6) { a, b, c, d, e, f -> "$a-$b-$c-$d-$e-$f" }.toList(), + listOf( + "a-4-true-z-30-third", + "b-4-true-z-30-third", + "c-4-true-z-30-third", + "d-4-true-z-30-third", + "e-4-true-z-30-third", + ), + ) + } + + @Test + fun testWithLatestFrom3_failureUpStream() = runTest { + assertFailsWith { + flow { throw TestException() } + .withLatestFrom(neverFlow(), neverFlow()) + .collect() + } + + assertFailsWith { + neverFlow() + .withLatestFrom(flow { throw TestException() }, neverFlow()) + .collect() + } + + assertFailsWith { + neverFlow() + .withLatestFrom(neverFlow(), flow { throw TestException() }) + .collect() + } + } + + @Test + fun testWithLatestFrom3_cancellation() = runTest { + assertFailsWith { + flow { + emit(1) + throw CancellationException("") + } + .withLatestFrom(emptyFlow(), emptyFlow()) + .collect() + } + + flowOf(1) + .withLatestFrom( + flow { + emit(2) + throw CancellationException("") + }, + flowOf(3), + ) + .test( + listOf( + Event.Value(Triple(1, 2, 3)), + Event.Complete, + ), + ) + } + + @Test + fun testWithLatestFrom3_nullValues() = runTest { + val f1 = flowOf(1, 2, 3, 4, null) + val f2 = flowOf("a", "b", "c", "d", "e") + val f3 = flowOf(true, false, null) + assertEquals( + f2.withLatestFrom(f1, f3).toList(), + listOf( + Triple("a", null, null), + Triple("b", null, null), + Triple("c", null, null), + Triple("d", null, null), + Triple("e", null, null), + ), + ) + } + + @Test + fun testWithLatestFrom7() = runTest { + val f1 = flowOf(1, 2, 3, 4) + val f2 = flowOf("a", "b", "c", "d", "e") + val f3 = flowOf(true, false, true) + val f4 = flowOf('x', 'y', 'z') + val f5 = flowOf(10.0, 20.0, 30.0) + val f6 = flowOf("first", "second", "third") + val f7 = flowOf(100L, 200L, 300L) + assertEquals( + f2.withLatestFrom(f1, f3, f4, f5, f6, f7) { a, b, c, d, e, f, g -> "$a-$b-$c-$d-$e-$f-$g" }.toList(), + listOf( + "a-4-true-z-30-third-300", + "b-4-true-z-30-third-300", + "c-4-true-z-30-third-300", + "d-4-true-z-30-third-300", + "e-4-true-z-30-third-300", + ), + ) + } + + @Test + fun testWithLatestFrom8() = runTest { + val f1 = flowOf(1, 2, 3, 4) + val f2 = flowOf("a", "b", "c", "d", "e") + val f3 = flowOf(true, false, true) + val f4 = flowOf('x', 'y', 'z') + val f5 = flowOf(10.0, 20.0, 30.0) + val f6 = flowOf("first", "second", "third") + val f7 = flowOf(100L, 200L, 300L) + val f8 = flowOf(0.1f, 0.2f, 0.3f) + assertEquals( + f2.withLatestFrom(f1, f3, f4, f5, f6, f7, f8) { a, b, c, d, e, f, g, h -> "$a-$b-$c-$d-$e-$f-$g-$h" }.toList(), + listOf( + "a-4-true-z-30-third-300-0.3", + "b-4-true-z-30-third-300-0.3", + "c-4-true-z-30-third-300-0.3", + "d-4-true-z-30-third-300-0.3", + "e-4-true-z-30-third-300-0.3", + ), + ) + } + + @Test + fun testWithLatestFrom9() = runTest { + val f1 = flowOf(1, 2, 3, 4) + val f2 = flowOf("a", "b", "c", "d", "e") + val f3 = flowOf(true, false, true) + val f4 = flowOf('x', 'y', 'z') + val f5 = flowOf(10.0, 20.0, 30.0) + val f6 = flowOf("first", "second", "third") + val f7 = flowOf(100L, 200L, 300L) + val f8 = flowOf(0.1f, 0.2f, 0.3f) + val f9 = flowOf("i", "ii", "iii") + assertEquals( + f2.withLatestFrom(f1, f3, f4, f5, f6, f7, f8, f9) { a, b, c, d, e, f, g, h, i -> "$a-$b-$c-$d-$e-$f-$g-$h-$i" }.toList(), + listOf( + "a-4-true-z-30-third-300-0.3-iii", + "b-4-true-z-30-third-300-0.3-iii", + "c-4-true-z-30-third-300-0.3-iii", + "d-4-true-z-30-third-300-0.3-iii", + "e-4-true-z-30-third-300-0.3-iii", + ), + ) + } + + @Test + fun testWithLatestFrom10() = runTest { + val f1 = flowOf(1, 2, 3, 4) + val f2 = flowOf("a", "b", "c", "d", "e") + val f3 = flowOf(true, false, true) + val f4 = flowOf('x', 'y', 'z') + val f5 = flowOf(10.0, 20.0, 30.0) + val f6 = flowOf("first", "second", "third") + val f7 = flowOf(100L, 200L, 300L) + val f8 = flowOf(0.1f, 0.2f, 0.3f) + val f9 = flowOf("i", "ii", "iii") + val f10 = flowOf(42, 43, 44) + assertEquals( + f2.withLatestFrom(f1, f3, f4, f5, f6, f7, f8, f9, f10) { a, b, c, d, e, f, g, h, i, j -> "$a-$b-$c-$d-$e-$f-$g-$h-$i-$j" }.toList(), + listOf( + "a-4-true-z-30-third-300-0.3-iii-44", + "b-4-true-z-30-third-300-0.3-iii-44", + "c-4-true-z-30-third-300-0.3-iii-44", + "d-4-true-z-30-third-300-0.3-iii-44", + "e-4-true-z-30-third-300-0.3-iii-44", + ), + ) + } }