Skip to content

Commit 8c443bd

Browse files
committed
add ExecutionOperation as a special durable operation
1 parent 73c482c commit 8c443bd

9 files changed

Lines changed: 226 additions & 144 deletions

File tree

sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,16 @@
66
import java.time.Duration;
77
import java.util.Objects;
88
import java.util.concurrent.atomic.AtomicInteger;
9+
import java.util.function.BiFunction;
910
import java.util.function.Function;
1011
import java.util.function.Supplier;
1112
import org.slf4j.LoggerFactory;
1213
import software.amazon.lambda.durable.execution.ExecutionManager;
1314
import software.amazon.lambda.durable.logging.DurableLogger;
15+
import software.amazon.lambda.durable.model.DurableExecutionOutput;
1416
import software.amazon.lambda.durable.operation.CallbackOperation;
1517
import software.amazon.lambda.durable.operation.ChildContextOperation;
18+
import software.amazon.lambda.durable.operation.ExecutionOperation;
1619
import software.amazon.lambda.durable.operation.InvokeOperation;
1720
import software.amazon.lambda.durable.operation.StepOperation;
1821
import software.amazon.lambda.durable.operation.WaitOperation;
@@ -298,6 +301,28 @@ public <T> DurableFuture<T> runInChildContextAsync(
298301
return operation;
299302
}
300303

