Skip to content

Commit a6df5b2

Browse files
committed
poll again if the previous poll doesn't receive an update
1 parent d66a854 commit a6df5b2

4 files changed

Lines changed: 18 additions & 12 deletions

File tree

sdk/src/main/java/com/amazonaws/lambda/durable/execution/ApiRequestBatcher.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// SPDX-License-Identifier: Apache-2.0
33
package com.amazonaws.lambda.durable.execution;
44

5-
import com.amazonaws.lambda.durable.exception.IllegalDurableOperationException;
65
import java.time.Duration;
76
import java.util.ArrayList;
87
import java.util.List;
@@ -140,14 +139,14 @@ private void flushNow() {
140139
execute();
141140
}
142141

142+
/** Flushes pending batch and waits for completion */
143143
public void shutdown() {
144-
var ex = new IllegalDurableOperationException("Batch cancelled");
145144
synchronized (items) {
146-
for (Item<T> item : items) {
147-
item.result().completeExceptionally(ex);
148-
}
149-
initializeBatch();
145+
flushNow();
150146
}
147+
148+
// wait for previous batches to be flushed
149+
previousBatchFuture.join();
151150
}
152151

153152
/** Executes batch and completes all item futures */

sdk/src/main/java/com/amazonaws/lambda/durable/execution/CheckpointBatcher.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
package com.amazonaws.lambda.durable.execution;
44

55
import com.amazonaws.lambda.durable.DurableConfig;
6-
76
import java.time.Duration;
87
import java.util.ArrayList;
98
import java.util.Collections;
@@ -59,10 +58,12 @@ CompletableFuture<Void> checkpoint(OperationUpdate update) {
5958
return checkpointApiRequestBatcher.submit(update, config.getCheckpointDelay());
6059
}
6160

61+
/** Poll for updates of the specified operation with preconfigured intervals */
6262
CompletableFuture<Operation> pollForUpdate(String operationId) {
6363
return pollForUpdate(operationId, config.getPollingInterval());
6464
}
6565

66+
/** Poll for updates of the specified operation with specified delay */
6667
CompletableFuture<Operation> pollForUpdate(String operationId, Duration delay) {
6768
logger.debug("Polling request received: operation id {}", operationId);
6869
var future = new CompletableFuture<Operation>();
@@ -72,14 +73,21 @@ CompletableFuture<Operation> pollForUpdate(String operationId, Duration delay) {
7273
.computeIfAbsent(operationId, k -> Collections.synchronizedList(new ArrayList<>()))
7374
.add(future);
7475
}
75-
checkpointApiRequestBatcher.submit(null, delay);
76+
checkpointApiRequestBatcher.submit(null, delay).thenCompose(v -> {
77+
if (future.isDone()) {
78+
return CompletableFuture.completedFuture(null);
79+
}
80+
return checkpointApiRequestBatcher.submit(null, delay);
81+
});
7682
return future;
7783
}
7884

7985
void shutdown() {
80-
List<List<CompletableFuture<Operation>>> allFutures;
86+
// wait for all checkpoint requests to complete
8187
checkpointApiRequestBatcher.shutdown();
8288

89+
// complete all polling futures with an exception
90+
List<List<CompletableFuture<Operation>>> allFutures;
8391
synchronized (pollingFutures) {
8492
allFutures = new ArrayList<>(pollingFutures.values());
8593
pollingFutures.clear();
@@ -90,6 +98,7 @@ void shutdown() {
9098
}
9199
}
92100

101+
/** Calling GetExecutionState API to get all pages of operations given the nextMarker */
93102
public List<Operation> fetchAllPages(List<Operation> initialOperations, String nextMarker) {
94103
List<Operation> operations = new ArrayList<>();
95104
if (initialOperations != null) {

sdk/src/main/java/com/amazonaws/lambda/durable/execution/ExecutionManager.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import com.amazonaws.lambda.durable.DurableConfig;
66
import com.amazonaws.lambda.durable.exception.UnrecoverableDurableExecutionException;
77
import com.amazonaws.lambda.durable.model.DurableExecutionInput.InitialExecutionState;
8-
98
import java.time.Duration;
109
import java.util.Collections;
1110
import java.util.HashMap;

sdk/src/main/java/com/amazonaws/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import com.amazonaws.lambda.durable.execution.ThreadType;
1414
import com.amazonaws.lambda.durable.serde.SerDes;
1515
import com.amazonaws.lambda.durable.util.ExceptionHelper;
16-
1716
import java.time.Duration;
1817
import java.util.Objects;
1918
import java.util.concurrent.CompletableFuture;
@@ -209,7 +208,7 @@ protected CompletableFuture<Operation> pollForOperationUpdates() {
209208
return executionManager.pollForOperationUpdates(operationId);
210209
}
211210

212-
protected CompletableFuture<Operation> pollForOperationUpdates(Duration delay) {
211+
protected CompletableFuture<Operation> pollForOperationUpdates(Duration delay) {
213212
return executionManager.pollForOperationUpdates(operationId, delay);
214213
}
215214

0 commit comments

Comments
 (0)