Skip to content

Commit c42f4f4

Browse files
authored
[feat]: support FLAT nesting type for map/parallel (#335)
* [feat]: support FLAT nesting type for map/parallel
1 parent 8712f6b commit c42f4f4

23 files changed

Lines changed: 1248 additions & 485 deletions

File tree

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples.map;
4+
5+
import java.time.Duration;
6+
import java.util.List;
7+
import java.util.stream.Collectors;
8+
import java.util.stream.IntStream;
9+
import software.amazon.lambda.durable.DurableContext;
10+
import software.amazon.lambda.durable.DurableHandler;
11+
import software.amazon.lambda.durable.config.CompletionConfig;
12+
import software.amazon.lambda.durable.config.MapConfig;
13+
import software.amazon.lambda.durable.config.NestingType;
14+
15+
/**
16+
* Example demonstrating advanced map features: wait operations inside branches, error handling, and early termination.
17+
*
18+
* <ol>
19+
* <li>Concurrent map with step + wait + step inside each branch — simulates multi-stage order processing with a
20+
* cooldown between stages
21+
* <li>Early termination with {@code minSuccessful(2)} — finds 2 healthy servers then stops
22+
* </ol>
23+
*/
24+
public class ComplexFlatMapExample extends DurableHandler<Integer, String> {
25+
26+
@Override
27+
public String handleRequest(Integer input, DurableContext context) {
28+
context.getLogger().info("Starting complex map example with {} items", input);
29+
30+
// Part 1: Concurrent map with step + wait inside each branch
31+
var orderIds = IntStream.range(1, input + 1).mapToObj(x -> "order-" + x).collect(Collectors.toList());
32+
33+
var orderResult = context.map(
34+
"process-orders",
35+
orderIds,
36+
String.class,
37+
(orderId, index, ctx) -> {
38+
// Step 1: validate the order
39+
var validated = ctx.step("validate-" + index, String.class, stepCtx -> "validated:" + orderId);
40+
41+
// Wait between stages (simulates a cooldown or external dependency)
42+
ctx.wait("cooldown-" + index, Duration.ofSeconds(1));
43+
44+
// Step 2: finalize the order
45+
return ctx.step("finalize-" + index, String.class, stepCtx -> "done:" + validated);
46+
},
47+
MapConfig.builder().nestingType(NestingType.FLAT).build());
48+
49+
var orderSummary = String.join(", ", orderResult.results());
50+
51+
// Part 2: Early termination — find 2 healthy servers then stop
52+
var servers = List.of("server-1", "server-2", "server-3", "server-4", "server-5");
53+
var earlyTermConfig = MapConfig.builder()
54+
.completionConfig(CompletionConfig.minSuccessful(2))
55+
.nestingType(NestingType.FLAT)
56+
.build();
57+
58+
var serverResult = context.map(
59+
"find-healthy-servers",
60+
servers,
61+
String.class,
62+
(server, index, ctx) -> ctx.step("health-check-" + index, String.class, stepCtx -> server + ":healthy"),
63+
earlyTermConfig);
64+
65+
var healthyServers = serverResult.succeeded().stream().collect(Collectors.joining(", "));
66+
67+
return String.format(
68+
"orders=[%s] | servers=[%s] reason=%s", orderSummary, healthyServers, serverResult.completionReason());
69+
}
70+
}

examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,25 @@ void testComplexMapExample() {
674674
assertTrue(output.contains("reason=MIN_SUCCESSFUL_REACHED"));
675675
}
676676

677+
@Test
678+
void testComplexFlatMapExample() {
679+
var runner = CloudDurableTestRunner.create(arn("complex-flat-map-example"), Integer.class, String.class);
680+
var result = runner.run(100);
681+
682+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
683+
var output = result.getResult();
684+
assertNotNull(output);
685+
686+
// Part 1: Concurrent order processing with step + wait + step
687+
assertTrue(output.contains("done:validated:order-1"));
688+
assertTrue(output.contains("done:validated:order-2"));
689+
assertTrue(output.contains("done:validated:order-100"));
690+
691+
// Part 2: Early termination — find 2 healthy servers then stop
692+
assertTrue(output.contains("healthy"));
693+
assertTrue(output.contains("reason=MIN_SUCCESSFUL_REACHED"));
694+
}
695+
677696
@Test
678697
void testWaitForConditionExample() {
679698
var runner = CloudDurableTestRunner.create(
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples.map;
4+
5+
import static org.junit.jupiter.api.Assertions.assertEquals;
6+
import static org.junit.jupiter.api.Assertions.assertTrue;
7+
8+
import org.junit.jupiter.api.Test;
9+
import software.amazon.lambda.durable.model.ExecutionStatus;
10+
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
11+
12+
class ComplexFlatMapExampleTest {
13+
14+
@Test
15+
void testComplexMapExample() {
16+
var handler = new ComplexFlatMapExample();
17+
var runner = LocalDurableTestRunner.create(Integer.class, handler);
18+
19+
var result = runner.runUntilComplete(50);
20+
21+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
22+
var output = result.getResult(String.class);
23+
24+
// Part 1: all 3 orders processed with step + wait + step
25+
assertTrue(output.contains("done:validated:order-1"));
26+
assertTrue(output.contains("done:validated:order-2"));
27+
assertTrue(output.contains("done:validated:order-50"));
28+
29+
// Part 2: early termination after 2 healthy servers
30+
assertTrue(output.contains("reason=MIN_SUCCESSFUL_REACHED"));
31+
assertTrue(output.contains("healthy"));
32+
}
33+
34+
@Test
35+
void testReplay() {
36+
var handler = new ComplexFlatMapExample();
37+
var runner = LocalDurableTestRunner.create(Integer.class, handler);
38+
39+
var result1 = runner.runUntilComplete(50);
40+
assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus());
41+
42+
// Replay — should use cached results.
43+
// Structural assertion because the first map has wait() inside branches with unlimited
44+
// concurrency, which can cause non-deterministic thread scheduling across invocations.
45+
var result2 = runner.runUntilComplete(50);
46+
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
47+
var output = result2.getResult(String.class);
48+
assertTrue(output.contains("done:validated:order-1"));
49+
assertTrue(output.contains("done:validated:order-2"));
50+
assertTrue(output.contains("done:validated:order-50"));
51+
assertTrue(output.contains("reason=MIN_SUCCESSFUL_REACHED"));
52+
assertTrue(output.contains("healthy"));
53+
}
54+
}

examples/template.yaml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,23 @@ Resources:
364364
- lambda:GetDurableExecutionState
365365
Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:complex-map-example-${JavaVersion}-runtime"
366366

367+
ComplexFlatMapExampleFunction:
368+
Type: AWS::Serverless::Function
369+
Properties:
370+
FunctionName: !Join
371+
- '-'
372+
- - 'complex-flat-map-example'
373+
- !Ref JavaVersion
374+
- runtime
375+
Handler: "software.amazon.lambda.durable.examples.map.ComplexFlatMapExample"
376+
Policies:
377+
- Statement:
378+
- Effect: Allow
379+
Action:
380+
- lambda:CheckpointDurableExecutions
381+
- lambda:GetDurableExecutionState
382+
Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:complex-flat-map-example-${JavaVersion}-runtime"
383+
367384
WaitForConditionExampleFunction:
368385
Type: AWS::Serverless::Function
369386
Properties:
@@ -497,6 +514,10 @@ Outputs:
497514
Description: Complex Map Example Function ARN
498515
Value: !GetAtt ComplexMapExampleFunction.Arn
499516

517+
ComplexFlatMapExampleFunction:
518+
Description: Complex Flat Map Example Function ARN
519+
Value: !GetAtt ComplexFlatMapExampleFunction.Arn
520+
500521
WaitForConditionExampleFunction:
501522
Description: Wait For Condition Example Function ARN
502523
Value: !GetAtt WaitForConditionExampleFunction.Arn

0 commit comments

Comments
 (0)