Skip to content

Commit 8110ddc

Browse files
authored
fix the pagination issue in checkpointBatcher (#54)
1 parent aaa7ff3 commit 8110ddc

3 files changed

Lines changed: 48 additions & 55 deletions

File tree

sdk/src/main/java/com/amazonaws/lambda/durable/execution/CheckpointBatcher.java

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@
55
import com.amazonaws.lambda.durable.client.DurableExecutionClient;
66
import java.util.ArrayList;
77
import java.util.List;
8+
import java.util.Objects;
89
import java.util.concurrent.BlockingQueue;
910
import java.util.concurrent.CompletableFuture;
1011
import java.util.concurrent.LinkedBlockingQueue;
1112
import java.util.concurrent.atomic.AtomicBoolean;
12-
import java.util.function.Supplier;
1313
import org.slf4j.Logger;
1414
import org.slf4j.LoggerFactory;
15+
import software.amazon.awssdk.services.lambda.model.Operation;
1516
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
1617

1718
/**
@@ -30,31 +31,31 @@ class CheckpointBatcher {
3031
private static final Logger logger = LoggerFactory.getLogger(CheckpointBatcher.class);
3132

3233
private final CheckpointCallback callback;
33-
private final Supplier<String> tokenSupplier;
3434
private final String durableExecutionArn;
3535
private final DurableExecutionClient client;
3636
private final BlockingQueue<CheckpointRequest> queue = new LinkedBlockingQueue<>();
3737
private final AtomicBoolean isProcessing = new AtomicBoolean(false);
38+
private String checkpointToken;
3839

3940
record CheckpointRequest(OperationUpdate update, CompletableFuture<Void> completion) {}
4041

4142
CheckpointBatcher(
4243
DurableExecutionClient client,
4344
String durableExecutionArn,
44-
Supplier<String> tokenSupplier,
45+
String checkpointToken,
4546
CheckpointCallback callback) {
4647
this.client = client;
4748
this.durableExecutionArn = durableExecutionArn;
48-
this.tokenSupplier = tokenSupplier;
4949
this.callback = callback;
50+
this.checkpointToken = checkpointToken;
5051
}
5152

5253
CompletableFuture<Void> checkpoint(OperationUpdate update) {
5354
logger.debug(
5455
"Checkpoint request received: Action {}",
5556
update != null ? update.action() : "NULL (Checkpoint request)");
5657
var future = new CompletableFuture<Void>();
57-
queue.offer(new CheckpointRequest(update, future));
58+
queue.add(new CheckpointRequest(update, future));
5859

5960
if (isProcessing.compareAndSet(false, true)) {
6061
InternalExecutor.INSTANCE.execute(this::processQueue);
@@ -70,32 +71,46 @@ void shutdown() {
7071
req -> req.completion().completeExceptionally(new IllegalStateException("CheckpointManager shutdown")));
7172
}
7273

74+
public List<Operation> fetchAllPages(List<Operation> initialOperations, String nextMarker) {
75+
List<Operation> operations = new ArrayList<>();
76+
if (initialOperations != null) {
77+
operations.addAll(initialOperations);
78+
}
79+
while (nextMarker != null && !nextMarker.isEmpty()) {
80+
var response = client.getExecutionState(durableExecutionArn, checkpointToken, nextMarker);
81+
logger.debug("DAR getExecutionState called: {}.", response);
82+
operations.addAll(response.operations());
83+
nextMarker = response.nextMarker();
84+
}
85+
return operations;
86+
}
87+
7388
private void processQueue() {
7489
try {
7590
var batch = collectBatch();
7691
if (!batch.isEmpty()) {
7792
// Filter out null updates (empty checkpoints for polling)
7893
var updates = batch.stream()
7994
.map(CheckpointRequest::update)
80-
.filter(u -> u != null)
95+
.filter(Objects::nonNull)
8196
.toList();
8297

83-
var response = client.checkpoint(durableExecutionArn, tokenSupplier.get(), updates);
84-
logger.debug("DAR backend called: {}.", response);
98+
var response = client.checkpoint(durableExecutionArn, checkpointToken, updates);
99+
logger.debug("DAR checkpointDurableExecution called: {}.", response);
85100

86101
// Notify callback of completion
87102
// TODO: sam local backend returns no new execution state when called with zero
88103
// updates. WHY?
89104
// This means the polling will never receive an operation update and complete
90105
// the Phaser.
91-
if (response.newExecutionState() != null
92-
&& response.newExecutionState().operations() != null
93-
&& !response.newExecutionState().operations().isEmpty()) {
94-
callback.onComplete(
95-
response.checkpointToken(),
96-
response.newExecutionState().operations());
97-
} else {
98-
callback.onComplete(response.checkpointToken(), List.of());
106+
checkpointToken = response.checkpointToken();
107+
if (response.newExecutionState() != null) {
108+
var operations = fetchAllPages(
109+
response.newExecutionState().operations(),
110+
response.newExecutionState().nextMarker());
111+
if (!operations.isEmpty()) {
112+
callback.onComplete(operations);
113+
}
99114
}
100115

101116
batch.forEach(req -> req.completion().complete(null));
@@ -122,7 +137,7 @@ private List<CheckpointRequest> collectBatch() {
122137
var itemSize = estimateSize(req.update());
123138

124139
if (currentSize + itemSize > MAX_BATCH_SIZE_BYTES && !batch.isEmpty()) {
125-
queue.offer(req);
140+
queue.add(req);
126141
break;
127142
}
128143

sdk/src/main/java/com/amazonaws/lambda/durable/execution/CheckpointCallback.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ interface CheckpointCallback {
1717
/**
1818
* Called when a checkpoint completes successfully.
1919
*
20-
* @param newToken New checkpoint token from backend
2120
* @param newOperations Updated operations from backend
2221
*/
23-
void onComplete(String newToken, List<Operation> newOperations);
22+
void onComplete(List<Operation> newOperations);
2423
}

