Skip to content

Latest commit

 

History

History
424 lines (342 loc) · 11.4 KB

File metadata and controls

424 lines (342 loc) · 11.4 KB
title Stream Pattern 3: Control Backpressure in Streams
id stream-pattern-backpressure-control
skillLevel intermediate
applicationPatternId streams
summary Use Stream throttling, buffering, and chunk operations to manage backpressure, preventing upstream from overwhelming downstream consumers.
tags
streams
backpressure
buffering
throttling
flow-control
performance
rule
description
Use backpressure control to manage flow between fast producers and slow consumers, preventing memory exhaustion and resource overflow.
related
stream-pattern-map-filter-transformations
concurrency-pattern-rate-limit-with-semaphore
process-streaming-data-with-stream
author effect_website
lessonOrder 8

Guideline

Backpressure is flow control: slow consumer tells fast producer to slow down.

Techniques:

  • Buffering: Store items temporarily (limited queue)
  • Throttling: Rate limit item emission
  • Chunking: Process in fixed-size batches
  • Debouncing: Skip rapid duplicates

Pattern: stream.pipe(Stream.throttle(...), Stream.buffer(...))


Rationale

Without backpressure management, mismatched producer/consumer speeds cause:

  • Memory exhaustion: Producer faster than consumer → queue grows unbounded
  • Garbage collection pauses: Large buffers cause GC pressure
  • Resource leaks: Open connections/file handles accumulate
  • Cascade failures: One slow consumer blocks entire pipeline

Backpressure enable:

  • Memory safety: Bounded buffers prevent overflow
  • Resource efficiency: Consumers pace producers naturally
  • Performance: Tuning buffer sizes improves throughput
  • Observability: Monitor backpressure as health indicator

Real-world example: Reading large file vs. writing to database

  • No backpressure: Read entire file into memory, write slowly → memory exhaustion
  • With backpressure: Read 1000 lines, wait for database, read next batch

Good Example

This example demonstrates managing backpressure when consuming events at different rates.

import { Stream, Effect, Chunk } from "effect";

interface DataPoint {
  readonly id: number;
  readonly value: number;
}

// Fast producer: generates 100 items per second
const fastProducer = (): Stream.Stream<DataPoint> =>
  Stream.fromIterable(Array.from({ length: 100 }, (_, i) => ({ id: i, value: Math.random() }))).pipe(
    Stream.tap(() => Effect.sleep("10 millis")) // 10ms per item = 100/sec
  );

// Slow consumer: processes 10 items per second
const slowConsumer = (item: DataPoint): Effect.Effect<void> =>
  Effect.gen(function* () {
    yield* Effect.sleep("100 millis"); // 100ms per item = 10/sec
  });

// Without backpressure (DANGEROUS - queue grows unbounded)
const unbufferedStream = (): Stream.Stream<DataPoint> =>
  fastProducer().pipe(
    Stream.tap((item) =>
      Effect.log(`[UNBUFFERED] Produced item ${item.id}`)
    )
  );

// With bounded buffer (backpressure kicks in)
const bufferedStream = (bufferSize: number): Stream.Stream<DataPoint> =>
  fastProducer().pipe(
    // Buffer at most 10 items; if full, producer waits
    Stream.buffer(bufferSize),
    Stream.tap((item) =>
      Effect.log(`[BUFFERED] Consumed item ${item.id}`)
    )
  );

// With throttling (rate limit emission)
const throttledStream = (): Stream.Stream<DataPoint> =>
  fastProducer().pipe(
    // Emit at most 1 item per 50ms (20/sec)
    Stream.throttle(1, "50 millis"),
    Stream.tap((item) =>
      Effect.log(`[THROTTLED] Item ${item.id}`)
    )
  );

