Skip to content

Latest commit

 

History

History
484 lines (380 loc) · 14 KB

File metadata and controls

484 lines (380 loc) · 14 KB
title Concurrency Pattern 3: Coordinate Multiple Fibers with Latch
id concurrency-pattern-coordinate-with-latch
skillLevel intermediate
applicationPatternId concurrency
summary Use Latch to synchronize multiple fibers, enabling patterns like coordinating N async tasks, fan-out/fan-in, and barrier synchronization.
tags
concurrency
latch
synchronization
fan-out-fan-in
barrier
multi-fiber
rule
description
Use Latch to coordinate multiple fibers awaiting a common completion signal, enabling fan-out/fan-in and barrier synchronization patterns.
related
concurrency-pattern-coordinate-with-deferred
concurrency-pattern-rate-limit-with-semaphore
run-background-tasks-with-fork
author effect_website
lessonOrder 4

Guideline

When you need multiple fibers to coordinate and wait for a shared completion condition, use Latch. A Latch is a countdown synchronization object: you initialize it with N, each fiber calls countDown(), and all waiting fibers are released when the count reaches zero. This enables fan-out/fan-in patterns and barrier synchronization.


Rationale

Multi-fiber coordination requires synchronization:

  • Parallel initialization: Wait for all services to start before proceeding
  • Fan-out/fan-in: Spawn multiple workers, collect results when all done
  • Barrier synchronization: All fibers wait at a checkpoint before proceeding
  • Graceful shutdown: Wait for all active fibers to complete
  • Aggregation patterns: Process streams in parallel, combine when ready

Unlike Deferred (one producer signals once), Latch:

  • Supports multiple signalers (each countDown())
  • Used with known count of participants (countdown from N to 0)
  • Enables barrier patterns (all wait for all)
  • Fair queuing of waiting fibers

Good Example

This example demonstrates a fan-out/fan-in pattern: spawn 5 worker fibers that process tasks in parallel, and coordinate to know when all are complete.

import { Effect, Latch, Fiber, Ref } from "effect";

interface WorkResult {
  readonly workerId: number;
  readonly taskId: number;
  readonly result: string;
  readonly duration: number;
}

// Simulate a long-running task
const processTask = (
  workerId: number,
  taskId: number
): Effect.Effect<WorkResult> =>
  Effect.gen(function* () {
    const startTime = Date.now();
    const duration = 100 + Math.random() * 400; // 100-500ms

    yield* Effect.log(
      `[Worker ${workerId}] Starting task ${taskId} (duration: ${Math.round(duration)}ms)`
    );

    yield* Effect.sleep(`${Math.round(duration)} millis`);

    const elapsed = Date.now() - startTime;

    yield* Effect.log(
      `[Worker ${workerId}] ✓ Completed task ${taskId} in ${elapsed}ms`
    );

    return {
      workerId,
      taskId,
      result: `Result from worker ${workerId} on task ${taskId}`,
      duration: elapsed,
    };
  });

// Fan-out/Fan-in with Latch
const fanOutFanIn = Effect.gen(function* () {
  const numWorkers = 5;
  const tasksPerWorker = 3;

  // Create latch: will countdown from (numWorkers) when all workers complete
  const workersCompleteLatch = yield* Latch.make(numWorkers);

  // Track results from all workers
  const results = yield* Ref.make<WorkResult[]>([]);

  // Worker fiber that processes tasks sequentially
  const createWorker = (workerId: number) =>
    Effect.gen(function* () {
      try {
        yield* Effect.log(`[Worker ${workerId}] ▶ Starting`);

        // Process multiple tasks
        for (let i = 1; i <= tasksPerWorker; i++) {
          const result = yield* processTask(workerId, i);
          yield* Ref.update(results, (rs) => [...rs, result]);
        }

        yield* Effect.log(`[Worker ${workerId}] ✓ All tasks completed`);
      } finally {
        // Signal completion to latch
        yield* Latch.countDown(workersCompleteLatch);
        yield* Effect.log(`[Worker ${workerId}] Signaled latch`);
      }
    });

  // Spawn all workers as background fibers
  console.log(`\n[COORDINATOR] Spawning ${numWorkers} workers...\n`);

  const workerFibers = yield* Effect.all(
    Array.from({ length: numWorkers }, (_, i) =>
      createWorker(i + 1).pipe(Effect.fork)
    )
  );

  // Wait for all workers to complete
  console.log(`\n[COORDINATOR] Waiting for all workers to finish...\n`);

  yield* Latch.await(workersCompleteLatch);

  console.log(`\n[COORDINATOR] All workers completed!\n`);

  // Join all fibers to ensure cleanup
  yield* Effect.all(workerFibers.map((fiber) => Fiber.join(fiber)));

  // Aggregate results
  const allResults = yield* Ref.get(results);

  console.log(`[SUMMARY]`);
  console.log(`  Total workers: ${numWorkers}`);
  console.log(`  Tasks per worker: ${tasksPerWorker}`);
  console.log(`  Total tasks: ${allResults.length}`);
  console.log(
    `  Avg task duration: ${Math.round(
      allResults.reduce((sum, r) => sum + r.duration, 0) / allResults.length
    )}ms`
  );
});

