Skip to content

Commit 77e20b7

Browse files
committed
add an optional stepContext parameter
1 parent 4251ccb commit 77e20b7

16 files changed

Lines changed: 345 additions & 338 deletions

File tree

examples/src/main/java/software/amazon/lambda/durable/examples/RetryExample.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class RetryExample extends DurableHandler<Object, String> {
3131
@Override
3232
public String handleRequest(Object input, DurableContext context) {
3333
// Step 1: Record start time
34-
startTime = context.step("record-start-time", Instant.class, Instant::now);
34+
startTime = context.step("record-start-time", Instant.class, () -> Instant.now());
3535
logger.info("Recorded start time: {}", startTime);
3636

3737
// Step 2: Call that never retries (fails immediately)
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable;
4+
5+
import com.amazonaws.services.lambda.runtime.Context;
6+
import software.amazon.lambda.durable.execution.ExecutionManager;
7+
import software.amazon.lambda.durable.logging.DurableLogger;
8+
9+
public abstract class BaseContext {
10+
protected final ExecutionManager executionManager;
11+
private final DurableConfig durableConfig;
12+
private final Context lambdaContext;
13+
private final ExecutionContext executionContext;
14+
private final String parentContextId;
15+
private boolean isReplaying;
16+
17+
/** Shared initialization — sets all fields but performs no thread registration. */
18+
protected BaseContext(
19+
ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext, String contextId) {
20+
this.executionManager = executionManager;
21+
this.durableConfig = durableConfig;
22+
this.lambdaContext = lambdaContext;
23+
this.parentContextId = contextId;
24+
this.executionContext = new ExecutionContext(executionManager.getDurableExecutionArn());
25+
this.isReplaying = executionManager.hasOperationsForContext(contextId);
26+
}
27+
28+
// =============== accessors ================
29+
public abstract DurableLogger getLogger();
30+
31+
public Context getLambdaContext() {
32+
return lambdaContext;
33+
}
34+
35+
/**
36+
* Returns metadata about the current durable execution.
37+
*
38+
* <p>The execution context provides information that remains constant throughout the execution lifecycle, such as
39+
* the durable execution ARN. This is useful for tracking execution progress, correlating logs, and referencing this
40+
* execution in external systems.
41+
*
42+
* @return the execution context
43+
*/
44+
public ExecutionContext getExecutionContext() {
45+
return executionContext;
46+
}
47+
48+
/**
49+
* Returns the configuration for durable execution behavior.
50+
*
51+
* @return the durable configuration
52+
*/
53+
public DurableConfig getDurableConfig() {
54+
return durableConfig;
55+
}
56+
57+
// ============= internal utilities ===============
58+
59+
/** Gets the context ID for this context. Null for root context, set for child contexts. */
60+
public String getParentContextId() {
61+
return parentContextId;
62+
}
63+
64+
public ExecutionManager getExecutionManager() {
65+
return executionManager;
66+
}
67+
68+
/** Returns whether this context is currently in replay mode. */
69+
boolean isReplaying() {
70+
return isReplaying;
71+
}
72+
73+
/**
74+
* Transitions this context from replay to execution mode. Called when the first un-cached operation is encountered.
75+
*/
76+
void setExecutionMode() {
77+
this.isReplaying = false;
78+
}
79+
}

sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java

Lines changed: 78 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,17 @@
2020
import software.amazon.lambda.durable.operation.WaitOperation;
2121
import software.amazon.lambda.durable.validation.ParameterValidator;
2222

23-
public class DurableContext {
24-
private static final String ROOT_CONTEXT = "Root";
25-
26-
private final ExecutionManager executionManager;
27-
private final DurableConfig durableConfig;
28-
private final Context lambdaContext;
23+
public class DurableContext extends BaseContext {
2924
private final AtomicInteger operationCounter;
3025
private final DurableLogger logger;
3126
private final ExecutionContext executionContext;
32-
private final String contextId;
33-
private boolean isReplaying;
3427

3528
/** Shared initialization — sets all fields but performs no thread registration. */
3629
private DurableContext(
3730
ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext, String contextId) {
38-
this.executionManager = executionManager;
39-
this.durableConfig = durableConfig;
40-
this.lambdaContext = lambdaContext;
41-
this.contextId = contextId;
31+
super(executionManager, durableConfig, lambdaContext, contextId);
4232
this.operationCounter = new AtomicInteger(0);
4333
this.executionContext = new ExecutionContext(executionManager.getDurableExecutionArn());
44-
this.isReplaying = executionManager.hasOperationsForContext(contextId);
4534

4635
var requestId = lambdaContext != null ? lambdaContext.getAwsRequestId() : null;
4736
this.logger = new DurableLogger(
@@ -54,19 +43,18 @@ private DurableContext(
5443
/**
5544
* Creates a root context and registers the current thread for execution coordination.
5645
*
57-
* <p>The context itself always has a null contextId (making it a root context). The thread is registered with the
58-
* ExecutionManager using the default {@link #ROOT_CONTEXT} identifier.
46+
* <p>The context itself always has a null contextId (making it a root context).
5947
*
6048
* @param executionManager the execution manager
6149
* @param durableConfig the durable configuration
6250
* @param lambdaContext the Lambda context
6351
* @return a new root DurableContext
6452
*/
65-
static DurableContext createRootContext(
53+
public static DurableContext createRootContext(
6654
ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext) {
6755
var ctx = new DurableContext(executionManager, durableConfig, lambdaContext, null);
68-
executionManager.registerActiveThread(ROOT_CONTEXT);
69-
executionManager.setCurrentThreadContext(new ThreadContext(ROOT_CONTEXT, ThreadType.CONTEXT));
56+
executionManager.registerActiveThread(null);
57+
executionManager.setCurrentThreadContext(new ThreadContext(null, ThreadType.CONTEXT));
7058
return ctx;
7159
}
7260

@@ -75,67 +63,108 @@ static DurableContext createRootContext(
7563
* ChildContextOperation, which registers on the parent thread before the executor runs and sets the context on the
7664
* child thread inside the executor.
7765
*
78-
* @param executionManager the execution manager
79-
* @param durableConfig the durable configuration
80-
* @param lambdaContext the Lambda context
81-
* @param contextId the child context's ID (the CONTEXT operation's operation ID)
66+
* @param childContextId the child context's ID (the CONTEXT operation's operation ID)
8267
* @return a new DurableContext for the child context
8368
*/
84-
public static DurableContext createChildContext(
85-
ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext, String contextId) {
86-
return new DurableContext(executionManager, durableConfig, lambdaContext, contextId);
69+
public DurableContext createChildContext(String childContextId) {
70+
return new DurableContext(executionManager, getDurableConfig(), getLambdaContext(), childContextId);
71+
}
72+
73+
/**
74+
* Creates a step context for executing step operations.
75+
*
76+
* @return a new StepContext instance
77+
*/
78+
public StepContext createStepContext(String stepOperationId) {
79+
return new StepContext(executionManager, getDurableConfig(), getLambdaContext(), stepOperationId);
8780
}
8881

8982
// ========== step methods ==========
9083

91-
public <T> T step(String name, Class<T> resultType, Supplier<T> func) {
84+
public <T> T step(String name, Class<T> resultType, Function<StepContext, T> func) {
9285
return step(name, TypeToken.get(resultType), func, StepConfig.builder().build());
9386
}
9487

95-
public <T> T step(String name, Class<T> resultType, Supplier<T> func, StepConfig config) {
88+
public <T> T step(String name, Class<T> resultType, Function<StepContext, T> func, StepConfig config) {
9689
// Simply delegate to stepAsync and block on the result
9790
return stepAsync(name, resultType, func, config).get();
9891
}
9992

100-
public <T> T step(String name, TypeToken<T> typeToken, Supplier<T> func) {
93+
public <T> T step(String name, TypeToken<T> typeToken, Function<StepContext, T> func) {
10194
return step(name, typeToken, func, StepConfig.builder().build());
10295
}
10396

104-
public <T> T step(String name, TypeToken<T> typeToken, Supplier<T> func, StepConfig config) {
97+
public <T> T step(String name, TypeToken<T> typeToken, Function<StepContext, T> func, StepConfig config) {
10598
// Simply delegate to stepAsync and block on the result
10699
return stepAsync(name, typeToken, func, config).get();
107100
}
108101

109-
public <T> DurableFuture<T> stepAsync(String name, Class<T> resultType, Supplier<T> func) {
102+
public <T> DurableFuture<T> stepAsync(String name, Class<T> resultType, Function<StepContext, T> func) {
110103
return stepAsync(
111104
name, TypeToken.get(resultType), func, StepConfig.builder().build());
112105
}
113106

114-
public <T> DurableFuture<T> stepAsync(String name, Class<T> resultType, Supplier<T> func, StepConfig config) {
107+
public <T> DurableFuture<T> stepAsync(
108+
String name, Class<T> resultType, Function<StepContext, T> func, StepConfig config) {
115109
return stepAsync(name, TypeToken.get(resultType), func, config);
116110
}
117111

118-
public <T> DurableFuture<T> stepAsync(String name, TypeToken<T> typeToken, Supplier<T> func) {
112+
public <T> DurableFuture<T> stepAsync(String name, TypeToken<T> typeToken, Function<StepContext, T> func) {
119113
return stepAsync(name, typeToken, func, StepConfig.builder().build());
120114
}
121115

122-
public <T> DurableFuture<T> stepAsync(String name, TypeToken<T> typeToken, Supplier<T> func, StepConfig config) {
116+
public <T> DurableFuture<T> stepAsync(
117+
String name, TypeToken<T> typeToken, Function<StepContext, T> func, StepConfig config) {
123118
Objects.requireNonNull(config, "config cannot be null");
124119
Objects.requireNonNull(typeToken, "typeToken cannot be null");
125120
if (config.serDes() == null) {
126-
config = config.toBuilder().serDes(durableConfig.getSerDes()).build();
121+
config = config.toBuilder().serDes(getDurableConfig().getSerDes()).build();
127122
}
128123
var operationId = nextOperationId();
129124

130125
// Create and start step operation with TypeToken
131-
var operation = new StepOperation<>(
132-
operationId, name, func, typeToken, config, executionManager, logger, durableConfig, contextId);
126+
var operation = new StepOperation<>(operationId, name, func, typeToken, config, this);
133127

134128
operation.execute(); // Start the step (returns immediately)
135129

136130
return operation;
137131
}
138132

133+
public <T> T step(String name, Class<T> resultType, Supplier<T> func) {
134+
return step(name, TypeToken.get(resultType), func, StepConfig.builder().build());
135+
}
136+
137+
public <T> T step(String name, Class<T> resultType, Supplier<T> func, StepConfig config) {
138+
// Simply delegate to stepAsync and block on the result
139+
return stepAsync(name, resultType, func, config).get();
140+
}
141+
142+
public <T> T step(String name, TypeToken<T> typeToken, Supplier<T> func) {
143+
return step(name, typeToken, func, StepConfig.builder().build());
144+
}
145+
146+
public <T> T step(String name, TypeToken<T> typeToken, Supplier<T> func, StepConfig config) {
147+
// Simply delegate to stepAsync and block on the result
148+
return stepAsync(name, typeToken, func, config).get();
149+
}
150+
151+
public <T> DurableFuture<T> stepAsync(String name, Class<T> resultType, Supplier<T> func) {
152+
return stepAsync(
153+
name, TypeToken.get(resultType), func, StepConfig.builder().build());
154+
}
155+
156+
public <T> DurableFuture<T> stepAsync(String name, Class<T> resultType, Supplier<T> func, StepConfig config) {
157+
return stepAsync(name, TypeToken.get(resultType), func, config);
158+
}
159+
160+
public <T> DurableFuture<T> stepAsync(String name, TypeToken<T> typeToken, Supplier<T> func) {
161+
return stepAsync(name, typeToken, func, StepConfig.builder().build());
162+
}
163+
164+
public <T> DurableFuture<T> stepAsync(String name, TypeToken<T> typeToken, Supplier<T> func, StepConfig config) {
165+
return stepAsync(name, typeToken, stepContext -> func.get(), config);
166+
}
167+
139168
// ========== wait methods ==========
140169

141170
public Void wait(Duration duration) {
@@ -147,7 +176,7 @@ public Void wait(String waitName, Duration duration) {
147176
var operationId = nextOperationId();
148177

149178
// Create and start wait operation
150-
var operation = new WaitOperation(operationId, waitName, duration, executionManager, contextId);
179+
var operation = new WaitOperation(operationId, waitName, duration, this);
151180

152181
operation.execute(); // Checkpoint the wait
153182
return operation.get(); // Block (will throw SuspendExecutionException if needed)
@@ -208,16 +237,17 @@ public <T, U> DurableFuture<T> invokeAsync(
208237
Objects.requireNonNull(config, "config cannot be null");
209238
Objects.requireNonNull(typeToken, "typeToken cannot be null");
210239
if (config.serDes() == null) {
211-
config = config.toBuilder().serDes(durableConfig.getSerDes()).build();
240+
config = config.toBuilder().serDes(getDurableConfig().getSerDes()).build();
212241
}
213242
if (config.payloadSerDes() == null) {
214-
config = config.toBuilder().payloadSerDes(durableConfig.getSerDes()).build();
243+
config = config.toBuilder()
244+
.payloadSerDes(getDurableConfig().getSerDes())
245+
.build();
215246
}
216247
var operationId = nextOperationId();
217248

218249
// Create and start invoke operation
219-
var operation = new InvokeOperation<>(
220-
operationId, name, functionName, payload, typeToken, config, executionManager, contextId);
250+
var operation = new InvokeOperation<>(operationId, name, functionName, payload, typeToken, config, this);
221251

222252
operation.execute(); // checkpoint the invoke operation
223253
return operation; // Block (will throw SuspendExecutionException if needed)
@@ -239,11 +269,11 @@ public <T> DurableCallbackFuture<T> createCallback(String name, Class<T> resultT
239269

240270
public <T> DurableCallbackFuture<T> createCallback(String name, TypeToken<T> typeToken, CallbackConfig config) {
241271
if (config.serDes() == null) {
242-
config = config.toBuilder().serDes(durableConfig.getSerDes()).build();
272+
config = config.toBuilder().serDes(getDurableConfig().getSerDes()).build();
243273
}
244274
var operationId = nextOperationId();
245275

246-
var operation = new CallbackOperation<>(operationId, name, typeToken, config, executionManager, contextId);
276+
var operation = new CallbackOperation<>(operationId, name, typeToken, config, this);
247277
operation.execute();
248278

249279
return operation;
@@ -270,69 +300,25 @@ public <T> DurableFuture<T> runInChildContextAsync(
270300
var operationId = nextOperationId();
271301

272302
var operation = new ChildContextOperation<>(
273-
operationId,
274-
name,
275-
func,
276-
typeToken,
277-
durableConfig.getSerDes(),
278-
executionManager,
279-
durableConfig,
280-
lambdaContext,
281-
contextId);
303+
operationId, name, func, typeToken, getDurableConfig().getSerDes(), this);
282304

283305
operation.execute();
284306
return operation;
285307
}
286308

287309
// =============== accessors ================
288-
289-
public Context getLambdaContext() {
290-
return lambdaContext;
291-
}
292-
293310
public DurableLogger getLogger() {
294311
return logger;
295312
}
296313

297-
/**
298-
* Returns metadata about the current durable execution.
299-
*
300-
* <p>The execution context provides information that remains constant throughout the execution lifecycle, such as
301-
* the durable execution ARN. This is useful for tracking execution progress, correlating logs, and referencing this
302-
* execution in external systems.
303-
*
304-
* @return the execution context
305-
*/
306-
public ExecutionContext getExecutionContext() {
307-
return executionContext;
308-
}
309-
310-
// ============= internal utilities ===============
311-
312-
/** Gets the context ID for this context. Null for root context, set for child contexts. */
313-
String getContextId() {
314-
return contextId;
315-
}
316-
317-
/** Returns whether this context is currently in replay mode. */
318-
boolean isReplaying() {
319-
return isReplaying;
320-
}
321-
322-
/**
323-
* Transitions this context from replay to execution mode. Called when the first un-cached operation is encountered.
324-
*/
325-
void setExecutionMode() {
326-
this.isReplaying = false;
327-
}
328-
329314
/**
330315
* Get the next operationId. For root contexts, returns sequential IDs like "1", "2", "3". For child contexts,
331-
* prefixes with the contextId to ensure global uniqueness, e.g. "1-1", "1-2" for operations inside child context
332-
* "1". This matches the JavaScript SDK's stepPrefix convention and prevents ID collisions in checkpoint batches.
316+
* prefixes with the getContextId() to ensure global uniqueness, e.g. "1-1", "1-2" for operations inside child
317+
* context "1". This matches the JavaScript SDK's stepPrefix convention and prevents ID collisions in checkpoint
318+
* batches.
333319
*/
334320
private String nextOperationId() {
335321
var counter = String.valueOf(operationCounter.incrementAndGet());
336-
return contextId != null ? contextId + "-" + counter : counter;
322+
return getParentContextId() != null ? getParentContextId() + "-" + counter : counter;
337323
}
338324
}

0 commit comments

Comments
 (0)