| title | Stream Pattern 3: Control Backpressure in Streams | ||||||
|---|---|---|---|---|---|---|---|
| id | stream-pattern-backpressure-control | ||||||
| skillLevel | intermediate | ||||||
| applicationPatternId | streams | ||||||
| summary | Use Stream throttling, buffering, and chunk operations to manage backpressure, preventing upstream from overwhelming downstream consumers. | ||||||
| tags |
|
||||||
| rule |
|
||||||
| related |
|
||||||
| author | effect_website | ||||||
| lessonOrder | 8 |
Backpressure is flow control: slow consumer tells fast producer to slow down.
Techniques:
- Buffering: Store items temporarily (limited queue)
- Throttling: Rate limit item emission
- Chunking: Process in fixed-size batches
- Debouncing: Skip rapid duplicates
Pattern: stream.pipe(Stream.throttle(...), Stream.buffer(...))
Without backpressure management, mismatched producer/consumer speeds cause:
- Memory exhaustion: Producer faster than consumer → queue grows unbounded
- Garbage collection pauses: Large buffers cause GC pressure
- Resource leaks: Open connections/file handles accumulate
- Cascade failures: One slow consumer blocks entire pipeline
Backpressure enable:
- Memory safety: Bounded buffers prevent overflow
- Resource efficiency: Consumers pace producers naturally
- Performance: Tuning buffer sizes improves throughput
- Observability: Monitor backpressure as health indicator
Real-world example: Reading large file vs. writing to database
- No backpressure: Read entire file into memory, write slowly → memory exhaustion
- With backpressure: Read 1000 lines, wait for database, read next batch
This example demonstrates managing backpressure when consuming events at different rates.
import { Stream, Effect, Chunk } from "effect";
interface DataPoint {
readonly id: number;
readonly value: number;
}
// Fast producer: generates 100 items per second
const fastProducer = (): Stream.Stream<DataPoint> =>
Stream.fromIterable(Array.from({ length: 100 }, (_, i) => ({ id: i, value: Math.random() }))).pipe(
Stream.tap(() => Effect.sleep("10 millis")) // 10ms per item = 100/sec
);
// Slow consumer: processes 10 items per second
const slowConsumer = (item: DataPoint): Effect.Effect<void> =>
Effect.gen(function* () {
yield* Effect.sleep("100 millis"); // 100ms per item = 10/sec
});
// Without backpressure (DANGEROUS - queue grows unbounded)
const unbufferedStream = (): Stream.Stream<DataPoint> =>
fastProducer().pipe(
Stream.tap((item) =>
Effect.log(`[UNBUFFERED] Produced item ${item.id}`)
)
);
// With bounded buffer (backpressure kicks in)
const bufferedStream = (bufferSize: number): Stream.Stream<DataPoint> =>
fastProducer().pipe(
// Buffer at most 10 items; if full, producer waits
Stream.buffer(bufferSize),
Stream.tap((item) =>
Effect.log(`[BUFFERED] Consumed item ${item.id}`)
)
);
// With throttling (rate limit emission)
const throttledStream = (): Stream.Stream<DataPoint> =>
fastProducer().pipe(
// Emit at most 1 item per 50ms (20/sec)
Stream.throttle(1, "50 millis"),
Stream.tap((item) =>
Effect.log(`[THROTTLED] Item ${item.id}`)
)
);
// Main: compare approaches
const program = Effect.gen(function* () {
console.log(`\n[START] Demonstrating backpressure management\n`);
// Test buffered approach
console.log(`[TEST 1] Buffered stream (buffer size 5):\n`);
const startBuffer = Date.now();
yield* bufferedStream(5).pipe(
Stream.take(20), // Take only 20 items
Stream.runForEach(slowConsumer)
);
const bufferTime = Date.now() - startBuffer;
console.log(`\n[RESULT] Buffered approach took ${bufferTime}ms\n`);
// Test throttled approach
console.log(`[TEST 2] Throttled stream (1 item per 50ms):\n`);
const startThrottle = Date.now();
yield* throttledStream().pipe(
Stream.take(20),
Stream.runForEach(slowConsumer)
);
const throttleTime = Date.now() - startThrottle;
console.log(`\n[RESULT] Throttled approach took ${throttleTime}ms\n`);
// Summary
console.log(`[SUMMARY]`);
console.log(` Without backpressure control:`);
console.log(` - Queue would grow to 100 items (memory risk)`);
console.log(` - Producer/consumer operate independently`);
console.log(` With buffering:`);
console.log(` - Queue bounded to 5 items (safe)`);
console.log(` - Producer waits when buffer full`);
console.log(` With throttling:`);
console.log(` - Production rate limited to 20/sec`);
console.log(` - Smooth controlled flow`);
});
Effect.runPromise(program);Adjust buffering based on consumption rate:
interface BackpressureMetrics {
readonly bufferSize: number;
readonly avgWaitTime: number;
readonly drainRate: number;
}
const adaptiveBuffer = <A>(
stream: Stream.Stream<A>,
initialBufferSize: number
): Stream.Stream<A> =>
Stream.gen(function* () {
const metrics = {
bufferSize: initialBufferSize,
itemsProcessed: 0,
totalWaitTime: 0,
};
const adaptiveStream = stream.pipe(
Stream.buffer(metrics.bufferSize),
Stream.tap((item) => {
metrics.itemsProcessed++;
})
);
// Monitor and adjust every 1000 items
const monitor = Effect.gen(function* () {
while (true) {
yield* Effect.sleep("1 second");
const avgWait = metrics.totalWaitTime / Math.max(1, metrics.itemsProcessed);
const drainRate = metrics.itemsProcessed / 1000;
yield* Effect.log(
`[ADAPTIVE] Buffer: ${metrics.bufferSize}, Wait: ${avgWait.toFixed(1)}ms, Rate: ${drainRate.toFixed(1)} items/sec`
);
// Increase buffer if high wait times
if (avgWait > 100 && metrics.bufferSize < 100) {
metrics.bufferSize *= 1.5;
yield* Effect.log(
`[ADAPTIVE] Increased buffer to ${metrics.bufferSize}`
);
}
// Decrease buffer if low wait times
if (avgWait < 10 && metrics.bufferSize > initialBufferSize) {
metrics.bufferSize /= 1.5;
yield* Effect.log(
`[ADAPTIVE] Decreased buffer to ${metrics.bufferSize}`
);
}
metrics.itemsProcessed = 0;
metrics.totalWaitTime = 0;
}
});
yield* Effect.fork(monitor);
yield* adaptiveStream;
});Process items in fixed-size chunks while managing backpressure:
const chunkedProcessing = <A, B>(
stream: Stream.Stream<A>,
chunkSize: number,
processChunk: (chunk: Chunk.Chunk<A>) => Effect.Effect<B>
): Stream.Stream<B> =>
stream.pipe(
// Collect into chunks
Stream.chunks,
// Filter to desired size (drop smaller end chunks or use sliding)
Stream.filter((chunk) => Chunk.size(chunk) === chunkSize),
// Process each chunk
Stream.mapEffect(processChunk),
// Buffer to prevent backlog
Stream.buffer(5)
);
// Usage: Process log lines in 100-item batches
const batchedLogProcessing = chunkedProcessing(
Stream.fromIterable(
Array.from({ length: 1000 }, (_, i) => `log-line-${i}`)
),
100,
(chunk) =>
Effect.gen(function* () {
const count = Chunk.size(chunk);
yield* Effect.log(
`Processing batch of ${count} items`
);
yield* Effect.sleep("500 millis");
return count;
})
).pipe(
Stream.tap((count) =>
Effect.log(`Processed ${count} items`)
),
Stream.runDrain
);Skip duplicates or rapid events:
interface Event {
readonly id: string;
readonly timestamp: Date;
}
const debounceStream = <A extends { id: string }>(
stream: Stream.Stream<A>,
delayMs: number
): Stream.Stream<A> => {
const lastEmit = new Map<string, number>();
return stream.pipe(
Stream.filter((item) => {
const now = Date.now();
const lastTime = lastEmit.get(item.id) ?? 0;
if (now - lastTime > delayMs) {
lastEmit.set(item.id, now);
return true; // Emit
}
return false; // Skip (too recent)
})
);
};
// Usage: Debounce rapid user activity events
const debouncedEvents = debounceStream(
Stream.fromIterable([
{ id: "user-123", timestamp: new Date() },
{ id: "user-123", timestamp: new Date() }, // Skipped
{ id: "user-123", timestamp: new Date() }, // Skipped
{ id: "user-456", timestamp: new Date() },
{ id: "user-456", timestamp: new Date() }, // Skipped
]),
100 // Wait 100ms between events for same user
).pipe(
Stream.tap((event) =>
Effect.log(`Debounced: ${event.id}`)
),
Stream.runDrain
);Combine multiple backpressure techniques:
interface BackpressureStrategy {
readonly bufferSize: number;
readonly throttlePerSec: number;
readonly chunkSize: number;
readonly debounceMs: number;
}
const applyBackpressureStrategy = <A>(
stream: Stream.Stream<A>,
strategy: BackpressureStrategy
): Stream.Stream<A> =>
stream.pipe(
// Level 1: Debounce rapid items
Stream.filter(() => true), // Add debounce logic here
// Level 2: Throttle emission rate
Stream.throttle(
strategy.throttlePerSec,
"1 second"
),
// Level 3: Buffer with bounded queue
Stream.buffer(strategy.bufferSize),
// Level 4: Chunk for batch processing
Stream.chunkN(strategy.chunkSize),
// Monitor backpressure health
Stream.tap((chunk) =>
Effect.log(
`[BACKPRESSURE] Chunk size: ${Chunk.size(chunk)}`
)
)
);
// Conservative strategy for high-volume sources
const conservativeStrategy: BackpressureStrategy = {
bufferSize: 5,
throttlePerSec: 100,
chunkSize: 10,
debounceMs: 50,
};
// Aggressive strategy for normal-volume sources
const aggressiveStrategy: BackpressureStrategy = {
bufferSize: 100,
throttlePerSec: 10000,
chunkSize: 1000,
debounceMs: 0,
};✅ Use backpressure control when:
- Producer faster than consumer
- Memory constraints (prevent unbounded growth)
- High-frequency data sources (sensors, market data)
- Network streams (files, APIs)
- Rate limiting requirements
- Buffering adds latency
- Throttling reduces throughput
- Tuning buffers requires experimentation
- Monitoring overhead
| Technique | Mechanism | Latency Impact | Memory Impact | Use Case |
|---|---|---|---|---|
| Buffer | Bounded queue | Medium | Controlled | Normal flow control |
| Throttle | Rate limiting | High | Low | Protecting downstream |
| Chunk | Batch processing | Low-Medium | Low | Bulk operations |
| Debounce | Skip rapid | Low | Very low | Duplicate reduction |
- Stream Pattern 1: Map & Filter Transformations - Stream transformations
- Concurrency Pattern 2: Rate Limit with Semaphore - Rate limiting
- Process Streaming Data with Stream - Stream basics
- Stream Collect Results - Collecting stream output