Skip to content

Latest commit

 

History

History
479 lines (378 loc) · 12.9 KB

File metadata and controls

479 lines (378 loc) · 12.9 KB
title Concurrency Pattern 4: Distribute Work with Queue
id concurrency-pattern-queue-work-distribution
skillLevel intermediate
applicationPatternId concurrency
summary Use Queue to decouple producers and consumers, enabling work distribution, pipeline stages, and backpressure handling across concurrent fibers.
tags
concurrency
queue
producer-consumer
backpressure
work-distribution
decoupling
rule
description
Use Queue to distribute work between producers and consumers with built-in backpressure, enabling flexible pipeline coordination.
related
decouple-fibers-with-queue-pubsub
concurrency-pattern-coordinate-with-deferred
concurrency-pattern-rate-limit-with-semaphore
author effect_website
lessonOrder 5

Guideline

When multiple fibers need to coordinate work asynchronously, use Queue:

  • Producers add items (enqueue)
  • Consumers remove and process items (dequeue)
  • Backpressure built-in: producers wait if queue is full
  • Decoupling: Producers don't block on consumer speed

Queue variants: bounded (size limit), unbounded (unlimited), dropping (discards on overflow).


Rationale

Direct producer-consumer coordination creates problems:

  • Blocking: Producer waits for consumer to finish
  • Tight coupling: Producer depends on consumer speed
  • Memory pressure: Fast producer floods memory with results
  • No backpressure: Downstream overload propagates upstream

Queue solves these:

  • Asynchronous: Producer enqueues and continues
  • Decoupled: Producer/consumer independent
  • Backpressure: Producer waits when queue full (natural flow control)
  • Throughput: Consumer processes at own pace

Real-world example: API request handler + database writer

  • Direct: Handler waits for DB write (blocking, slow requests)
  • Queue: Handler enqueues write and returns immediately (responsive)

Good Example

This example demonstrates a producer-consumer pipeline with a bounded queue for buffering work items.

import { Effect, Queue, Fiber, Ref } from "effect";

interface WorkItem {
  readonly id: number;
  readonly data: string;
  readonly timestamp: number;
}

interface WorkResult {
  readonly itemId: number;
  readonly processed: string;
  readonly duration: number;
}

// Producer: generates work items
const producer = (
  queue: Queue.Enqueue<WorkItem>,
  count: number
): Effect.Effect<void> =>
  Effect.gen(function* () {
    yield* Effect.log(`[PRODUCER] Starting, generating ${count} items`);

    for (let i = 1; i <= count; i++) {
      const item: WorkItem = {
        id: i,
        data: `Item ${i}`,
        timestamp: Date.now(),
      };

      const start = Date.now();

      // Enqueue - will block if queue is full (backpressure)
      yield* Queue.offer(queue, item);

      const delay = Date.now() - start;

      if (delay > 0) {
        yield* Effect.log(
          `[PRODUCER] Item ${i} enqueued (waited ${delay}ms due to backpressure)`
        );
      } else {
        yield* Effect.log(`[PRODUCER] Item ${i} enqueued`);
      }

      // Simulate work
      yield* Effect.sleep("50 millis");
    }

    yield* Effect.log(`[PRODUCER] ✓ All items enqueued`);
  });

// Consumer: processes work items
const consumer = (
  queue: Queue.Dequeue<WorkItem>,
  consumerId: number,
  results: Ref.Ref<WorkResult[]>
): Effect.Effect<void> =>
  Effect.gen(function* () {
    yield* Effect.log(`[CONSUMER ${consumerId}] Starting`);

    while (true) {
      // Dequeue - will block if queue is empty
      const item = yield* Queue.take(queue).pipe(Effect.either);

      if (item._tag === "Left") {
        yield* Effect.log(`[CONSUMER ${consumerId}] Queue closed, stopping`);
        return;
      }

      const workItem = item.right;
      const startTime = Date.now();

      yield* Effect.log(
        `[CONSUMER ${consumerId}] Processing ${workItem.data}`
      );

      // Simulate processing
      yield* Effect.sleep("150 millis");

      const duration = Date.now() - startTime;
      const result: WorkResult = {
        itemId: workItem.id,
        processed: `${workItem.data} [processed by consumer ${consumerId}]`,
        duration,
      };

      yield* Ref.update(results, (rs) => [...rs, result]);

      yield* Effect.log(
        `[CONSUMER ${consumerId}] ✓ Completed ${workItem.data} in ${duration}ms`
      );
    }
  });

