Skip to content

Commit 925a5ab

Browse files
committed
update design doc for completionFuture
1 parent 4a8f349 commit 925a5ab

2 files changed

Lines changed: 43 additions & 41 deletions

File tree

docs/design.md

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ sequenceDiagram
328328
WO->>EM: deregisterActiveThread("Root")
329329
330330
Note over EM: No active threads!
331-
EM->>EM: suspendExecutionFuture.complete()
331+
EM->>EM: executionExceptionFuture.completeExceptionally(SuspendExecutionException)
332332
EM-->>WO: throw SuspendExecutionException
333333
334334
Note over UC: Execution suspended, returns PENDING
@@ -562,36 +562,29 @@ This approach ensures suspension happens precisely when no thread can make progr
562562
#### Advanced Feature: In-Process Completion
563563
In scenarios where waits or step retries would normally suspend execution, but other active threads prevent suspension, the SDK automatically switches to in-process completion by polling the backend until timing conditions are met. This allows complex concurrent workflows to complete efficiently without unnecessary Lambda re-invocations or extended waiting periods.
564564

565-
### From Thread Tracking to Phaser Coordination
565+
### Active Thread Tracking and Operation Completion Coordination
566566

567-
Thread counting handles simple cases, but complex scenarios require sophisticated coordination:
567+
Each piece of user code - main function body, step body or child context body - runs in its own thread. Execution manager tracks active running threads. When a new step or child context is created, a new thread will be created and registered in execution manager. When the user code is blocked on `get()` or synchronous durable operations, the thread will be deregistered from execution manager. When there is no active running thread, the function execution will be suspended.
568568

569-
**Simple case - Wait operations:**
570-
```java
571-
context.wait(Duration.ofMinutes(5)); // Root deregisters → immediate suspension
572-
```
573-
574-
**Complex case - Blocking on retrying operations:**
575-
```java
576-
var future1 = context.stepAsync("step1", () -> failsAndRetries());
577-
var result = context.step("step2", () -> future1.get() + "-processed");
578-
```
579-
580-
**Without phasers:** Simple thread counting fails because step2's thread would stay registered while blocked on `future1.get()`, preventing `activeThreads.isEmpty()` from triggering suspension → Lambda stays active during step1's retry delay instead of suspending.
581-
582-
**What should happen instead:** step2's root thread must deregister when blocked, allow suspension during step1's retry, then coordinate re-registration when step1 completes with checkpointed results.
569+
These user threads and the system thread use CompletableFuture to communicate the completion of operations. When a context executes a step, the communication happens as shown below
583570

584-
**The problem:** When step1 retries, step2's root thread must:
585-
1. Deregister (to allow suspension during retry delay)
586-
2. Block until step1 either completes successfully or wants to suspend for another retry
587-
3. Re-register when step1 finishes or when resuming from suspension
588-
4. Ensure step1's result is checkpointed before proceeding
571+
| Sequence | Context thread | Step Thread | System Thread |
572+
|-----------|--------------------------------------------------------------------------------------------|---------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
573+
| 1 | create StepOperation, create CompletableFuture | (not created) | (idle) |
574+
| 2 | checkpoint START event (synchronously or asynchronously) | (not created) | call checkpoint API |
575+
| 3 | create and register the Step thread | execute user code for the step | (idle) |
576+
| 4 | call `get()`, deregister the context thread and wait for the CompletableFuture to complete | (continue) | (idle) |
577+
| 5 | (blocked) | checkpoint the step result and wait for checkpoint call to complete | call checkpoint API, and handle the API response. If it is a terminal response, it will complete the Step operation CompletableFuture, register and unblock the context thread. |
578+
| 6 | retrieve the result of the step | deregister and terminate the Step thread | (idle) |
589579

590-
**Additional complex scenarios:**
591-
- **Nested blocking:** Multiple threads blocking on each other's results
592-
- **Future operations:** `runInChildContext` with multiple child threads coordinating
593-
- **Race conditions:** Ensuring checkpoint completion before thread lifecycle changes
580+
If the user code completes quickly, an alternative scenario could happen as follows
594581