Effect.runPromise(fanOutFanIn);

This pattern:

  1. Creates Latch with count = number of workers
  2. Spawns worker fibers as background tasks
  3. Each worker processes tasks independently
  4. Signals Latch when work completes (countDown)
  5. Coordinator awaits until all workers signal
  6. Aggregates results from all workers

Advanced: Barrier Synchronization

All fibers wait at a checkpoint before proceeding:

interface WorkerConfig {
  readonly workerId: number;
  readonly phaseDuration: number;
}

const barrierSynchronization = (workers: WorkerConfig[]) =>
  Effect.gen(function* () {
    const phases = 3;
    const barrierLatches = yield* Effect.all(
      Array.from({ length: phases }, () => Latch.make(workers.length))
    );

    // Worker that processes phases with barrier synchronization
    const createBarrierWorker = (config: WorkerConfig, barriers: Latch[]) =>
      Effect.gen(function* () {
        for (const [phase, barrier] of barriers.entries()) {
          yield* Effect.log(
            `[Worker ${config.workerId}] Phase ${phase + 1}: Working...`
          );

          // Simulate work
          yield* Effect.sleep(`${config.phaseDuration} millis`);

          yield* Effect.log(
            `[Worker ${config.workerId}] Phase ${phase + 1}: Done, waiting at barrier`
          );

          // Signal completion for this phase
          yield* Latch.countDown(barrier);

          // Wait for all workers to reach barrier
          yield* Latch.await(barrier);

          yield* Effect.log(
            `[Worker ${config.workerId}] Phase ${phase + 1}: All workers ready, proceeding`
          );
        }
      });

    // Spawn all workers with barrier coordination
    const fibers = yield* Effect.all(
      workers.map((w) =>
        createBarrierWorker(w, barrierLatches).pipe(Effect.fork)
      )
    );

    // Wait for all to complete
    yield* Effect.all(fibers.map((f) => Fiber.join(f)));
  });

const barrierExample = barrierSynchronization([
  { workerId: 1, phaseDuration: 100 },
  { workerId: 2, phaseDuration: 150 },
  { workerId: 3, phaseDuration: 120 },
]);

Advanced: Hierarchical Coordination (Tree Join)

Coordinate multiple stages of workers:

interface TreeJoinConfig {
  readonly stageCount: number;
  readonly workersPerStage: number;
}

const treeJoinCoordination = (config: TreeJoinConfig) =>
  Effect.gen(function* () {
    const stageLocks: Latch[] = [];

    // Create latch for each stage
    for (let stage = 0; stage < config.stageCount; stage++) {
      const latch = yield* Latch.make(config.workersPerStage);
      stageLocks.push(latch);
    }

    // Worker that participates in stage-based coordination
    const createStageWorker = (
      stageIndex: number,
      workerId: number
    ): Effect.Effect<void> =>
      Effect.gen(function* () {
        // Only run if this stage has workers
        if (stageIndex < config.stageCount) {
          yield* Effect.log(
            `[Stage ${stageIndex}] Worker ${workerId} processing...`
          );

          // Simulate work
          yield* Effect.sleep(`${50 + Math.random() * 100} millis`);

          yield* Effect.log(
            `[Stage ${stageIndex}] Worker ${workerId} done, signaling`
          );

          // Signal completion
          yield* Latch.countDown(stageLocks[stageIndex]);

          // Wait for all workers in this stage
          yield* Latch.await(stageLocks[stageIndex]);

          yield* Effect.log(
            `[Stage ${stageIndex}] All workers ready, proceeding to next stage`
          );

          // Recursively proceed to next stage
          if (stageIndex + 1 < config.stageCount) {
            yield* createStageWorker(stageIndex + 1, workerId);
          }
        }
      });

    // Start all workers at stage 0
    const fibers = yield* Effect.all(
      Array.from({ length: config.workersPerStage }, (_, i) =>
        createStageWorker(0, i + 1).pipe(Effect.fork)
      )
    );

    // Wait for completion
    yield* Effect.all(fibers.map((f) => Fiber.join(f)));
  });

Advanced: Error Propagation with Latch

Handle failures in coordinated fibers:

interface CoordinatedTask {
  readonly taskId: number;
  readonly shouldFail: boolean;
}

