Skip to content

Commit 3ae487b

Browse files
authored
fix exception types for child executions (aws#265)
1 parent 9a014e1 commit 3ae487b

13 files changed

Lines changed: 337 additions & 11 deletions

File tree

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples.map;
4+
5+
import java.time.Duration;
6+
import java.util.List;
7+
import software.amazon.lambda.durable.DurableContext;
8+
import software.amazon.lambda.durable.DurableHandler;
9+
import software.amazon.lambda.durable.TypeToken;
10+
import software.amazon.lambda.durable.config.MapConfig;
11+
import software.amazon.lambda.durable.examples.types.GreetingRequest;
12+
import software.amazon.lambda.durable.exception.SerDesException;
13+
import software.amazon.lambda.durable.serde.JacksonSerDes;
14+
15+
/**
16+
* Example demonstrating the map operation with the Durable Execution SDK.
17+
*
18+
* <p>This handler processes a list of names concurrently using {@code map()}, where each item runs in its own child
19+
* context with full checkpoint-and-replay support.
20+
*
21+
* <ol>
22+
* <li>Create a list of names from the input
23+
* <li>Map over each name concurrently, applying a greeting transformation via a durable step
24+
* <li>Collect and join the results
25+
* </ol>
26+
*/
27+
public class DeserializationFailedMapExample extends DurableHandler<GreetingRequest, String> {
28+
29+
@Override
30+
public String handleRequest(GreetingRequest input, DurableContext context) {
31+
var name = input.getName();
32+
context.getLogger().info("Starting map example for {}", name);
33+
34+
var names = List.of(name, name.toUpperCase(), name.toLowerCase());
35+
36+
// Map over each name concurrently — each iteration runs in its own child context
37+
var result = context.map(
38+
"greet-all",
39+
names,
40+
String.class,
41+
(item, index, ctx) -> {
42+
return ctx.step("greet-" + index, String.class, stepCtx -> {
43+
throw new RuntimeException("Failure from " + item + "!");
44+
});
45+
},
46+
MapConfig.builder().serDes(new FailedSerDes()).build());
47+
48+
context.getLogger().info("Map completed: allSucceeded={}, size={}", result.allSucceeded(), result.size());
49+
50+
context.wait("suspend and replay", Duration.ofSeconds(1));
51+
52+
return result.getError(0).errorMessage();
53+
}
54+
55+
private static class FailedSerDes extends JacksonSerDes {
56+
57+
@Override
58+
public <T> T deserialize(String json, TypeToken<T> typeToken) {
59+
T result = super.deserialize(json, typeToken);
60+
if (result instanceof RuntimeException ex) {
61+
throw new SerDesException("Deserialization failed", ex);
62+
}
63+
return result;
64+
}
65+
}
66+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples.parallel;
4+
5+
import java.util.List;
6+
import software.amazon.lambda.durable.DurableContext;
7+
import software.amazon.lambda.durable.DurableHandler;
8+
import software.amazon.lambda.durable.ParallelDurableFuture;
9+
import software.amazon.lambda.durable.TypeToken;
10+
import software.amazon.lambda.durable.config.ParallelBranchConfig;
11+
import software.amazon.lambda.durable.config.ParallelConfig;
12+
import software.amazon.lambda.durable.exception.SerDesException;
13+
import software.amazon.lambda.durable.serde.JacksonSerDes;
14+
15+
/**
16+
* Example demonstrating parallel branch execution with the Durable Execution SDK.
17+
*
18+
* <p>This handler processes a list of items concurrently using {@code context.parallel()}:
19+
*
20+
* <ol>
21+
* <li>Each item is processed in its own branch (child context)
22+
* <li>All branches run concurrently and their results are collected
23+
* <li>A final step combines the results into a summary
24+
* </ol>
25+
*
26+
* <p>The {@link ParallelDurableFuture} implements {@link AutoCloseable}, so try-with-resources guarantees
27+
* {@code join()} is called even if an exception occurs.
28+
*/
29+
public class DeserializationFailedParallelExample
30+
extends DurableHandler<DeserializationFailedParallelExample.Input, String> {
31+
32+
public record Input(List<String> items) {}
33+
34+
@Override
35+
public String handleRequest(Input input, DurableContext context) {
36+
var logger = context.getLogger();
37+
var items = input.items();
38+
logger.info("Starting parallel processing of {} items", items.size());
39+
40+
var config = ParallelConfig.builder().build();
41+
42+
var parallel = context.parallel("process-items", config);
43+
44+
try (parallel) {
45+
var future = parallel.branch(
46+
"process",
47+
String.class,
48+
branchCtx -> {
49+
return branchCtx.step("transform", String.class, stepCtx -> {
50+
throw new RuntimeException("Intentional failure for transform");
51+
});
52+
},
53+
ParallelBranchConfig.builder().serDes(new FailedSerDes()).build());
54+
55+
parallel.get();
56+
try {
57+
return future.get();
58+
} catch (Exception e) {
59+
return e.getMessage();
60+
}
61+
}
62+
}
63+
64+
private static class FailedSerDes extends JacksonSerDes {
65+
66+
@Override
67+
public <T> T deserialize(String json, TypeToken<T> typeToken) {
68+
T result = super.deserialize(json, typeToken);
69+
if (result instanceof RuntimeException ex) {
70+
throw new SerDesException("Deserialization failed", ex);
71+
}
72+
return result;
73+
}
74+
}
75+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples.step;
4+
5+
import java.time.Duration;
6+
import software.amazon.lambda.durable.DurableContext;
7+
import software.amazon.lambda.durable.DurableHandler;
8+
import software.amazon.lambda.durable.TypeToken;
9+
import software.amazon.lambda.durable.config.StepConfig;
10+
import software.amazon.lambda.durable.exception.SerDesException;
11+
import software.amazon.lambda.durable.serde.JacksonSerDes;
12+
13+
public class DeserializationFailureExample extends DurableHandler<String, String> {
14+
15+
@Override
16+
public String handleRequest(String input, DurableContext context) {
17+
try {
18+
context.step(
19+
"fail-step",
20+
String.class,
21+
stepCtx -> {
22+
throw new RuntimeException("this is a test");
23+
},
24+
StepConfig.builder().serDes(new FailedSerDes()).build());
25+
} catch (Exception e) {
26+
context.wait("suspend and replay", Duration.ofSeconds(1));
27+
return e.getClass().getSimpleName() + ":" + e.getMessage();
28+
}
29+
30+
throw new IllegalStateException("should not reach here");
31+
}
32+
33+
private static class FailedSerDes extends JacksonSerDes {
34+
35+
@Override
36+
public <T> T deserialize(String json, TypeToken<T> typeToken) {
37+
T result = super.deserialize(json, typeToken);
38+
if (result instanceof RuntimeException ex) {
39+
throw new SerDesException("Deserialization failed", ex);
40+
}
41+
return result;
42+
}
43+
}
44+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples.map;
4+
5+
import static org.junit.jupiter.api.Assertions.assertEquals;
6+
7+
import org.junit.jupiter.api.Test;
8+
import software.amazon.lambda.durable.examples.types.GreetingRequest;
9+
import software.amazon.lambda.durable.model.ExecutionStatus;
10+
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
11+
12+
class DeserializationFailedMapExampleTest {
13+
14+
@Test
15+
void testDeserializationFailedExample() {
16+
var handler = new DeserializationFailedMapExample();
17+
var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler);
18+
19+
var result = runner.runUntilComplete(new GreetingRequest("Alice"));
20+
21+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
22+
assertEquals(
23+
"Map iteration failed with error of type java.lang.RuntimeException. Message: Failure from Alice!",
24+
result.getResult(String.class));
25+
}
26+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples.parallel;
4+
5+
import static org.junit.jupiter.api.Assertions.assertEquals;
6+
7+
import java.util.List;
8+
import org.junit.jupiter.api.Test;
9+
import software.amazon.lambda.durable.model.ExecutionStatus;
10+
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
11+
12+
class DeserializationFailedParallelExampleTest {
13+
14+
@Test
15+
void testDeserializationFailedParallelExample() {
16+
var handler = new DeserializationFailedParallelExample();
17+
var runner = LocalDurableTestRunner.create(DeserializationFailedParallelExample.Input.class, handler);
18+
19+
var input = new DeserializationFailedParallelExample.Input(List.of("apple", "banana", "cherry"));
20+
var result = runner.runUntilComplete(input);
21+
22+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
23+
24+
var output = result.getResult(String.class);
25+
assertEquals(
26+
"Parallel branch failed with error of type java.lang.RuntimeException. Message: Intentional failure for transform",
27+
output);
28+
}
29+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples.step;
4+
5+
import static org.junit.jupiter.api.Assertions.assertEquals;
6+
7+
import org.junit.jupiter.api.Test;
8+
import software.amazon.lambda.durable.model.ExecutionStatus;
9+
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
10+
11+
class DeserializationFailureExampleTest {
12+
13+
@Test
14+
void testDeserializationExample() {
15+
var handler = new DeserializationFailureExample();
16+
17+
// Create test runner from handler (automatically extracts config)
18+
var runner = LocalDurableTestRunner.create(String.class, handler);
19+
20+
// Run with input
21+
var result = runner.runUntilComplete("test-input");
22+
23+
// Verify result
24+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
25+
26+
// assert StepFailedException is thrown when SerDes fails to deserialize the exception
27+
assertEquals(
28+
"StepFailedException:Step failed with error of type java.lang.RuntimeException. Message: this is a test",
29+
result.getResult(String.class));
30+
}
31+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.exception;
4+
5+
import software.amazon.awssdk.services.lambda.model.ErrorObject;
6+
import software.amazon.awssdk.services.lambda.model.Operation;
7+
8+
/** Thrown when a map iteration fails and deserialization of the original exception also fails. */
9+
public class MapIterationFailedException extends DurableOperationException {
10+
public MapIterationFailedException(Operation operation) {
11+
super(operation, getError(operation), formatMessage(getError(operation)));
12+
}
13+
14+
private static ErrorObject getError(Operation operation) {
15+
return operation.contextDetails() != null ? operation.contextDetails().error() : null;
16+
}
17+
18+
private static String formatMessage(ErrorObject errorObject) {
19+
if (errorObject == null) {
20+
return "Map iteration failed without an error";
21+
}
22+
return String.format(
23+
"Map iteration failed with error of type %s. Message: %s",
24+
errorObject.errorType(), errorObject.errorMessage());
25+
}
26+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.exception;
4+
5+
import software.amazon.awssdk.services.lambda.model.ErrorObject;
6+
import software.amazon.awssdk.services.lambda.model.Operation;
7+
8+
/** Thrown when a parallel branch fails and deserialization of the original exception also fails. */
9+
public class ParallelBranchFailedException extends DurableOperationException {
10+
public ParallelBranchFailedException(Operation operation) {
11+
super(operation, getError(operation), formatMessage(getError(operation)));
12+
}
13+
14+
private static ErrorObject getError(Operation operation) {
15+
return operation.contextDetails() != null ? operation.contextDetails().error() : null;
16+
}
17+
18+
private static String formatMessage(ErrorObject errorObject) {
19+
if (errorObject == null) {
20+
return "Parallel branch failed without an error";
21+
}
22+
return String.format(
23+
"Parallel branch failed with error of type %s. Message: %s",
24+
errorObject.errorType(), errorObject.errorMessage());
25+
}
26+
}

sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import software.amazon.lambda.durable.exception.CallbackTimeoutException;
2424
import software.amazon.lambda.durable.exception.ChildContextFailedException;
2525
import software.amazon.lambda.durable.exception.DurableOperationException;
26+
import software.amazon.lambda.durable.exception.MapIterationFailedException;
27+
import software.amazon.lambda.durable.exception.ParallelBranchFailedException;
2628
import software.amazon.lambda.durable.exception.StepFailedException;
2729
import software.amazon.lambda.durable.exception.StepInterruptedException;
2830
import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException;
@@ -217,12 +219,13 @@ public T get() {
217219
// throw a general failed exception if a user exception is not reconstructed
218220
return switch (getSubType()) {
219221
case WAIT_FOR_CALLBACK -> handleWaitForCallbackFailure();
220-
case MAP -> throw new ChildContextFailedException(op);
221-
case MAP_ITERATION -> throw new ChildContextFailedException(op);
222-
case PARALLEL -> throw new ChildContextFailedException(op);
223-
case PARALLEL_BRANCH -> throw new ChildContextFailedException(op);
222+
case MAP_ITERATION -> throw new MapIterationFailedException(op);
223+
case PARALLEL_BRANCH -> throw new ParallelBranchFailedException(op);
224224
case RUN_IN_CHILD_CONTEXT -> throw new ChildContextFailedException(op);
225-
case WAIT_FOR_CONDITION -> throw new ChildContextFailedException(op);
225+
226+
// the following subtypes should not be able to reach here
227+
case PARALLEL, MAP, WAIT_FOR_CONDITION ->
228+
throw new IllegalStateException("Unexpected sub-type: " + getSubType());
226229
};
227230
}
228231
}

sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ protected <R> ChildContextOperation<R> createItem(
116116
this);
117117
}
118118

119-
/** Called when the concurrency operation succeeds. Subclasses define checkpointing behavior. */
120-
protected abstract void handleSuccess(ConcurrencyCompletionStatus concurrencyCompletionStatus);
119+
/** Called when the concurrency operation completes. Subclasses define checkpointing behavior. */
120+
protected abstract void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletionStatus);
121121

122122
// ========== Concurrency control ==========
123123

@@ -173,7 +173,7 @@ protected void executeItems() {
173173
}
174174
var completionStatus = canComplete(succeededCount, failedCount, runningChildren);
175175
if (completionStatus != null) {
176-
handleSuccess(completionStatus);
176+
handleCompletion(completionStatus);
177177
return;
178178
}
179179

0 commit comments

Comments
 (0)