|
| 1 | +# ADR-002: Phaser-Based Operation Coordination |
| 2 | + |
| 3 | +**Status:** Accepted |
| 4 | +**Date:** 2025-12-29 |
| 5 | + |
| 6 | +## Context |
| 7 | + |
| 8 | +Durable operations need to coordinate: |
| 9 | +1. Background thread executing the operation |
| 10 | +2. Main thread calling `get()` waiting for result |
| 11 | +3. Checkpoint system persisting results before signaling completion |
| 12 | + |
| 13 | +The result must be checkpointed BEFORE `get()` returns, otherwise a Lambda crash loses the result. |
| 14 | + |
| 15 | +## Decision |
| 16 | + |
| 17 | +Use Java `Phaser` for operation coordination with checkpoint-driven advancement. |
| 18 | + |
| 19 | +```java |
| 20 | +// Operation executes but doesn't signal completion |
| 21 | +T result = function.get(); |
| 22 | +executionManager.sendOperationUpdate(successUpdate); // Async checkpoint |
| 23 | +// Phaser stays in Phase 0 |
| 24 | + |
| 25 | +// ExecutionManager advances phaser AFTER checkpoint succeeds |
| 26 | +private void onCheckpointComplete(String newToken, List<Operation> ops) { |
| 27 | + if (isTerminalStatus(op.status())) { |
| 28 | + phaser.arriveAndAwaitAdvance(); // Phase 0 → 1 |
| 29 | + phaser.arriveAndAwaitAdvance(); // Phase 1 → 2 |
| 30 | + } |
| 31 | +} |
| 32 | +``` |
| 33 | + |
| 34 | +### Two-Phase Completion Protocol |
| 35 | + |
| 36 | +| Phase | State | Purpose | |
| 37 | +|-------|-------|---------| |
| 38 | +| 0 | RUNNING | Operation executing | |
| 39 | +| 1 | COMPLETING | Waiters unblock, reactivate | |
| 40 | +| 2 | DONE | Step thread deregisters | |
| 41 | + |
| 42 | +**Why two phases?** Prevents race condition: |
| 43 | +- Phase 0→1: Main thread unblocks and re-registers as active |
| 44 | +- Phase 1→2: Step thread deregisters |
| 45 | + |
| 46 | +Without this, step thread could deregister before main thread re-registers, causing premature suspension. |
| 47 | + |
| 48 | +## Alternatives Considered |
| 49 | + |
| 50 | +### CompletableFuture |
| 51 | + |
| 52 | +```java |
| 53 | +private final CompletableFuture<T> resultFuture = new CompletableFuture<>(); |
| 54 | + |
| 55 | +public void execute() { |
| 56 | + executor.execute(() -> { |
| 57 | + T result = function.get(); |
| 58 | + resultFuture.complete(result); // When to complete? |
| 59 | + }); |
| 60 | +} |
| 61 | +``` |
| 62 | + |
| 63 | +**Rejected because:** |
| 64 | + |
| 65 | +1. **Checkpoint timing:** Complete before checkpoint = race condition. Complete after = CheckpointBatcher needs to know about operation futures (tight coupling). |
| 66 | + |
| 67 | +2. **Retry handling:** CompletableFuture is single-completion. Retries need multiple attempts: |
| 68 | + ``` |
| 69 | + Attempt 1 → FAIL → PENDING (retry in 30s) |
| 70 | + Attempt 2 → FAIL → PENDING (retry in 60s) |
| 71 | + Attempt 3 → SUCCESS |
| 72 | + ``` |
| 73 | + Phaser stays in Phase 0 across all attempts naturally. |
| 74 | + |
| 75 | +3. **Thread management:** Phaser integrates cleanly with thread deregistration during `get()`: |
| 76 | + ```java |
| 77 | + public T get() { |
| 78 | + phaser.register(); |
| 79 | + executionManager.deregisterActiveThread("Root"); // Allow suspension |
| 80 | + phaser.arriveAndAwaitAdvance(); |
| 81 | + executionManager.registerActiveThread("Root"); // Reactivate |
| 82 | + phaser.arriveAndDeregister(); |
| 83 | + } |
| 84 | + ``` |
| 85 | + |
| 86 | +## Consequences |
| 87 | + |
| 88 | +**Positive:** |
| 89 | +- Checkpoint-driven completion ensures durability |
| 90 | +- Clean separation: ExecutionManager controls completion, not operations |
| 91 | +- Handles retries without special cases |
| 92 | +- Unified replay handling (same `get()` logic for new and replayed operations) |
| 93 | + |
| 94 | +**Negative:** |
| 95 | +- Phaser is a complex primitive with steeper learning curve |
| 96 | +- Two-phase protocol adds cognitive overhead |
| 97 | + |
| 98 | +## Relationship to Suspension Model |
| 99 | + |
| 100 | +The two-phase protocol directly prevents a race condition with the suspension mechanism. |
| 101 | + |
| 102 | +### Why Thread-Counting for Suspension? |
| 103 | + |
| 104 | +Suspension is triggered when `activeThreads.isEmpty()`. An alternative would be explicit suspension flags (e.g., `executionManager.requestSuspension()`), but thread-counting is necessary for correctness: |
| 105 | + |
| 106 | +**Scenario: Blocking on a retrying step** |
| 107 | +```java |
| 108 | +var future1 = context.stepAsync("step1", () -> failsAndRetries()); |
| 109 | +var result = future1.get(); // Root blocks here |
| 110 | +``` |
| 111 | + |
| 112 | +With explicit flags: |
| 113 | +- step1 sets `suspensionRequested = true`, deregisters |
| 114 | +- Root is blocked but still registered → `activeThreads = {Root}` |
| 115 | +- Not empty → no suspension → **deadlock** |
| 116 | + |
| 117 | +Root must deregister when blocking to allow suspension. This brings us back to thread-counting. |
| 118 | + |
| 119 | +**Nested blocking also works:** |
| 120 | +```java |
| 121 | +var future1 = context.stepAsync("step1", () -> failsAndRetries()); |
| 122 | +context.step("step2", () -> { |
| 123 | + return future1.get() + "-processed"; // step2-thread blocks on step1 |
| 124 | +}); |
| 125 | +``` |
| 126 | + |
| 127 | +The phaser approach supports this - step2-thread deregisters while waiting, re-registers after. (Note: current implementation hardcodes "Root" - see TODO in `StepOperation.get()`). |
| 128 | + |
| 129 | +### The Race Condition |
| 130 | + |
| 131 | +**Without two phases:** |
| 132 | +``` |
| 133 | +1. Root calls get(), deregisters itself, blocks on phaser |
| 134 | +2. Step completes, checkpoint succeeds |
| 135 | +3. Step thread deregisters ← activeThreads is now EMPTY → SUSPENSION TRIGGERED |
| 136 | +4. Root unblocks, tries to re-register ← TOO LATE |
| 137 | +``` |
| 138 | + |
| 139 | +**With two phases:** |
| 140 | +``` |
| 141 | +1. Root calls get(), deregisters itself, blocks on phaser (Phase 0) |
| 142 | +2. Step completes, checkpoint succeeds |
| 143 | +3. Phaser advances to Phase 1 ← Root unblocks HERE |
| 144 | +4. Root re-registers as active ← Back to 1 active thread |
| 145 | +5. Phaser advances to Phase 2 |
| 146 | +6. Step thread deregisters ← Safe, Root is still active |
| 147 | +``` |
| 148 | + |
| 149 | +The `get()` code shows this dance: |
| 150 | +```java |
| 151 | +executionManager.deregisterActiveThread("Root"); // Allow suspension if step retrying |
| 152 | +phaser.arriveAndAwaitAdvance(); // Wait for step |
| 153 | +executionManager.registerActiveThread("Root"); // Reactivate BEFORE step deregisters |
| 154 | +``` |
| 155 | + |
| 156 | +This allows suspension when appropriate (step is retrying, no active threads) while preventing false suspensions when the step completes normally. |
0 commit comments