Skip to content

Commit 4061d4c

Browse files
authored
add retryable exception (#350)
1 parent aae569d commit 4061d4c

3 files changed

Lines changed: 87 additions & 20 deletions

File tree

sdk/src/main/java/software/amazon/lambda/durable/exception/UnrecoverableDurableExecutionException.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,25 @@
77
/** Exception thrown when the execution is not recoverable. The durable execution will be immediately terminated. */
88
public class UnrecoverableDurableExecutionException extends DurableExecutionException {
99
private final ErrorObject errorObject;
10+
private final boolean retryable;
1011

11-
public UnrecoverableDurableExecutionException(ErrorObject errorObject) {
12+
public UnrecoverableDurableExecutionException(ErrorObject errorObject, boolean retryable) {
1213
super(errorObject.errorMessage());
1314
this.errorObject = errorObject;
15+
this.retryable = retryable;
16+
}
17+
18+
public UnrecoverableDurableExecutionException(ErrorObject errorObject) {
19+
this(errorObject, false);
1420
}
1521

1622
/** Returns the error details for this unrecoverable exception. */
1723
public ErrorObject getErrorObject() {
1824
return errorObject;
1925
}
26+
27+
/** Returns true if the execution can be retried. */
28+
public boolean isRetryable() {
29+
return retryable;
30+
}
2031
}

sdk/src/main/java/software/amazon/lambda/durable/execution/DurableExecutor.java

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.amazonaws.services.lambda.runtime.RequestHandler;
77
import java.nio.charset.StandardCharsets;
88
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.CompletionException;
910
import java.util.function.BiFunction;
1011
import org.slf4j.Logger;
1112
import org.slf4j.LoggerFactory;
@@ -65,26 +66,43 @@ public static <I, O> DurableExecutionOutput execute(
6566
// Execute the handlerFuture in ExecutionManager. If it completes successfully, the output of user function
6667
// will be returned. Otherwise, it will complete exceptionally with a SuspendExecutionException or a
6768
// failure.
68-
return executionManager
69-
.runUntilCompleteOrSuspend(handlerFuture)
70-
.handle((result, ex) -> {
71-
if (ex != null) {
72-
// an exception thrown from handlerFuture or suspension/termination occurred
73-
Throwable cause = ExceptionHelper.unwrapCompletableFuture(ex);
74-
if (cause instanceof SuspendExecutionException) {
75-
return DurableExecutionOutput.pending();
69+
try {
70+
return executionManager
71+
.runUntilCompleteOrSuspend(handlerFuture)
72+
.handle((result, ex) -> {
73+
if (ex != null) {
74+
// an exception thrown from handlerFuture or suspension/termination occurred
75+
Throwable cause = ExceptionHelper.unwrapCompletableFuture(ex);
76+
77+
// return PENDING if it's SuspendExecutionException
78+
if (cause instanceof SuspendExecutionException) {
79+
return DurableExecutionOutput.pending();
80+
}
81+
82+
// let the backend retry the invocation if the exception is retryable
83+
if (cause
84+
instanceof
85+
UnrecoverableDurableExecutionException
86+
unrecoverableDurableExecutionException
87+
&& unrecoverableDurableExecutionException.isRetryable()) {
88+
throw unrecoverableDurableExecutionException;
89+
}
90+
91+
// fail the execution otherwise
92+
logger.debug("Execution failed: {}", cause.getMessage());
93+
return DurableExecutionOutput.failure(buildErrorObject(cause, config.getSerDes()));
7694
}
77-
78-
logger.debug("Execution failed: {}", cause.getMessage());
79-
return DurableExecutionOutput.failure(buildErrorObject(cause, config.getSerDes()));
80-
}
81-
// user handler complete successfully
82-
var outputPayload = config.getSerDes().serialize(result);
83-
84-
logger.debug("Execution completed");
85-
return DurableExecutionOutput.success(handleLargePayload(executionManager, outputPayload));
86-
})
87-
.join();
95+
// user handler complete successfully
96+
logger.debug("Execution completed");
97+
var outputPayload = config.getSerDes().serialize(result);
98+
return DurableExecutionOutput.success(handleLargePayload(executionManager, outputPayload));
99+
})
100+
.join();
101+
} catch (CompletionException e) {
102+
// unwrap the CompletionException and rethrow the wrapped exception
103+
ExceptionHelper.sneakyThrow(ExceptionHelper.unwrapCompletableFuture(e));
104+
return null;
105+
}
88106
}
89107
}
90108

sdk/src/test/java/software/amazon/lambda/durable/execution/DurableExecutionTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@
1414
import java.util.concurrent.ExecutorService;
1515
import org.junit.jupiter.api.Test;
1616
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
17+
import software.amazon.awssdk.services.lambda.model.ErrorObject;
1718
import software.amazon.awssdk.services.lambda.model.ExecutionDetails;
1819
import software.amazon.awssdk.services.lambda.model.Operation;
1920
import software.amazon.awssdk.services.lambda.model.OperationStatus;
2021
import software.amazon.awssdk.services.lambda.model.OperationType;
2122
import software.amazon.awssdk.services.lambda.model.StepDetails;
2223
import software.amazon.lambda.durable.DurableConfig;
2324
import software.amazon.lambda.durable.TestUtils;
25+
import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException;
2426
import software.amazon.lambda.durable.model.DurableExecutionInput;
2527
import software.amazon.lambda.durable.model.ExecutionStatus;
2628

@@ -134,6 +136,42 @@ void testExecuteFailure() {
134136
assertEquals("Test error", output.error().errorMessage());
135137
}
136138

139+
@Test
140+
void testRetryableExceptions() {
141+
var executionOp = Operation.builder()
142+
.id(EXECUTION_OP_ID)
143+
.type(OperationType.EXECUTION)
144+
.status(OperationStatus.STARTED)
145+
.executionDetails(ExecutionDetails.builder()
146+
.inputPayload("\"test-input\"")
147+
.build())
148+
.build();
149+
150+
var input = new DurableExecutionInput(
151+
EXECUTION_ARN,
152+
"token1",
153+
CheckpointUpdatedExecutionState.builder()
154+
.operations(List.of(executionOp))
155+
.build());
156+
157+
UnrecoverableDurableExecutionException ex = assertThrows(
158+
UnrecoverableDurableExecutionException.class,
159+
() -> DurableExecutor.execute(
160+
input,
161+
null,
162+
get(String.class),
163+
(userInput, ctx) -> {
164+
throw new UnrecoverableDurableExecutionException(
165+
ErrorObject.builder()
166+
.errorMessage("Test error")
167+
.build(),
168+
true);
169+
},
170+
configWithMockClient()));
171+
172+
assertTrue(ex.isRetryable());
173+
}
174+
137175
@Test
138176
void testExecuteReplay() {
139177
var executionOp = Operation.builder()

0 commit comments

Comments
 (0)