Skip to content

Latest commit

 

History

History
216 lines (175 loc) · 6.18 KB

File metadata and controls

216 lines (175 loc) · 6.18 KB
title Sink Pattern 1: Batch Insert Stream Records into Database
id sink-pattern-batch-insert-stream-records-into-database
skillLevel intermediate
applicationPatternId streams-sinks
summary Use Sink to batch stream records and insert them efficiently into a database in groups, rather than one-by-one, for better performance and resource usage.
tags
sink
stream
database
persistence
batching
performance
rule
description
Batch stream records before database operations to improve throughput and reduce transaction overhead.
related
process-streaming-data-with-stream
stream-from-paginated-api
handle-errors-with-catch
author effect_website
lessonOrder 1

Guideline

When consuming a stream of records to persist in a database, collect them into batches using Sink before inserting. This reduces the number of database round-trips and transaction overhead, improving overall throughput significantly.


Rationale

Inserting records one-by-one is inefficient:

  • Each insert is a separate database call (network latency, connection overhead)
  • Each insert may be a separate transaction (ACID overhead)
  • Resource contention and connection pool exhaustion at scale

Batching solves this by:

  • Grouping N records into a single bulk insert operation
  • Amortizing database overhead across multiple records
  • Maintaining throughput even under backpressure
  • Enabling efficient transaction semantics for the entire batch

For example, inserting 10,000 records one-by-one might take 100 seconds. Batching in groups of 100 might take just 2-3 seconds.


Good Example

This example demonstrates streaming user records from a paginated API and batching them for efficient database insertion.

import { Effect, Stream, Sink, Chunk } from "effect";

interface User {
  readonly id: number;
  readonly name: string;
  readonly email: string;
}

interface PaginatedResponse {
  readonly users: User[];
  readonly nextPage: number | null;
}

// Mock API that returns paginated users
const fetchUserPage = (
  page: number
): Effect.Effect<PaginatedResponse> =>
  Effect.succeed(
    page < 10
      ? {
          users: Array.from({ length: 50 }, (_, i) => ({
            id: page * 50 + i,
            name: `User ${page * 50 + i}`,
            email: `user${page * 50 + i}@example.com`,
          })),
          nextPage: page + 1,
        }
      : { users: [], nextPage: null }
  ).pipe(Effect.delay("10 millis"));

// Mock database insert that takes a batch of users
const insertUserBatch = (
  users: readonly User[]
): Effect.Effect<number> =>
  Effect.sync(() => {
    console.log(`Inserting batch of ${users.length} users`);
    return users.length;
  }).pipe(Effect.delay("50 millis"));

// Create a stream of users from paginated API
const userStream: Stream.Stream<User> = Stream.paginateEffect(
  0,
  (page) =>
    fetchUserPage(page).pipe(
      Effect.map((response) => [
        Chunk.fromIterable(response.users),
        response.nextPage !== null ? Option.some(response.nextPage) : Option.none(),
      ])
    )
);

// Sink that batches users and inserts them
const batchInsertSink: Sink.Sink<number, never, User> = Sink.fold(
  0,
  (count, chunk: Chunk.Chunk<User>) =>
    Effect.gen(function* () {
      const users = Chunk.toArray(chunk);
      const inserted = yield* insertUserBatch(users);
      return count + inserted;
    }),
  (count) => Effect.succeed(count)
).pipe(
  // Batch into groups of 100 users
  Sink.withChunking((chunk) =>
    chunk.pipe(
      Chunk.chunksOf(100),
      Stream.fromIterable,
      Stream.runCollect
    )
  )
);

// Run the stream with batching sink
const program = Effect.gen(function* () {
  const totalInserted = yield* userStream.pipe(
    Stream.run(batchInsertSink)
  );
  console.log(`Total users inserted: ${totalInserted}`);
});

Effect.runPromise(program);

This pattern:

  1. Creates a stream of users from a paginated API
  2. Defines a batching sink that collects users into groups of 100
  3. Inserts each batch to the database in a single operation
  4. Tracks total count of inserted records

The batching happens automatically—the sink collects elements until the batch size is reached, then processes the complete batch.


Advanced: Configurable Batch Size

You can make batch size configurable and handle partial batches at stream end:

interface BatchInsertConfig {
  readonly batchSize: number;
  readonly flushTimeoutMs?: number;
}

const createBatchInsertSink = (config: BatchInsertConfig) =>
  Sink.fold(
    { buffer: [] as User[], count: 0 },
    (state, user: User) =>
      Effect.gen(function* () {
        const newBuffer = [...state.buffer, user];

        if (newBuffer.length >= config.batchSize) {
          // Batch full, insert immediately
          const inserted = yield* insertUserBatch(newBuffer);
          return { buffer: [], count: state.count + inserted };
        }

        // Not full yet, keep buffering
        return { buffer: newBuffer, count: state.count };
      }),
    (state) =>
      Effect.gen(function* () {
        // Flush any remaining buffered records
        if (state.buffer.length > 0) {
          const inserted = yield* insertUserBatch(state.buffer);
          return state.count + inserted;
        }
        return state.count;
      })
  );

// Use with custom batch size
const customBatchSink = createBatchInsertSink({ batchSize: 50 });

When to Use This Pattern

Use batching when:

  • Inserting many records from a high-volume source
  • Database connection/transaction overhead is significant
  • You need to optimize throughput over individual latency
  • Records arrive faster than they can be inserted individually

⚠️ Trade-offs:

  • Increased latency for individual records (buffered until batch fills)
  • Slightly increased memory usage (holding batch in memory)
  • More complex error handling (partial batch failures)

See Also