// Main: coordinate producer and consumers
const program = Effect.gen(function* () {
  // Create bounded queue with capacity 3
  const queue = yield* Queue.bounded<WorkItem>(3);
  const results = yield* Ref.make<WorkResult[]>([]);

  console.log(`\n[MAIN] Starting producer-consumer pipeline with queue size 3\n`);

  // Spawn producer
  const producerFiber = yield* producer(queue, 10).pipe(Effect.fork);

  // Spawn 2 consumers
  const consumer1 = yield* consumer(queue, 1, results).pipe(Effect.fork);
  const consumer2 = yield* consumer(queue, 2, results).pipe(Effect.fork);

  // Wait for producer to finish
  yield* Fiber.join(producerFiber);

  // Give consumers time to finish
  yield* Effect.sleep("3 seconds");

  // Close queue and wait for consumers
  yield* Queue.shutdown(queue);
  yield* Fiber.join(consumer1);
  yield* Fiber.join(consumer2);

  // Summary
  const allResults = yield* Ref.get(results);
  const totalDuration = allResults.reduce((sum, r) => sum + r.duration, 0);

  console.log(`\n[SUMMARY]`);
  console.log(`  Items processed: ${allResults.length}`);
  console.log(
    `  Avg processing time: ${Math.round(totalDuration / allResults.length)}ms`
  );
});

Effect.runPromise(program);

This pattern:

  1. Creates bounded queue with capacity (backpressure point)
  2. Producer enqueues items (blocks if full)
  3. Consumers dequeue and process (each at own pace)
  4. Queue coordinates flow automatically

Advanced: Dynamic Consumer Pool

Scale consumer count based on queue depth:

const adaptiveConsumerPool = (
  queue: Queue.Dequeue<WorkItem>,
  maxConsumers: number
) =>
  Effect.gen(function* () {
    const activeConsumers = yield* Ref.make(1);
    const metrics = yield* Ref.make({
      itemsProcessed: 0,
      queueDepth: 0,
      avgProcessTime: 0,
    });

    // Monitor queue and scale consumers
    const scaler = Effect.gen(function* () {
      while (true) {
        yield* Effect.sleep("500 millis");

        const depth = yield* Queue.size(queue);
        const consumers = yield* Ref.get(activeConsumers);

        // Scale up if queue building
        if (depth > consumers * 2 && consumers < maxConsumers) {
          yield* Ref.update(activeConsumers, (c) => c + 1);
          yield* Effect.log(
            `[SCALER] Increased to ${consumers + 1} consumers (queue depth: ${depth})`
          );
        }

        // Scale down if queue draining
        if (depth < consumers / 2 && consumers > 1) {
          yield* Ref.update(activeConsumers, (c) => c - 1);
          yield* Effect.log(
            `[SCALER] Decreased to ${consumers - 1} consumers (queue depth: ${depth})`
          );
        }
      }
    });

    return scaler;
  });

Advanced: Priority Queue with Multiple Queues

Separate queues for different priority levels:

interface PriorityWorkItem extends WorkItem {
  readonly priority: number; // Higher = more important
}

const createPriorityQueueSystem = () =>
  Effect.gen(function* () {
    // High priority queue (size 10), normal queue (size 50)
    const highPriorityQueue = yield* Queue.bounded<PriorityWorkItem>(10);
    const normalQueue = yield* Queue.bounded<PriorityWorkItem>(50);

    const enqueueWithPriority = (item: PriorityWorkItem) =>
      Effect.gen(function* () {
        if (item.priority > 5) {
          yield* Queue.offer(highPriorityQueue, item);
          yield* Effect.log(
            `[PRIORITY] Item ${item.id} enqueued to HIGH priority queue`
          );
        } else {
          yield* Queue.offer(normalQueue, item);
          yield* Effect.log(
            `[PRIORITY] Item ${item.id} enqueued to NORMAL queue`
          );
        }
      });

    // Consumer prioritizes high-priority queue
    const priorityConsumer = (consumerId: number) =>
      Effect.gen(function* () {
        while (true) {
          // Try high priority first
          const highItem = yield* Queue.poll(highPriorityQueue);

          if (highItem._tag === "Some") {
            yield* Effect.log(
              `[CONSUMER ${consumerId}] Processing HIGH priority item ${highItem.value.id}`
            );
            yield* Effect.sleep("100 millis");
            continue;
          }

          // Fall back to normal priority
          const normalItem = yield* Queue.poll(normalQueue);

          if (normalItem._tag === "Some") {
            yield* Effect.log(
              `[CONSUMER ${consumerId}] Processing NORMAL priority item ${normalItem.value.id}`
            );
            yield* Effect.sleep("100 millis");
            continue;
          }

          // No items available, wait a bit
          yield* Effect.sleep("100 millis");
        }
      });

    return {
      enqueueWithPriority,
      priorityConsumer,
      highPriorityQueue,
      normalQueue,
    };
  });

