Skip to content

Commit 06e5233

Browse files
committed
Merge remote-tracking branch 'origin/main' into child-context
2 parents 842a106 + 117934e commit 06e5233

32 files changed

Lines changed: 725 additions & 505 deletions

.github/workflows/e2e-tests.yml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ permissions:
3030

3131
jobs:
3232
e2e-tests:
33-
if: github.event_name == 'pull_request'
3433
env:
3534
AWS_REGION: us-west-2
3635
runs-on: ubuntu-latest
@@ -74,5 +73,5 @@ jobs:
7473
- name: Build locally
7574
run: mvn -B -q -Dmaven.test.skip=true install --file pom.xml
7675
- name: Cloud Based Integration Tests
77-
run: mvn clean test -B -Dtest.cloud.enabled=true -Dtest=CloudBasedIntegrationTest
78-
working-directory: ./examples
76+
run: mvn clean test -B -Dtest.cloud.enabled=true -Dtest=CloudBasedIntegrationTest -Dtest.function.name.suffix='-java${{ matrix.java }}'
77+
working-directory: ./examples

docs/adr/002-phaser-based-coordination.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# ADR-002: Phaser-Based Operation Coordination
22

3-
**Status:** Accepted
3+
**Status:** Superseded by ADR-003 CompletableFuture-Based Operation Coordination
44
**Date:** 2025-12-29
55

66

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# ADR-003: CompletableFuture-Based Operation Coordination
2+
3+
**Status:** Todo
4+
**Date:** 2026-02-18
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# ADR-003: Child Context Execution (`runInChildContext`)
1+
# ADR-004: Child Context Execution (`runInChildContext`)
22

33
**Status:** Accepted
44
**Date:** 2026-02-16

docs/design.md

Lines changed: 36 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ The SDK uses two separate thread pools with distinct responsibilities:
9999
- Default: cached daemon thread pool
100100

101101
**Internal Executor (`InternalExecutor.INSTANCE`):**
102-
- Runs SDK coordination tasks: checkpoint batching, polling for wait completion, phaser management
102+
- Runs SDK coordination tasks: checkpoint batching, polling for wait completion
103103
- Dedicated cached thread pool with daemon threads named `durable-sdk-internal-*`
104104
- Not configurable by users
105105