sdk/src/main/java/com/amazonaws/lambda/durable/execution/ExecutionManager.java

Lines changed: 15 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
import java.util.List;
1212
import java.util.Map;
1313
import java.util.concurrent.CompletableFuture;
14-
import java.util.concurrent.ConcurrentHashMap;
1514
import java.util.concurrent.Phaser;
1615
import java.util.concurrent.atomic.AtomicReference;
16+
import java.util.stream.Collectors;
1717
import org.slf4j.Logger;
1818
import org.slf4j.LoggerFactory;
1919
import software.amazon.awssdk.services.lambda.model.Operation;
@@ -43,9 +43,8 @@ public class ExecutionManager {
4343
private static final Logger logger = LoggerFactory.getLogger(ExecutionManager.class);
4444

4545
// ===== Execution State =====
46-
private final Map<String, Operation> operations = new ConcurrentHashMap<>();
46+
private final Map<String, Operation> operations;
4747
private final String executionOperationId;
48-
private volatile String checkpointToken;
4948
private final String durableExecutionArn;
5049
private final AtomicReference<ExecutionMode> executionMode;
5150

@@ -57,46 +56,28 @@ public class ExecutionManager {
5756

5857
// ===== Checkpoint Batching =====
5958
private final CheckpointBatcher checkpointBatcher;
60-
private final DurableExecutionClient client;
6159

6260
public ExecutionManager(
6361
String durableExecutionArn,
6462
String checkpointToken,
6563
InitialExecutionState initialExecutionState,
6664
DurableExecutionClient client) {
6765
this.durableExecutionArn = durableExecutionArn;
68-
this.checkpointToken = checkpointToken;
69-
this.client = client;
7066
this.executionOperationId = initialExecutionState.operations().get(0).id();
71-
loadAllOperations(initialExecutionState);
67+
68+
// Create checkpoint batcher for internal coordination
69+
this.checkpointBatcher =
70+
new CheckpointBatcher(client, durableExecutionArn, checkpointToken, this::onCheckpointComplete);
71+
72+
this.operations =
73+
checkpointBatcher
74+
.fetchAllPages(initialExecutionState.operations(), initialExecutionState.nextMarker())
75+
.stream()
76+
.collect(Collectors.toConcurrentMap(Operation::id, op -> op));
7277

7378
// Start in REPLAY mode if we have more than just the initial EXECUTION operation
7479
this.executionMode =
7580
new AtomicReference<>(operations.size() > 1 ? ExecutionMode.REPLAY : ExecutionMode.EXECUTION);
76-
77-
// Create checkpoint manager using common pool for internal coordination
78-
this.checkpointBatcher = new CheckpointBatcher(
79-
client, durableExecutionArn, this::getCheckpointToken, this::onCheckpointComplete);
80-
}
81-
82-
private void loadAllOperations(InitialExecutionState initialExecutionState) {
83-
var initialOperations = initialExecutionState.operations();
84-
initialOperations.forEach(op -> operations.put(op.id(), op));
85-
86-
var nextMarker = initialExecutionState.nextMarker();
87-
while (nextMarker != null && !nextMarker.isEmpty()) {
88-
var response = client.getExecutionState(durableExecutionArn, checkpointToken, nextMarker);
89-
response.operations().forEach(op -> operations.put(op.id(), op));
90-
nextMarker = response.nextMarker();
91-
}
92-
}
93-
94-
// ===== Checkpoint Completion Handler =====
95-
96-
/** Called by CheckpointManager when a checkpoint completes. Updates state and advances phasers. */
97-
private void onCheckpointComplete(String newToken, List<Operation> newOperations) {
98-
this.checkpointToken = newToken;
99-
updateOperations(newOperations);
10081
}
10182

10283
// ===== State Management =====
@@ -109,11 +90,9 @@ public boolean isReplaying() {
10990
return executionMode.get() == ExecutionMode.REPLAY;
11091
}
11192

112-
public String getCheckpointToken() {
113-
return checkpointToken;
114-
}
115-
116-
private void updateOperations(List<Operation> newOperations) {
93+
// ===== Checkpoint Completion Handler =====
94+
/** Called by CheckpointManager when a checkpoint completes. Updates state and advances phasers. */
95+
private void onCheckpointComplete(List<Operation> newOperations) {
11796
// Update operation storage
11897
newOperations.forEach(op -> operations.put(op.id(), op));
11998

0 commit comments

Comments
 (0)