const coordinatedWithErrorHandling = (
  tasks: CoordinatedTask[]
) =>
  Effect.gen(function* () {
    const completionLatch = yield* Latch.make(tasks.length);
    const errors = yield* Ref.make<Error[]>([]);
    const results = yield* Ref.make<Map<number, string>>(new Map());

    // Worker that can fail
    const executeTask = (task: CoordinatedTask) =>
      Effect.gen(function* () {
        try {
          yield* Effect.log(`[Task ${task.taskId}] Starting...`);

          if (task.shouldFail) {
            yield* Effect.sleep("50 millis");
            throw new Error(`Task ${task.taskId} intentionally failed`);
          }

          yield* Effect.sleep("100 millis");
          yield* Ref.update(results, (m) =>
            m.set(task.taskId, `Success from task ${task.taskId}`)
          );

          yield* Effect.log(`[Task ${task.taskId}] ✓ Completed`);
        } catch (error) {
          yield* Ref.update(errors, (errs) => [...errs, error as Error]);
          yield* Effect.log(`[Task ${task.taskId}] ✗ Failed: ${error}`);
        } finally {
          // Always signal completion
          yield* Latch.countDown(completionLatch);
        }
      });

    // Execute all tasks
    const fibers = yield* Effect.all(
      tasks.map((t) => executeTask(t).pipe(Effect.fork))
    );

    // Wait for all to complete
    yield* Latch.await(completionLatch);
    yield* Effect.all(fibers.map((f) => Fiber.join(f)));

    // Check for errors
    const taskErrors = yield* Ref.get(errors);
    if (taskErrors.length > 0) {
      yield* Effect.log(
        `\n⚠ ${taskErrors.length} task(s) failed during coordination:`
      );
      taskErrors.forEach((err, idx) =>
        console.log(`  ${idx + 1}. ${err.message}`)
      );
    }

    const allResults = yield* Ref.get(results);
    yield* Effect.log(
      `\n✓ ${allResults.size} task(s) succeeded during coordination`
    );
  });

Advanced: Timeout-Based Coordination

Proceed even if some fibers don't complete:

const coordinatedWithTimeout = (
  taskCount: number,
  timeoutMs: number
) =>
  Effect.gen(function* () {
    const completionLatch = yield* Latch.make(taskCount);
    const completed = yield* Ref.make<number>(0);

    // Task that might hang
    const unreliableTask = (taskId: number) =>
      Effect.gen(function* () {
        const delay = Math.random() > 0.5 ? 1000 : 200; // 50% chance of timeout

        yield* Effect.log(
          `[Task ${taskId}] Starting with ${delay}ms delay`
        );

        yield* Effect.sleep(`${delay} millis`).pipe(
          Effect.catchAll(() => Effect.void)
        );

        yield* Ref.update(completed, (c) => c + 1);
        yield* Latch.countDown(completionLatch);

        yield* Effect.log(`[Task ${taskId}] Completed`);
      });

    // Spawn tasks
    const fibers = yield* Effect.all(
      Array.from({ length: taskCount }, (_, i) =>
        unreliableTask(i + 1).pipe(Effect.fork)
      )
    );

    // Wait with timeout
    const waitResult = yield* Latch.await(completionLatch).pipe(
      Effect.timeout(`${timeoutMs} millis`),
      Effect.either
    );

    if (waitResult._tag === "Left") {
      yield* Effect.log(
        `⚠ Coordination timeout after ${timeoutMs}ms`
      );
    } else {
      yield* Effect.log(`✓ All tasks completed within timeout`);
    }

    // Join fibers
    yield* Effect.all(fibers.map((f) => Fiber.join(f)));

    const completedCount = yield* Ref.get(completed);
    yield* Effect.log(
      `[RESULT] ${completedCount}/${taskCount} tasks completed`
    );
  });

When to Use This Pattern

Use Latch when:

  • Coordinating N fibers with a completion condition
  • Implementing fan-out/fan-in patterns
  • Creating barrier synchronization (all wait for all)
  • Waiting for parallel initialization to complete
  • Coordinating multi-stage pipelines
  • Need to know when all workers are done

⚠️ Trade-offs:

  • Latch is one-time use (can't reset count)
  • All N fibers must reach latch for waiters to proceed
  • If any fiber hangs, coordinated fibers wait indefinitely
  • Requires explicit countDown() calls (can be forgotten)

When to Choose Between Latch and Deferred

Scenario Latch Deferred
One producer signals many consumers Deferred ✓ Simpler
N producers signal N consumers ✓ Latch -
One-time coordination Both Both
Reusable coordination - Use multiple Deferreds
Known participant count ✓ Latch -
Unknown participant count - ✓ Deferred

See Also