Skip to content

Latest commit

 

History

History
108 lines (85 loc) · 3.98 KB

File metadata and controls

108 lines (85 loc) · 3.98 KB
title Process Items in Batches
id stream-process-in-batches
skillLevel intermediate
applicationPatternId building-data-pipelines
summary Group items into chunks for efficient bulk operations, like database inserts or batch API calls.
tags
stream
batch
chunk
performance
grouped
rule
description
Use Stream.grouped(n) to transform a stream of items into a stream of batched chunks.
author PaulJPhilp
related
stream-process-concurrently
stream-run-for-effects
lessonOrder 6

Guideline

To process items in fixed-size batches for performance, use the Stream.grouped(batchSize) operator to transform a stream of individual items into a stream of Chunks.


Rationale

When interacting with external systems like databases or APIs, making one request per item is often incredibly inefficient. The network latency and overhead of each individual call can dominate the total processing time. Most high-performance systems offer bulk or batch endpoints to mitigate this.

Stream.grouped(n) provides a simple, declarative way to prepare your data for these bulk operations:

  1. Performance Optimization: It dramatically reduces the number of network roundtrips. A single API call with 100 items is far faster than 100 individual API calls.
  2. Declarative Batching: It abstracts away the tedious and error-prone manual logic of counting items, managing temporary buffers, and deciding when to send a batch.
  3. Seamless Composition: It transforms a Stream<A> into a Stream<Chunk<A>>. This new stream of chunks can be piped directly into Stream.mapEffect, allowing you to process each batch concurrently.
  4. Handles Leftovers: The operator automatically handles the final, smaller batch if the total number of items is not perfectly divisible by the batch size.

Good Example

This example processes 10 users. By using Stream.grouped(5), it transforms the stream of 10 individual users into a stream of two chunks (each a batch of 5). The saveUsersInBulk function is then called only twice, once for each batch.

import { Effect, Stream, Chunk } from "effect";

// A mock function that simulates a bulk database insert
const saveUsersInBulk = (
  userBatch: Chunk.Chunk<{ id: number }>
): Effect.Effect<void, Error> =>
  Effect.log(
    `Saving batch of ${userBatch.length} users: ${Chunk.toArray(userBatch)
      .map((u) => u.id)
      .join(", ")}`
  );

const userIds = Array.from({ length: 10 }, (_, i) => ({ id: i + 1 }));

const program = Stream.fromIterable(userIds).pipe(
  // Group the stream of users into batches of 5
  Stream.grouped(5),
  // Process each batch with our bulk save function
  Stream.mapEffect(saveUsersInBulk, { concurrency: 1 }),
  Stream.runDrain
);

Effect.runPromise(program);
/*
Output:
... level=INFO msg="Saving batch of 5 users: 1, 2, 3, 4, 5"
... level=INFO msg="Saving batch of 5 users: 6, 7, 8, 9, 10"
*/

Anti-Pattern

The anti-pattern is to process items one by one when a more efficient bulk operation is available. This is a common performance bottleneck.

import { Effect, Stream } from "effect";

// A mock function that saves one user at a time
const saveUser = (user: { id: number }): Effect.Effect<void, Error> =>
  Effect.log(`Saving single user: ${user.id}`);

const userIds = Array.from({ length: 10 }, (_, i) => ({ id: i + 1 }));

const program = Stream.fromIterable(userIds).pipe(
  // Process each user individually, leading to 10 separate "saves"
  Stream.mapEffect(saveUser, { concurrency: 1 }),
  Stream.runDrain
);

Effect.runPromise(program);
/*
Output:
... level=INFO msg="Saving single user: 1"
... level=INFO msg="Saving single user: 2"
... (and so on for all 10 users)
*/

This individual processing approach is an anti-pattern because it creates unnecessary overhead. If each saveUser call took 50ms of network latency, the total time would be over 500ms. The batched approach might only take 100ms (2 batches * 50ms), resulting in a 5x performance improvement.