Skip to content

Commit a0eb0fe

Browse files
authored
fix negative polling delay (aws#260)
1 parent 6b1c91b commit a0eb0fe

7 files changed

Lines changed: 47 additions & 27 deletions

File tree

sdk/src/main/java/software/amazon/lambda/durable/execution/CheckpointManager.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,14 @@ CompletableFuture<Operation> pollForUpdate(String operationId) {
7272
}
7373

7474
/**
75-
* Polls for updates of the specified operation with specified delay
75+
* Polls for updates of the specified operation at the specified time. If the give time is at the past, SDK will
76+
* immediately make a polling call.
7677
*
78+
* @param at the time to poll for the update
7779
* @return a future that completes when the operation is updated
7880
*/
79-
CompletableFuture<Operation> pollForUpdate(String operationId, Duration delay) {
80-
return pollForUpdate(operationId, PollingStrategies.fixedDelay(delay));
81+
CompletableFuture<Operation> pollForUpdate(String operationId, Instant at) {
82+
return pollForUpdate(operationId, PollingStrategies.at(at));
8183
}
8284

8385
/**

sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33
package software.amazon.lambda.durable.execution;
44

5-
import java.time.Duration;
5+
import java.time.Instant;
66
import java.util.ArrayList;
77
import java.util.Collections;
88
import java.util.HashMap;
@@ -247,8 +247,15 @@ public CompletableFuture<Operation> pollForOperationUpdates(String operationId)
247247
return checkpointManager.pollForUpdate(operationId);
248248
}
249249

250-
public CompletableFuture<Operation> pollForOperationUpdates(String operationId, Duration delay) {
251-
return checkpointManager.pollForUpdate(operationId, delay);
250+
/**
251+
* Pools for operation updates at a specific time
252+
*
253+
* @param operationId the operation id to poll for updates
254+
* @param at the time to poll for updates
255+
* @return a completable future that completes with the operation update
256+
*/
257+
public CompletableFuture<Operation> pollForOperationUpdates(String operationId, Instant at) {
258+
return checkpointManager.pollForUpdate(operationId, at);
252259
}
253260

254261
// ===== Utilities =====

sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33
package software.amazon.lambda.durable.operation;
44

5-
import java.time.Duration;
5+
import java.time.Instant;
66
import java.util.List;
77
import java.util.Objects;
88
import java.util.concurrent.CompletableFuture;
@@ -341,13 +341,13 @@ protected CompletableFuture<Operation> pollForOperationUpdates() {
341341
}
342342

343343
/**
344-
* Polls the backend for updates to this operation after the specified delay.
344+
* Polls the backend for updates to this operation at a specific time.
345345
*
346-
* @param delay the delay before polling
346+
* @param at the time to poll for updates
347347
* @return a future that completes with the updated operation
348348
*/
349-
protected CompletableFuture<Operation> pollForOperationUpdates(Duration delay) {
350-
return executionManager.pollForOperationUpdates(getOperationId(), delay);
349+
protected CompletableFuture<Operation> pollForOperationUpdates(Instant at) {
350+
return executionManager.pollForOperationUpdates(getOperationId(), at);
351351
}
352352

353353
/** Sends an operation update synchronously (blocks until the update is acknowledged). */

sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
// SPDX-License-Identifier: Apache-2.0
33
package software.amazon.lambda.durable.operation;
44

5-
import java.time.Duration;
6-
import java.time.Instant;
75
import java.util.concurrent.CompletableFuture;
86
import java.util.function.Function;
97
import software.amazon.awssdk.services.lambda.model.ErrorObject;
@@ -87,10 +85,10 @@ protected void replay(Operation existing) {
8785

8886
private CompletableFuture<Void> pollReadyAndExecuteStepLogic(Operation existing, int attempt) {
8987
var nextAttemptInstant = existing.stepDetails().nextAttemptTimestamp();
90-
return pollForOperationUpdates(Duration.between(Instant.now(), nextAttemptInstant))
88+
return pollForOperationUpdates(nextAttemptInstant)
9189
.thenCompose(op -> op.status() == OperationStatus.READY
9290
? CompletableFuture.completedFuture(op)
93-
: pollForOperationUpdates())
91+
: pollForOperationUpdates(nextAttemptInstant))
9492
.thenRun(() -> executeStepLogic(attempt));
9593
}
9694

sdk/src/main/java/software/amazon/lambda/durable/operation/WaitOperation.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,21 +60,14 @@ protected void replay(Operation existing) {
6060
}
6161

6262
private void pollForWaitExpiration() {
63-
// Always calculate remaining time from scheduledEndTimestamp if scheduledEndTimestamp exists
64-
var remainingWaitTime = duration;
63+
var scheduledEndTimestamp = Instant.now().plusMillis(duration.toMillis());
6564
var existing = getOperation();
6665
if (existing != null
6766
&& existing.waitDetails() != null
6867
&& existing.waitDetails().scheduledEndTimestamp() != null) {
69-
remainingWaitTime =
70-
Duration.between(Instant.now(), existing.waitDetails().scheduledEndTimestamp());
71-
// If the wait has already elapsed, poll immediately with a minimal positive interval
72-
if (remainingWaitTime.isNegative() || remainingWaitTime.isZero()) {
73-
remainingWaitTime = Duration.ofMillis(1);
74-
}
68+
scheduledEndTimestamp = existing.waitDetails().scheduledEndTimestamp();
7569
}
76-
logger.debug("Remaining wait time: {} ms", remainingWaitTime.toMillis());
77-
pollForOperationUpdates(remainingWaitTime);
70+
pollForOperationUpdates(scheduledEndTimestamp);
7871
}
7972

8073
@Override

sdk/src/main/java/software/amazon/lambda/durable/retry/PollingStrategies.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package software.amazon.lambda.durable.retry;
44

55
import java.time.Duration;
6+
import java.time.Instant;
67
import java.util.Objects;
78

89
/** Factory class for creating common polling strategies. */
@@ -67,4 +68,22 @@ public static PollingStrategy fixedDelay(Duration interval) {
6768
}
6869
return (attempt) -> interval;
6970
}
71+
72+
/**
73+
* Creates a polling strategy that polls at a specific instant in time.
74+
*
75+
* @param instant The instant to poll at
76+
* @return PollingStrategy that calculates delay until the specified instant
77+
*/
78+
public static PollingStrategy at(Instant instant) {
79+
Objects.requireNonNull(instant, "instant must not be null");
80+
return (attempt) -> {
81+
var duration = Duration.between(Instant.now(), instant);
82+
if (duration.isNegative()) {
83+
// as soon as possible
84+
return Duration.ZERO;
85+
}
86+
return duration;
87+
};
88+
}
7089
}

sdk/src/test/java/software/amazon/lambda/durable/execution/CheckpointManagerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import static org.mockito.Mockito.*;
88

99
import java.time.Duration;
10+
import java.time.Instant;
1011
import java.util.ArrayList;
1112
import java.util.List;
1213
import java.util.concurrent.TimeUnit;
@@ -276,7 +277,7 @@ void pollForUpdate_withCustomDelay() throws Exception {
276277
.build())
277278
.build());
278279

279-
var future = batcher.pollForUpdate("op-1", Duration.ofMillis(100));
280+
var future = batcher.pollForUpdate("op-1", Instant.now().plusMillis(100));
280281

281282
var result = future.get(300, TimeUnit.MILLISECONDS);
282283

@@ -410,7 +411,7 @@ void pollForUpdate_withCustomDelay_ignoresBackoffConfig() throws Exception {
410411
});
411412

412413
// Use explicit delay (fixed interval) — should NOT apply backoff
413-
var future = backoffBatcher.pollForUpdate("op-1", Duration.ofMillis(20));
414+
var future = backoffBatcher.pollForUpdate("op-1", Instant.now().plusMillis(20));
414415
var result = future.get(1000, TimeUnit.MILLISECONDS);
415416

416417
assertEquals(operation, result);

0 commit comments

Comments
 (0)