-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathDurableExecutor.java
More file actions
160 lines (140 loc) · 8.05 KB
/
Copy pathDurableExecutor.java
File metadata and controls
160 lines (140 loc) · 8.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package com.amazonaws.lambda.durable;
import com.amazonaws.lambda.durable.exception.DurableOperationException;
import com.amazonaws.lambda.durable.exception.IllegalDurableOperationException;
import com.amazonaws.lambda.durable.exception.UnrecoverableDurableExecutionException;
import com.amazonaws.lambda.durable.execution.ExecutionManager;
import com.amazonaws.lambda.durable.execution.SuspendExecutionException;
import com.amazonaws.lambda.durable.model.DurableExecutionInput;
import com.amazonaws.lambda.durable.model.DurableExecutionOutput;
import com.amazonaws.lambda.durable.serde.SerDes;
import com.amazonaws.lambda.durable.util.ExceptionHelper;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.lambda.model.ErrorObject;
import software.amazon.awssdk.services.lambda.model.Operation;
import software.amazon.awssdk.services.lambda.model.OperationAction;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
public class DurableExecutor {
private static final Logger logger = LoggerFactory.getLogger(DurableExecutor.class);
// Lambda response size limit is 6MB minus small epsilon for envelope
private static final int LAMBDA_RESPONSE_SIZE_LIMIT = 6 * 1024 * 1024 - 50;
public static <I, O> DurableExecutionOutput execute(
DurableExecutionInput input,
Context lambdaContext,
Class<I> inputType,
BiFunction<I, DurableContext, O> handler,
DurableConfig config) {
logger.debug("DurableExecution.execute() called");
logger.debug("DurableExecutionArn: {}", input.durableExecutionArn());
logger.debug(
"Initial operations count: {}",
input.initialExecutionState() != null
&& input.initialExecutionState().operations() != null
? input.initialExecutionState().operations().size()
: 0);
// Validate initial operation is an EXECUTION operation
if (input.initialExecutionState() == null
|| input.initialExecutionState().operations() == null
|| input.initialExecutionState().operations().isEmpty()
|| input.initialExecutionState().operations().get(0).type() != OperationType.EXECUTION) {
throw new IllegalStateException("First operation must be EXECUTION");
}
var executionManager = new ExecutionManager(
input.durableExecutionArn(), input.checkpointToken(), input.initialExecutionState(), config);
logger.debug(
"EXECUTION operation found: {}",
executionManager.getExecutionOperation().id());
var handlerFuture = CompletableFuture.supplyAsync(
() -> {
var userInput =
extractUserInput(executionManager.getExecutionOperation(), config.getSerDes(), inputType);
// Create context in the executor thread so it detects the correct thread name
var context = new DurableContext(executionManager, config, lambdaContext);
return handler.apply(userInput, context);
},
config.getExecutorService()); // Get executor from config for running user code
// Execute the handlerFuture in ExecutionManager. If it completes successfully, the output of user function
// will be returned. Otherwise, it will complete exceptionally with a SuspendExecutionException or a failure.
return executionManager
.runUntilCompleteOrSuspend(handlerFuture)
.handle((result, ex) -> {
if (ex != null) {
// an exception thrown from handlerFuture or suspension/termination occurred
Throwable cause = ExceptionHelper.unwrapCompletableFuture(ex);
if (cause instanceof SuspendExecutionException) {
return DurableExecutionOutput.pending();
}
logger.debug("Execution failed: {}", cause.getMessage());
return DurableExecutionOutput.failure(buildErrorObject(cause, config.getSerDes()));
}
// user handler complete successfully
var outputPayload = config.getSerDes().serialize(result);
logger.debug("Execution completed");
return DurableExecutionOutput.success(handleLargePayload(executionManager, outputPayload));
})
.whenComplete((v, ex) -> {
// We shutdown the execution to make sure remaining checkpoint calls in the queue are drained
// We DO NOT shutdown the executor since it should stay warm for re-invokes against a warm Lambda
// runtime.
// For example, a re-invoke after a wait should re-use the same executor instance from
// DurableConfig.
// userExecutor.shutdown();
executionManager.shutdown();
})
.join();
}
private static String handleLargePayload(ExecutionManager executionManager, String outputPayload) {
// Check if the serialized payload exceeds Lambda response size limit
var payloadSize = outputPayload != null ? outputPayload.getBytes(StandardCharsets.UTF_8).length : 0;
if (payloadSize > LAMBDA_RESPONSE_SIZE_LIMIT) {
logger.debug(
"Response size ({} bytes) exceeds Lambda limit ({} bytes). Checkpointing result.",
payloadSize,
LAMBDA_RESPONSE_SIZE_LIMIT);
// Checkpoint the large result and wait for it to complete
executionManager
.sendOperationUpdate(OperationUpdate.builder()
.type(OperationType.EXECUTION)
.id(executionManager.getExecutionOperation().id())
.action(OperationAction.SUCCEED)
.payload(outputPayload)
.build())
.join();
// Return empty result, we checkpointed the data manually
logger.debug("Execution completed (large response checkpointed)");
return "";
}
// If response size is acceptable, return the result directly
return outputPayload;
}
private static ErrorObject buildErrorObject(Throwable e, SerDes serDes) {
// exceptions thrown from operations, e.g. Step
if (e instanceof DurableOperationException) {
return ((DurableOperationException) e).getErrorObject();
}
if (e instanceof UnrecoverableDurableExecutionException) {
return ((UnrecoverableDurableExecutionException) e).getErrorObject();
}
// exceptions thrown from non-operation code
return ExceptionHelper.buildErrorObject(e, serDes);
}
private static <I> I extractUserInput(Operation executionOp, SerDes serDes, Class<I> inputType) {
if (executionOp.executionDetails() == null) {
throw new IllegalDurableOperationException("EXECUTION operation missing executionDetails");
}
var inputPayload = executionOp.executionDetails().inputPayload();
return serDes.deserialize(inputPayload, TypeToken.get(inputType));
}
public static <I, O> RequestHandler<DurableExecutionInput, DurableExecutionOutput> wrap(
Class<I> inputType, BiFunction<I, DurableContext, O> handler, DurableConfig config) {
return (input, context) -> execute(input, context, inputType, handler, config);
}
}