Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
308 changes: 308 additions & 0 deletions src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,311 @@ public fun <A, B, R> Flow<A>.withLatestFrom(
@Suppress("NOTHING_TO_INLINE")
public inline fun <A, B> Flow<A>.withLatestFrom(other: Flow<B>): Flow<Pair<A, B>> =
withLatestFrom(other, ::Pair)

/**
* Merges multiple [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any.
* Values emitted by self before all other [Flow]s have emitted any values will be omitted.
*
* @param others Array of other [Flow]s
* @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any.
*/
public fun <A, R> Flow<A>.withLatestFrom(
others: Array<out Flow<*>>,
transform: suspend (A, Array<Any?>) -> R,
): Flow<R> {
return flow {
val refs = Array<AtomicRef<Any?>>(others.size) { AtomicRef(null) }

try {
coroutineScope {
val jobs = others.mapIndexed { index, flow ->
launch(start = CoroutineStart.UNDISPATCHED) {
flow.collect { refs[index].value = it ?: INTERNAL_NULL_VALUE }
}
}

collect { value ->
val values = Array<Any?>(refs.size) { index ->
refs[index].value ?: return@collect
}

val unboxedValues = Array<Any?>(values.size) { index ->
INTERNAL_NULL_VALUE.unbox(values[index])
}

Comment on lines +99 to +106

Copilot AI Sep 27, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code creates an intermediate values array and then immediately creates another unboxedValues array. This could be optimized by combining both operations into a single array creation to reduce memory allocations.

Suggested change
val values = Array<Any?>(refs.size) { index ->
refs[index].value ?: return@collect
}
val unboxedValues = Array<Any?>(values.size) { index ->
INTERNAL_NULL_VALUE.unbox(values[index])
}
val unboxedValues = Array<Any?>(refs.size) { index ->
val v = refs[index].value ?: return@collect
INTERNAL_NULL_VALUE.unbox(v)
}

Copilot uses AI. Check for mistakes.
emit(transform(value, unboxedValues))
}

jobs.forEach { it.cancelAndJoin() }
}
} finally {
refs.forEach { it.value = null }
}
}
}

/**
* Merges three [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any.
* Values emitted by self before all other [Flow]s have emitted any values will be omitted.
*
* @param other2 Second [Flow]
* @param other3 Third [Flow]
* @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any.
*/
public fun <A, B, C, R> Flow<A>.withLatestFrom(
other2: Flow<B>,
other3: Flow<C>,
transform: suspend (A, B, C) -> R,
): Flow<R> {
return withLatestFrom(arrayOf(other2, other3)) { value, others ->
transform(
value,
others[0] as B,
others[1] as C,
Comment on lines +132 to +135

Copilot AI Sep 27, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unchecked cast as B and as C could fail at runtime if the array contains values of different types than expected. Consider adding runtime type checks or documenting this assumption clearly.

Suggested change
transform(
value,
others[0] as B,
others[1] as C,
val b = others[0]
val c = others[1]
if (b !is B) {
throw IllegalStateException("Expected others[0] to be of type B, but was ${b?.let { it::class }}")
}
if (c !is C) {
throw IllegalStateException("Expected others[1] to be of type C, but was ${c?.let { it::class }}")
}
transform(
value,
b,
c,

Copilot uses AI. Check for mistakes.
)
}
}

@Suppress("NOTHING_TO_INLINE")
public inline fun <A, B, C> Flow<A>.withLatestFrom(
other2: Flow<B>,
other3: Flow<C>,
): Flow<Triple<A, B, C>> = withLatestFrom(other2, other3, ::Triple)

/**
* Merges four [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any.
* Values emitted by self before all other [Flow]s have emitted any values will be omitted.
*
* @param other2 Second [Flow]
* @param other3 Third [Flow]
* @param other4 Fourth [Flow]
* @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any.
*/
public fun <A, B, C, D, R> Flow<A>.withLatestFrom(
other2: Flow<B>,
other3: Flow<C>,
other4: Flow<D>,
transform: suspend (A, B, C, D) -> R,
): Flow<R> {
return withLatestFrom(arrayOf(other2, other3, other4)) { value, others ->
transform(
value,
others[0] as B,
others[1] as C,
others[2] as D,
)
}
}

/**
* Merges five [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any.
* Values emitted by self before all other [Flow]s have emitted any values will be omitted.
*
* @param other2 Second [Flow]
* @param other3 Third [Flow]
* @param other4 Fourth [Flow]
* @param other5 Fifth [Flow]
* @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any.
*/
public fun <A, B, C, D, E, R> Flow<A>.withLatestFrom(
other2: Flow<B>,
other3: Flow<C>,
other4: Flow<D>,
other5: Flow<E>,
transform: suspend (A, B, C, D, E) -> R,
): Flow<R> {
return withLatestFrom(arrayOf(other2, other3, other4, other5)) { value, others ->
transform(
value,
others[0] as B,
others[1] as C,
others[2] as D,
others[3] as E,
)
}
}

/**
* Merges six [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any.
* Values emitted by self before all other [Flow]s have emitted any values will be omitted.
*
* @param other2 Second [Flow]
* @param other3 Third [Flow]
* @param other4 Fourth [Flow]
* @param other5 Fifth [Flow]
* @param other6 Sixth [Flow]
* @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any.
*/
public fun <A, B, C, D, E, F, R> Flow<A>.withLatestFrom(
other2: Flow<B>,
other3: Flow<C>,
other4: Flow<D>,
other5: Flow<E>,
other6: Flow<F>,
transform: suspend (A, B, C, D, E, F) -> R,
): Flow<R> {
return withLatestFrom(arrayOf(other2, other3, other4, other5, other6)) { value, others ->
transform(
value,
others[0] as B,
others[1] as C,
others[2] as D,
others[3] as E,
others[4] as F,
)
}
}

/**
* Merges seven [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any.
* Values emitted by self before all other [Flow]s have emitted any values will be omitted.
*
* @param other2 Second [Flow]
* @param other3 Third [Flow]
* @param other4 Fourth [Flow]
* @param other5 Fifth [Flow]
* @param other6 Sixth [Flow]
* @param other7 Seventh [Flow]
* @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any.
*/
public fun <A, B, C, D, E, F, G, R> Flow<A>.withLatestFrom(
other2: Flow<B>,
other3: Flow<C>,
other4: Flow<D>,
other5: Flow<E>,
other6: Flow<F>,
other7: Flow<G>,
transform: suspend (A, B, C, D, E, F, G) -> R,
): Flow<R> {
return withLatestFrom(arrayOf(other2, other3, other4, other5, other6, other7)) { value, others ->
transform(
value,
others[0] as B,
others[1] as C,
others[2] as D,
others[3] as E,
others[4] as F,
others[5] as G,
)
}
}

/**
* Merges eight [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any.
* Values emitted by self before all other [Flow]s have emitted any values will be omitted.
*
* @param other2 Second [Flow]
* @param other3 Third [Flow]
* @param other4 Fourth [Flow]
* @param other5 Fifth [Flow]
* @param other6 Sixth [Flow]
* @param other7 Seventh [Flow]
* @param other8 Eighth [Flow]
* @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any.
*/
public fun <A, B, C, D, E, F, G, H, R> Flow<A>.withLatestFrom(
other2: Flow<B>,
other3: Flow<C>,
other4: Flow<D>,
other5: Flow<E>,
other6: Flow<F>,
other7: Flow<G>,
other8: Flow<H>,
transform: suspend (A, B, C, D, E, F, G, H) -> R,
): Flow<R> {
return withLatestFrom(arrayOf(other2, other3, other4, other5, other6, other7, other8)) { value, others ->
transform(
value,
others[0] as B,
others[1] as C,
others[2] as D,
others[3] as E,
others[4] as F,
others[5] as G,
others[6] as H,
)
}
}

/**
* Merges nine [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any.
* Values emitted by self before all other [Flow]s have emitted any values will be omitted.
*
* @param other2 Second [Flow]
* @param other3 Third [Flow]
* @param other4 Fourth [Flow]
* @param other5 Fifth [Flow]
* @param other6 Sixth [Flow]
* @param other7 Seventh [Flow]
* @param other8 Eighth [Flow]
* @param other9 Ninth [Flow]
* @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any.
*/
public fun <A, B, C, D, E, F, G, H, I, R> Flow<A>.withLatestFrom(
other2: Flow<B>,
other3: Flow<C>,
other4: Flow<D>,
other5: Flow<E>,
other6: Flow<F>,
other7: Flow<G>,
other8: Flow<H>,
other9: Flow<I>,
transform: suspend (A, B, C, D, E, F, G, H, I) -> R,
): Flow<R> {
return withLatestFrom(arrayOf(other2, other3, other4, other5, other6, other7, other8, other9)) { value, others ->
transform(
value,
others[0] as B,
others[1] as C,
others[2] as D,
others[3] as E,
others[4] as F,
others[5] as G,
others[6] as H,
others[7] as I,
)
}
}

/**
* Merges ten [Flow]s into one [Flow] by combining each value from self with the latest values from the other [Flow]s, if any.
* Values emitted by self before all other [Flow]s have emitted any values will be omitted.
*
* @param other2 Second [Flow]
* @param other3 Third [Flow]
* @param other4 Fourth [Flow]
* @param other5 Fifth [Flow]
* @param other6 Sixth [Flow]
* @param other7 Seventh [Flow]
* @param other8 Eighth [Flow]
* @param other9 Ninth [Flow]
* @param other10 Tenth [Flow]
* @param transform A transform function to apply to each value from self combined with the latest values from the other [Flow]s, if any.
*/
public fun <A, B, C, D, E, F, G, H, I, J, R> Flow<A>.withLatestFrom(
other2: Flow<B>,
other3: Flow<C>,
other4: Flow<D>,
other5: Flow<E>,
other6: Flow<F>,
other7: Flow<G>,
other8: Flow<H>,
other9: Flow<I>,
other10: Flow<J>,
transform: suspend (A, B, C, D, E, F, G, H, I, J) -> R,
): Flow<R> {
return withLatestFrom(arrayOf(other2, other3, other4, other5, other6, other7, other8, other9, other10)) { value, others ->
transform(
value,
others[0] as B,
others[1] as C,
others[2] as D,
others[3] as E,
others[4] as F,
others[5] as G,
others[6] as H,
others[7] as I,
others[8] as J,
)
}
}
Loading
Loading