-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathStepOperation.java
More file actions
229 lines (201 loc) · 9.9 KB
/
Copy pathStepOperation.java
File metadata and controls
229 lines (201 loc) · 9.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.operation;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import software.amazon.awssdk.services.lambda.model.ErrorObject;
import software.amazon.awssdk.services.lambda.model.Operation;
import software.amazon.awssdk.services.lambda.model.OperationAction;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
import software.amazon.awssdk.services.lambda.model.StepOptions;
import software.amazon.lambda.durable.StepContext;
import software.amazon.lambda.durable.TypeToken;
import software.amazon.lambda.durable.config.StepConfig;
import software.amazon.lambda.durable.config.StepSemantics;
import software.amazon.lambda.durable.context.BaseContextImpl;
import software.amazon.lambda.durable.context.DurableContextImpl;
import software.amazon.lambda.durable.exception.DurableOperationException;
import software.amazon.lambda.durable.exception.StepFailedException;
import software.amazon.lambda.durable.exception.StepInterruptedException;
import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException;
import software.amazon.lambda.durable.execution.SuspendExecutionException;
import software.amazon.lambda.durable.execution.ThreadType;
import software.amazon.lambda.durable.logging.DurableLogger;
import software.amazon.lambda.durable.model.OperationIdentifier;
import software.amazon.lambda.durable.util.ExceptionHelper;
/**
* Durable operation that executes a user-provided function with retry support.
*
* <p>Steps are the primary unit of work in a durable execution. Each step is checkpointed before and after execution,
* enabling automatic retry on failure and replay on re-invocation.
*
* @param <T> the result type of the step function
*/
public class StepOperation<T> extends SerializableDurableOperation<T> {
private static final Integer FIRST_ATTEMPT = 1;
private final Function<StepContext, T> function;
private final StepConfig config;
public StepOperation(
OperationIdentifier operationIdentifier,
Function<StepContext, T> function,
TypeToken<T> resultTypeToken,
StepConfig config,
DurableContextImpl durableContext) {
super(operationIdentifier, resultTypeToken, config.serDes(), durableContext);
this.function = function;
this.config = config;
}
/** Starts the operation. */
@Override
protected void start() {
executeStepLogic(FIRST_ATTEMPT);
}
/** Replays the operation. */
@Override
protected void replay(Operation existing) {
var attempt = existing.stepDetails() != null && existing.stepDetails().attempt() != null
? existing.stepDetails().attempt() + 1
: FIRST_ATTEMPT;
switch (existing.status()) {
case SUCCEEDED, FAILED -> markAlreadyCompleted();
case STARTED -> {
if (isAtMostOnce()) {
// AT_MOST_ONCE: treat as interrupted, go through retry logic
handleStepFailure(new StepInterruptedException(existing), attempt);
} else {
// AT_LEAST_ONCE: re-execute the step
executeStepLogic(attempt);
}
}
// Step is pending retry - Start polling for PENDING -> READY transition
case PENDING -> {
if (existing.stepDetails() != null && existing.stepDetails().nextAttemptTimestamp() != null) {
pollReadyAndExecuteStepLogic(existing.stepDetails().nextAttemptTimestamp(), attempt);
} else {
throw terminateExecutionWithIllegalDurableOperationException(
"Unexpected PENDING step without nextAttemptTimestamp: " + getOperationId());
}
}
// Execute with current attempt
case READY -> executeStepLogic(attempt);
default ->
throw terminateExecutionWithIllegalDurableOperationException(
"Unexpected step status: " + existing.status());
}
}
private void pollReadyAndExecuteStepLogic(Instant nextAttemptInstant, int attempt) {
pollForOperationUpdates(nextAttemptInstant)
.thenCompose(op -> op.status() == OperationStatus.READY
? CompletableFuture.completedFuture(op)
: pollForOperationUpdates(nextAttemptInstant))
.thenRun(() -> executeStepLogic(attempt));
}
private void executeStepLogic(int attempt) {
Runnable userHandler = () -> {
// use a try-with-resources to
// - add thread id/type to thread local when the step starts
// - clear logger properties when the step finishes
StepContext stepContext = getContext().createStepContext(getOperationId(), getName(), attempt);
BaseContextImpl.setCurrentContext(stepContext);
try (var ignored = DurableLogger.attachContext()) {
try {
checkpointStarted();
// Execute the function
T result = function.apply(stepContext);
handleStepSucceeded(result);
} catch (Throwable e) {
handleStepFailure(e, attempt);
}
}
};
// Execute user provided step code in user-configured executor
runUserHandler(userHandler, ThreadType.STEP, attempt);
}
private void checkpointStarted() {
// Check if we need to send START
var existing = getOperation();
if (existing == null || existing.status() != OperationStatus.STARTED) {
var startUpdate = OperationUpdate.builder().action(OperationAction.START);
if (isAtMostOnce()) {
// AT_MOST_ONCE: await START checkpoint before executing user code
sendOperationUpdate(startUpdate);
} else {
// AT_LEAST_ONCE: fire-and-forget START checkpoint
sendOperationUpdateAsync(startUpdate);
}
}
}
private void handleStepSucceeded(T result) {
// Send SUCCEED
var serializedResult = serializeResultWithDeserializedValue(result);
var successUpdate =
OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serializedResult.serialized());
// sendOperationUpdate must be synchronous here. When waiting for the return of this call,
// the context threads waiting for the result of this step operation will be wakened up and registered.
sendOperationUpdate(successUpdate);
}
private void handleStepFailure(Throwable exception, int attempt) {
exception = ExceptionHelper.unwrapCompletableFuture(exception);
if (exception instanceof SuspendExecutionException suspendExecutionException) {
throw suspendExecutionException;
}
if (exception instanceof UnrecoverableDurableExecutionException unrecoverableDurableExecutionException) {
// terminate the execution and throw the exception if it's not recoverable
throw terminateExecution(unrecoverableDurableExecutionException);
}
final ErrorObject errorObject;
if (exception instanceof DurableOperationException durableOperationException) {
errorObject = durableOperationException.getErrorObject();
} else {
errorObject = serializeException(exception);
}
var retryDecision = config.retryStrategy().makeRetryDecision(exception, attempt);
if (retryDecision.shouldRetry()) {
// Send RETRY
var retryDelayInSeconds = Math.toIntExact(retryDecision.delay().toSeconds());
var retryUpdate = OperationUpdate.builder()
.action(OperationAction.RETRY)
.error(errorObject)
.stepOptions(StepOptions.builder()
// RetryDecisions always produce integer number of seconds greater or equals to
// 1 (no sub-second numbers)
.nextAttemptDelaySeconds(retryDelayInSeconds)
.build());
sendOperationUpdate(retryUpdate);
// Poll for READY status and then execute the step again
pollReadyAndExecuteStepLogic(Instant.now().plusSeconds(retryDelayInSeconds), attempt + 1);
} else {
// Send FAIL - retries exhausted
var failUpdate =
OperationUpdate.builder().action(OperationAction.FAIL).error(errorObject);
sendOperationUpdate(failUpdate);
}
}
@Override
public T get() {
var op = waitForOperationCompletion();
if (op.status() == OperationStatus.SUCCEEDED) {
var stepDetails = op.stepDetails();
var result = (stepDetails != null) ? stepDetails.result() : null;
return deserializeResult(result);
} else {
var errorObject = op.stepDetails().error();
// Throw StepInterruptedException directly for AT_MOST_ONCE interrupted steps
if (StepInterruptedException.isStepInterruptedException(errorObject)) {
throw new StepInterruptedException(op);
}
// Attempt to reconstruct and throw the original exception
Throwable original = deserializeException(errorObject);
if (original != null) {
ExceptionHelper.sneakyThrow(original);
}
// Fallback: wrap in StepFailedException
throw new StepFailedException(op);
}
}
private boolean isAtMostOnce() {
return config.semanticsPerRetry() == StepSemantics.AT_MOST_ONCE_PER_RETRY;
}
}