Skip to content

Commit 478226d

Browse files
authored
[bugfix]: fix inconsistency result when replay concurrency operations (#348)
* fix inconsistency result when replay concurrency operations * add tests for parallel
1 parent f3a2344 commit 478226d

11 files changed

Lines changed: 580 additions & 133 deletions

File tree

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.util.ArrayList;
99
import java.util.List;
1010
import java.util.concurrent.atomic.AtomicInteger;
11+
import java.util.concurrent.atomic.AtomicReference;
1112
import org.junit.jupiter.api.Test;
1213
import org.junit.jupiter.params.ParameterizedTest;
1314
import org.junit.jupiter.params.provider.CsvSource;
@@ -1003,6 +1004,130 @@ void testMapWithMinSuccessful_replay(NestingType nestingType, int events) {
10031004
assertEquals(events, result2.getHistoryEvents().size());
10041005
}
10051006

1007+
@ParameterizedTest
1008+
@CsvSource({"FLAT, 2", "NESTED, 9"})
1009+
void testMapWithMinSuccessful_replayLargePayload(NestingType nestingType, int events) {
1010+
var executionCount = new AtomicInteger(0);
1011+
1012+
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
1013+
var items = List.of("a", "b", "c", "d", "e");
1014+
var config = MapConfig.builder()
1015+
.maxConcurrency(5)
1016+
.completionConfig(CompletionConfig.minSuccessful(2))
1017+
.nestingType(nestingType)
1018+
.build();
1019+
var result = context.map(
1020+
"min-success-replay",
1021+
items,
1022+
String.class,
1023+
(item, index, ctx) -> {
1024+
executionCount.incrementAndGet();
1025+
// add delays to first 3 items when starting, and
1026+
// add delays to other 2 items when replaying
1027+
try {
1028+
if (executionCount.get() <= 5 && index < 3 || executionCount.get() > 5 && index > 3) {
1029+
Thread.sleep(1000);
1030+
}
1031+
} catch (InterruptedException e) {
1032+
throw new RuntimeException(e);
1033+
}
1034+
return item.toUpperCase().repeat(1000000);
1035+
},
1036+
config);
1037+
1038+
// always the same items get skipped when replay
1039+
assertEquals(
1040+
MapResult.MapResultItem.Status.SKIPPED, result.getItem(0).status());
1041+
assertEquals(
1042+
MapResult.MapResultItem.Status.SKIPPED, result.getItem(1).status());
1043+
assertEquals(
1044+
MapResult.MapResultItem.Status.SKIPPED, result.getItem(2).status());
1045+
assertEquals(
1046+
MapResult.MapResultItem.Status.SUCCEEDED, result.getItem(3).status());
1047+
assertEquals(
1048+
MapResult.MapResultItem.Status.SUCCEEDED, result.getItem(4).status());
1049+
1050+
return "done";
1051+
});
1052+
1053+
var result1 = runner.runUntilComplete("test");
1054+
assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus());
1055+
var firstRunCount = executionCount.get();
1056+
assertEquals(events, result1.getHistoryEvents().size());
1057+
1058+
// Replay — small result path: deserialize MapResult from payload, no child replay
1059+
var result2 = runner.run("test");
1060+
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
1061+
assertEquals(firstRunCount + 2, executionCount.get(), "Map functions should re-execute on replay");
1062+
assertEquals(events, result2.getHistoryEvents().size());
1063+
}
1064+
1065+
@ParameterizedTest
1066+
@CsvSource({"FLAT, 2", "NESTED, 9"})
1067+
void testMapWithMinSuccessful_replayLargePayloadResultConsistency(NestingType nestingType, int events) {
1068+
var executionCount = new AtomicInteger(0);
1069+
var initialResult = new AtomicReference<MapResult<String>>();
1070+
1071+
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
1072+
var items = List.of("a", "b", "c", "d", "e");
1073+
var config = MapConfig.builder()
1074+
.maxConcurrency(5)
1075+
.completionConfig(CompletionConfig.minSuccessful(2))
1076+
.nestingType(nestingType)
1077+
.build();
1078+
var result = context.map(
1079+
"min-success-replay",
1080+
items,
1081+
String.class,
1082+
(item, index, ctx) -> {
1083+
executionCount.incrementAndGet();
1084+
// add delays to first 2 items when starting, and
1085+
// add delays to other 3 items when replaying
1086+
try {
1087+
if (executionCount.get() <= 5 && index < 2 || executionCount.get() > 5 && index > 2) {
1088+
Thread.sleep(1000);
1089+
}
1090+
} catch (InterruptedException e) {
1091+
throw new RuntimeException(e);
1092+
}
1093+
return item.toUpperCase().repeat(1000000);
1094+
},
1095+
config);
1096+
1097+
// always the same items get skipped when replay
1098+
if (initialResult.get() == null) {
1099+
initialResult.set(result);
1100+
} else {
1101+
assertEquals(initialResult.get(), result);
1102+
}
1103+
1104+
return "done";
1105+
});
1106+
1107+
var result1 = runner.runUntilComplete("test");
1108+
assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus());
1109+
var firstRunCount = executionCount.get();
1110+
if (nestingType == NestingType.FLAT) {
1111+
assertEquals(events, result1.getHistoryEvents().size());
1112+
} else {
1113+
// 9 events if 2 branches completed 10 if 3
1114+
assertTrue(events <= result1.getHistoryEvents().size());
1115+
}
1116+
1117+
// Replay — small result path: deserialize MapResult from payload, no child replay
1118+
var result2 = runner.run("test");
1119+
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
1120+
// 2 or 3 more replays
1121+
assertTrue(firstRunCount + 2 <= executionCount.get(), "Map functions should re-execute on replay");
1122+
if (nestingType == NestingType.FLAT) {
1123+
assertEquals(events, result2.getHistoryEvents().size());
1124+
} else {
1125+
// 9 events if 2 branches completed 10 if 3
1126+
assertTrue(events <= result2.getHistoryEvents().size());
1127+
assertTrue(result2.getHistoryEvents().size() <= events + 1);
1128+
}
1129+
}
1130+
10061131
@ParameterizedTest
10071132
@CsvSource({"FLAT, 8", "NESTED, 12"})
10081133
void testMapAsyncWithInterleavedWork_replay(NestingType nestingType, int events) {

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/ParallelIntegrationTest.java

Lines changed: 80 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.util.ArrayList;
99
import java.util.List;
1010
import java.util.concurrent.atomic.AtomicInteger;
11+
import java.util.concurrent.atomic.AtomicReference;
1112
import org.junit.jupiter.params.ParameterizedTest;
1213
import org.junit.jupiter.params.provider.CsvSource;
1314
import software.amazon.lambda.durable.config.CompletionConfig;
@@ -16,6 +17,7 @@
1617
import software.amazon.lambda.durable.config.WaitForConditionConfig;
1718
import software.amazon.lambda.durable.model.ConcurrencyCompletionStatus;
1819
import software.amazon.lambda.durable.model.ExecutionStatus;
20+
import software.amazon.lambda.durable.model.ParallelResult;
1921
import software.amazon.lambda.durable.model.WaitForConditionResult;
2022
import software.amazon.lambda.durable.retry.WaitStrategies;
2123
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
@@ -29,7 +31,7 @@ void testSimpleParallel(NestingType nestingType, int events) {
2931
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
3032
var config = ParallelConfig.builder().nestingType(nestingType).build();
3133
var futures = new ArrayList<DurableFuture<String>>();
32-
var parallel = context.parallel("process-items", config);
34+
ParallelDurableFuture parallel = context.parallel("process-items", config);
3335

3436
try (parallel) {
3537
for (var item : List.of("a", "b", "c")) {
@@ -58,7 +60,7 @@ void testParallelWithStepsInsideBranches(NestingType nestingType, int events) {
5860
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
5961
var config = ParallelConfig.builder().nestingType(nestingType).build();
6062
var futures = new ArrayList<DurableFuture<String>>();
61-
var parallel = context.parallel("parallel-with-steps", config);
63+
ParallelDurableFuture parallel = context.parallel("parallel-with-steps", config);
6264

6365
try (parallel) {
6466
for (var item : List.of("hello", "world")) {
@@ -118,7 +120,7 @@ void testParallelPartialFailure_failedBranchDoesNotPreventOthers(NestingType nes
118120
void testParallelAllBranchesFail(NestingType nestingType, int events) {
119121
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
120122
var config = ParallelConfig.builder().nestingType(nestingType).build();
121-
var parallel = context.parallel("all-fail", config);
123+
ParallelDurableFuture parallel = context.parallel("all-fail", config);
122124

123125
try (parallel) {
124126
parallel.branch("branch-x", String.class, ctx -> {
@@ -155,7 +157,7 @@ void testParallelWithMaxConcurrency1_sequentialExecution(NestingType nestingType
155157
.nestingType(nestingType)
156158
.build();
157159
var futures = new ArrayList<DurableFuture<String>>();
158-
var parallel = context.parallel("sequential-parallel", config);
160+
ParallelDurableFuture parallel = context.parallel("sequential-parallel", config);
159161

160162
try (parallel) {
161163
for (var item : List.of("a", "b", "c", "d")) {
@@ -194,7 +196,7 @@ void testParallelWithMaxConcurrency2_limitedConcurrency(NestingType nestingType,
194196
.nestingType(nestingType)
195197
.build();
196198
var futures = new ArrayList<DurableFuture<String>>();
197-
var parallel = context.parallel("limited-parallel", config);
199+
ParallelDurableFuture parallel = context.parallel("limited-parallel", config);
198200

199201
try (parallel) {
200202
for (var item : List.of("a", "b", "c", "d", "e")) {
@@ -229,7 +231,7 @@ void testParallelReplayAfterInterruption_cachedResultsUsed(NestingType nestingTy
229231
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
230232
var config = ParallelConfig.builder().nestingType(nestingType).build();
231233
var futures = new ArrayList<DurableFuture<String>>();
232-
var parallel = context.parallel("replay-parallel", config);
234+
ParallelDurableFuture parallel = context.parallel("replay-parallel", config);
233235

234236
try (parallel) {
235237
for (var item : List.of("a", "b", "c")) {
@@ -531,7 +533,7 @@ void testParallelUnlimitedConcurrencyWithToleratedFailureCount(NestingType nesti
531533
void testParallelBranchesReturnDifferentTypes(NestingType nestingType, int events) {
532534
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
533535
var config = ParallelConfig.builder().nestingType(nestingType).build();
534-
var parallel = context.parallel("mixed-types", config);
536+
ParallelDurableFuture parallel = context.parallel("mixed-types", config);
535537

536538
DurableFuture<String> strFuture;
537539
DurableFuture<Integer> intFuture;
@@ -600,7 +602,7 @@ void testParallel50BranchesWithWaitForCallback(NestingType nestingType, int even
600602
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
601603
var config = ParallelConfig.builder().nestingType(nestingType).build();
602604
var futures = new ArrayList<DurableFuture<String>>();
603-
var parallel = context.parallel("50-callbacks", config);
605+
ParallelDurableFuture parallel = context.parallel("50-callbacks", config);
604606

605607
try (parallel) {
606608
for (int i = 0; i < branchCount; i++) {
@@ -655,7 +657,7 @@ void testParallel50BranchesWithWaitForCallback_maxConcurrency5(NestingType nesti
655657
.maxConcurrency(5)
656658
.nestingType(nestingType)
657659
.build();
658-
var parallel = context.parallel("50-callbacks-limited", config);
660+
ParallelDurableFuture parallel = context.parallel("50-callbacks-limited", config);
659661

660662
try (parallel) {
661663
for (int i = 0; i < branchCount; i++) {
@@ -705,7 +707,7 @@ void testParallel50BranchesWithWaitForCallback_partialFailure(NestingType nestin
705707

706708
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
707709
var config = ParallelConfig.builder().nestingType(nestingType).build();
708-
var parallel = context.parallel("50-callbacks-partial-fail", config);
710+
ParallelDurableFuture parallel = context.parallel("50-callbacks-partial-fail", config);
709711

710712
try (parallel) {
711713
for (int i = 0; i < branchCount; i++) {
@@ -859,7 +861,7 @@ void testParallel50BranchesWithWaitForCondition_someExceedMaxAttempts(NestingTyp
859861

860862
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
861863
var config = ParallelConfig.builder().nestingType(nestingType).build();
862-
var parallel = context.parallel("50-conditions-some-fail", config);
864+
ParallelDurableFuture parallel = context.parallel("50-conditions-some-fail", config);
863865

864866
try (parallel) {
865867
for (int i = 0; i < branchCount; i++) {
@@ -910,7 +912,7 @@ void testParallel50BranchesWithWaitForCondition_replay(NestingType nestingType,
910912

911913
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
912914
var config = ParallelConfig.builder().nestingType(nestingType).build();
913-
var parallel = context.parallel("50-conditions-replay", config);
915+
ParallelDurableFuture parallel = context.parallel("50-conditions-replay", config);
914916

915917
try (parallel) {
916918
for (int i = 0; i < branchCount; i++) {
@@ -960,7 +962,7 @@ void testParallel50BranchesMixed_callbackAndCondition(NestingType nestingType, i
960962

961963
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
962964
var config = ParallelConfig.builder().nestingType(nestingType).build();
963-
var parallel = context.parallel("50-mixed", config);
965+
ParallelDurableFuture parallel = context.parallel("50-mixed", config);
964966

965967
try (parallel) {
966968
for (int i = 0; i < branchCount; i++) {
@@ -1089,6 +1091,71 @@ void testParallelWithMinSuccessful_earlyTermination(NestingType nestingType, int
10891091
assertEquals(events, result.getHistoryEvents().size());
10901092
}
10911093

1094+
@ParameterizedTest
1095+
@CsvSource({"FLAT, 2", "NESTED, 8"})
1096+
void testParallelWithMinSuccessful_earlyTermination_consistentResult(NestingType nestingType, int events) {
1097+
var executionCount = new AtomicInteger(0);
1098+
var initialResult = new AtomicReference<ParallelResult>();
1099+
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
1100+
var config = ParallelConfig.builder()
1101+
.maxConcurrency(5)
1102+
.completionConfig(CompletionConfig.minSuccessful(2))
1103+
.nestingType(nestingType)
1104+
.build();
1105+
var futures = new ArrayList<DurableFuture<String>>();
1106+
ParallelDurableFuture parallel = context.parallel("min-successful", config);
1107+
1108+
try (parallel) {
1109+
for (var item : List.of("a", "b", "c", "d", "e")) {
1110+
futures.add(parallel.branch("branch-" + item, String.class, ctx -> {
1111+
executionCount.incrementAndGet();
1112+
try {
1113+
if (executionCount.get() <= 5 && (item.equals("a") || item.equals("b"))
1114+
|| executionCount.get() > 5 && (item.equals("c") || item.equals("d"))) {
1115+
Thread.sleep(1000);
1116+
}
1117+
} catch (InterruptedException e) {
1118+
throw new RuntimeException(e);
1119+
}
1120+
1121+
return item.toUpperCase();
1122+
}));
1123+
}
1124+
}
1125+
1126+
var result = parallel.get();
1127+
if (initialResult.get() == null) {
1128+
initialResult.set(result);
1129+
} else {
1130+
assertEquals(initialResult.get(), result);
1131+
}
1132+
assertEquals(ConcurrencyCompletionStatus.MIN_SUCCESSFUL_REACHED, result.completionStatus());
1133+
assertTrue(result.completionStatus().isSucceeded());
1134+
assertTrue(result.size() >= 2 && result.size() <= 5);
1135+
assertEquals(ParallelResult.Status.SKIPPED, result.statuses().get(0));
1136+
assertEquals(ParallelResult.Status.SKIPPED, result.statuses().get(1));
1137+
1138+
return "done";
1139+
});
1140+
1141+
var result = runner.runUntilComplete("test");
1142+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
1143+
if (nestingType == NestingType.FLAT) {
1144+
assertEquals(events, result.getHistoryEvents().size());
1145+
} else {
1146+
assertTrue(events <= result.getHistoryEvents().size());
1147+
assertTrue(result.getHistoryEvents().size() <= events + 2);
1148+
}
1149+
1150+
var result2 = runner.run("test");
1151+
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
1152+
if (nestingType == NestingType.FLAT) {
1153+
assertEquals(events, result2.getHistoryEvents().size());
1154+
} else {
1155+
assertTrue(result2.getHistoryEvents().size() <= events + 2);
1156+
}
1157+
}
1158+
10921159
@ParameterizedTest
10931160
@CsvSource({"FLAT, 2", "NESTED, 6"})
10941161
void testParallelWithAllSuccessful_stopsOnFirstFailure(NestingType nestingType, int events) {

sdk/src/main/java/software/amazon/lambda/durable/model/ParallelResult.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,26 @@
22
// SPDX-License-Identifier: Apache-2.0
33
package software.amazon.lambda.durable.model;
44

5+
import java.util.List;
6+
57
/**
68
* Summary result of a parallel operation.
79
*
810
* <p>Captures the aggregate outcome of a parallel execution: how many branches were registered, how many succeeded, how
911
* many failed, and why the operation completed.
1012
*/
11-
public record ParallelResult(int size, int succeeded, int failed, ConcurrencyCompletionStatus completionStatus) {}
13+
public record ParallelResult(
14+
int size,
15+
int succeeded,
16+
int failed,
17+
int skipped,
18+
ConcurrencyCompletionStatus completionStatus,
19+
List<Status> statuses) {
20+
21+
/** Status of an individual parallel branch. */
22+
public enum Status {
23+
SUCCEEDED,
24+
FAILED,
25+
SKIPPED
26+
}
27+
}

0 commit comments

Comments
 (0)