Skip to content

Commit 94ed5a7

Browse files
committed
add more tests
1 parent de17465 commit 94ed5a7

2 files changed

Lines changed: 116 additions & 47 deletions

File tree

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java

Lines changed: 105 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import software.amazon.lambda.durable.model.MapResult;
2121
import software.amazon.lambda.durable.model.WaitForConditionResult;
2222
import software.amazon.lambda.durable.retry.WaitStrategies;
23+
import software.amazon.lambda.durable.serde.JacksonSerDes;
2324
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
2425

2526
class MapIntegrationTest {
@@ -725,8 +726,8 @@ void testMapWithWaitInsideBranches_replay(NestingType nestingType, int events) {
725726
}
726727

727728
@ParameterizedTest
728-
@CsvSource({"FLAT, 6", "NESTED, 18"})
729-
void testNestedMap_replay(NestingType nestingType, int events) {
729+
@CsvSource({"FLAT, FLAT, 6", "FLAT, NESTED, 14", "NESTED, FLAT, 10", "NESTED, NESTED, 18"})
730+
void testNestedMap_replay(NestingType outerNestingType, NestingType innerNestingType, int events) {
730731
var executionCount = new AtomicInteger(0);
731732

732733
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
@@ -745,10 +746,12 @@ void testNestedMap_replay(NestingType nestingType, int events) {
745746
executionCount.incrementAndGet();
746747
return item.toUpperCase();
747748
},
748-
MapConfig.builder().nestingType(nestingType).build());
749+
MapConfig.builder()
750+
.nestingType(innerNestingType)
751+
.build());
749752
return String.join("+", innerResult.results());
750753
},
751-
MapConfig.builder().nestingType(nestingType).build());
754+
MapConfig.builder().nestingType(outerNestingType).build());
752755

753756
return String.join("|", outerResult.results());
754757
});
@@ -766,12 +769,14 @@ void testNestedMap_replay(NestingType nestingType, int events) {
766769
assertEquals(events, result2.getHistoryEvents().size());
767770
}
768771