// Main: compare approaches
const program = Effect.gen(function* () {
  console.log(`\n[START] Demonstrating backpressure management\n`);

  // Test buffered approach
  console.log(`[TEST 1] Buffered stream (buffer size 5):\n`);

  const startBuffer = Date.now();

  yield* bufferedStream(5).pipe(
    Stream.take(20), // Take only 20 items
    Stream.runForEach(slowConsumer)
  );

  const bufferTime = Date.now() - startBuffer;
  console.log(`\n[RESULT] Buffered approach took ${bufferTime}ms\n`);

  // Test throttled approach
  console.log(`[TEST 2] Throttled stream (1 item per 50ms):\n`);

  const startThrottle = Date.now();

  yield* throttledStream().pipe(
    Stream.take(20),
    Stream.runForEach(slowConsumer)
  );

  const throttleTime = Date.now() - startThrottle;
  console.log(`\n[RESULT] Throttled approach took ${throttleTime}ms\n`);

  // Summary
  console.log(`[SUMMARY]`);
  console.log(`  Without backpressure control:`);
  console.log(`    - Queue would grow to 100 items (memory risk)`);
  console.log(`    - Producer/consumer operate independently`);
  console.log(`  With buffering:`);
  console.log(`    - Queue bounded to 5 items (safe)`);
  console.log(`    - Producer waits when buffer full`);
  console.log(`  With throttling:`);
  console.log(`    - Production rate limited to 20/sec`);
  console.log(`    - Smooth controlled flow`);
});

Effect.runPromise(program);

Advanced: Adaptive Backpressure

Adjust buffering based on consumption rate:

interface BackpressureMetrics {
  readonly bufferSize: number;
  readonly avgWaitTime: number;
  readonly drainRate: number;
}

const adaptiveBuffer = <A>(
  stream: Stream.Stream<A>,
  initialBufferSize: number
): Stream.Stream<A> =>
  Stream.gen(function* () {
    const metrics = {
      bufferSize: initialBufferSize,
      itemsProcessed: 0,
      totalWaitTime: 0,
    };

    const adaptiveStream = stream.pipe(
      Stream.buffer(metrics.bufferSize),
      Stream.tap((item) => {
        metrics.itemsProcessed++;
      })
    );

    // Monitor and adjust every 1000 items
    const monitor = Effect.gen(function* () {
      while (true) {
        yield* Effect.sleep("1 second");

        const avgWait = metrics.totalWaitTime / Math.max(1, metrics.itemsProcessed);
        const drainRate = metrics.itemsProcessed / 1000;

        yield* Effect.log(
          `[ADAPTIVE] Buffer: ${metrics.bufferSize}, Wait: ${avgWait.toFixed(1)}ms, Rate: ${drainRate.toFixed(1)} items/sec`
        );

        // Increase buffer if high wait times
        if (avgWait > 100 && metrics.bufferSize < 100) {
          metrics.bufferSize *= 1.5;
          yield* Effect.log(
            `[ADAPTIVE] Increased buffer to ${metrics.bufferSize}`
          );
        }

        // Decrease buffer if low wait times
        if (avgWait < 10 && metrics.bufferSize > initialBufferSize) {
          metrics.bufferSize /= 1.5;
          yield* Effect.log(
            `[ADAPTIVE] Decreased buffer to ${metrics.bufferSize}`
          );
        }

        metrics.itemsProcessed = 0;
        metrics.totalWaitTime = 0;
      }
    });

    yield* Effect.fork(monitor);
    yield* adaptiveStream;
  });

Advanced: Chunk Processing with Backpressure

Process items in fixed-size chunks while managing backpressure:

const chunkedProcessing = <A, B>(
  stream: Stream.Stream<A>,
  chunkSize: number,
  processChunk: (chunk: Chunk.Chunk<A>) => Effect.Effect<B>
): Stream.Stream<B> =>
  stream.pipe(
    // Collect into chunks
    Stream.chunks,
    // Filter to desired size (drop smaller end chunks or use sliding)
    Stream.filter((chunk) => Chunk.size(chunk) === chunkSize),
    // Process each chunk
    Stream.mapEffect(processChunk),
    // Buffer to prevent backlog
    Stream.buffer(5)
  );

