Skip to content

Commit f2bdcb9

Browse files
committed
fix exception type
1 parent f2a1ac3 commit f2bdcb9

9 files changed

Lines changed: 228 additions & 14 deletions

File tree

examples/src/test/java/software/amazon/lambda/durable/examples/CallbackExampleTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static org.junit.jupiter.api.Assertions.*;
66

77
import org.junit.jupiter.api.Test;
8+
import software.amazon.awssdk.services.lambda.model.ErrorObject;
89
import software.amazon.awssdk.services.lambda.model.OperationStatus;
910
import software.amazon.awssdk.services.lambda.model.OperationType;
1011
import software.amazon.lambda.durable.model.ExecutionStatus;
@@ -61,4 +62,38 @@ void testCallbackExampleCompletesAfterApproval() {
6162
"Approval request for: New laptop ($1500.0) - Sent to preapprover - Approved by manager",
6263
result.getResult(String.class));
6364
}
65+
66+
@Test
67+
void testCallbackExampleFail() {
68+
var handler = new CallbackExample();
69+
var runner = LocalDurableTestRunner.create(ApprovalRequest.class, handler);
70+
71+
var input = new ApprovalRequest("New laptop", 1500.00);
72+
73+
// First run - suspends waiting for callback
74+
var result = runner.run(input);
75+
assertEquals(ExecutionStatus.PENDING, result.getStatus());
76+
77+
// Simulate external system approving the request
78+
var callbackId = runner.getCallbackId("approval");
79+
runner.completeCallback(callbackId, "\"Approved by manager\"");
80+
81+
result = runner.run(input);
82+
assertEquals(ExecutionStatus.PENDING, result.getStatus());
83+
84+
// second run - pending preapproval
85+
var preapprovalCallbackId = runner.getCallbackId("preapproval-callback");
86+
runner.failCallback(
87+
preapprovalCallbackId,
88+
ErrorObject.builder()
89+
.errorType("error type")
90+
.errorMessage("error message")
91+
.build());
92+
93+
// third run - callback complete, finishes processing
94+
result = runner.run(input);
95+
96+
assertEquals(ExecutionStatus.FAILED, result.getStatus());
97+
assertEquals("error message", result.getError().get().errorMessage());
98+
}
6499
}

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/CallbackIntegrationTest.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import software.amazon.awssdk.services.lambda.model.ErrorObject;
1111
import software.amazon.awssdk.services.lambda.model.OperationStatus;
1212
import software.amazon.awssdk.services.lambda.model.OperationType;
13+
import software.amazon.lambda.durable.exception.CallbackFailedException;
14+
import software.amazon.lambda.durable.exception.CallbackTimeoutException;
1315
import software.amazon.lambda.durable.model.ExecutionStatus;
1416
import software.amazon.lambda.durable.serde.JacksonSerDes;
1517
import software.amazon.lambda.durable.serde.SerDes;
@@ -259,4 +261,92 @@ void callbackFailedExceptionHandlesVariousErrorFormats() {
259261
assertNotNull(result.getError().get().stackTrace());
260262
assertEquals(1, result.getError().get().stackTrace().size());
261263
}
264+
265+
@Test
266+
void waitForCallbackCallbackFailed() {
267+
var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> {
268+
try {
269+
ctx.waitForCallback("approval", String.class, (callbackId, stepCtx) -> {});
270+
fail();
271+
return "should not reach here";
272+
} catch (Exception e) {
273+
assertInstanceOf(CallbackFailedException.class, e);
274+
throw e;
275+
}
276+
});
277+
278+
// First run - creates callback
279+
runner.run("test");
280+
281+
// Fail callback with errorType, errorMessage, and stack trace
282+
var callbackId = runner.getCallbackId("approval-callback");
283+
var error = ErrorObject.builder()
284+
.errorType("ValidationError")
285+
.errorMessage("Invalid input data")
286+
.stackTrace(java.util.List.of("com.example.Service|validate|Service.java|42"))
287+
.build();
288+
runner.failCallback(callbackId, error);
289+
290+
// Second run - should fail with formatted message and preserved stack trace
291+
var result = runner.run("test");
292+
assertEquals(ExecutionStatus.FAILED, result.getStatus());
293+
assertTrue(result.getError().isPresent());
294+
assertEquals("ValidationError", result.getError().get().errorType());
295+
assertEquals("Invalid input data", result.getError().get().errorMessage());
296+
assertNotNull(result.getError().get().stackTrace());
297+
assertEquals(1, result.getError().get().stackTrace().size());
298+
}
299+
300+
@Test
301+
void waitForCallbackCallbackTimeout() {
302+
var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> {
303+
try {
304+
ctx.waitForCallback("approval", String.class, (callbackId, stepCtx) -> {});
305+
fail();
306+
return "should not reach here";
307+
} catch (Exception e) {
308+
assertInstanceOf(CallbackTimeoutException.class, e);
309+
throw e;
310+
}
311+
});
312+
313+
// First run - creates callback
314+
runner.run("test");
315+
316+
// Fail callback with errorType, errorMessage, and stack trace
317+
var callbackId = runner.getCallbackId("approval-callback");
318+
runner.timeoutCallback(callbackId);
319+
320+
// Second run - should fail with formatted message and preserved stack trace
321+
var result = runner.run("test");
322+
assertEquals(ExecutionStatus.FAILED, result.getStatus());
323+
}
324+
325+
@Test
326+
void waitForCallbackCallbackFailedWithUserException() {
327+
var runner = LocalDurableTestRunner.create(String.class, (input, ctx) -> {
328+
var errorMessage = "user exception";
329+
try {
330+
return ctx.waitForCallback("approval", String.class, (callbackId, stepCtx) -> {
331+
// original exception
332+
throw new IllegalArgumentException(errorMessage);
333+
});
334+
} catch (Exception e) {
335+
assertInstanceOf(IllegalArgumentException.class, e);
336+
assertEquals(errorMessage, e.getMessage());
337+
throw e;
338+
}
339+
});
340+
341+
// First run - creates callback
342+
runner.run("test");
343+
344+
// Fail callback with errorType, errorMessage, and stack trace
345+
var callbackId = runner.getCallbackId("approval-callback");
346+
runner.timeoutCallback(callbackId);
347+
348+
// Second run - should fail with formatted message and preserved stack trace
349+
var result = runner.runUntilComplete("test");
350+
assertEquals(ExecutionStatus.FAILED, result.getStatus());
351+
}
262352
}

