Skip to content

Commit aefcd0e

Browse files
committed
avoid marking branch done when parent is done
1 parent c3316dd commit aefcd0e

3 files changed

Lines changed: 15 additions & 20 deletions

File tree

sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -266,15 +266,8 @@ public void onCheckpointComplete(Operation operation) {
266266
// This method handles only terminal status updates. Override this method if a DurableOperation needs to
267267
// handle other updates.
268268
logger.trace("In onCheckpointComplete, completing operation {} ({})", getOperationId(), completionFuture);
269-
// It's important that we synchronize access to the future, otherwise the processing could happen
270-
// on someone else's thread and cause a race condition.
271-
synchronized (completionFuture) {
272-
// Completing the future here will also run any other completion stages that have been attached
273-
// to the future. In our case, other contexts may have attached a function to reactivate themselves,
274-
// so they will definitely have a chance to reactivate before we finish completing and deactivating
275-
// whatever operations were just checkpointed.
276-
completionFuture.complete(this);
277-
}
269+
270+
markCompletionFutureCompleted();
278271
}
279272
}
280273

@@ -283,10 +276,17 @@ protected void markAlreadyCompleted() {
283276
// When the operation is already completed in a replay, we complete completionFuture immediately
284277
// so that the `get` method will be unblocked and the context thread will be registered
285278
logger.trace("In markAlreadyCompleted, completing operation: {} ({}).", getOperationId(), completionFuture);
279+
markCompletionFutureCompleted();
280+
}
286281

282+
private void markCompletionFutureCompleted() {
287283
// It's important that we synchronize access to the future, otherwise the processing could happen
288284
// on someone else's thread and cause a race condition.
289285
synchronized (completionFuture) {
286+
// Completing the future here will also run any other completion stages that have been attached
287+
// to the future. In our case, other contexts may have attached a function to reactivate themselves,
288+
// so they will definitely have a chance to reactivate before we finish completing and deactivating
289+
// whatever operations were just checkpointed.
290290
completionFuture.complete(this);
291291
}
292292
}

sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static software.amazon.lambda.durable.execution.ExecutionManager.isTerminalStatus;
66

77
import java.nio.charset.StandardCharsets;
8+
import java.util.concurrent.atomic.AtomicBoolean;
89
import java.util.function.Function;
910
import software.amazon.awssdk.services.lambda.model.ContextOptions;
1011
import software.amazon.awssdk.services.lambda.model.ErrorObject;
@@ -46,7 +47,7 @@ public class ChildContextOperation<T> extends SerializableDurableOperation<T> {
4647

4748
private final Function<DurableContext, T> function;
4849
private final ConcurrencyOperation<?> parentOperation;
49-
private boolean replayChildContext;
50+
private final AtomicBoolean replayChildren = new AtomicBoolean(false);
5051
private T reconstructedResult;
5152

5253
public ChildContextOperation(
@@ -86,7 +87,7 @@ protected void replay(Operation existing) {
8687
if (existing.contextDetails() != null
8788
&& Boolean.TRUE.equals(existing.contextDetails().replayChildren())) {
8889
// Large result: re-execute child context to reconstruct result
89-
replayChildContext = true;
90+
replayChildren.set(true);
9091
executeChildContext();
9192
} else {
9293
markAlreadyCompleted();
@@ -100,11 +101,6 @@ protected void replay(Operation existing) {
100101
}
101102
}
102103

103-
@Override
104-
protected void markAlreadyCompleted() {
105-
super.markAlreadyCompleted();
106-
}
107-
108104
private void executeChildContext() {
109105
// The operationId is already globally unique (prefixed by parent context path via
110106
// DurableContext.nextOperationId), so we use it directly as the contextId.
@@ -137,7 +133,7 @@ private void executeChildContext() {
137133
}
138134

139135
private void handleChildContextSuccess(T result) {
140-
if (replayChildContext) {
136+
if (replayChildren.get()) {
141137
// Replaying a SUCCEEDED child with replayChildren=true — skip checkpointing.
142138
// Mark the completableFuture completed so get() doesn't block waiting for a checkpoint response.
143139
this.reconstructedResult = result;
@@ -151,8 +147,6 @@ private void checkpointSuccess(T result) {
151147
// Skip checkpointing if parent ConcurrencyOperation has already completed —
152148
// prevents race conditions where a child finishes after the parent has already completed.
153149
if (parentOperation != null && parentOperation.isOperationCompleted()) {
154-
this.reconstructedResult = result;
155-
markAlreadyCompleted();
156150
return;
157151
}
158152

@@ -187,7 +181,6 @@ private void handleChildContextFailure(Throwable exception) {
187181
// Skip checkpointing if parent ConcurrencyOperation has already completed —
188182
// prevents race conditions where a child finishes after the parent has already succeeded.
189183
if (parentOperation != null && parentOperation.isOperationCompleted()) {
190-
markAlreadyCompleted();
191184
return;
192185
}
193186

sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ protected void executeItems() {
174174
next.execute();
175175
}
176176
var child = waitForChildCompletion(succeededCount, failedCount, runningChildren);
177+
// child may be null if the consumer thread is woken up due to a new item being added
177178
if (child != null) {
178179
if (runningChildren.contains(child)) {
179180
runningChildren.remove(child);
@@ -213,6 +214,7 @@ private BaseDurableOperation waitForChildCompletion(
213214
.map(BaseDurableOperation::getCompletionFuture)
214215
.toList());
215216
if (futures.size() < maxConcurrency) {
217+
// add a future to listen to the new items if there is a vacancy
216218
consumerThreadListener.compareAndSet(null, new CompletableFuture<>());
217219
futures.add(consumerThreadListener.get());
218220
}

0 commit comments

Comments
 (0)