| title | Concurrency Pattern 3: Coordinate Multiple Fibers with Latch | ||||||
|---|---|---|---|---|---|---|---|
| id | concurrency-pattern-coordinate-with-latch | ||||||
| skillLevel | intermediate | ||||||
| applicationPatternId | concurrency | ||||||
| summary | Use Latch to synchronize multiple fibers, enabling patterns like coordinating N async tasks, fan-out/fan-in, and barrier synchronization. | ||||||
| tags |
|
||||||
| rule |
|
||||||
| related |
|
||||||
| author | effect_website | ||||||
| lessonOrder | 4 |
When you need multiple fibers to coordinate and wait for a shared completion condition, use Latch. A Latch is a countdown synchronization object: you initialize it with N, each fiber calls countDown(), and all waiting fibers are released when the count reaches zero. This enables fan-out/fan-in patterns and barrier synchronization.
Multi-fiber coordination requires synchronization:
- Parallel initialization: Wait for all services to start before proceeding
- Fan-out/fan-in: Spawn multiple workers, collect results when all done
- Barrier synchronization: All fibers wait at a checkpoint before proceeding
- Graceful shutdown: Wait for all active fibers to complete
- Aggregation patterns: Process streams in parallel, combine when ready
Unlike Deferred (one producer signals once), Latch:
- Supports multiple signalers (each
countDown()) - Used with known count of participants (countdown from N to 0)
- Enables barrier patterns (all wait for all)
- Fair queuing of waiting fibers
This example demonstrates a fan-out/fan-in pattern: spawn 5 worker fibers that process tasks in parallel, and coordinate to know when all are complete.
import { Effect, Latch, Fiber, Ref } from "effect";
interface WorkResult {
readonly workerId: number;
readonly taskId: number;
readonly result: string;
readonly duration: number;
}
// Simulate a long-running task
const processTask = (
workerId: number,
taskId: number
): Effect.Effect<WorkResult> =>
Effect.gen(function* () {
const startTime = Date.now();
const duration = 100 + Math.random() * 400; // 100-500ms
yield* Effect.log(
`[Worker ${workerId}] Starting task ${taskId} (duration: ${Math.round(duration)}ms)`
);
yield* Effect.sleep(`${Math.round(duration)} millis`);
const elapsed = Date.now() - startTime;
yield* Effect.log(
`[Worker ${workerId}] ✓ Completed task ${taskId} in ${elapsed}ms`
);
return {
workerId,
taskId,
result: `Result from worker ${workerId} on task ${taskId}`,
duration: elapsed,
};
});
// Fan-out/Fan-in with Latch
const fanOutFanIn = Effect.gen(function* () {
const numWorkers = 5;
const tasksPerWorker = 3;
// Create latch: will countdown from (numWorkers) when all workers complete
const workersCompleteLatch = yield* Latch.make(numWorkers);
// Track results from all workers
const results = yield* Ref.make<WorkResult[]>([]);
// Worker fiber that processes tasks sequentially
const createWorker = (workerId: number) =>
Effect.gen(function* () {
try {
yield* Effect.log(`[Worker ${workerId}] ▶ Starting`);
// Process multiple tasks
for (let i = 1; i <= tasksPerWorker; i++) {
const result = yield* processTask(workerId, i);
yield* Ref.update(results, (rs) => [...rs, result]);
}
yield* Effect.log(`[Worker ${workerId}] ✓ All tasks completed`);
} finally {
// Signal completion to latch
yield* Latch.countDown(workersCompleteLatch);
yield* Effect.log(`[Worker ${workerId}] Signaled latch`);
}
});
// Spawn all workers as background fibers
console.log(`\n[COORDINATOR] Spawning ${numWorkers} workers...\n`);
const workerFibers = yield* Effect.all(
Array.from({ length: numWorkers }, (_, i) =>
createWorker(i + 1).pipe(Effect.fork)
)
);
// Wait for all workers to complete
console.log(`\n[COORDINATOR] Waiting for all workers to finish...\n`);
yield* Latch.await(workersCompleteLatch);
console.log(`\n[COORDINATOR] All workers completed!\n`);
// Join all fibers to ensure cleanup
yield* Effect.all(workerFibers.map((fiber) => Fiber.join(fiber)));
// Aggregate results
const allResults = yield* Ref.get(results);
console.log(`[SUMMARY]`);
console.log(` Total workers: ${numWorkers}`);
console.log(` Tasks per worker: ${tasksPerWorker}`);
console.log(` Total tasks: ${allResults.length}`);
console.log(
` Avg task duration: ${Math.round(
allResults.reduce((sum, r) => sum + r.duration, 0) / allResults.length
)}ms`
);
});
Effect.runPromise(fanOutFanIn);This pattern:
- Creates Latch with count = number of workers
- Spawns worker fibers as background tasks
- Each worker processes tasks independently
- Signals Latch when work completes (countDown)
- Coordinator awaits until all workers signal
- Aggregates results from all workers
All fibers wait at a checkpoint before proceeding:
interface WorkerConfig {
readonly workerId: number;
readonly phaseDuration: number;
}
const barrierSynchronization = (workers: WorkerConfig[]) =>
Effect.gen(function* () {
const phases = 3;
const barrierLatches = yield* Effect.all(
Array.from({ length: phases }, () => Latch.make(workers.length))
);
// Worker that processes phases with barrier synchronization
const createBarrierWorker = (config: WorkerConfig, barriers: Latch[]) =>
Effect.gen(function* () {
for (const [phase, barrier] of barriers.entries()) {
yield* Effect.log(
`[Worker ${config.workerId}] Phase ${phase + 1}: Working...`
);
// Simulate work
yield* Effect.sleep(`${config.phaseDuration} millis`);
yield* Effect.log(
`[Worker ${config.workerId}] Phase ${phase + 1}: Done, waiting at barrier`
);
// Signal completion for this phase
yield* Latch.countDown(barrier);
// Wait for all workers to reach barrier
yield* Latch.await(barrier);
yield* Effect.log(
`[Worker ${config.workerId}] Phase ${phase + 1}: All workers ready, proceeding`
);
}
});
// Spawn all workers with barrier coordination
const fibers = yield* Effect.all(
workers.map((w) =>
createBarrierWorker(w, barrierLatches).pipe(Effect.fork)
)
);
// Wait for all to complete
yield* Effect.all(fibers.map((f) => Fiber.join(f)));
});
const barrierExample = barrierSynchronization([
{ workerId: 1, phaseDuration: 100 },
{ workerId: 2, phaseDuration: 150 },
{ workerId: 3, phaseDuration: 120 },
]);Coordinate multiple stages of workers:
interface TreeJoinConfig {
readonly stageCount: number;
readonly workersPerStage: number;
}
const treeJoinCoordination = (config: TreeJoinConfig) =>
Effect.gen(function* () {
const stageLocks: Latch[] = [];
// Create latch for each stage
for (let stage = 0; stage < config.stageCount; stage++) {
const latch = yield* Latch.make(config.workersPerStage);
stageLocks.push(latch);
}
// Worker that participates in stage-based coordination
const createStageWorker = (
stageIndex: number,
workerId: number
): Effect.Effect<void> =>
Effect.gen(function* () {
// Only run if this stage has workers
if (stageIndex < config.stageCount) {
yield* Effect.log(
`[Stage ${stageIndex}] Worker ${workerId} processing...`
);
// Simulate work
yield* Effect.sleep(`${50 + Math.random() * 100} millis`);
yield* Effect.log(
`[Stage ${stageIndex}] Worker ${workerId} done, signaling`
);
// Signal completion
yield* Latch.countDown(stageLocks[stageIndex]);
// Wait for all workers in this stage
yield* Latch.await(stageLocks[stageIndex]);
yield* Effect.log(
`[Stage ${stageIndex}] All workers ready, proceeding to next stage`
);
// Recursively proceed to next stage
if (stageIndex + 1 < config.stageCount) {
yield* createStageWorker(stageIndex + 1, workerId);
}
}
});
// Start all workers at stage 0
const fibers = yield* Effect.all(
Array.from({ length: config.workersPerStage }, (_, i) =>
createStageWorker(0, i + 1).pipe(Effect.fork)
)
);
// Wait for completion
yield* Effect.all(fibers.map((f) => Fiber.join(f)));
});Handle failures in coordinated fibers:
interface CoordinatedTask {
readonly taskId: number;
readonly shouldFail: boolean;
}
const coordinatedWithErrorHandling = (
tasks: CoordinatedTask[]
) =>
Effect.gen(function* () {
const completionLatch = yield* Latch.make(tasks.length);
const errors = yield* Ref.make<Error[]>([]);
const results = yield* Ref.make<Map<number, string>>(new Map());
// Worker that can fail
const executeTask = (task: CoordinatedTask) =>
Effect.gen(function* () {
try {
yield* Effect.log(`[Task ${task.taskId}] Starting...`);
if (task.shouldFail) {
yield* Effect.sleep("50 millis");
throw new Error(`Task ${task.taskId} intentionally failed`);
}
yield* Effect.sleep("100 millis");
yield* Ref.update(results, (m) =>
m.set(task.taskId, `Success from task ${task.taskId}`)
);
yield* Effect.log(`[Task ${task.taskId}] ✓ Completed`);
} catch (error) {
yield* Ref.update(errors, (errs) => [...errs, error as Error]);
yield* Effect.log(`[Task ${task.taskId}] ✗ Failed: ${error}`);
} finally {
// Always signal completion
yield* Latch.countDown(completionLatch);
}
});
// Execute all tasks
const fibers = yield* Effect.all(
tasks.map((t) => executeTask(t).pipe(Effect.fork))
);
// Wait for all to complete
yield* Latch.await(completionLatch);
yield* Effect.all(fibers.map((f) => Fiber.join(f)));
// Check for errors
const taskErrors = yield* Ref.get(errors);
if (taskErrors.length > 0) {
yield* Effect.log(
`\n⚠ ${taskErrors.length} task(s) failed during coordination:`
);
taskErrors.forEach((err, idx) =>
console.log(` ${idx + 1}. ${err.message}`)
);
}
const allResults = yield* Ref.get(results);
yield* Effect.log(
`\n✓ ${allResults.size} task(s) succeeded during coordination`
);
});Proceed even if some fibers don't complete:
const coordinatedWithTimeout = (
taskCount: number,
timeoutMs: number
) =>
Effect.gen(function* () {
const completionLatch = yield* Latch.make(taskCount);
const completed = yield* Ref.make<number>(0);
// Task that might hang
const unreliableTask = (taskId: number) =>
Effect.gen(function* () {
const delay = Math.random() > 0.5 ? 1000 : 200; // 50% chance of timeout
yield* Effect.log(
`[Task ${taskId}] Starting with ${delay}ms delay`
);
yield* Effect.sleep(`${delay} millis`).pipe(
Effect.catchAll(() => Effect.void)
);
yield* Ref.update(completed, (c) => c + 1);
yield* Latch.countDown(completionLatch);
yield* Effect.log(`[Task ${taskId}] Completed`);
});
// Spawn tasks
const fibers = yield* Effect.all(
Array.from({ length: taskCount }, (_, i) =>
unreliableTask(i + 1).pipe(Effect.fork)
)
);
// Wait with timeout
const waitResult = yield* Latch.await(completionLatch).pipe(
Effect.timeout(`${timeoutMs} millis`),
Effect.either
);
if (waitResult._tag === "Left") {
yield* Effect.log(
`⚠ Coordination timeout after ${timeoutMs}ms`
);
} else {
yield* Effect.log(`✓ All tasks completed within timeout`);
}
// Join fibers
yield* Effect.all(fibers.map((f) => Fiber.join(f)));
const completedCount = yield* Ref.get(completed);
yield* Effect.log(
`[RESULT] ${completedCount}/${taskCount} tasks completed`
);
});✅ Use Latch when:
- Coordinating N fibers with a completion condition
- Implementing fan-out/fan-in patterns
- Creating barrier synchronization (all wait for all)
- Waiting for parallel initialization to complete
- Coordinating multi-stage pipelines
- Need to know when all workers are done
- Latch is one-time use (can't reset count)
- All N fibers must reach latch for waiters to proceed
- If any fiber hangs, coordinated fibers wait indefinitely
- Requires explicit countDown() calls (can be forgotten)
| Scenario | Latch | Deferred |
|---|---|---|
| One producer signals many consumers | Deferred | ✓ Simpler |
| N producers signal N consumers | ✓ Latch | - |
| One-time coordination | Both | Both |
| Reusable coordination | - | Use multiple Deferreds |
| Known participant count | ✓ Latch | - |
| Unknown participant count | - | ✓ Deferred |
- Concurrency Pattern 1: Coordinate with Deferred - One-time async signaling
- Concurrency Pattern 2: Rate Limit with Semaphore - Concurrent resource limiting
- Run Background Tasks with Fork - Background fiber execution
- Process Collection in Parallel with Foreach - Parallel iteration