595-
These scenarios are why we chose **phasers** - a multi-party synchronization primitive that coordinates checkpoint-driven completion.
582+
| Sequence | Context thread | Step Thread | System Thread |
583+
|----------|-------------------------------------------------------------------------------|---------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------|
584+
| 1 | create StepOperation, create CompletableFuture | (not created) | (idle) |
585+
| 2 | checkpoint START event (synchronously or asynchronously) | (not created) | call checkpoint API |
586+
| 3 | create and register the Step thread | execute user code for the step and complete quickly | (idle) |
587+
| 5 | (do something else or just get starved) | checkpoint the step result and wait for checkpoint call to complete | call checkpoint API, and handle the API response. If it is a terminal response, it will complete the Step operation CompletableFuture. |
588+
| 4 | call `get()`. It's not blocked because CompletableFuture is already completed | deregister and terminate the Step thread | (idle) |
589+
| 6 | retrieve the result of the step | (ended) | (idle) |
596590

597-
See [ADR-002: Phaser-Based Operation Coordination](adr/002-phaser-based-coordination.md) for detailed implementation and usage patterns.

sdk/src/test/java/com/amazonaws/lambda/durable/operation/BaseDurableOperationTest.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import static org.junit.jupiter.api.Assertions.assertNull;
88
import static org.junit.jupiter.api.Assertions.assertThrows;
99
import static org.junit.jupiter.api.Assertions.assertTrue;
10+
import static org.junit.jupiter.api.Assertions.fail;
1011
import static org.mockito.ArgumentMatchers.any;
1112
import static org.mockito.Mockito.mock;
1213
import static org.mockito.Mockito.never;
@@ -23,8 +24,11 @@
2324
import com.amazonaws.lambda.durable.execution.ThreadType;
2425
import com.amazonaws.lambda.durable.serde.JacksonSerDes;
2526
import com.amazonaws.lambda.durable.serde.SerDes;
27+
import java.util.concurrent.ExecutionException;
2628
import java.util.concurrent.ExecutorService;
2729
import java.util.concurrent.Executors;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.TimeoutException;
2832
import org.junit.jupiter.api.BeforeEach;
2933
import org.junit.jupiter.api.Test;
3034
import software.amazon.awssdk.services.lambda.model.ErrorObject;
@@ -120,30 +124,35 @@ public String get() {
120124
}
121125

122126
@Test
123-
void waitForOperationCompletionWhenRunningAndReadyToComplete() throws InterruptedException {
127+
void waitForOperationCompletionWhenRunningAndReadyToComplete()
128+
throws InterruptedException, ExecutionException, TimeoutException {
124129
BaseDurableOperation<String> op =
125130
new BaseDurableOperation<>(
126131
OPERATION_ID, OPERATION_NAME, OPERATION_TYPE, RESULT_TYPE, SER_DES, executionManager) {
127132
@Override
128-
public void execute() {
129-
waitForOperationCompletion();
130-
}
133+
public void execute() {}
131134

132135
@Override
133136
public String get() {
137+
waitForOperationCompletion();
134138
return RESULT;
135139
}
136140
};
137141

138-
// call execute in a separate thread
139-
internalExecutor.execute(op::execute);
140-
// wait for execute to be blocked by the completionFuture and then complete the future
141-
Thread.sleep(500);
142-
op.onCheckpointComplete(
143-
Operation.builder().status(OperationStatus.SUCCEEDED).build());
144-
verify(executionManager).deregisterActiveThreadAndUnsetCurrentContext(CONTEXT_ID);
145-
verify(executionManager).registerActiveThread(CONTEXT_ID, ThreadType.CONTEXT);
146-
verify(executionManager).setCurrentContext(CONTEXT_ID, ThreadType.CONTEXT);
142+
// call get in a separate thread which will be blocked
143+
var future = internalExecutor.submit(op::get);
144+
// wait for execute to be blocked by the completionFuture and then feed the completion event
145+
try {
146+
future.get(500, TimeUnit.MILLISECONDS);
147+
fail();
148+
} catch (TimeoutException e) {
149+
op.onCheckpointComplete(
150+
Operation.builder().status(OperationStatus.SUCCEEDED).build());
151+
assertEquals(RESULT, future.get());
152+
verify(executionManager).deregisterActiveThreadAndUnsetCurrentContext(CONTEXT_ID);
153+
verify(executionManager).registerActiveThread(CONTEXT_ID, ThreadType.CONTEXT);
154+
verify(executionManager).setCurrentContext(CONTEXT_ID, ThreadType.CONTEXT);
155+
}
147156
}
148157

149158
@Test

0 commit comments

Comments
 (0)