sdk/src/main/java/software/amazon/lambda/durable/exception/CallbackException.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@ public class CallbackException extends DurableOperationException {
88
private final String callbackId;
99

1010
public CallbackException(Operation operation, String message) {
11-
super(operation, operation.callbackDetails().error(), message);
11+
this(operation, message, null);
12+
}
13+
14+
public CallbackException(Operation operation, String message, Throwable cause) {
15+
super(operation, operation.callbackDetails().error(), message, cause);
1216
this.callbackId = operation.callbackDetails().callbackId();
1317
}
1418

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.exception;
4+
5+
import software.amazon.awssdk.services.lambda.model.Operation;
6+
7+
/** Exception thrown when a callback submitter step fails to submit a callback. */
8+
public class CallbackSubmitterException extends CallbackException {
9+
public CallbackSubmitterException(Operation callbackOp, StepException stepEx) {
10+
super(callbackOp, stepEx.getMessage(), stepEx);
11+
}
12+
}

sdk/src/main/java/software/amazon/lambda/durable/exception/DurableOperationException.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,26 @@ public DurableOperationException(Operation operation, ErrorObject errorObject) {
1616
}
1717

1818
public DurableOperationException(Operation operation, ErrorObject errorObject, String errorMessage) {
19+
this(operation, errorObject, errorMessage, null);
20+
}
21+
22+
public DurableOperationException(
23+
Operation operation, ErrorObject errorObject, String errorMessage, Throwable cause) {
1924
this(
2025
operation,
2126
errorObject,
2227
errorMessage,
23-
errorObject != null ? ExceptionHelper.deserializeStackTrace(errorObject.stackTrace()) : null);
28+
errorObject != null ? ExceptionHelper.deserializeStackTrace(errorObject.stackTrace()) : null,
29+
cause);
2430
}
2531

