Skip to content

Commit fba65a6

Browse files
wangyb-AAlex Wang
andauthored
Add parallel wait example (#231)
Co-authored-by: Alex Wang <wangyb@amazon.com>
1 parent a221910 commit fba65a6

10 files changed

Lines changed: 384 additions & 5 deletions

File tree

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples.parallel;
4+
5+
import java.time.Duration;
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
import software.amazon.lambda.durable.DurableContext;
9+
import software.amazon.lambda.durable.DurableFuture;
10+
import software.amazon.lambda.durable.DurableHandler;
11+
import software.amazon.lambda.durable.ParallelConfig;
12+
13+
/**
14+
* Example demonstrating parallel branches where some branches include wait operations.
15+
*
16+
* <p>This models a notification fan-out pattern where different channels have different delivery delays:
17+
*
18+
* <ul>
19+
* <li>Email — sent immediately
20+
* <li>SMS — waits for a rate-limit window before sending
21+
* <li>Push notification — waits for a quiet-hours window before sending
22+
* </ul>
23+
*
24+
* <p>All three branches run concurrently. Branches with waits suspend without consuming compute resources and resume
25+
* automatically once the wait elapses. The parallel operation completes once all branches finish.
26+
*/
27+
public class ParallelWithWaitExample
28+
extends DurableHandler<ParallelWithWaitExample.Input, ParallelWithWaitExample.Output> {
29+
30+
public record Input(String userId, String message) {}
31+
32+
public record Output(List<String> deliveries) {}
33+
34+
@Override
35+
public Output handleRequest(Input input, DurableContext context) {
36+
var logger = context.getLogger();
37+
logger.info("Sending notifications to user {}", input.userId());
38+
39+
var config = ParallelConfig.builder().build();
40+
var futures = new ArrayList<DurableFuture<String>>(3);
41+
42+
try (var parallel = context.parallel("notify", config)) {
43+
44+
// Branch 1: email — no wait, deliver immediately
45+
futures.add(parallel.branch("email", String.class, ctx -> {
46+
ctx.wait("email-rate-limit-delay", Duration.ofSeconds(10));
47+
return ctx.step("send-email", String.class, stepCtx -> "email:" + input.message());
48+
}));
49+
50+
// Branch 2: SMS — wait for rate-limit window, then send
51+
futures.add(parallel.branch("sms", String.class, ctx -> {
52+
ctx.wait("sms-rate-limit-delay", Duration.ofSeconds(10));
53+
return ctx.step("send-sms", String.class, stepCtx -> "sms:" + input.message());
54+
}));
55+
56+
// Branch 3: push notification — wait for quiet-hours window, then send
57+
futures.add(parallel.branch("push", String.class, ctx -> {
58+
ctx.wait("push-quiet-delay", Duration.ofSeconds(10));
59+
return ctx.step("send-push", String.class, stepCtx -> "push:" + input.message());
60+
}));
61+
}
62+
63+
var deliveries = futures.stream().map(DurableFuture::get).toList();
64+
logger.info("All {} notifications delivered", deliveries.size());
65+
return new Output(deliveries);
66+
}
67+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples.parallel;
4+
5+
import static org.junit.jupiter.api.Assertions.*;
6+
7+
import java.util.List;
8+
import org.junit.jupiter.api.Test;
9+
import software.amazon.lambda.durable.model.ExecutionStatus;
10+
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
11+
12+
class ParallelWithWaitExampleTest {
13+
@Test
14+
void completesAfterManuallyAdvancingWaits() {
15+
var handler = new ParallelWithWaitExample();
16+
var runner = LocalDurableTestRunner.create(ParallelWithWaitExample.Input.class, handler);
17+
18+
var input = new ParallelWithWaitExample.Input("user-456", "world");
19+
20+
// First run suspends on wait branches
21+
var first = runner.run(input);
22+
assertEquals(ExecutionStatus.PENDING, first.getStatus());
23+
24+
// Advance waits and re-run to completion
25+
runner.advanceTime();
26+
var result = runner.runUntilComplete(input);
27+
28+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
29+
30+
var output = result.getResult(ParallelWithWaitExample.Output.class);
31+
assertEquals(List.of("email:world", "sms:world", "push:world"), output.deliveries());
32+
}
33+
}

sdk/src/main/java/software/amazon/lambda/durable/context/BaseContextImpl.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,28 @@ protected BaseContextImpl(
3737
String contextId,
3838
String contextName,
3939
ThreadType threadType) {
40+
this(executionManager, durableConfig, lambdaContext, contextId, contextName, threadType, true);
41+
}
42+
43+
/**
44+
* Creates a new BaseContext instance.
45+
*
46+
* @param executionManager the execution manager for thread coordination and state management
47+
* @param durableConfig the durable execution configuration
48+
* @param lambdaContext the AWS Lambda runtime context
49+
* @param contextId the context ID, null for root context, set for child contexts
50+
* @param contextName the human-readable name for this context
51+
* @param threadType the type of thread this context runs on
52+
* @param setCurrentThreadContext whether to call setCurrentThreadContext on the execution manager
53+
*/
54+
protected BaseContextImpl(
55+
ExecutionManager executionManager,
56+
DurableConfig durableConfig,
57+
Context lambdaContext,
58+
String contextId,
59+
String contextName,
60+
ThreadType threadType,
61+
boolean setCurrentThreadContext) {
4062
this.executionManager = executionManager;
4163
this.durableConfig = durableConfig;
4264
this.lambdaContext = lambdaContext;
@@ -45,8 +67,10 @@ protected BaseContextImpl(
4567
this.isReplaying = executionManager.hasOperationsForContext(contextId);
4668
this.threadType = threadType;
4769

48-
// write the thread id and type to thread local
49-
executionManager.setCurrentThreadContext(new ThreadContext(contextId, threadType));
70+
if (setCurrentThreadContext) {
71+
// write the thread id and type to thread local
72+
executionManager.setCurrentThreadContext(new ThreadContext(contextId, threadType));
73+
}
5074
}
5175

5276
// =============== accessors ================

sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,24 @@ private DurableContextImpl(
6868
Context lambdaContext,
6969
String contextId,
7070
String contextName) {
71-
super(executionManager, durableConfig, lambdaContext, contextId, contextName, ThreadType.CONTEXT);
71+
this(executionManager, durableConfig, lambdaContext, contextId, contextName, true);
72+
}
73+
74+
private DurableContextImpl(
75+
ExecutionManager executionManager,
76+
DurableConfig durableConfig,
77+
Context lambdaContext,
78+
String contextId,
79+
String contextName,
80+
boolean setCurrentThreadContext) {
81+
super(
82+
executionManager,
83+
durableConfig,
84+
lambdaContext,
85+
contextId,
86+
contextName,
87+
ThreadType.CONTEXT,
88+
setCurrentThreadContext);
7289
operationIdGenerator = new OperationIdGenerator(contextId);
7390
}
7491

@@ -99,6 +116,22 @@ public DurableContextImpl createChildContext(String childContextId, String child
99116
getExecutionManager(), getDurableConfig(), getLambdaContext(), childContextId, childContextName);
100117
}
101118

119+
/**
120+
* Creates a child context without setting the current thread context.
121+
*
122+
* <p>Use this when the child context is being created on a thread that should not have its thread-local context
123+
* overwritten (e.g. when constructing the context ahead of running it on a separate thread).
124+
*
125+
* @param childContextId the child context's ID (the CONTEXT operation's operation ID)
126+
* @param childContextName the name of the child context
127+
* @return a new DurableContext for the child context
128+
*/
129+
public DurableContextImpl createChildContextWithoutSettingThreadContext(
130+
String childContextId, String childContextName) {
131+
return new DurableContextImpl(
132+
getExecutionManager(), getDurableConfig(), getLambdaContext(), childContextId, childContextName, false);
133+
}
134+
102135
/**
103136
* Creates a step context for executing step operations.
104137
*

sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ protected ConcurrencyOperation(
7474
this.toleratedFailureCount = toleratedFailureCount;
7575
this.failureRateThreshold = failureRateThreshold;
7676
this.operationIdGenerator = new OperationIdGenerator(getOperationId());
77-
this.rootContext = durableContext.createChildContext(getOperationId(), getName());
77+
this.rootContext = durableContext.createChildContextWithoutSettingThreadContext(getOperationId(), getName());
7878
}
7979

8080
protected ConcurrencyOperation(

sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package software.amazon.lambda.durable.operation;
44

55
import java.util.function.Function;
6+
import software.amazon.awssdk.services.lambda.model.ContextOptions;
67
import software.amazon.awssdk.services.lambda.model.Operation;
78
import software.amazon.awssdk.services.lambda.model.OperationAction;
89
import software.amazon.awssdk.services.lambda.model.OperationType;
@@ -80,7 +81,8 @@ protected <R> ChildContextOperation<R> createItem(
8081
protected void handleSuccess() {
8182
sendOperationUpdate(OperationUpdate.builder()
8283
.action(OperationAction.SUCCEED)
83-
.subType(getSubType().getValue()));
84+
.subType(getSubType().getValue())
85+
.contextOptions(ContextOptions.builder().replayChildren(true).build()));
8486
}
8587

8688
@Override
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.context;
4+
5+
import static org.junit.jupiter.api.Assertions.*;
6+
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
import org.junit.jupiter.api.BeforeEach;
10+
import org.junit.jupiter.api.Test;
11+
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
12+
import software.amazon.awssdk.services.lambda.model.Operation;
13+
import software.amazon.awssdk.services.lambda.model.OperationStatus;
14+
import software.amazon.awssdk.services.lambda.model.OperationType;
15+
import software.amazon.lambda.durable.DurableConfig;
16+
import software.amazon.lambda.durable.TestUtils;
17+
import software.amazon.lambda.durable.execution.ExecutionManager;
18+
import software.amazon.lambda.durable.execution.ThreadContext;
19+
import software.amazon.lambda.durable.execution.ThreadType;
20+
import software.amazon.lambda.durable.model.DurableExecutionInput;
21+
22+
class BaseContextImplTest {
23+
24+
private static final String INVOCATION_ID = "20dae574-53da-37a1-bfd5-b0e2e6ec715d";
25+
private static final String EXECUTION_NAME = "349beff4-a89d-4bc8-a56f-af7a8af67a5f";
26+
private static final Operation EXECUTION_OP = Operation.builder()
27+
.id(INVOCATION_ID)
28+
.type(OperationType.EXECUTION)
29+
.status(OperationStatus.STARTED)
30+
.build();
31+
32+
@BeforeEach
33+
void clearThreadContext() {
34+
// currentThreadContext is a static ThreadLocal on ExecutionManager — clear it
35+
// before each test to prevent bleed-through from other tests on the same thread.
36+
createExecutionManager().setCurrentThreadContext(null);
37+
}
38+
39+
private ExecutionManager createExecutionManager() {
40+
var client = TestUtils.createMockClient();
41+
var initialState = CheckpointUpdatedExecutionState.builder()
42+
.operations(new ArrayList<>(List.of(EXECUTION_OP)))
43+
.build();
44+
return new ExecutionManager(
45+
new DurableExecutionInput(
46+
"arn:aws:lambda:us-east-1:123456789012:function:test:$LATEST/durable-execution/"
47+
+ EXECUTION_NAME + "/" + INVOCATION_ID,
48+
"test-token",
49+
initialState),
50+
DurableConfig.builder().withDurableExecutionClient(client).build());
51+
}
52+
53+
@Test
54+
void defaultConstructor_setsCurrentThreadContext() {
55+
var executionManager = createExecutionManager();
56+
// Precondition: no thread context set yet
57+
assertNull(executionManager.getCurrentThreadContext());
58+
59+
// Creating a root context with the default constructor should set the thread context
60+
DurableContextImpl.createRootContext(
61+
executionManager, DurableConfig.builder().build(), null);
62+
63+
var threadContext = executionManager.getCurrentThreadContext();
64+
assertNotNull(threadContext);
65+
assertEquals(ThreadType.CONTEXT, threadContext.threadType());
66+
assertNull(threadContext.threadId());
67+
}
68+
69+
@Test
70+
void constructorWithSetCurrentThreadContextTrue_setsCurrentThreadContext() {
71+
var executionManager = createExecutionManager();
72+
73+
// createRootContext sets thread context to root (threadId=null)
74+
var rootContext = DurableContextImpl.createRootContext(
75+
executionManager, DurableConfig.builder().build(), null);
76+
assertEquals(
77+
ThreadType.CONTEXT, executionManager.getCurrentThreadContext().threadType());
78+
assertNull(executionManager.getCurrentThreadContext().threadId());
79+
80+
// createChildContext (setCurrentThreadContext=true) should overwrite with child's context
81+
rootContext.createChildContext("child-id", "child-name");
82+
83+
var threadContext = executionManager.getCurrentThreadContext();
84+
assertNotNull(threadContext);
85+
assertEquals(ThreadType.CONTEXT, threadContext.threadType());
86+
assertEquals("child-id", threadContext.threadId());
87+
}
88+
89+
@Test
90+
void constructorWithSetCurrentThreadContextFalse_doesNotOverwriteThreadContext() {
91+
var executionManager = createExecutionManager();
92+
93+
// Create root context first (it will set thread context to null/root)
94+
var rootContext = DurableContextImpl.createRootContext(
95+
executionManager, DurableConfig.builder().build(), null);
96+
97+
// Now set a sentinel — simulating a caller thread that already has context established
98+
var sentinel = new ThreadContext("original-context", ThreadType.CONTEXT);
99+
executionManager.setCurrentThreadContext(sentinel);
100+
101+
// createChildContextWithoutSettingThreadContext should NOT overwrite the sentinel
102+
rootContext.createChildContextWithoutSettingThreadContext("child-id", "child-name");
103+
104+
// Thread context should still be the sentinel, not the child's context
105+
var threadContext = executionManager.getCurrentThreadContext();
106+
assertNotNull(threadContext);
107+
assertEquals("original-context", threadContext.threadId());
108+
}
109+
110+
@Test
111+
void createChildContextWithoutSettingThreadContext_returnsValidChildContext() {
112+
var executionManager = createExecutionManager();
113+
executionManager.setCurrentThreadContext(new ThreadContext(null, ThreadType.CONTEXT));
114+
var rootContext = DurableContextImpl.createRootContext(
115+
executionManager, DurableConfig.builder().build(), null);
116+
117+
var childContext = rootContext.createChildContextWithoutSettingThreadContext("child-id", "child-name");
118+
119+
assertNotNull(childContext);
120+
assertEquals("child-id", childContext.getContextId());
121+
assertEquals("child-name", childContext.getContextName());
122+
}
123+
}

0 commit comments

Comments
 (0)