Skip to content

Commit ff88c78

Browse files
authored
[bugfix] replace phaser with future (#103)
* replace phaser with future * update design doc for completionFuture * update docs
1 parent 0e4f292 commit ff88c78

16 files changed

Lines changed: 343 additions & 476 deletions

docs/adr/002-phaser-based-coordination.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# ADR-002: Phaser-Based Operation Coordination
22

3-
**Status:** Accepted
3+
**Status:** Superseded by ADR-003 CompletableFuture-Based Operation Coordination
44
**Date:** 2025-12-29
55

66

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# ADR-003: CompletableFuture-Based Operation Coordination
2+
3+
**Status:** Todo
4+
**Date:** 2026-02-18

docs/design.md

Lines changed: 36 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ The SDK uses two separate thread pools with distinct responsibilities:
9999
- Default: cached daemon thread pool
100100

101101
**Internal Executor (`InternalExecutor.INSTANCE`):**
102-
- Runs SDK coordination tasks: checkpoint batching, polling for wait completion, phaser management
102+
- Runs SDK coordination tasks: checkpoint batching, polling for wait completion
103103
- Dedicated cached thread pool with daemon threads named `durable-sdk-internal-*`
104104
- Not configurable by users
105105

@@ -171,14 +171,14 @@ context.step("name", Type.class, supplier,
171171
172172
┌───────────────┴───────────────┐
173173
▼ ▼
174-
┌──────────────────────────────┐ ┌──────────────────────────────┐
175-
│ DurableContext │ │ ExecutionManager │
176-
│ - User-facing API │ │ - State (ops, token) │
177-
│ - step(), stepAsync() │ │ - Thread coordination │
178-
│ - wait() │ │ - Phaser management
179-
│ - Operation ID counter │ │ - Checkpoint batching
180-
└──────────────────────────────┘ │ - Polling │
181-
│ └──────────────────────────────┘
174+
┌──────────────────────────────┐ ┌─────────────────────────────────
175+
│ DurableContext │ │ ExecutionManager
176+
│ - User-facing API │ │ - State (ops, token)
177+
│ - step(), stepAsync(), etc │ │ - Thread coordination
178+
│ - wait() │ │ - Checkpoint batching
179+
│ - Operation ID counter │ │ - Checkpoint response handling
180+
└──────────────────────────────┘ │ - Polling
181+
│ └─────────────────────────────────
182182
│ │
183183
▼ ▼
184184
┌──────────────────────────────┐ ┌──────────────────────────────┐
@@ -213,13 +213,14 @@ com.amazonaws.lambda.durable
213213
│ ├── CheckpointBatcher # Batching (package-private)
214214
│ ├── CheckpointCallback # Callback interface
215215
│ ├── SuspendExecutionException
216-
│ ├── ThreadType # CONTEXT, STEP
217-
│ └── ExecutionPhase # RUNNING(0), COMPLETING(1), DONE(2)
216+
│ └── ThreadType # CONTEXT, STEP
218217
219218
├── operation/
220-
│ ├── DurableOperation<T> # Interface
221-
│ ├── StepOperation<T> # Step logic
222-
│ └── WaitOperation # Wait logic
219+
│ ├── BaseDurableOperation<T> # Common operation logic
220+
│ ├── StepOperation<T> # Step logic
221+
│ ├── InvokeOperation<T> # Invoke logic
222+
│ ├── CallbackOperation<T> # Callback logic
223+
│ └── WaitOperation # Wait logic
223224
224225
├── logging/
225226
│ ├── DurableLogger # Context-aware logger wrapper (MDC-based)
@@ -328,7 +329,7 @@ sequenceDiagram
328329
WO->>EM: deregisterActiveThread("Root")
329330
330331
Note over EM: No active threads!
331-
EM->>EM: suspendExecutionFuture.complete()
332+
EM->>EM: executionExceptionFuture.completeExceptionally(SuspendExecutionException)
332333
EM-->>WO: throw SuspendExecutionException
333334
334335
Note over UC: Execution suspended, returns PENDING
@@ -562,36 +563,29 @@ This approach ensures suspension happens precisely when no thread can make progr
562563
#### Advanced Feature: In-Process Completion
563564
In scenarios where waits or step retries would normally suspend execution, but other active threads prevent suspension, the SDK automatically switches to in-process completion by polling the backend until timing conditions are met. This allows complex concurrent workflows to complete efficiently without unnecessary Lambda re-invocations or extended waiting periods.
564565

565-
### From Thread Tracking to Phaser Coordination
566-
567-
Thread counting handles simple cases, but complex scenarios require sophisticated coordination:
568-
569-
**Simple case - Wait operations:**
570-
```java
571-
context.wait(Duration.ofMinutes(5)); // Root deregisters → immediate suspension
572-
```
573-
574-
**Complex case - Blocking on retrying operations:**
575-
```java
576-
var future1 = context.stepAsync("step1", () -> failsAndRetries());
577-
var result = context.step("step2", () -> future1.get() + "-processed");
578-
```
566+
### Active Thread Tracking and Operation Completion Coordination
579567

580-
**Without phasers:** Simple thread counting fails because step2's thread would stay registered while blocked on `future1.get()`, preventing `activeThreads.isEmpty()` from triggering suspension → Lambda stays active during step1's retry delay instead of suspending.
568+
Each piece of user code - main function body, step body or child context body - runs in its own thread. Execution manager tracks active running threads. When a new step or child context is created, a new thread will be created and registered in execution manager. When the user code is blocked on `get()` or synchronous durable operations, the thread will be deregistered from execution manager. When there is no active running thread, the function execution will be suspended.
581569

582-
**What should happen instead:** step2's root thread must deregister when blocked, allow suspension during step1's retry, then coordinate re-registration when step1 completes with checkpointed results.
570+
These user threads and the system thread use CompletableFuture to communicate the completion of operations. When a context executes a step, the communication happens as shown below
583571

584-
**The problem:** When step1 retries, step2's root thread must:
585-
1. Deregister (to allow suspension during retry delay)
586-
2. Block until step1 either completes successfully or wants to suspend for another retry
587-
3. Re-register when step1 finishes or when resuming from suspension
588-
4. Ensure step1's result is checkpointed before proceeding
572+
| Sequence | Context thread | Step Thread | System Thread |
573+
|-----------|--------------------------------------------------------------------------------------------|---------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
574+
| 1 | create StepOperation, create CompletableFuture | (not created) | (idle) |
575+
| 2 | checkpoint START event (synchronously or asynchronously) | (not created) | call checkpoint API |
576+
| 3 | create and register the Step thread | execute user code for the step | (idle) |
577+
| 4 | call `get()`, deregister the context thread and wait for the CompletableFuture to complete | (continue) | (idle) |
578+
| 5 | (blocked) | checkpoint the step result and wait for checkpoint call to complete | call checkpoint API, and handle the API response. If it is a terminal response, it will complete the Step operation CompletableFuture, register and unblock the context thread. |
579+
| 6 | retrieve the result of the step | deregister and terminate the Step thread | (idle) |
589580

590-
**Additional complex scenarios:**
591-
- **Nested blocking:** Multiple threads blocking on each other's results
592-
- **Future operations:** `runInChildContext` with multiple child threads coordinating
593-
- **Race conditions:** Ensuring checkpoint completion before thread lifecycle changes
581+
If the user code completes quickly, an alternative scenario could happen as follows
594582

595-
These scenarios are why we chose **phasers** - a multi-party synchronization primitive that coordinates checkpoint-driven completion.
583+
| Sequence | Context thread | Step Thread | System Thread |
584+
|----------|-------------------------------------------------------------------------------|---------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------|
585+
| 1 | create StepOperation, create CompletableFuture | (not created) | (idle) |
586+
| 2 | checkpoint START event (synchronously or asynchronously) | (not created) | call checkpoint API |
587+
| 3 | create and register the Step thread | execute user code for the step and complete quickly | (idle) |
588+
| 5 | (do something else or just get starved) | checkpoint the step result and wait for checkpoint call to complete | call checkpoint API, and handle the API response. If it is a terminal response, it will complete the Step operation CompletableFuture. |
589+
| 4 | call `get()`. It's not blocked because CompletableFuture is already completed | deregister and terminate the Step thread | (idle) |
590+
| 6 | retrieve the result of the step | (ended) | (idle) |
596591

597-
See [ADR-002: Phaser-Based Operation Coordination](adr/002-phaser-based-coordination.md) for detailed implementation and usage patterns.

sdk/src/main/java/com/amazonaws/lambda/durable/DurableFuture.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ public interface DurableFuture<T> {
99
/**
1010
* Blocks until the operation completes and returns the result.
1111
*
12-
* <p>This delegates to operation.get() which handles: - Phaser blocking (arriveAndAwaitAdvance) - Thread
13-
* deregistration (allows suspension) - Thread reactivation (resumes execution) - Result retrieval
12+
* <p>This delegates to operation.get() which handles: - Thread deregistration (allows suspension) - Thread
13+
* reactivation (resumes execution) - Result retrieval
1414
*
1515
* @return the operation result
1616
*/

sdk/src/main/java/com/amazonaws/lambda/durable/execution/CheckpointBatcher.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,6 @@ private void checkpointBatch(List<OperationUpdate> updates) {
139139
logger.debug("Durable checkpoint API called (latency={}ns): {}.", System.nanoTime() - startTime, response);
140140

141141
// Notify callback of completion
142-
// TODO: sam local backend returns no new execution state when called with zero
143-
// updates. WHY?
144-
// This means the polling will never receive an operation update and complete
145-
// the Phaser.
146142
checkpointToken = response.checkpointToken();
147143
if (response.newExecutionState() != null) {
148144
// fetch all pages of operations

sdk/src/main/java/com/amazonaws/lambda/durable/execution/ExecutionManager.java

Lines changed: 24 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44

55
import com.amazonaws.lambda.durable.DurableConfig;
66
import com.amazonaws.lambda.durable.exception.UnrecoverableDurableExecutionException;
7+
import com.amazonaws.lambda.durable.operation.BaseDurableOperation;
78
import java.time.Duration;
89
import java.util.Collections;
910
import java.util.HashMap;
1011
import java.util.List;
1112
import java.util.Map;
1213
import java.util.concurrent.CompletableFuture;
13-
import java.util.concurrent.Phaser;
1414
import java.util.concurrent.atomic.AtomicReference;
1515
import java.util.stream.Collectors;
1616
import org.slf4j.Logger;
@@ -28,8 +28,8 @@
2828
* <ul>
2929
* <li>Execution state (operations, checkpoint token)
3030
* <li>Thread lifecycle (registration/deregistration)
31-
* <li>Phaser management (coordination)
32-
* <li>Checkpoint batching (via CheckpointManager)
31+
* <li>Checkpoint batching (via CheckpointBatcher)
32+
* <li>Checkpoint result handling (CheckpointBatcher callback)
3333
* <li>Polling (for waits and retries)
3434
* </ul>
3535
*
@@ -43,15 +43,16 @@ public class ExecutionManager {
4343
private static final Logger logger = LoggerFactory.getLogger(ExecutionManager.class);
4444

4545
// ===== Execution State =====
46-
private final Map<String, Operation> operations;
46+
private final Map<String, Operation> operationStorage;
4747
private final String executionOperationId;
4848
private final String durableExecutionArn;
4949
private final AtomicReference<ExecutionMode> executionMode;
5050

5151
// ===== Thread Coordination =====
52+
private final Map<String, BaseDurableOperation<?>> registeredOperations =
53+
Collections.synchronizedMap(new HashMap<>());
5254
private final Map<String, ThreadType> activeThreads = Collections.synchronizedMap(new HashMap<>());
5355
private static final ThreadLocal<OperationContext> currentContext = new ThreadLocal<>();
54-
private final Map<String, Phaser> openPhasers = Collections.synchronizedMap(new HashMap<>());
5556
private final CompletableFuture<Void> executionExceptionFuture = new CompletableFuture<>();
5657

5758
// ===== Checkpoint Batching =====
@@ -69,12 +70,12 @@ public ExecutionManager(
6970
this.checkpointBatcher =
7071
new CheckpointBatcher(config, durableExecutionArn, checkpointToken, this::onCheckpointComplete);
7172

72-
this.operations = checkpointBatcher.fetchAllPages(initialExecutionState).stream()
73+
this.operationStorage = checkpointBatcher.fetchAllPages(initialExecutionState).stream()
7374
.collect(Collectors.toConcurrentMap(Operation::id, op -> op));
7475

7576
// Start in REPLAY mode if we have more than just the initial EXECUTION operation
7677
this.executionMode =
77-
new AtomicReference<>(operations.size() > 1 ? ExecutionMode.REPLAY : ExecutionMode.EXECUTION);
78+
new AtomicReference<>(operationStorage.size() > 1 ? ExecutionMode.REPLAY : ExecutionMode.EXECUTION);
7879
}
7980

8081
// ===== State Management =====
@@ -87,24 +88,22 @@ public boolean isReplaying() {
8788
return executionMode.get() == ExecutionMode.REPLAY;
8889
}
8990

91+
public void registerOperation(BaseDurableOperation<?> operation) {
92+
registeredOperations.put(operation.getOperationId(), operation);
93+
}
94+
9095
// ===== Checkpoint Completion Handler =====
91-
/** Called by CheckpointManager when a checkpoint completes. Updates state and advances phasers. */
96+
/** Called by CheckpointManager when a checkpoint completes. Updates operationStorage and notify operations . */
9297
private void onCheckpointComplete(List<Operation> newOperations) {
93-
// Update operation storage
94-
newOperations.forEach(op -> operations.put(op.id(), op));
95-
96-
// Advance phasers for completed operations
97-
for (Operation operation : newOperations) {
98-
if (openPhasers.containsKey(operation.id()) && isTerminalStatus(operation.status())) {
99-
var phaser = openPhasers.get(operation.id());
100-
101-
// Two-phase completion
102-
logger.trace("Advancing phaser 0 -> 1: {}", phaser);
103-
phaser.arriveAndAwaitAdvance();
104-
logger.trace("Advancing phaser 1 -> 2: {}", phaser);
105-
phaser.arriveAndAwaitAdvance();
106-
}
107-
}
98+
newOperations.forEach(op -> {
99+
// Update operation storage
100+
operationStorage.put(op.id(), op);
101+
// call registered operation's onCheckpointComplete method for completed operations
102+
registeredOperations.computeIfPresent(op.id(), (id, operation) -> {
103+
operation.onCheckpointComplete(op);
104+
return operation;
105+
});
106+
});
108107
}
109108

110109
/**
@@ -115,7 +114,7 @@ private void onCheckpointComplete(List<Operation> newOperations) {
115114
* @return the existing operation, or null if not found (first execution)
116115
*/
117116
public Operation getOperationAndUpdateReplayState(String operationId) {
118-
var existing = operations.get(operationId);
117+
var existing = operationStorage.get(operationId);
119118
if (executionMode.get() == ExecutionMode.REPLAY) {
120119
if (existing == null || !isTerminalStatus(existing.status())) {
121120
if (executionMode.compareAndSet(ExecutionMode.REPLAY, ExecutionMode.EXECUTION)) {
@@ -127,7 +126,7 @@ public Operation getOperationAndUpdateReplayState(String operationId) {
127126
}
128127

129128
public Operation getExecutionOperation() {
130-
return operations.get(executionOperationId);
129+
return operationStorage.get(executionOperationId);
131130
}
132131

133132
// ===== Thread Coordination =====
@@ -185,15 +184,6 @@ public void deregisterActiveThreadAndUnsetCurrentContext(String threadId) {
185184
}
186185
}
187186

188-
// ===== Phaser Management =====
189-
190-
public Phaser startPhaser(String operationId) {
191-
var phaser = new Phaser(1);
192-
openPhasers.put(operationId, phaser);
193-
logger.trace("Started phaser for operation '{}'", operationId);
194-
return phaser;
195-
}
196-
197187
// ===== Checkpointing =====
198188

199189
// This method will checkpoint the operation updates to the durable backend and return a future which completes

0 commit comments

Comments
 (0)