304+
// =============== execute ================
305+
306+
/**
307+
* Executes a durable function with automatic checkpointing and replay support.
308+
*
309+
* <p>This is the entry point for durable execution. The function will be executed with the ability to checkpoint
310+
* state and resume from the last checkpoint on subsequent invocations. The execution automatically handles
311+
* serialization, deserialization, and replay of operations.
312+
*
313+
* @param <I> the input type for the durable function
314+
* @param <O> the output type returned by the durable function
315+
* @param inputType the Class object representing the input type, used for deserialization
316+
* @param func the durable function to execute, receiving the deserialized input and this DurableContext
317+
* @return a DurableExecutionOutput containing the execution result and metadata
318+
*/
319+
<I, O> DurableExecutionOutput execute(Class<I> inputType, BiFunction<I, DurableContext, O> func) {
320+
var operation = new ExecutionOperation<>(
321+
getExecutionManager().getExecutionOperation().id(), inputType, func, this);
322+
operation.execute();
323+
return operation.get();
324+
}
325+
301326
// =============== accessors ================
302327
/**
303328
* Returns a logger with execution context information for replay-aware logging.

sdk/src/main/java/software/amazon/lambda/durable/DurableExecutor.java

Lines changed: 6 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -4,136 +4,27 @@
44

55
import com.amazonaws.services.lambda.runtime.Context;
66
import com.amazonaws.services.lambda.runtime.RequestHandler;
7-
import java.nio.charset.StandardCharsets;
8-
import java.util.concurrent.CompletableFuture;
97
import java.util.function.BiFunction;
10-
import org.slf4j.Logger;
11-
import org.slf4j.LoggerFactory;
12-
import software.amazon.awssdk.services.lambda.model.ErrorObject;
13-
import software.amazon.awssdk.services.lambda.model.Operation;
14-
import software.amazon.awssdk.services.lambda.model.OperationAction;
15-
import software.amazon.awssdk.services.lambda.model.OperationType;
16-
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
17-
import software.amazon.lambda.durable.exception.DurableOperationException;
18-
import software.amazon.lambda.durable.exception.IllegalDurableOperationException;
19-
import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException;
208
import software.amazon.lambda.durable.execution.ExecutionManager;
21-
import software.amazon.lambda.durable.execution.SuspendExecutionException;
22-
import software.amazon.lambda.durable.execution.ThreadContext;
23-
import software.amazon.lambda.durable.execution.ThreadType;
249
import software.amazon.lambda.durable.model.DurableExecutionInput;
2510
import software.amazon.lambda.durable.model.DurableExecutionOutput;
26-
import software.amazon.lambda.durable.serde.SerDes;
27-
import software.amazon.lambda.durable.util.ExceptionHelper;
2811

2912
public class DurableExecutor {
30-
private static final Logger logger = LoggerFactory.getLogger(DurableExecutor.class);
31-
32-
// Lambda response size limit is 6MB minus small epsilon for envelope
33-
private static final int LAMBDA_RESPONSE_SIZE_LIMIT = 6 * 1024 * 1024 - 50;
3413

3514
public static <I, O> DurableExecutionOutput execute(
3615
DurableExecutionInput input,
3716
Context lambdaContext,
3817
Class<I> inputType,
3918
BiFunction<I, DurableContext, O> handler,
4019
DurableConfig config) {
41-
var executionManager = new ExecutionManager(
42-
input.durableExecutionArn(), input.checkpointToken(), input.initialExecutionState(), config);
43-
44-
var handlerFuture = CompletableFuture.supplyAsync(
45-
() -> {
46-
var userInput =
47-
extractUserInput(executionManager.getExecutionOperation(), config.getSerDes(), inputType);
48-
// Create context in the executor thread so it detects the correct thread name
49-
var context = DurableContext.createRootContext(executionManager, config, lambdaContext);
50-
executionManager.registerActiveThread(null);
51-
executionManager.setCurrentThreadContext(new ThreadContext(null, ThreadType.CONTEXT));
52-
return handler.apply(userInput, context);
53-
},
54-
config.getExecutorService()); // Get executor from config for running user code
55-
56-
// Execute the handlerFuture in ExecutionManager. If it completes successfully, the output of user function
57-
// will be returned. Otherwise, it will complete exceptionally with a SuspendExecutionException or a failure.
58-
return executionManager
59-
.runUntilCompleteOrSuspend(handlerFuture)
60-
.handle((result, ex) -> {
61-
if (ex != null) {
62-
// an exception thrown from handlerFuture or suspension/termination occurred
63-
Throwable cause = ExceptionHelper.unwrapCompletableFuture(ex);
64-
if (cause instanceof SuspendExecutionException) {
65-
return DurableExecutionOutput.pending();
66-
}
67-
68-
logger.debug("Execution failed: {}", cause.getMessage());
69-
return DurableExecutionOutput.failure(buildErrorObject(cause, config.getSerDes()));
70-
}
71-
// user handler complete successfully
72-
var outputPayload = config.getSerDes().serialize(result);
73-
74-
logger.debug("Execution completed");
75-
return DurableExecutionOutput.success(handleLargePayload(executionManager, outputPayload));
76-
})
77-
.whenComplete((v, ex) -> {
78-
// We shutdown the execution to make sure remaining checkpoint calls in the queue are drained
79-
// We DO NOT shutdown the executor since it should stay warm for re-invokes against a warm Lambda
80-
// runtime.
81-
// For example, a re-invoke after a wait should re-use the same executor instance from
82-
// DurableConfig.
83-
// userExecutor.shutdown();
84-
executionManager.shutdown();
85-
})
86-
.join();
87-
}
88-
89-
private static String handleLargePayload(ExecutionManager executionManager, String outputPayload) {
90-
// Check if the serialized payload exceeds Lambda response size limit
91-
var payloadSize = outputPayload != null ? outputPayload.getBytes(StandardCharsets.UTF_8).length : 0;
92-
93-
if (payloadSize > LAMBDA_RESPONSE_SIZE_LIMIT) {
94-
logger.debug(
95-
"Response size ({} bytes) exceeds Lambda limit ({} bytes). Checkpointing result.",
96-
payloadSize,
97-
LAMBDA_RESPONSE_SIZE_LIMIT);
98-
99-
// Checkpoint the large result and wait for it to complete
100-
executionManager
101-
.sendOperationUpdate(OperationUpdate.builder()
102-
.type(OperationType.EXECUTION)
103-
.id(executionManager.getExecutionOperation().id())
104-
.action(OperationAction.SUCCEED)
105-
.payload(outputPayload)
106-
.build())
107-
.join();
10820

109-
// Return empty result, we checkpointed the data manually
110-
logger.debug("Execution completed (large response checkpointed)");
111-
return "";
21+
// We shut down the executionManager to make sure remaining checkpoint calls in the queue are drained
22+
// We DO NOT shut down the executor since it should stay warm for re-invokes against a warm Lambda runtime.
23+
// For example, a re-invoke after a wait should re-use the same executor instance from DurableConfig.
24+
try (var executionManager = new ExecutionManager(input, config)) {
25+
var context = DurableContext.createRootContext(executionManager, config, lambdaContext);
26+
return context.execute(inputType, handler);
11227
}
113-
114-
// If response size is acceptable, return the result directly
115-
return outputPayload;
116-
}
117-
118-
private static ErrorObject buildErrorObject(Throwable e, SerDes serDes) {
119-
// exceptions thrown from operations, e.g. Step
120-
if (e instanceof DurableOperationException) {
121-
return ((DurableOperationException) e).getErrorObject();
122-
}
123-
if (e instanceof UnrecoverableDurableExecutionException) {
124-
return ((UnrecoverableDurableExecutionException) e).getErrorObject();
125-
}
126-
// exceptions thrown from non-operation code
127-
return ExceptionHelper.buildErrorObject(e, serDes);
128-
}
129-
130-
private static <I> I extractUserInput(Operation executionOp, SerDes serDes, Class<I> inputType) {
131-
if (executionOp.executionDetails() == null) {
132-
throw new IllegalDurableOperationException("EXECUTION operation missing executionDetails");
133-
}
134-
135-
var inputPayload = executionOp.executionDetails().inputPayload();
136-
return serDes.deserialize(inputPayload, TypeToken.get(inputType));
13728
}
13829

13930
public static <I, O> RequestHandler<DurableExecutionInput, DurableExecutionOutput> wrap(

sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
2323
import software.amazon.lambda.durable.DurableConfig;
2424
import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException;
25+
import software.amazon.lambda.durable.model.DurableExecutionInput;
2526
import software.amazon.lambda.durable.operation.BaseDurableOperation;
2627

2728
/**
@@ -45,7 +46,7 @@
4546
*
4647
* @see InternalExecutor
4748
*/
48-
public class ExecutionManager {
49+
public class ExecutionManager implements AutoCloseable {
4950

5051
private static final Logger logger = LoggerFactory.getLogger(ExecutionManager.class);
5152

@@ -65,25 +66,21 @@ public class ExecutionManager {
6566
// ===== Checkpoint Batching =====
6667
private final CheckpointBatcher checkpointBatcher;
6768

68-
public ExecutionManager(
69-
String durableExecutionArn,
70-
String checkpointToken,
71-
CheckpointUpdatedExecutionState initialExecutionState,
72-
DurableConfig config) {
73-
this.durableExecutionArn = durableExecutionArn;
69+
public ExecutionManager(DurableExecutionInput input, DurableConfig config) {
70+
this.durableExecutionArn = input.durableExecutionArn();
7471

7572
// Create checkpoint batcher for internal coordination
7673
this.checkpointBatcher =
77-
new CheckpointBatcher(config, durableExecutionArn, checkpointToken, this::onCheckpointComplete);
74+
new CheckpointBatcher(config, durableExecutionArn, input.checkpointToken(), this::onCheckpointComplete);
7875

79-
this.operationStorage = checkpointBatcher.fetchAllPages(initialExecutionState).stream()
76+
this.operationStorage = checkpointBatcher.fetchAllPages(input.initialExecutionState()).stream()
8077
.collect(Collectors.toConcurrentMap(Operation::id, op -> op));
8178

8279
// Start in REPLAY mode if we have more than just the initial EXECUTION operation
8380
this.executionMode =
8481
new AtomicReference<>(operationStorage.size() > 1 ? ExecutionMode.REPLAY : ExecutionMode.EXECUTION);
8582

86-
executionOp = findExecutionOp(initialExecutionState);
83+
executionOp = findExecutionOp(input.initialExecutionState());
8784

8885
// Validate initial operation is an EXECUTION operation
8986
if (executionOp == null) {
@@ -248,7 +245,17 @@ public CompletableFuture<Operation> pollForOperationUpdates(String operationId,
248245
}
249246

250247
// ===== Utilities =====
251-
public void shutdown() {
248+
249+
/**
250+
* Closes this stream and releases any system resources associated with it. If the stream is already closed then
251+
* invoking this method has no effect.
252+
*
253+
* <p>As noted in {@link AutoCloseable#close()}, cases where the close may fail require careful attention. It is
254+
* strongly advised to relinquish the underlying resources and to internally <em>mark</em> the {@code Closeable} as
255+
* closed, prior to throwing the {@code IOException}.
256+
*/
257+
@Override
258+
public void close() {
252259
checkpointBatcher.shutdown();
253260
}
254261

sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,8 @@ private void checkpointSuccess(T result) {
150150
private void handleChildContextFailure(Throwable exception) {
151151
exception = ExceptionHelper.unwrapCompletableFuture(exception);
152152
if (exception instanceof SuspendExecutionException) {
153-
// Rethrow Error immediately — do not checkpoint
154-
ExceptionHelper.sneakyThrow(exception);
153+
// The execution is going to suspend. do nothing
154+
return;
155155
}
156156
if (exception instanceof UnrecoverableDurableExecutionException) {
157157
terminateExecution((UnrecoverableDurableExecutionException) exception);

0 commit comments

Comments
 (0)