2632
public DurableOperationException(
27-
Operation operation, ErrorObject errorObject, String errorMessage, StackTraceElement[] stackTrace) {
28-
super(errorMessage, null, stackTrace);
33+
Operation operation,
34+
ErrorObject errorObject,
35+
String errorMessage,
36+
StackTraceElement[] stackTrace,
37+
Throwable cause) {
38+
super(errorMessage, cause, stackTrace);
2939
this.operation = operation;
3040
this.errorObject = errorObject;
3141
}

sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package software.amazon.lambda.durable.execution;
44

55
import java.time.Duration;
6+
import java.util.ArrayList;
67
import java.util.Collections;
78
import java.util.HashMap;
89
import java.util.HashSet;
@@ -120,6 +121,22 @@ private void onCheckpointComplete(List<Operation> newOperations) {
120121
});
121122
}
122123

124+
/**
125+
* Gets all child operations for a given operationId.
126+
*
127+
* @param operationId the operationId to get children for
128+
* @return List of child operations for the given operationId
129+
*/
130+
public List<Operation> getChildOperations(String operationId) {
131+
var children = new ArrayList<Operation>();
132+
for (Operation op : operationStorage.values()) {
133+
if (Objects.equals(op.parentId(), operationId)) {
134+
children.add(op);
135+
}
136+
}
137+
return children;
138+
}
139+
123140
/**
124141
* Gets an operation by its globally unique operationId, and updates replay state. Transitions from REPLAY to
125142
* EXECUTION mode if the operation is not found or is not in a terminal state (still in progress).

sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package software.amazon.lambda.durable.operation;
44

55
import java.time.Duration;
6+
import java.util.List;
67
import java.util.Objects;
78
import java.util.concurrent.CompletableFuture;
89
import org.slf4j.Logger;
@@ -128,6 +129,16 @@ protected Operation getOperation() {
128129
return executionManager.getOperationAndUpdateReplayState(getOperationId());
129130
}
130131

132+
/**
133+
* Gets the direct child Operations of a give context operation.
134+
*
135+
* @param operationId the operation id of the context
136+
* @return list of the child Operations
137+
*/
138+
protected List<Operation> getChildOperations(String operationId) {
139+
return executionManager.getChildOperations(operationId);
140+
}
141+
131142
/**
132143
* Checks if it's called from a Step.
133144
*

sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,13 @@
1515
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
1616
import software.amazon.lambda.durable.DurableContext;
1717
import software.amazon.lambda.durable.TypeToken;
18+
import software.amazon.lambda.durable.exception.CallbackFailedException;
19+
import software.amazon.lambda.durable.exception.CallbackSubmitterException;
20+
import software.amazon.lambda.durable.exception.CallbackTimeoutException;
1821
import software.amazon.lambda.durable.exception.ChildContextFailedException;
1922
import software.amazon.lambda.durable.exception.DurableOperationException;
23+
import software.amazon.lambda.durable.exception.StepFailedException;
24+
import software.amazon.lambda.durable.exception.StepInterruptedException;
2025
import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException;
2126
import software.amazon.lambda.durable.execution.SuspendExecutionException;
2227
import software.amazon.lambda.durable.execution.ThreadContext;
@@ -196,8 +201,39 @@ public T get() {
196201
if (original != null) {
197202
ExceptionHelper.sneakyThrow(original);
198203
}
199-
// Fallback: wrap in ChildContextFailedException
200-
throw new ChildContextFailedException(op);
204+
205+
// throw a general failed exception if a user exception is not reconstructed
206+
switch (subType) {
207+
case WAIT_FOR_CALLBACK: {
208+
var childrenOps = getChildOperations(op.id());
209+
var callbackOp = childrenOps.stream()
210+
.filter(o -> o.type() == OperationType.CALLBACK)
211+
.findFirst()
212+
.orElse(null);
213+
var stepOp = childrenOps.stream()
214+
.filter(o -> o.type() == OperationType.STEP)
215+
.findFirst()
216+
.orElse(null);
217+
if (callbackOp != null) {
218+
switch (callbackOp.status()) {
219+
case FAILED -> throw new CallbackFailedException(callbackOp);
220+
case TIMED_OUT -> throw new CallbackTimeoutException(callbackOp);
221+
default -> throw new ChildContextFailedException(op);
222+
}
223+
}
224+
if (stepOp != null) {
225+
var stepError = stepOp.stepDetails().error();
226+
if (StepInterruptedException.isStepInterruptedException(stepError)) {
227+
throw new CallbackSubmitterException(callbackOp, new StepInterruptedException(stepOp));
228+
} else {
229+
throw new CallbackSubmitterException(callbackOp, new StepFailedException(stepOp));
230+
}
231+
}
232+
}
233+
// todo: add cases for MAP/PARALLEL
234+
default:
235+
throw new ChildContextFailedException(op);
236+
}
201237
}
202238
}
203239
}

sdk/src/test/java/software/amazon/lambda/durable/operation/ChildContextOperationTest.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ private DurableConfig createConfig() {
5050
}
5151

5252
private ChildContextOperation<String> createOperation(
53-
ExecutionManager executionManager,
5453
java.util.function.Function<software.amazon.lambda.durable.DurableContext, String> func) {
5554
return new ChildContextOperation<>(
5655
"1",
@@ -79,7 +78,7 @@ void replaySucceededReturnsCachedResult() {
7978
.build());
8079

8180
var functionCalled = new AtomicBoolean(false);
82-
var operation = createOperation(executionManager, ctx -> {
81+
var operation = createOperation(ctx -> {
8382
functionCalled.set(true);
8483
return "should-not-execute";
8584
});
@@ -116,7 +115,7 @@ void replayFailedThrowsOriginalException() {
116115
.build());
117116

118117
var functionCalled = new AtomicBoolean(false);
119-
var operation = createOperation(executionManager, ctx -> {
118+
var operation = createOperation(ctx -> {
120119
functionCalled.set(true);
121120
return "should-not-execute";
122121
});
@@ -146,7 +145,7 @@ void replayFailedFallsBackToChildContextFailedException() {
146145
.build())
147146
.build());
148147

149-
var operation = createOperation(executionManager, ctx -> "unused");
148+
var operation = createOperation(ctx -> "unused");
150149
operation.execute();
151150

152151
var thrown = assertThrows(ChildContextFailedException.class, operation::get);
@@ -170,7 +169,7 @@ void replayStartedReExecutesChildContext() throws Exception {
170169
when(executionManager.hasOperationsForContext("1")).thenReturn(false);
171170

172171
var functionCalled = new AtomicBoolean(false);
173-
var operation = createOperation(executionManager, ctx -> {
172+
var operation = createOperation(ctx -> {
174173
functionCalled.set(true);
175174
return "re-executed";
176175
});
@@ -199,7 +198,7 @@ void replayChildrenReExecutesToReconstructResult() throws Exception {
199198
when(executionManager.hasOperationsForContext("1")).thenReturn(false);
200199

201200
var functionCalled = new AtomicBoolean(false);
202-
var operation = createOperation(executionManager, ctx -> {
201+
var operation = createOperation(ctx -> {
203202
functionCalled.set(true);
204203
return "reconstructed-value";
205204
});
@@ -224,7 +223,7 @@ void replayWithTypeMismatchTerminatesExecution() {
224223
.status(OperationStatus.SUCCEEDED)
225224
.build());
226225

227-
var operation = createOperation(executionManager, ctx -> "unused");
226+
var operation = createOperation(ctx -> "unused");
228227

229228
assertThrows(NonDeterministicExecutionException.class, operation::execute);
230229
}
@@ -242,7 +241,7 @@ void replayWithNameMismatchTerminatesExecution() {
242241
ContextDetails.builder().result("\"value\"").build())
243242
.build());
244243

245-
var operation = createOperation(executionManager, ctx -> "unused");
244+
var operation = createOperation(ctx -> "unused");
246245

247246
assertThrows(NonDeterministicExecutionException.class, operation::execute);
248247
}

0 commit comments

Comments
 (0)