Skip to content

Latest commit

 

History

History
135 lines (108 loc) · 4.77 KB

File metadata and controls

135 lines (108 loc) · 4.77 KB
title Process Items Concurrently
id stream-process-concurrently
skillLevel intermediate
applicationPatternId building-data-pipelines
summary Perform an asynchronous action for each item in a stream with controlled parallelism to dramatically improve performance.
tags
stream
concurrency
performance
mapEffect
parallelism
rule
description
Use Stream.mapEffect with the `concurrency` option to process stream items in parallel.
author PaulJPhilp
related
stream-from-iterable
stream-from-paginated-api
stream-retry-on-failure
lessonOrder 5

Guideline

To process items in a stream concurrently, use Stream.mapEffect and provide a value greater than 1 to its concurrency option.


Rationale

For many data pipelines, the most time-consuming step is performing an I/O-bound operation for each item, such as calling an API or querying a database. Processing these items one by one (sequentially) is safe but slow, as the entire pipeline waits for each operation to complete before starting the next.

Stream.mapEffect's concurrency option is the solution. It provides a simple, declarative way to introduce controlled parallelism into your pipeline.

  1. Performance Boost: It allows the stream to work on multiple items at once, drastically reducing the total execution time for I/O-bound tasks.
  2. Controlled Parallelism: Unlike Promise.all which runs everything at once, you specify the exact number of concurrent operations. This is crucial for stability, as it prevents your application from overwhelming downstream services or exhausting its own resources (like file handles or network sockets).
  3. Automatic Backpressure: The stream will not pull new items from the source faster than the concurrent slots can process them. This backpressure is handled automatically, preventing memory issues.
  4. Structured Concurrency: It's fully integrated with Effect's runtime. If any concurrent operation fails, all other in-flight operations for that stream are immediately and reliably interrupted, preventing wasted work and ensuring clean shutdowns.

Good Example

This example processes four items, each taking one second. By setting concurrency: 2, the total runtime is approximately two seconds instead of four, because items are processed in parallel pairs.

import { Effect, Stream } from "effect";

// A mock function that simulates a slow I/O operation
const processItem = (id: number): Effect.Effect<string, Error> =>
  Effect.log(`Starting item ${id}...`).pipe(
    Effect.delay("1 second"),
    Effect.map(() => `Finished item ${id}`),
    Effect.tap(Effect.log)
  );

const ids = [1, 2, 3, 4];

const program = Stream.fromIterable(ids).pipe(
  // Process up to 2 items concurrently
  Stream.mapEffect(processItem, { concurrency: 2 }),
  Stream.runDrain
);

// Measure the total time taken
const timedProgram = Effect.timed(program);

const programWithLogging = Effect.gen(function* () {
  const [duration, _] = yield* timedProgram;
  const durationMs = Number(duration);
  yield* Effect.log(`\nTotal time: ${Math.round(durationMs / 1000)} seconds`);
  return duration;
}).pipe(
  Effect.catchAll((error) =>
    Effect.gen(function* () {
      yield* Effect.logError(`Program error: ${error}`);
      return null;
    })
  )
);

Effect.runPromise(programWithLogging);
/*
Output:
... level=INFO msg="Starting item 1..."
... level=INFO msg="Starting item 2..."
... level=INFO msg="Finished item 1"
... level=INFO msg="Starting item 3..."
... level=INFO msg="Finished item 2"
... level=INFO msg="Starting item 4..."
... level=INFO msg="Finished item 3"
... level=INFO msg="Finished item 4"

Total time: 2 seconds
*/

Anti-Pattern

The anti-pattern is to process I/O-bound tasks sequentially. This is the default behavior of Stream.mapEffect if you don't specify a concurrency level, and it leads to poor performance.

import { Effect, Stream } from "effect";
// ... same processItem function ...

const ids = [1, 2, 3, 4];

// Processing sequentially (default concurrency is 1)
const program = Stream.fromIterable(ids).pipe(
  Stream.mapEffect(processItem), // No concurrency option
  Stream.runDrain
);

const timedProgram = Effect.timed(program);

Effect.runPromise(timedProgram).then(([duration, _]) => {
  console.log(`\nTotal time: ${Math.round(duration.millis / 1000)} seconds`);
});
/*
Output:
... level=INFO msg="Starting item 1..."
... level=INFO msg="Finished item 1"
... level=INFO msg="Starting item 2..."
... level=INFO msg="Finished item 2"
... etc.

Total time: 4 seconds
*/

While sequential processing is sometimes necessary to preserve order or avoid race conditions, it is a performance anti-pattern for independent, I/O-bound tasks. The concurrent approach is almost always preferable in such cases.