| title | Sink Pattern 1: Batch Insert Stream Records into Database | ||||||
|---|---|---|---|---|---|---|---|
| id | sink-pattern-batch-insert-stream-records-into-database | ||||||
| skillLevel | intermediate | ||||||
| applicationPatternId | streams-sinks | ||||||
| summary | Use Sink to batch stream records and insert them efficiently into a database in groups, rather than one-by-one, for better performance and resource usage. | ||||||
| tags |
|
||||||
| rule |
|
||||||
| related |
|
||||||
| author | effect_website | ||||||
| lessonOrder | 1 |
When consuming a stream of records to persist in a database, collect them into batches using Sink before inserting. This reduces the number of database round-trips and transaction overhead, improving overall throughput significantly.
Inserting records one-by-one is inefficient:
- Each insert is a separate database call (network latency, connection overhead)
- Each insert may be a separate transaction (ACID overhead)
- Resource contention and connection pool exhaustion at scale
Batching solves this by:
- Grouping N records into a single bulk insert operation
- Amortizing database overhead across multiple records
- Maintaining throughput even under backpressure
- Enabling efficient transaction semantics for the entire batch
For example, inserting 10,000 records one-by-one might take 100 seconds. Batching in groups of 100 might take just 2-3 seconds.
This example demonstrates streaming user records from a paginated API and batching them for efficient database insertion.
import { Effect, Stream, Sink, Chunk } from "effect";
interface User {
readonly id: number;
readonly name: string;
readonly email: string;
}
interface PaginatedResponse {
readonly users: User[];
readonly nextPage: number | null;
}
// Mock API that returns paginated users
const fetchUserPage = (
page: number
): Effect.Effect<PaginatedResponse> =>
Effect.succeed(
page < 10
? {
users: Array.from({ length: 50 }, (_, i) => ({
id: page * 50 + i,
name: `User ${page * 50 + i}`,
email: `user${page * 50 + i}@example.com`,
})),
nextPage: page + 1,
}
: { users: [], nextPage: null }
).pipe(Effect.delay("10 millis"));
// Mock database insert that takes a batch of users
const insertUserBatch = (
users: readonly User[]
): Effect.Effect<number> =>
Effect.sync(() => {
console.log(`Inserting batch of ${users.length} users`);
return users.length;
}).pipe(Effect.delay("50 millis"));
// Create a stream of users from paginated API
const userStream: Stream.Stream<User> = Stream.paginateEffect(
0,
(page) =>
fetchUserPage(page).pipe(
Effect.map((response) => [
Chunk.fromIterable(response.users),
response.nextPage !== null ? Option.some(response.nextPage) : Option.none(),
])
)
);
// Sink that batches users and inserts them
const batchInsertSink: Sink.Sink<number, never, User> = Sink.fold(
0,
(count, chunk: Chunk.Chunk<User>) =>
Effect.gen(function* () {
const users = Chunk.toArray(chunk);
const inserted = yield* insertUserBatch(users);
return count + inserted;
}),
(count) => Effect.succeed(count)
).pipe(
// Batch into groups of 100 users
Sink.withChunking((chunk) =>
chunk.pipe(
Chunk.chunksOf(100),
Stream.fromIterable,
Stream.runCollect
)
)
);
// Run the stream with batching sink
const program = Effect.gen(function* () {
const totalInserted = yield* userStream.pipe(
Stream.run(batchInsertSink)
);
console.log(`Total users inserted: ${totalInserted}`);
});
Effect.runPromise(program);This pattern:
- Creates a stream of users from a paginated API
- Defines a batching sink that collects users into groups of 100
- Inserts each batch to the database in a single operation
- Tracks total count of inserted records
The batching happens automatically—the sink collects elements until the batch size is reached, then processes the complete batch.
You can make batch size configurable and handle partial batches at stream end:
interface BatchInsertConfig {
readonly batchSize: number;
readonly flushTimeoutMs?: number;
}
const createBatchInsertSink = (config: BatchInsertConfig) =>
Sink.fold(
{ buffer: [] as User[], count: 0 },
(state, user: User) =>
Effect.gen(function* () {
const newBuffer = [...state.buffer, user];
if (newBuffer.length >= config.batchSize) {
// Batch full, insert immediately
const inserted = yield* insertUserBatch(newBuffer);
return { buffer: [], count: state.count + inserted };
}
// Not full yet, keep buffering
return { buffer: newBuffer, count: state.count };
}),
(state) =>
Effect.gen(function* () {
// Flush any remaining buffered records
if (state.buffer.length > 0) {
const inserted = yield* insertUserBatch(state.buffer);
return state.count + inserted;
}
return state.count;
})
);
// Use with custom batch size
const customBatchSink = createBatchInsertSink({ batchSize: 50 });✅ Use batching when:
- Inserting many records from a high-volume source
- Database connection/transaction overhead is significant
- You need to optimize throughput over individual latency
- Records arrive faster than they can be inserted individually
- Increased latency for individual records (buffered until batch fills)
- Slightly increased memory usage (holding batch in memory)
- More complex error handling (partial batch failures)
- Process Streaming Data with Stream - Stream fundamentals
- Stream Elements from Paginated API - Creating paginated streams
- Handle Errors with catchTag - Error recovery in streams