| title | Process Items Concurrently | |||||
|---|---|---|---|---|---|---|
| id | stream-process-concurrently | |||||
| skillLevel | intermediate | |||||
| applicationPatternId | building-data-pipelines | |||||
| summary | Perform an asynchronous action for each item in a stream with controlled parallelism to dramatically improve performance. | |||||
| tags |
|
|||||
| rule |
|
|||||
| author | PaulJPhilp | |||||
| related |
|
|||||
| lessonOrder | 5 |
To process items in a stream concurrently, use Stream.mapEffect and provide a value greater than 1 to its concurrency option.
For many data pipelines, the most time-consuming step is performing an I/O-bound operation for each item, such as calling an API or querying a database. Processing these items one by one (sequentially) is safe but slow, as the entire pipeline waits for each operation to complete before starting the next.
Stream.mapEffect's concurrency option is the solution. It provides a simple, declarative way to introduce controlled parallelism into your pipeline.
- Performance Boost: It allows the stream to work on multiple items at once, drastically reducing the total execution time for I/O-bound tasks.
- Controlled Parallelism: Unlike
Promise.allwhich runs everything at once, you specify the exact number of concurrent operations. This is crucial for stability, as it prevents your application from overwhelming downstream services or exhausting its own resources (like file handles or network sockets). - Automatic Backpressure: The stream will not pull new items from the source faster than the concurrent slots can process them. This backpressure is handled automatically, preventing memory issues.
- Structured Concurrency: It's fully integrated with Effect's runtime. If any concurrent operation fails, all other in-flight operations for that stream are immediately and reliably interrupted, preventing wasted work and ensuring clean shutdowns.
This example processes four items, each taking one second. By setting concurrency: 2, the total runtime is approximately two seconds instead of four, because items are processed in parallel pairs.
import { Effect, Stream } from "effect";
// A mock function that simulates a slow I/O operation
const processItem = (id: number): Effect.Effect<string, Error> =>
Effect.log(`Starting item ${id}...`).pipe(
Effect.delay("1 second"),
Effect.map(() => `Finished item ${id}`),
Effect.tap(Effect.log)
);
const ids = [1, 2, 3, 4];
const program = Stream.fromIterable(ids).pipe(
// Process up to 2 items concurrently
Stream.mapEffect(processItem, { concurrency: 2 }),
Stream.runDrain
);
// Measure the total time taken
const timedProgram = Effect.timed(program);
const programWithLogging = Effect.gen(function* () {
const [duration, _] = yield* timedProgram;
const durationMs = Number(duration);
yield* Effect.log(`\nTotal time: ${Math.round(durationMs / 1000)} seconds`);
return duration;
}).pipe(
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.logError(`Program error: ${error}`);
return null;
})
)
);
Effect.runPromise(programWithLogging);
/*
Output:
... level=INFO msg="Starting item 1..."
... level=INFO msg="Starting item 2..."
... level=INFO msg="Finished item 1"
... level=INFO msg="Starting item 3..."
... level=INFO msg="Finished item 2"
... level=INFO msg="Starting item 4..."
... level=INFO msg="Finished item 3"
... level=INFO msg="Finished item 4"
Total time: 2 seconds
*/The anti-pattern is to process I/O-bound tasks sequentially. This is the default behavior of Stream.mapEffect if you don't specify a concurrency level, and it leads to poor performance.
import { Effect, Stream } from "effect";
// ... same processItem function ...
const ids = [1, 2, 3, 4];
// Processing sequentially (default concurrency is 1)
const program = Stream.fromIterable(ids).pipe(
Stream.mapEffect(processItem), // No concurrency option
Stream.runDrain
);
const timedProgram = Effect.timed(program);
Effect.runPromise(timedProgram).then(([duration, _]) => {
console.log(`\nTotal time: ${Math.round(duration.millis / 1000)} seconds`);
});
/*
Output:
... level=INFO msg="Starting item 1..."
... level=INFO msg="Finished item 1"
... level=INFO msg="Starting item 2..."
... level=INFO msg="Finished item 2"
... etc.
Total time: 4 seconds
*/While sequential processing is sometimes necessary to preserve order or avoid race conditions, it is a performance anti-pattern for independent, I/O-bound tasks. The concurrent approach is almost always preferable in such cases.