// Usage: Process log lines in 100-item batches
const batchedLogProcessing = chunkedProcessing(
  Stream.fromIterable(
    Array.from({ length: 1000 }, (_, i) => `log-line-${i}`)
  ),
  100,
  (chunk) =>
    Effect.gen(function* () {
      const count = Chunk.size(chunk);
      yield* Effect.log(
        `Processing batch of ${count} items`
      );
      yield* Effect.sleep("500 millis");
      return count;
    })
).pipe(
  Stream.tap((count) =>
    Effect.log(`Processed ${count} items`)
  ),
  Stream.runDrain
);

Advanced: Debouncing Rapid Events

Skip duplicates or rapid events:

interface Event {
  readonly id: string;
  readonly timestamp: Date;
}

const debounceStream = <A extends { id: string }>(
  stream: Stream.Stream<A>,
  delayMs: number
): Stream.Stream<A> => {
  const lastEmit = new Map<string, number>();

  return stream.pipe(
    Stream.filter((item) => {
      const now = Date.now();
      const lastTime = lastEmit.get(item.id) ?? 0;

      if (now - lastTime > delayMs) {
        lastEmit.set(item.id, now);
        return true; // Emit
      }

      return false; // Skip (too recent)
    })
  );
};

// Usage: Debounce rapid user activity events
const debouncedEvents = debounceStream(
  Stream.fromIterable([
    { id: "user-123", timestamp: new Date() },
    { id: "user-123", timestamp: new Date() }, // Skipped
    { id: "user-123", timestamp: new Date() }, // Skipped
    { id: "user-456", timestamp: new Date() },
    { id: "user-456", timestamp: new Date() }, // Skipped
  ]),
  100 // Wait 100ms between events for same user
).pipe(
  Stream.tap((event) =>
    Effect.log(`Debounced: ${event.id}`)
  ),
  Stream.runDrain
);

Advanced: Multi-Level Backpressure Strategy

Combine multiple backpressure techniques:

interface BackpressureStrategy {
  readonly bufferSize: number;
  readonly throttlePerSec: number;
  readonly chunkSize: number;
  readonly debounceMs: number;
}

const applyBackpressureStrategy = <A>(
  stream: Stream.Stream<A>,
  strategy: BackpressureStrategy
): Stream.Stream<A> =>
  stream.pipe(
    // Level 1: Debounce rapid items
    Stream.filter(() => true), // Add debounce logic here

    // Level 2: Throttle emission rate
    Stream.throttle(
      strategy.throttlePerSec,
      "1 second"
    ),

    // Level 3: Buffer with bounded queue
    Stream.buffer(strategy.bufferSize),

    // Level 4: Chunk for batch processing
    Stream.chunkN(strategy.chunkSize),

    // Monitor backpressure health
    Stream.tap((chunk) =>
      Effect.log(
        `[BACKPRESSURE] Chunk size: ${Chunk.size(chunk)}`
      )
    )
  );

// Conservative strategy for high-volume sources
const conservativeStrategy: BackpressureStrategy = {
  bufferSize: 5,
  throttlePerSec: 100,
  chunkSize: 10,
  debounceMs: 50,
};

// Aggressive strategy for normal-volume sources
const aggressiveStrategy: BackpressureStrategy = {
  bufferSize: 100,
  throttlePerSec: 10000,
  chunkSize: 1000,
  debounceMs: 0,
};

When to Use This Pattern

Use backpressure control when:

  • Producer faster than consumer
  • Memory constraints (prevent unbounded growth)
  • High-frequency data sources (sensors, market data)
  • Network streams (files, APIs)
  • Rate limiting requirements

⚠️ Trade-offs:

  • Buffering adds latency
  • Throttling reduces throughput
  • Tuning buffers requires experimentation
  • Monitoring overhead

Backpressure Techniques Comparison

Technique Mechanism Latency Impact Memory Impact Use Case
Buffer Bounded queue Medium Controlled Normal flow control
Throttle Rate limiting High Low Protecting downstream
Chunk Batch processing Low-Medium Low Bulk operations
Debounce Skip rapid Low Very low Duplicate reduction

See Also