Skip to content

Commit 8712f6b

Browse files
authored
[bugfix]: avoid checkpoint when replay in large result mode (#336)
1 parent 4ece540 commit 8712f6b

2 files changed

Lines changed: 12 additions & 3 deletions

File tree

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -862,12 +862,14 @@ void testMapWithLargeResult_replayChildren() {
862862

863863
var result1 = runner.runUntilComplete("test");
864864
assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus());
865+
assertEquals(202, result1.getHistoryEvents().size());
865866
var firstRunCount = executionCount.get();
866867
assertTrue(firstRunCount >= 100);
867868

868869
// Replay — large result path: replayChildren=true, children replay from cache
869870
var result2 = runner.run("test");
870871
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
872+
assertEquals(202, result2.getHistoryEvents().size());
871873
assertEquals(firstRunCount, executionCount.get(), "Map functions should not re-execute on replay");
872874
}
873875

sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.ArrayList;
66
import java.util.Collections;
77
import java.util.List;
8+
import java.util.concurrent.atomic.AtomicBoolean;
89
import software.amazon.awssdk.services.lambda.model.ContextOptions;
910
import software.amazon.awssdk.services.lambda.model.Operation;
1011
import software.amazon.awssdk.services.lambda.model.OperationAction;
@@ -42,7 +43,8 @@ public class MapOperation<I, O> extends ConcurrencyOperation<MapResult<O>> {
4243
private final DurableContext.MapFunction<I, O> function;
4344
private final TypeToken<O> itemResultType;
4445
private final SerDes serDes;
45-
private boolean replayFromPayload;
46+
private final AtomicBoolean replayFromPayload = new AtomicBoolean(false);
47+
private final AtomicBoolean replayForLargeResult = new AtomicBoolean(false);
4648
private volatile MapResult<O> cachedResult;
4749

4850
public MapOperation(
@@ -133,10 +135,11 @@ protected void replay(Operation existing) {
133135
if (existing.contextDetails() != null
134136
&& Boolean.TRUE.equals(existing.contextDetails().replayChildren())) {
135137
// Large result: re-execute children to reconstruct MapResult
138+
replayForLargeResult.set(true);
136139
executeItems();
137140
} else {
138141
// Small result: MapResult is in the payload, skip child replay
139-
replayFromPayload = true;
142+
replayFromPayload.set(true);
140143
markAlreadyCompleted();
141144
}
142145
}
@@ -181,6 +184,10 @@ protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletio
181184
}
182185

183186
this.cachedResult = new MapResult<>(resultItems, concurrencyCompletionStatus);
187+
if (replayForLargeResult.get()) {
188+
markAlreadyCompleted();
189+
return;
190+
}
184191
var serialized = serializeResult(cachedResult);
185192
var serializedBytes = serialized.getBytes(java.nio.charset.StandardCharsets.UTF_8);
186193

@@ -205,7 +212,7 @@ public MapResult<O> get() {
205212
if (items.isEmpty()) {
206213
return MapResult.empty();
207214
}
208-
if (replayFromPayload) {
215+
if (replayFromPayload.get()) {
209216
// Small result replay: deserialize MapResult directly from checkpoint payload
210217
var op = waitForOperationCompletion();
211218
var result = (op.contextDetails() != null) ? op.contextDetails().result() : null;

0 commit comments

Comments
 (0)