769-
@Test
770-
void testMapWithToleratedFailurePercentage() {
772+
@ParameterizedTest
773+
@CsvSource({"FLAT, 2", "NESTED, 16"})
774+
void testMapWithToleratedFailurePercentage(NestingType nestingType, int events) {
771775
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
772776
var items = List.of("ok1", "FAIL1", "ok2", "FAIL2", "ok3", "FAIL3", "ok4");
773777
var config = MapConfig.builder()
774778
.completionConfig(CompletionConfig.toleratedFailurePercentage(0.3))
779+
.nestingType(nestingType)
775780
.build();
776781
var result = context.map(
777782
"pct-fail",
@@ -791,16 +796,19 @@ void testMapWithToleratedFailurePercentage() {
791796

792797
var result = runner.runUntilComplete("test");
793798
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
799+
assertEquals(events, result.getHistoryEvents().size());
794800
}
795801

796-
@Test
797-
void testMapWithToleratedFailurePercentage_replay() {
802+
@ParameterizedTest
803+
@CsvSource({"FLAT, 2", "NESTED, 16"})
804+
void testMapWithToleratedFailurePercentage_replay(NestingType nestingType, int events) {
798805
var executionCount = new AtomicInteger(0);
799806

800807
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
801808
var items = List.of("ok1", "FAIL1", "ok2", "FAIL2", "ok3", "FAIL3", "ok4");
802809
var config = MapConfig.builder()
803810
.completionConfig(CompletionConfig.toleratedFailurePercentage(0.3))
811+
.nestingType(nestingType)
804812
.build();
805813
var result = context.map(
806814
"pct-fail-replay",
@@ -822,23 +830,31 @@ void testMapWithToleratedFailurePercentage_replay() {
822830
var result1 = runner.runUntilComplete("test");
823831
assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus());
824832
var firstRunCount = executionCount.get();
833+
assertEquals(events, result1.getHistoryEvents().size());
825834

826835
// Replay — with unlimited concurrency, children replay simultaneously.
827836
// Verify completionReason is consistent and no re-execution occurs.
828837
var result2 = runner.run("test");
829838
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
830839
assertEquals(firstRunCount, executionCount.get(), "Map functions should not re-execute on replay");
840+
assertEquals(events, result2.getHistoryEvents().size());
831841
}
832842

833-
@Test
834-
void testMapAsyncWithWaitInsideBranches() {
843+
@ParameterizedTest
844+
@CsvSource({"FLAT, 12", "NESTED, 16"})
845+
void testMapAsyncWithWaitInsideBranches(NestingType nestingType, int events) {
835846
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
836847
var items = List.of("a", "b");
837-
var future = context.mapAsync("async-wait-map", items, String.class, (item, index, ctx) -> {
838-
var stepped = ctx.step("process-" + index, String.class, stepCtx -> item.toUpperCase());
839-
ctx.wait("pause-" + index, Duration.ofSeconds(1));
840-
return stepped + "-done";
841-
});
848+
var future = context.mapAsync(
849+
"async-wait-map",
850+
items,
851+
String.class,
852+
(item, index, ctx) -> {
853+
var stepped = ctx.step("process-" + index, String.class, stepCtx -> item.toUpperCase());
854+
ctx.wait("pause-" + index, Duration.ofSeconds(1));
855+
return stepped + "-done";
856+
},
857+
MapConfig.builder().nestingType(nestingType).build());
842858

843859
var other = context.step("other", String.class, stepCtx -> "OTHER");
844860
var mapResult = future.get();
@@ -850,14 +866,19 @@ void testMapAsyncWithWaitInsideBranches() {
850866
var result = runner.runUntilComplete("test");
851867
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
852868
assertEquals("OTHER:A-done,B-done", result.getResult(String.class));
869+
assertEquals(events, result.getHistoryEvents().size());
853870
}
854871

855-
@Test
856-
void testMapWithCustomSerDes() {
857-
var customSerDes = new software.amazon.lambda.durable.serde.JacksonSerDes();
872+
@ParameterizedTest
873+
@CsvSource({"FLAT, 2", "NESTED, 6"})
874+
void testMapWithCustomSerDes(NestingType nestingType, int events) {
875+
var customSerDes = new JacksonSerDes();
858876
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
859877
var items = List.of("a", "b");
860-
var config = MapConfig.builder().serDes(customSerDes).build();
878+
var config = MapConfig.builder()
879+
.serDes(customSerDes)
880+
.nestingType(nestingType)
881+
.build();
861882
var result = context.map(
862883
"custom-serdes-map", items, String.class, (item, index, ctx) -> item.toUpperCase(), config);
863884

@@ -868,16 +889,25 @@ void testMapWithCustomSerDes() {
868889
var result = runner.runUntilComplete("test");
869890
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
870891
assertEquals("A,B", result.getResult(String.class));
892+
assertEquals(events, result.getHistoryEvents().size());
871893
}
872894

873-
@Test
874-
void testMapWithGenericResultType() {
895+
@ParameterizedTest
896+
@CsvSource({"FLAT, 6", "NESTED, 10"})
897+
void testMapWithGenericResultType(NestingType nestingType, int events) {
875898
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
876899
var items = List.of("a,b", "c,d");
877-
var result = context.map("generic-map", items, new TypeToken<List<String>>() {}, (item, index, ctx) -> {
878-
return ctx.step(
879-
"split-" + index, new TypeToken<List<String>>() {}, stepCtx -> List.of(item.split(",")));
880-
});
900+
var result = context.map(
901+
"generic-map",
902+
items,
903+
new TypeToken<List<String>>() {},
904+
(item, index, ctx) -> {
905+
return ctx.step(
906+
"split-" + index,
907+
new TypeToken<List<String>>() {},
908+
stepCtx -> List.of(item.split(",")));
909+
},
910+
MapConfig.builder().nestingType(nestingType).build());
881911

882912
assertTrue(result.allSucceeded());
883913
assertEquals(List.of("a", "b"), result.getResult(0));
@@ -887,13 +917,18 @@ void testMapWithGenericResultType() {
887917

888918
var result = runner.runUntilComplete("test");
889919
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
920+
assertEquals(events, result.getHistoryEvents().size());
890921
}
891922

892-
@Test
893-
void testMapWithWaitInsideBranches_maxConcurrency1() {
923+
@ParameterizedTest
924+
@CsvSource({"FLAT, 10", "NESTED, 14"})
925+
void testMapWithWaitInsideBranches_maxConcurrency1(NestingType nestingType, int events) {
894926
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
895927
var items = List.of("a", "b");
896-
var config = MapConfig.builder().maxConcurrency(1).build();
928+
var config = MapConfig.builder()
929+
.maxConcurrency(1)
930+
.nestingType(nestingType)
931+
.build();
897932
var result = context.map(
898933
"seq-wait-map",
899934
items,
@@ -920,22 +955,25 @@ void testMapWithWaitInsideBranches_maxConcurrency1() {
920955
if (result.getStatus() != ExecutionStatus.PENDING) {
921956
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
922957
assertEquals("A-done,B-done", result.getResult(String.class));
958+
assertEquals(events, result.getHistoryEvents().size());
923959
return;
924960
}
925961
runner.advanceTime();
926962
}
927963
fail("Expected SUCCEEDED within 10 invocations");
928964
}
929965

930-
@Test
931-
void testMapWithMinSuccessful_replay() {
966+
@ParameterizedTest
967+
@CsvSource({"FLAT, 2", "NESTED, 6"})
968+
void testMapWithMinSuccessful_replay(NestingType nestingType, int events) {
932969
var executionCount = new AtomicInteger(0);
933970

934971
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
935972
var items = List.of("a", "b", "c", "d", "e");
936973
var config = MapConfig.builder()
937974
.maxConcurrency(1)
938975
.completionConfig(CompletionConfig.minSuccessful(2))
976+
.nestingType(nestingType)
939977
.build();
940978
var result = context.map(
941979
"min-success-replay",
@@ -956,23 +994,31 @@ void testMapWithMinSuccessful_replay() {
956994
var result1 = runner.runUntilComplete("test");
957995
assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus());
958996
var firstRunCount = executionCount.get();
997+
assertEquals(events, result1.getHistoryEvents().size());
959998

960999
// Replay — small result path: deserialize MapResult from payload, no child replay
9611000
var result2 = runner.run("test");
9621001
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
9631002
assertEquals(firstRunCount, executionCount.get(), "Map functions should not re-execute on replay");
1003+
assertEquals(events, result2.getHistoryEvents().size());
9641004
}
9651005

966-
@Test
967-
void testMapAsyncWithInterleavedWork_replay() {
1006+
@ParameterizedTest
1007+
@CsvSource({"FLAT, 8", "NESTED, 12"})
1008+
void testMapAsyncWithInterleavedWork_replay(NestingType nestingType, int events) {
9681009
var executionCount = new AtomicInteger(0);
9691010

9701011
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
9711012
var items = List.of("x", "y");
972-
var future = context.mapAsync("async-replay-map", items, String.class, (item, index, ctx) -> {
973-
executionCount.incrementAndGet();
974-
return ctx.step("process-" + index, String.class, stepCtx -> item.toUpperCase());
975-
});
1013+
var future = context.mapAsync(
1014+
"async-replay-map",
1015+
items,
1016+
String.class,
1017+
(item, index, ctx) -> {
1018+
executionCount.incrementAndGet();
1019+
return ctx.step("process-" + index, String.class, stepCtx -> item.toUpperCase());
1020+
},
1021+
MapConfig.builder().nestingType(nestingType).build());
9761022

9771023
var other = context.step("other-work", String.class, stepCtx -> "OTHER");
9781024
var mapResult = future.get();
@@ -984,17 +1030,20 @@ void testMapAsyncWithInterleavedWork_replay() {
9841030
var result1 = runner.runUntilComplete("test");
9851031
assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus());
9861032
assertEquals("OTHER:X,Y", result1.getResult(String.class));
1033+
assertEquals(events, result1.getHistoryEvents().size());
9871034
var firstRunCount = executionCount.get();
9881035

9891036
// Replay — async map + interleaved step should all use cached results
9901037
var result2 = runner.run("test");
9911038
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
9921039
assertEquals("OTHER:X,Y", result2.getResult(String.class));
9931040
assertEquals(firstRunCount, executionCount.get(), "Map functions should not re-execute on replay");
1041+
assertEquals(events, result2.getHistoryEvents().size());
9941042
}
9951043

996-
@Test
997-
void testMapWithLargeResult_replayChildren() {
1044+
@ParameterizedTest
1045+
@CsvSource({"FLAT, 2", "NESTED, 202"})
1046+
void testMapWithLargeResult_replayChildren(NestingType nestingType, int events) {
9981047
var executionCount = new AtomicInteger(0);
9991048
// Generate items that produce results exceeding 256KB total to trigger replayChildren path
10001049
var items = new ArrayList<String>();
@@ -1003,11 +1052,16 @@ void testMapWithLargeResult_replayChildren() {
10031052
}
10041053

10051054
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
1006-
var result = context.map("large-result-map", items, String.class, (item, index, ctx) -> {
1007-
executionCount.incrementAndGet();
1008-
// Each item returns ~3KB string to push total well over 256KB
1009-
return item + "-" + "x".repeat(3000);
1010-
});
1055+
var result = context.map(
1056+
"large-result-map",
1057+
items,
1058+
String.class,
1059+
(item, index, ctx) -> {
1060+
executionCount.incrementAndGet();
1061+
// Each item returns ~3KB string to push total well over 256KB
1062+
return item + "-" + "x".repeat(3000);
1063+
},
1064+
MapConfig.builder().nestingType(nestingType).build());
10111065

10121066
assertTrue(result.allSucceeded());
10131067
assertEquals(100, result.size());
@@ -1018,13 +1072,20 @@ void testMapWithLargeResult_replayChildren() {
10181072

10191073
var result1 = runner.runUntilComplete("test");
10201074
assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus());
1075+
assertEquals(events, result1.getHistoryEvents().size());
10211076
var firstRunCount = executionCount.get();
10221077
assertTrue(firstRunCount >= 100);
10231078

10241079
// Replay — large result path: replayChildren=true, children replay from cache
10251080
var result2 = runner.run("test");
10261081
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
1027-
assertEquals(firstRunCount, executionCount.get(), "Map functions should not re-execute on replay");
1082+
if (nestingType == NestingType.FLAT) {
1083+
// in FLAT mode, children will always be replayed
1084+
assertEquals(firstRunCount * 2, executionCount.get(), "Map functions should re-execute on replay");
1085+
} else {
1086+
assertEquals(firstRunCount, executionCount.get(), "Map functions should not re-execute on replay");
1087+
}
1088+
assertEquals(events, result2.getHistoryEvents().size());
10281089
}
10291090

10301091
@Test

sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.ArrayList;
66
import java.util.Collections;
77
import java.util.List;
8+
import java.util.concurrent.atomic.AtomicBoolean;
89
import software.amazon.awssdk.services.lambda.model.ContextOptions;
910
import software.amazon.awssdk.services.lambda.model.Operation;
1011
import software.amazon.awssdk.services.lambda.model.OperationAction;
@@ -42,7 +43,8 @@ public class MapOperation<I, O> extends ConcurrencyOperation<MapResult<O>> {
4243
private final DurableContext.MapFunction<I, O> function;
4344
private final TypeToken<O> itemResultType;
4445
private final SerDes serDes;
45-
private boolean replayFromPayload;
46+
private final AtomicBoolean replayFromPayload = new AtomicBoolean(false);
47+
private final AtomicBoolean replayForLargeResult = new AtomicBoolean(false);
4648
private volatile MapResult<O> cachedResult;
4749

4850
public MapOperation(
@@ -134,10 +136,11 @@ protected void replay(Operation existing) {
134136
if (existing.contextDetails() != null
135137
&& Boolean.TRUE.equals(existing.contextDetails().replayChildren())) {
136138
// Large result: re-execute children to reconstruct MapResult
139+
replayForLargeResult.set(true);
137140
executeItems();
138141
} else {
139142
// Small result: MapResult is in the payload, skip child replay
140-
replayFromPayload = true;
143+
replayFromPayload.set(true);
141144
markAlreadyCompleted();
142145
}
143146
}
@@ -182,6 +185,11 @@ protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletio
182185
}
183186

184187
this.cachedResult = new MapResult<>(resultItems, concurrencyCompletionStatus);
188+
// avoid checkpointing because the children are replayed due to large result
189+
if (replayForLargeResult.get()) {
190+
markAlreadyCompleted();
191+
return;
192+
}
185193
var serialized = serializeResult(cachedResult);
186194
var serializedBytes = serialized.getBytes(java.nio.charset.StandardCharsets.UTF_8);
187195

@@ -206,7 +214,7 @@ public MapResult<O> get() {
206214
if (items.isEmpty()) {
207215
return MapResult.empty();
208216
}
209-
if (replayFromPayload) {
217+
if (replayFromPayload.get()) {
210218
// Small result replay: deserialize MapResult directly from checkpoint payload
211219
var op = waitForOperationCompletion();
212220
var result = (op.contextDetails() != null) ? op.contextDetails().result() : null;

0 commit comments

Comments
 (0)