|
| 1 | +package com.quarkdown.core.util |
| 2 | + |
| 3 | +import kotlinx.coroutines.Dispatchers |
| 4 | +import kotlinx.coroutines.async |
| 5 | +import kotlinx.coroutines.awaitAll |
| 6 | +import kotlinx.coroutines.coroutineScope |
| 7 | +import kotlinx.coroutines.runBlocking |
| 8 | + |
| 9 | +/** |
| 10 | + * Minimum number of items required for parallel execution to be worthwhile. |
| 11 | + * Below this threshold, the overhead of coroutine scheduling exceeds the benefit. |
| 12 | + */ |
| 13 | +private const val MIN_ITEMS_FOR_PARALLELISM = 4 |
| 14 | + |
| 15 | +/** |
| 16 | + * Tracks whether the current thread is already inside a [mapParallel] invocation, |
| 17 | + * preventing nested [runBlocking] calls that would risk thread pool exhaustion. |
| 18 | + */ |
| 19 | +private val insideParallelBlock = ThreadLocal.withInitial { false } |
| 20 | + |
| 21 | +/** |
| 22 | + * Maps each element of this list using [transform], executing transformations in parallel |
| 23 | + * via coroutines when the list is large enough to benefit from concurrency. |
| 24 | + * Falls back to sequential mapping for small lists where coroutine overhead exceeds benefit. |
| 25 | + * |
| 26 | + * Handles nested invocations safely: if already executing inside a parallel block, |
| 27 | + * the inner call uses [coroutineScope] instead of [runBlocking] to avoid thread pool exhaustion. |
| 28 | + * |
| 29 | + * Results are returned in the same order as the input list. |
| 30 | + * @param transform the transformation to apply to each element |
| 31 | + * @return the list of transformed results, preserving input order |
| 32 | + */ |
| 33 | +fun <T, R> List<T>.mapParallel(transform: (T) -> R): List<R> { |
| 34 | + if (size < MIN_ITEMS_FOR_PARALLELISM) { |
| 35 | + return map(transform) |
| 36 | + } |
| 37 | + |
| 38 | + // If already inside a parallel block, delegate to a suspending coroutineScope |
| 39 | + // to participate in the existing dispatcher without blocking a thread. |
| 40 | + if (insideParallelBlock.get()) { |
| 41 | + return runBlocking { |
| 42 | + mapParallelAsync(transform) |
| 43 | + } |
| 44 | + } |
| 45 | + |
| 46 | + return runBlocking(Dispatchers.Default) { |
| 47 | + mapParallelAsync(transform) |
| 48 | + } |
| 49 | +} |
| 50 | + |
| 51 | +/** |
| 52 | + * Suspending implementation of parallel mapping. |
| 53 | + * Launches an [async] coroutine per element and awaits all results in order. |
| 54 | + */ |
| 55 | +private suspend fun <T, R> List<T>.mapParallelAsync(transform: (T) -> R): List<R> = |
| 56 | + coroutineScope { |
| 57 | + map { element -> |
| 58 | + async { |
| 59 | + insideParallelBlock.set(true) |
| 60 | + try { |
| 61 | + transform(element) |
| 62 | + } finally { |
| 63 | + insideParallelBlock.set(false) |
| 64 | + } |
| 65 | + } |
| 66 | + }.awaitAll() |
| 67 | + } |
0 commit comments