|
3 | 3 | **Status:** Accepted |
4 | 4 | **Date:** 2025-12-29 |
5 | 5 |
|
| 6 | + |
6 | 7 | ## Context |
7 | 8 |
|
8 | | -Durable operations need to coordinate: |
9 | | -1. Background thread executing the operation |
10 | | -2. Main thread calling `get()` waiting for result |
11 | | -3. Checkpoint system persisting results before signaling completion |
| 9 | +The SDK uses a unified suspension mechanism: when `activeThreads.isEmpty()`, execution suspends. This requires careful coordination between: |
| 10 | +1. Background threads executing operations |
| 11 | +2. Main thread calling `get()` and potentially blocking |
| 12 | +3. Checkpoint system ensuring results are durable before completion |
| 13 | + |
| 14 | +## Problem: Complex Blocking Scenarios |
| 15 | + |
| 16 | +### Critical Case: Blocking on Retrying Operations |
| 17 | +```java |
| 18 | +var future1 = context.stepAsync("step1", () -> failsAndRetries()); |
| 19 | +var result = context.step("step2", () -> future1.get() + "-processed"); |
| 20 | +``` |
| 21 | + |
| 22 | +**What happens:** |
| 23 | +1. step1-thread fails, deregisters (entering retry) |
| 24 | +2. step2-thread calls `future1.get()`, must deregister (blocked waiting) |
| 25 | +3. `activeThreads = {}` → suspension triggered correctly |
| 26 | +4. Lambda re-invokes after retry delay |
| 27 | +5. step1 succeeds → step2 must unblock safely |
12 | 28 |
|
13 | | -The result must be checkpointed BEFORE `get()` returns, otherwise a Lambda crash loses the result. |
| 29 | +**Challenge:** How does step2-thread know when step1 completes? Simple signaling fails because completion must be **checkpoint-driven** (results durable before unblocking). |
| 30 | + |
| 31 | +**Key insight:** Steps always reach phaser coordination because they have background threads keeping execution alive. Waits usually suspend immediately via `deregisterActiveThread()` before reaching phaser logic, only using phasers when other threads are active. |
| 32 | + |
| 33 | +### Future Complex Operations |
| 34 | +Operations like `runInChildContext` will create multiple child threads that may block on each other, requiring sophisticated multi-party coordination. |
14 | 35 |
|
15 | 36 | ## Decision |
16 | 37 |
|
17 | | -Use Java `Phaser` for operation coordination with checkpoint-driven advancement. |
| 38 | +Use Java `Phaser` for checkpoint-driven operation coordination. |
| 39 | + |
| 40 | +### Implementation |
18 | 41 |
|
19 | 42 | ```java |
20 | | -// Operation executes but doesn't signal completion |
| 43 | +// Operation execution doesn't signal completion directly |
21 | 44 | T result = function.get(); |
22 | 45 | executionManager.sendOperationUpdate(successUpdate); // Async checkpoint |
23 | | -// Phaser stays in Phase 0 |
| 46 | +// Phaser stays in Phase 0 (RUNNING) |
24 | 47 |
|
25 | 48 | // ExecutionManager advances phaser AFTER checkpoint succeeds |
26 | 49 | private void onCheckpointComplete(String newToken, List<Operation> ops) { |
27 | 50 | if (isTerminalStatus(op.status())) { |
28 | | - phaser.arriveAndAwaitAdvance(); // Phase 0 → 1 |
29 | | - phaser.arriveAndAwaitAdvance(); // Phase 1 → 2 |
| 51 | + phaser.arriveAndAwaitAdvance(); // Phase 0→1: Unblock waiters |
| 52 | + phaser.arriveAndAwaitAdvance(); // Phase 1→2: Allow step deregistration |
30 | 53 | } |
31 | 54 | } |
32 | 55 | ``` |
33 | 56 |
|
| 57 | +**Usage patterns:** |
| 58 | +- **Steps:** Always use phasers - `get()` blocks until ExecutionManager advances phaser after checkpoint |
| 59 | +- **Waits:** Usually suspend before reaching phasers - only use them when other threads keep execution alive |
| 60 | + |
34 | 61 | ### Two-Phase Completion Protocol |
35 | 62 |
|
36 | | -| Phase | State | Purpose | |
37 | | -|-------|-------|---------| |
38 | | -| 0 | RUNNING | Operation executing | |
39 | | -| 1 | COMPLETING | Waiters unblock, reactivate | |
40 | | -| 2 | DONE | Step thread deregisters | |
| 63 | +**Phase 0 (RUNNING):** Operation executing, waiters blocked |
| 64 | +**Phase 1 (COMPLETING):** Waiters unblock and re-register as active |
| 65 | +**Phase 2 (DONE):** Step threads deregister safely |
41 | 66 |
|
42 | 67 | **Why two phases?** Prevents race condition: |
43 | | -- Phase 0→1: Main thread unblocks and re-registers as active |
44 | | -- Phase 1→2: Step thread deregisters |
45 | | - |
46 | | -Without this, step thread could deregister before main thread re-registers, causing premature suspension. |
47 | | - |
48 | | -## Alternatives Considered |
49 | | - |
50 | | -### CompletableFuture |
51 | | - |
52 | 68 | ```java |
53 | | -private final CompletableFuture<T> resultFuture = new CompletableFuture<>(); |
54 | | - |
55 | | -public void execute() { |
56 | | - executor.execute(() -> { |
57 | | - T result = function.get(); |
58 | | - resultFuture.complete(result); // When to complete? |
59 | | - }); |
60 | | -} |
| 69 | +// Without two phases - RACE: |
| 70 | +1. step2-thread deregisters, blocks on phaser |
| 71 | +2. step1 completes, checkpoints |
| 72 | +3. step1-thread deregisters → suspension triggered |
| 73 | +4. step2-thread tries to re-register → TOO LATE |
| 74 | + |
| 75 | +// With two phases - SAFE: |
| 76 | +1. step2-thread deregisters, blocks on Phase 0 |
| 77 | +2. step1 completes, checkpoints |
| 78 | +3. ExecutionManager advances Phase 0→1 → step2-thread unblocks and re-registers |
| 79 | +4. ExecutionManager advances Phase 1→2 → step1-thread deregisters safely |
61 | 80 | ``` |
62 | 81 |
|
63 | | -**Rejected because:** |
64 | | - |
65 | | -1. **Checkpoint timing:** Complete before checkpoint = race condition. Complete after = CheckpointBatcher needs to know about operation futures (tight coupling). |
66 | | - |
67 | | -2. **Retry handling:** CompletableFuture is single-completion. Retries need multiple attempts: |
68 | | - ``` |
69 | | - Attempt 1 → FAIL → PENDING (retry in 30s) |
70 | | - Attempt 2 → FAIL → PENDING (retry in 60s) |
71 | | - Attempt 3 → SUCCESS |
72 | | - ``` |
73 | | - Phaser stays in Phase 0 across all attempts naturally. |
74 | | - |
75 | | -3. **Thread management:** Phaser integrates cleanly with thread deregistration during `get()`: |
76 | | - ```java |
77 | | - public T get() { |
78 | | - phaser.register(); |
79 | | - executionManager.deregisterActiveThread("Root"); // Allow suspension |
80 | | - phaser.arriveAndAwaitAdvance(); |
81 | | - executionManager.registerActiveThread("Root"); // Reactivate |
82 | | - phaser.arriveAndDeregister(); |
83 | | - } |
84 | | - ``` |
85 | | - |
86 | | -### Two-Phase Completion Sequence |
87 | | - |
| 82 | +#### Sequence Diagram |
88 | 83 | ```mermaid |
89 | 84 | sequenceDiagram |
90 | | - participant Main as Main Thread |
91 | | - participant Step as Step Thread |
| 85 | + participant Step2 as Step2 Thread |
| 86 | + participant Step1 as Step1 Thread |
92 | 87 | participant Ph as Phaser |
93 | 88 | participant EM as ExecutionManager |
94 | 89 |
|
95 | 90 | Note over Ph: Phase 0 (RUNNING) |
96 | 91 | |
97 | | - Main->>Ph: register() |
98 | | - Main->>EM: deregisterActiveThread("Root") |
99 | | - Main->>Ph: arriveAndAwaitAdvance() |
100 | | - Note over Main: BLOCKED |
| 92 | + Step2->>Ph: register() |
| 93 | + Step2->>EM: deregisterActiveThread("step2") |
| 94 | + Step2->>Ph: arriveAndAwaitAdvance() |
| 95 | + Note over Step2: BLOCKED |
101 | 96 | |
102 | | - Step->>Step: execute user function |
103 | | - Step->>EM: checkpoint SUCCEED |
| 97 | + Step1->>Step1: execute user function |
| 98 | + Step1->>EM: checkpoint SUCCESS |
104 | 99 | EM->>Ph: arriveAndAwaitAdvance() |
105 | 100 | Note over Ph: Phase 1 (COMPLETING) |
106 | 101 | |
107 | | - Note over Main: UNBLOCKED |
108 | | - Main->>EM: registerActiveThread("Root") |
109 | | - Main->>Ph: arriveAndDeregister() |
| 102 | + Note over Step2: UNBLOCKED |
| 103 | + Step2->>EM: registerActiveThread("step2") |
| 104 | + Step2->>Ph: arriveAndDeregister() |
110 | 105 | |
111 | | - Step->>Ph: arriveAndAwaitAdvance() |
| 106 | + EM->>Ph: arriveAndAwaitAdvance() |
112 | 107 | Note over Ph: Phase 2 (DONE) |
113 | | - Step->>EM: deregisterActiveThread("1-step") |
| 108 | + Step1->>EM: deregisterActiveThread("step1") |
114 | 109 | ``` |
115 | 110 |
|
116 | | -## Consequences |
117 | | - |
118 | | -**Positive:** |
119 | | -- Checkpoint-driven completion ensures durability |
120 | | -- Clean separation: ExecutionManager controls completion, not operations |
121 | | -- Handles retries without special cases |
122 | | -- Unified replay handling (same `get()` logic for new and replayed operations) |
123 | | - |
124 | | -**Negative:** |
125 | | -- Phaser is a complex primitive with steeper learning curve |
126 | | -- Two-phase protocol adds cognitive overhead |
127 | | - |
128 | | -## Relationship to Suspension Model |
129 | | - |
130 | | -The two-phase protocol directly prevents a race condition with the suspension mechanism. |
131 | | - |
132 | | -### Why Thread-Counting for Suspension? |
133 | | - |
134 | | -Suspension is triggered when `activeThreads.isEmpty()`. An alternative would be explicit suspension flags (e.g., `executionManager.requestSuspension()`), but thread-counting is necessary for correctness: |
135 | | - |
136 | | -**Scenario: Blocking on a retrying step** |
137 | | -```java |
138 | | -var future1 = context.stepAsync("step1", () -> failsAndRetries()); |
139 | | -var result = future1.get(); // Root blocks here |
140 | | -``` |
141 | | - |
142 | | -With explicit flags: |
143 | | -- step1 sets `suspensionRequested = true`, deregisters |
144 | | -- Root is blocked but still registered → `activeThreads = {Root}` |
145 | | -- Not empty → no suspension → **deadlock** |
146 | | - |
147 | | -Root must deregister when blocking to allow suspension. This brings us back to thread-counting. |
148 | | - |
149 | | -**Nested blocking also works:** |
150 | | -```java |
151 | | -var future1 = context.stepAsync("step1", () -> failsAndRetries()); |
152 | | -context.step("step2", () -> { |
153 | | - return future1.get() + "-processed"; // step2-thread blocks on step1 |
154 | | -}); |
155 | | -``` |
| 111 | +## Alternatives Considered |
156 | 112 |
|
157 | | -The phaser approach supports this - step2-thread deregisters while waiting, re-registers after. (Note: current implementation hardcodes "Root" - see TODO in `StepOperation.get()`). |
| 113 | +### Simple Thread Signaling |
| 114 | +**Rejected:** Fails on blocking scenarios. If thread A blocks on thread B's result while B is retrying, A remains registered but inactive, preventing suspension. |
158 | 115 |
|
159 | | -### The Race Condition |
| 116 | +### CompletableFuture |
| 117 | +**Rejected:** |
| 118 | +- Checkpoint timing issues (complete before/after checkpoint) |
| 119 | +- Single-completion model doesn't handle retry attempts |
| 120 | +- No integration with thread lifecycle management |
160 | 121 |
|
161 | | -**Without two phases:** |
162 | | -``` |
163 | | -1. Root calls get(), deregisters itself, blocks on phaser |
164 | | -2. Step completes, checkpoint succeeds |
165 | | -3. Step thread deregisters ← activeThreads is now EMPTY → SUSPENSION TRIGGERED |
166 | | -4. Root unblocks, tries to re-register ← TOO LATE |
167 | | -``` |
| 122 | +## Consequences |
168 | 123 |
|
169 | | -**With two phases:** |
170 | | -``` |
171 | | -1. Root calls get(), deregisters itself, blocks on phaser (Phase 0) |
172 | | -2. Step completes, checkpoint succeeds |
173 | | -3. Phaser advances to Phase 1 ← Root unblocks HERE |
174 | | -4. Root re-registers as active ← Back to 1 active thread |
175 | | -5. Phaser advances to Phase 2 |
176 | | -6. Step thread deregisters ← Safe, Root is still active |
177 | | -``` |
| 124 | +**Enables:** |
| 125 | +- Correct suspension on blocking scenarios |
| 126 | +- Checkpoint-driven completion ensuring durability |
| 127 | +- Support for future complex operations requiring multi-party coordination |
| 128 | +- Unified `get()` logic for both new and replayed operations |
178 | 129 |
|
179 | | -The `get()` code shows this dance: |
180 | | -```java |
181 | | -executionManager.deregisterActiveThread("Root"); // Allow suspension if step retrying |
182 | | -phaser.arriveAndAwaitAdvance(); // Wait for step |
183 | | -executionManager.registerActiveThread("Root"); // Reactivate BEFORE step deregisters |
184 | | -``` |
| 130 | +**Cost:** |
| 131 | +- Phaser complexity vs simpler alternatives |
| 132 | +- Two-phase protocol cognitive overhead |
185 | 133 |
|
186 | | -This allows suspension when appropriate (step is retrying, no active threads) while preventing false suspensions when the step completes normally. |
| 134 | +The phaser approach is architected to support the full spectrum of durable operations, ensuring the SDK can handle complex coordination patterns without architectural changes. |
0 commit comments