@@ -171,14 +171,14 @@ context.step("name", Type.class, supplier,
171171
172172
┌───────────────┴───────────────┐
173173
▼ ▼
174-
┌──────────────────────────────┐ ┌──────────────────────────────┐
175-
│ DurableContext │ │ ExecutionManager │
176-
│ - User-facing API │ │ - State (ops, token) │
177-
│ - step(), stepAsync() │ │ - Thread coordination │
178-
│ - wait() │ │ - Phaser management
179-
│ - Operation ID counter │ │ - Checkpoint batching
180-
└──────────────────────────────┘ │ - Polling │
181-
│ └──────────────────────────────┘
174+
┌──────────────────────────────┐ ┌─────────────────────────────────
175+
│ DurableContext │ │ ExecutionManager
176+
│ - User-facing API │ │ - State (ops, token)
177+
│ - step(), stepAsync(), etc │ │ - Thread coordination
178+
│ - wait() │ │ - Checkpoint batching
179+
│ - Operation ID counter │ │ - Checkpoint response handling
180+
└──────────────────────────────┘ │ - Polling
181+
│ └─────────────────────────────────
182182
│ │
183183
▼ ▼
184184
┌──────────────────────────────┐ ┌──────────────────────────────┐
@@ -213,13 +213,14 @@ com.amazonaws.lambda.durable
213213
│ ├── CheckpointBatcher # Batching (package-private)
214214
│ ├── CheckpointCallback # Callback interface
215215
│ ├── SuspendExecutionException
216-
│ ├── ThreadType # CONTEXT, STEP
217-
│ └── ExecutionPhase # RUNNING(0), COMPLETING(1), DONE(2)
216+
│ └── ThreadType # CONTEXT, STEP
218217
219218
├── operation/
220-
│ ├── DurableOperation<T> # Interface
221-
│ ├── StepOperation<T> # Step logic
222-
│ └── WaitOperation # Wait logic
219+
│ ├── BaseDurableOperation<T> # Common operation logic
220+
│ ├── StepOperation<T> # Step logic
221+
│ ├── InvokeOperation<T> # Invoke logic
222+
│ ├── CallbackOperation<T> # Callback logic
223+
│ └── WaitOperation # Wait logic
223224
224225
├── logging/
225226
│ ├── DurableLogger # Context-aware logger wrapper (MDC-based)
@@ -328,7 +329,7 @@ sequenceDiagram
328329
WO->>EM: deregisterActiveThread("Root")
329330
330331
Note over EM: No active threads!
331-
EM->>EM: suspendExecutionFuture.complete()
332+
EM->>EM: executionExceptionFuture.completeExceptionally(SuspendExecutionException)
332333
EM-->>WO: throw SuspendExecutionException
333334
334335
Note over UC: Execution suspended, returns PENDING
@@ -562,36 +563,29 @@ This approach ensures suspension happens precisely when no thread can make progr
562563
#### Advanced Feature: In-Process Completion
563564
In scenarios where waits or step retries would normally suspend execution, but other active threads prevent suspension, the SDK automatically switches to in-process completion by polling the backend until timing conditions are met. This allows complex concurrent workflows to complete efficiently without unnecessary Lambda re-invocations or extended waiting periods.
564565

565-
### From Thread Tracking to Phaser Coordination
566-
567-
Thread counting handles simple cases, but complex scenarios require sophisticated coordination:
568-
569-
**Simple case - Wait operations:**
570-
```java
571-
context.wait(Duration.ofMinutes(5)); // Root deregisters → immediate suspension
572-
```
573-
574-
**Complex case - Blocking on retrying operations:**
575-
```java
576-
var future1 = context.stepAsync("step1", () -> failsAndRetries());
577-
var result = context.step("step2", () -> future1.get() + "-processed");
578-
```
566+
### Active Thread Tracking and Operation Completion Coordination
579567

580-
**Without phasers:** Simple thread counting fails because step2's thread would stay registered while blocked on `future1.get()`, preventing `activeThreads.isEmpty()` from triggering suspension → Lambda stays active during step1's retry delay instead of suspending.
568+
Each piece of user code - main function body, step body or child context body - runs in its own thread. Execution manager tracks active running threads. When a new step or child context is created, a new thread will be created and registered in execution manager. When the user code is blocked on `get()` or synchronous durable operations, the thread will be deregistered from execution manager. When there is no active running thread, the function execution will be suspended.
581569

582-
**What should happen instead:** step2's root thread must deregister when blocked, allow suspension during step1's retry, then coordinate re-registration when step1 completes with checkpointed results.
570+
These user threads and the system thread use CompletableFuture to communicate the completion of operations. When a context executes a step, the communication happens as shown below
583571

584-
**The problem:** When step1 retries, step2's root thread must:
585-
1. Deregister (to allow suspension during retry delay)
586-
2. Block until step1 either completes successfully or wants to suspend for another retry
587-
3. Re-register when step1 finishes or when resuming from suspension
588-
4. Ensure step1's result is checkpointed before proceeding
572+
| Sequence | Context thread | Step Thread | System Thread |
573+
|-----------|--------------------------------------------------------------------------------------------|---------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
574+
| 1 | create StepOperation, create CompletableFuture | (not created) | (idle) |
575+
| 2 | checkpoint START event (synchronously or asynchronously) | (not created) | call checkpoint API |
576+
| 3 | create and register the Step thread | execute user code for the step | (idle) |
577+
| 4 | call `get()`, deregister the context thread and wait for the CompletableFuture to complete | (continue) | (idle) |
578+
| 5 | (blocked) | checkpoint the step result and wait for checkpoint call to complete | call checkpoint API, and handle the API response. If it is a terminal response, it will complete the Step operation CompletableFuture, register and unblock the context thread. |
579+
| 6 | retrieve the result of the step | deregister and terminate the Step thread | (idle) |
589580

590-
**Additional complex scenarios:**
591-
- **Nested blocking:** Multiple threads blocking on each other's results
592-
- **Future operations:** `runInChildContext` with multiple child threads coordinating
593-
- **Race conditions:** Ensuring checkpoint completion before thread lifecycle changes
581+
If the user code completes quickly, an alternative scenario could happen as follows
594582

595-
These scenarios are why we chose **phasers** - a multi-party synchronization primitive that coordinates checkpoint-driven completion.
583+
| Sequence | Context thread | Step Thread | System Thread |
584+
|----------|-------------------------------------------------------------------------------|---------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------|
585+
| 1 | create StepOperation, create CompletableFuture | (not created) | (idle) |
586+
| 2 | checkpoint START event (synchronously or asynchronously) | (not created) | call checkpoint API |
587+
| 3 | create and register the Step thread | execute user code for the step and complete quickly | (idle) |
588+
| 5 | (do something else or just get starved) | checkpoint the step result and wait for checkpoint call to complete | call checkpoint API, and handle the API response. If it is a terminal response, it will complete the Step operation CompletableFuture. |
589+
| 4 | call `get()`. It's not blocked because CompletableFuture is already completed | deregister and terminate the Step thread | (idle) |
590+
| 6 | retrieve the result of the step | (ended) | (idle) |
596591

597-
See [ADR-002: Phaser-Based Operation Coordination](adr/002-phaser-based-coordination.md) for detailed implementation and usage patterns.

examples/src/main/java/com/amazonaws/lambda/durable/examples/SimpleInvokeExample.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ public String handleRequest(GreetingRequest input, DurableContext context) {
1818
// invoke `simple-step-example` function
1919
var future = context.invokeAsync(
2020
"call-greeting1",
21-
"simple-step-example:$LATEST",
21+
"simple-step-example" + input.getName() + ":$LATEST",
2222
input,
2323
String.class,
2424
InvokeConfig.builder().build());
2525
var result2 = context.invoke(
2626
"call-greeting2",
27-
"simple-step-example:$LATEST",
27+
"simple-step-example" + input.getName() + ":$LATEST",
2828
input,
2929
String.class,
3030
InvokeConfig.builder().build());

examples/src/test/java/com/amazonaws/lambda/durable/examples/CloudBasedIntegrationTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class CloudBasedIntegrationTest {
2121

2222
private static String account;
2323
private static String region;
24+
private static String functionNameSuffix;
2425

2526
static boolean isEnabled() {
2627
var enabled = "true".equals(System.getProperty("test.cloud.enabled"));
@@ -40,6 +41,7 @@ static void setup() {
4041

4142
account = System.getProperty("test.aws.account");
4243
region = System.getProperty("test.aws.region");
44+
functionNameSuffix = System.getProperty("test.function.name.suffix", "");
4345

4446
if (account == null || region == null) {
4547
var sts = StsClient.create();
@@ -52,7 +54,8 @@ static void setup() {
5254
}
5355

5456
private static String arn(String functionName) {
55-
return "arn:aws:lambda:" + region + ":" + account + ":function:" + functionName + ":$LATEST";
57+
return "arn:aws:lambda:" + region + ":" + account + ":function:" + functionName + functionNameSuffix
58+
+ ":$LATEST";
5659
}
5760

5861
@Test
@@ -71,7 +74,7 @@ void testSimpleStepExample() {
7174
@Test
7275
void testSimpleInvokeExample() {
7376
var runner = CloudDurableTestRunner.create(arn("simple-invoke-example"), Map.class, String.class);
74-
var result = runner.run(Map.of("message", "test"));
77+
var result = runner.run(Map.of("name", functionNameSuffix));
7578

7679
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
7780
assertNotNull(result.getResult(String.class));

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
<maven.compiler.source>17</maven.compiler.source>
2424
<maven.compiler.target>17</maven.compiler.target>
2525
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
26-
<aws.sdk.version>2.41.28</aws.sdk.version>
26+
<aws.sdk.version>2.41.32</aws.sdk.version>
2727
<jackson.version>2.21.0</jackson.version>
28-
<junit.version>6.0.2</junit.version>
28+
<junit.version>6.0.3</junit.version>
2929
<mockito.version>5.21.0</mockito.version>
3030
<slf4j.version>2.0.17</slf4j.version>
3131
<jacoco.version>0.8.14</jacoco.version>

sdk/src/main/java/com/amazonaws/lambda/durable/CallbackConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package com.amazonaws.lambda.durable;
44

55
import com.amazonaws.lambda.durable.serde.SerDes;
6+
import com.amazonaws.lambda.durable.validation.ParameterValidator;
67
import java.time.Duration;
78

89
/** Configuration for callback operations. */
@@ -60,11 +61,13 @@ private Builder(Duration timeout, Duration heartbeatTimeout, SerDes serDes) {
6061
}
6162

6263
public Builder timeout(Duration timeout) {
64+
ParameterValidator.validateOptionalDuration(timeout, "Callback timeout");
6365
this.timeout = timeout;
6466
return this;
6567
}
6668

6769
public Builder heartbeatTimeout(Duration heartbeatTimeout) {
70+
ParameterValidator.validateOptionalDuration(heartbeatTimeout, "Heartbeat timeout");
6871
this.heartbeatTimeout = heartbeatTimeout;
6972
return this;
7073
}

sdk/src/main/java/com/amazonaws/lambda/durable/DurableConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ public Builder withLoggerConfig(LoggerConfig loggerConfig) {
312312
* @return This builder
313313
*/
314314
public Builder withPollingInterval(Duration duration) {
315+
// No validation - polling intervals can be less than 1 second (e.g., 200ms with backoff)
315316
this.pollingInterval = duration;
316317
return this;
317318
}

0 commit comments

Comments
 (0)