Skip to content

Commit d1cd716

Browse files
wangyb-AAlex Wang
andauthored
fix: [Parallel] fix NPE in handle complete (#279)
* Add basic parallel integration tests * Fix parallel NPE in handle complete --------- Co-authored-by: Alex Wang <wangyb@amazon.com>
1 parent 2edb2f0 commit d1cd716

2 files changed

Lines changed: 98 additions & 3 deletions

File tree

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/ParallelIntegrationTest.java

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ void testParallelReplayAfterInterruption_cachedResultsUsed() {
227227
var firstRunCount = executionCounts.get();
228228
assertTrue(firstRunCount >= 3, "Expected at least 3 executions on first run but got " + firstRunCount);
229229

230-
var result2 = runner.run("test");
230+
var result2 = runner.runUntilComplete("test");
231231
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
232232
assertEquals("A,B,C", result2.getResult(String.class));
233233
assertEquals(firstRunCount, executionCounts.get(), "Branch functions should not re-execute on replay");
@@ -536,4 +536,97 @@ void testParallelResultSummary_succeededAndFailedCounts() {
536536
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
537537
assertEquals("3/2", result.getResult(String.class));
538538
}
539+
540+
@Test
541+
void testParallelWithToleratedFailureCount_earlyTermination() {
542+
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
543+
var config = ParallelConfig.builder()
544+
.maxConcurrency(1)
545+
.completionConfig(CompletionConfig.toleratedFailureCount(1))
546+
.build();
547+
var futures = new ArrayList<DurableFuture<String>>();
548+
var parallel = context.parallel("tolerated-fail", config);
549+
550+
try (parallel) {
551+
futures.add(parallel.branch("branch-ok", String.class, ctx -> "OK"));
552+
futures.add(parallel.branch("branch-fail1", String.class, ctx -> {
553+
throw new RuntimeException("failed: fail1");
554+
}));
555+
futures.add(parallel.branch("branch-fail2", String.class, ctx -> {
556+
throw new RuntimeException("failed: fail2");
557+
}));
558+
futures.add(parallel.branch("branch-ok2", String.class, ctx -> "OK2"));
559+
}
560+
561+
var result = parallel.get();
562+
assertEquals(ConcurrencyCompletionStatus.FAILURE_TOLERANCE_EXCEEDED, result.completionStatus());
563+
assertFalse(result.completionStatus().isSucceeded());
564+
assertEquals(4, result.size());
565+
assertEquals("OK", futures.get(0).get());
566+
567+
return "done";
568+
});
569+
570+
var result = runner.runUntilComplete("test");
571+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
572+
}
573+
574+
@Test
575+
void testParallelWithMinSuccessful_earlyTermination() {
576+
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
577+
var config = ParallelConfig.builder()
578+
.maxConcurrency(1)
579+
.completionConfig(CompletionConfig.minSuccessful(2))
580+
.build();
581+
var futures = new ArrayList<DurableFuture<String>>();
582+
var parallel = context.parallel("min-successful", config);
583+
584+
try (parallel) {
585+
for (var item : List.of("a", "b", "c", "d", "e")) {
586+
futures.add(parallel.branch("branch-" + item, String.class, ctx -> item.toUpperCase()));
587+
}
588+
}
589+
590+
var result = parallel.get();
591+
assertEquals(ConcurrencyCompletionStatus.MIN_SUCCESSFUL_REACHED, result.completionStatus());
592+
assertTrue(result.completionStatus().isSucceeded());
593+
assertEquals(5, result.size());
594+
assertEquals("A", futures.get(0).get());
595+
assertEquals("B", futures.get(1).get());
596+
597+
return "done";
598+
});
599+
600+
var result = runner.runUntilComplete("test");
601+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
602+
}
603+
604+
@Test
605+
void testParallelWithAllSuccessful_stopsOnFirstFailure() {
606+
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
607+
var config = ParallelConfig.builder()
608+
.maxConcurrency(1)
609+
.completionConfig(CompletionConfig.allSuccessful())
610+
.build();
611+
var futures = new ArrayList<DurableFuture<String>>();
612+
var parallel = context.parallel("all-successful", config);
613+
614+
try (parallel) {
615+
futures.add(parallel.branch("branch-ok1", String.class, ctx -> "OK1"));
616+
futures.add(parallel.branch("branch-fail", String.class, ctx -> {
617+
throw new RuntimeException("failed");
618+
}));
619+
futures.add(parallel.branch("branch-ok2", String.class, ctx -> "OK2"));
620+
}
621+
622+
var result = parallel.get();
623+
assertEquals(ConcurrencyCompletionStatus.FAILURE_TOLERANCE_EXCEEDED, result.completionStatus());
624+
assertEquals("OK1", futures.get(0).get());
625+
626+
return "done";
627+
});
628+
629+
var result = runner.runUntilComplete("test");
630+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
631+
}
539632
}

sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,12 @@ public ParallelOperation(
6868
protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletionStatus) {
6969
var items = getBranches();
7070
int succeededCount = Math.toIntExact(items.stream()
71-
.filter(item -> item.getOperation().status() == OperationStatus.SUCCEEDED)
71+
.filter(item ->
72+
item.getOperation() != null && item.getOperation().status() == OperationStatus.SUCCEEDED)
7273
.count());
7374
int failedCount = Math.toIntExact(items.stream()
74-
.filter(item -> item.getOperation().status() != OperationStatus.SUCCEEDED)
75+
.filter(item ->
76+
item.getOperation() != null && item.getOperation().status() != OperationStatus.SUCCEEDED)
7577
.count());
7678
this.cachedResult = new ParallelResult(items.size(), succeededCount, failedCount, concurrencyCompletionStatus);
7779
if (skipCheckpoint) {

0 commit comments

Comments
 (0)