Advanced: Queue with Batch Processing

Dequeue multiple items and process as batch:

const batchConsumer = (
  queue: Queue.Dequeue<WorkItem>,
  batchSize: number,
  processBatch: (items: WorkItem[]) => Effect.Effect<void>
): Effect.Effect<void> =>
  Effect.gen(function* () {
    while (true) {
      const batch: WorkItem[] = [];

      // Collect up to batchSize items (with timeout)
      for (let i = 0; i < batchSize; i++) {
        const item = yield* Queue.take(queue).pipe(
          Effect.timeout("100 millis"),
          Effect.either
        );

        if (item._tag === "Left") {
          // Timeout - process whatever we have
          break;
        }

        batch.push(item.right);
      }

      if (batch.length > 0) {
        yield* Effect.log(
          `[BATCH] Processing ${batch.length} items`
        );

        yield* processBatch(batch);

        yield* Effect.log(
          `[BATCH] ✓ Completed batch of ${batch.length}`
        );
      } else {
        // No items, wait before retrying
        yield* Effect.sleep("100 millis");
      }
    }
  });

Advanced: Async Pipeline with Multiple Stages

Chain multiple queue-based processing stages:

interface PipelineStage<I, O> {
  readonly name: string;
  readonly inputQueue: Queue.Dequeue<I>;
  readonly outputQueue: Queue.Enqueue<O>;
  readonly process: (item: I) => Effect.Effect<O>;
}

const createPipeline = <I, O>(
  inputQueue: Queue.Dequeue<I>,
  stages: Array<(item: I) => Effect.Effect<I>>,
  outputQueue: Queue.Enqueue<O>
): Effect.Effect<void> =>
  Effect.gen(function* () {
    while (true) {
      const item = yield* Queue.take(inputQueue);
      let current: any = item;

      // Process through all stages
      for (const stage of stages) {
        current = yield* stage(current);
      }

      yield* Queue.offer(outputQueue, current);
    }
  });

// Example: 3-stage pipeline
const pipelineExample = Effect.gen(function* () {
  const inputQueue = yield* Queue.bounded<string>(10);
  const outputQueue = yield* Queue.bounded<string>(10);

  const stages = [
    (item: string) =>
      Effect.gen(function* () {
        yield* Effect.log(`[STAGE 1] Validating: ${item}`);
        return item.toUpperCase();
      }),

    (item: string) =>
      Effect.gen(function* () {
        yield* Effect.log(`[STAGE 2] Enriching: ${item}`);
        return `${item}-ENRICHED`;
      }),

    (item: string) =>
      Effect.gen(function* () {
        yield* Effect.log(`[STAGE 3] Formatting: ${item}`);
        return `[${item}]`;
      }),
  ];

  yield* createPipeline(inputQueue, stages, outputQueue);
});

When to Use This Pattern

Use Queue when:

  • Decoupling producers from consumers
  • Implementing pipeline stages
  • Handling backpressure naturally
  • Work distribution across fibers
  • Async work buffering
  • Load balancing across workers

⚠️ Trade-offs:

  • Queue copy overhead for large items
  • Memory usage grows with queue size
  • Bounded queues block producers
  • Unbounded queues can exhaust memory

Queue Type Guide

Type Behavior Use Case
bounded(n) Backpressure when full Producer/consumer control
unbounded() Grows unbounded Unbounded work streams
dropping() Discards on overflow High-frequency events (sampling)
sliding() Removes oldest on overflow Sliding window buffering

See Also