From a1662f74d25d1c685d8498855d8aaec3e57eb8d4 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Thu, 30 Apr 2026 11:10:44 -0700 Subject: [PATCH 1/2] fix ConcurrentModificationException when completing invocations --- .../amazon/lambda/durable/execution/ExecutionManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java b/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java index 92e1ff1c4..71be61656 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java @@ -5,7 +5,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -13,6 +12,7 @@ import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -59,7 +59,7 @@ public class ExecutionManager implements AutoCloseable { private final DurableConfig durableConfig; // ===== Thread Coordination ===== - private final Map registeredOperations = Collections.synchronizedMap(new HashMap<>()); + private final Map registeredOperations = new ConcurrentHashMap<>(); private final Set activeThreads = Collections.synchronizedSet(new HashSet<>()); private static final ThreadLocal currentThreadContext = new ThreadLocal<>(); private final CompletableFuture executionExceptionFuture = new CompletableFuture<>(); From d7536c769134862b892b2c56a1d18f8d6daf9593 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Thu, 30 Apr 2026 12:21:47 -0700 Subject: [PATCH 2/2] fix non-existent branches in checkpointed result --- .../durable/operation/ParallelOperation.java | 16 +++++++++++++--- .../durable/operation/ParallelOperationTest.java | 2 +- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java index 96b33e125..4ae7b8386 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 package software.amazon.lambda.durable.operation; +import java.util.List; import java.util.function.Function; import software.amazon.awssdk.services.lambda.model.ContextOptions; import software.amazon.awssdk.services.lambda.model.Operation; @@ -66,7 +67,8 @@ public ParallelOperation( @Override protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletionStatus) { - var items = getBranches(); + + var items = List.copyOf(getBranches()); var statuses = items.stream().map(this::getParallelItemStatus).toList(); int succeededCount = Math.toIntExact(statuses.stream() .filter(s -> s == ParallelResult.Status.SUCCEEDED) @@ -76,6 +78,9 @@ protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletio int skippedCount = items.size() - succeededCount - failedCount; cachedResult = new ParallelResult( items.size(), succeededCount, failedCount, skippedCount, concurrencyCompletionStatus, statuses); + + // Branches added after checkpoint will not exist in the checkpointed result, but they'll be in the returned + // value from get() method. sendOperationUpdate(OperationUpdate.builder() .action(OperationAction.SUCCEED) .subType(getSubType().getValue()) @@ -157,9 +162,14 @@ public DurableFuture branch( throw new IllegalStateException("Cannot add branches after join() has been called"); } - // ConcurrencyOperation will skip this branch if skip=true + var nextBranchIndex = getBranches().size(); + + // ConcurrencyOperation will skip this branch if skip=true: + // 1. if the parallel operation is already completed (partialResult is not null) + // 2. if the branch is already skipped in the partialResult or nonexistent in the partialResult var skip = partialResult != null - && partialResult.statuses().get(getBranches().size()) == ParallelResult.Status.SKIPPED; + && (partialResult.statuses().size() <= nextBranchIndex + || partialResult.statuses().get(nextBranchIndex) == ParallelResult.Status.SKIPPED); var serDes = config.serDes() == null ? getContext().getDurableConfig().getSerDes() : config.serDes(); return enqueueItem(name, func, resultType, serDes, OperationSubType.PARALLEL_BRANCH, skip); } diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java index 20b3c28ef..32a41f2cd 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java @@ -270,7 +270,7 @@ void minSuccessful_notExecuteSkippedBranchWhenReplay() { .status(OperationStatus.SUCCEEDED) .contextDetails(ContextDetails.builder() .result( - "{\"succeeded\": 1, \"completionStatus\": \"MIN_SUCCESSFUL_REACHED\", \"statuses\":[\"SKIPPED\", \"SUCCEEDED\"]}") + "{\"size\": 2, \"skipped\": 1, \"succeeded\": 1, \"completionStatus\": \"MIN_SUCCESSFUL_REACHED\", \"statuses\":[\"SKIPPED\", \"SUCCEEDED\"]}") .build()) .build()); when(executionManager.getOperationAndUpdateReplayState(CHILD_OP_2))