Skip to content

Commit f0b42ac

Browse files
author
Alex Wang
committed
improvement: improve polling mechanism
- Refactor jitter - Add jitter, polling interval, backoff rate and max interval - unit tests - example
1 parent d9b8cef commit f0b42ac

13 files changed

Lines changed: 978 additions & 32 deletions

File tree

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples;
4+
5+
import java.time.Duration;
6+
import software.amazon.lambda.durable.DurableConfig;
7+
import software.amazon.lambda.durable.DurableContext;
8+
import software.amazon.lambda.durable.DurableHandler;
9+
import software.amazon.lambda.durable.InvokeConfig;
10+
import software.amazon.lambda.durable.retry.JitterStrategy;
11+
import software.amazon.lambda.durable.retry.PollingStrategies;
12+
13+
/**
14+
* Example demonstrating custom polling strategy configuration.
15+
*
16+
* <p>The polling strategy controls how the SDK polls for async operation results. By default, the SDK uses exponential
17+
* backoff (1s base, 2x rate, full jitter). This example shows how to customize the polling behavior.
18+
*
19+
* <p>This example configures:
20+
*
21+
* <ul>
22+
* <li>Exponential backoff with 500ms base interval
23+
* <li>1.5x backoff rate for gentler growth
24+
* <li>Half jitter to balance between consistency and thundering herd avoidance
25+
* </ul>
26+
*/
27+
public class CustomPollingExample extends DurableHandler<GreetingRequest, String> {
28+
29+
@Override
30+
protected DurableConfig createConfiguration() {
31+
return DurableConfig.builder()
32+
.withPollingStrategy(PollingStrategies.exponentialBackoff(
33+
Duration.ofMillis(500), 1.5, JitterStrategy.HALF, Duration.ofSeconds(5)))
34+
.build();
35+
}
36+
37+
@Override
38+
public String handleRequest(GreetingRequest input, DurableContext context) {
39+
context.getLogger().info("Starting workflow with input: {}", input);
40+
41+
// Step 1: lower case the input
42+
var lowered = context.stepAsync("validate", String.class, () -> {
43+
try {
44+
// prevent the execution from suspension
45+
Thread.sleep(5000);
46+
} catch (InterruptedException e) {
47+
e.printStackTrace();
48+
}
49+
return input.getName().toLowerCase();
50+
});
51+
52+
// Step 2: Invoke async
53+
var future = context.invokeAsync(
54+
"call-greeting",
55+
"simple-step-example" + input.getName() + ":$LATEST",
56+
input,
57+
String.class,
58+
InvokeConfig.builder().build());
59+
60+
return future.get() + lowered.get();
61+
}
62+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples;
4+
5+
import static org.junit.jupiter.api.Assertions.*;
6+
7+
import org.junit.jupiter.api.Test;
8+
import software.amazon.lambda.durable.model.ExecutionStatus;
9+
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
10+
11+
class CustomPollingExampleTest {
12+
13+
@Test
14+
void testCustomPollingExample() {
15+
var handler = new CustomPollingExample();
16+
var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler);
17+
18+
// First run: executes validate step, then pending at wait
19+
var input = new GreetingRequest("world");
20+
var output1 = runner.run(input);
21+
22+
assertEquals(ExecutionStatus.PENDING, output1.getStatus());
23+
24+
// Second run:
25+
runner.completeChainedInvoke("call-greeting", "\"hello\"");
26+
var output2 = runner.run(input);
27+
28+
assertEquals(ExecutionStatus.SUCCEEDED, output2.getStatus());
29+
assertEquals("helloworld", output2.getResult(String.class));
30+
}
31+
}

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/LocalDurableTestRunner.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@ public TestResult<O> run(I input) {
155155
.withDurableExecutionClient(storage)
156156
.withSerDes(customerConfig.getSerDes())
157157
.withExecutorService(customerConfig.getExecutorService())
158+
.withPollingStrategy(customerConfig.getPollingStrategy())
159+
.withCheckpointDelay(customerConfig.getCheckpointDelay())
160+
.withLoggerConfig(customerConfig.getLoggerConfig())
158161
.build();
159162
} else {
160163
// Fallback to default config with in-memory client

sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import software.amazon.lambda.durable.client.DurableExecutionClient;
2222
import software.amazon.lambda.durable.client.LambdaDurableFunctionsClient;
2323
import software.amazon.lambda.durable.logging.LoggerConfig;
24+
import software.amazon.lambda.durable.retry.PollingStrategies;
25+
import software.amazon.lambda.durable.retry.PollingStrategy;
2426
import software.amazon.lambda.durable.serde.JacksonSerDes;
2527
import software.amazon.lambda.durable.serde.SerDes;
2628

@@ -77,7 +79,7 @@ public final class DurableConfig {
7779
private final SerDes serDes;
7880
private final ExecutorService executorService;
7981
private final LoggerConfig loggerConfig;
80-
private final Duration pollingInterval;
82+
private final PollingStrategy pollingStrategy;
8183
private final Duration checkpointDelay;
8284

8385
private DurableConfig(Builder builder) {
@@ -87,7 +89,8 @@ private DurableConfig(Builder builder) {
8789
this.serDes = builder.serDes != null ? builder.serDes : new JacksonSerDes();
8890
this.executorService = builder.executorService != null ? builder.executorService : createDefaultExecutor();
8991
this.loggerConfig = builder.loggerConfig != null ? builder.loggerConfig : LoggerConfig.defaults();
90-
this.pollingInterval = builder.pollingInterval != null ? builder.pollingInterval : Duration.ofMillis(1000);
92+
this.pollingStrategy =
93+
builder.pollingStrategy != null ? builder.pollingStrategy : PollingStrategies.Presets.DEFAULT;
9194
this.checkpointDelay = builder.checkpointDelay != null ? builder.checkpointDelay : Duration.ofSeconds(0);
9295
}
9396

@@ -146,12 +149,12 @@ public LoggerConfig getLoggerConfig() {
146149
}
147150

148151
/**
149-
* Gets the configured polling interval.
152+
* Gets the polling strategy.
150153
*
151-
* @return polling interval in Duration.
154+
* @return PollingStrategy instance (never null)
152155
*/
153-
public Duration getPollingInterval() {
154-
return pollingInterval;
156+
public PollingStrategy getPollingStrategy() {
157+
return pollingStrategy;
155158
}
156159

157160
/**
@@ -248,7 +251,7 @@ public static final class Builder {
248251
private SerDes serDes;
249252
private ExecutorService executorService;
250253
private LoggerConfig loggerConfig;
251-
private Duration pollingInterval;
254+
private PollingStrategy pollingStrategy;
252255
private Duration checkpointDelay;
253256

254257
private Builder() {}
@@ -336,14 +339,14 @@ public Builder withLoggerConfig(LoggerConfig loggerConfig) {
336339
}
337340

338341
/**
339-
* Sets how often the SDK polls updates from backend.
342+
* Sets the polling strategy. If not set, defaults to 1 second with full jitter and 2x backoff.
340343
*
341-
* @param duration the polling interval in Duration
344+
* @param pollingStrategy Custom PollingStrategy instance
342345
* @return This builder
343346
*/
344-
public Builder withPollingInterval(Duration duration) {
347+
public Builder withPollingStrategy(PollingStrategy pollingStrategy) {
345348
// No validation - polling intervals can be less than 1 second (e.g., 200ms with backoff)
346-
this.pollingInterval = duration;
349+
this.pollingStrategy = pollingStrategy;
347350
return this;
348351
}
349352

sdk/src/main/java/software/amazon/lambda/durable/execution/CheckpointBatcher.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import software.amazon.awssdk.services.lambda.model.Operation;
1818
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
1919
import software.amazon.lambda.durable.DurableConfig;
20+
import software.amazon.lambda.durable.retry.PollingStrategies;
21+
import software.amazon.lambda.durable.retry.PollingStrategy;
2022

2123
/**
2224
* Package-private checkpoint manager for batching and queueing checkpoint API calls.
@@ -57,11 +59,16 @@ CompletableFuture<Void> checkpoint(OperationUpdate update) {
5759

5860
/** Polls for updates of the specified operation with preconfigured intervals */
5961
CompletableFuture<Operation> pollForUpdate(String operationId) {
60-
return pollForUpdate(operationId, config.getPollingInterval());
62+
return pollForUpdate(operationId, config.getPollingStrategy());
6163
}
6264

6365
/** Polls for updates of the specified operation with specified delay */
6466
CompletableFuture<Operation> pollForUpdate(String operationId, Duration delay) {
67+
return pollForUpdate(operationId, PollingStrategies.fixedDelay(delay));
68+
}
69+
70+
/** Polls for updates of the specified operation with specified polling strategy */
71+
CompletableFuture<Operation> pollForUpdate(String operationId, PollingStrategy pollingStrategy) {
6572
logger.debug("Polling request received: operation id {}", operationId);
6673
var future = new CompletableFuture<Operation>();
6774
synchronized (pollingFutures) {
@@ -70,17 +77,20 @@ CompletableFuture<Operation> pollForUpdate(String operationId, Duration delay) {
7077
.computeIfAbsent(operationId, k -> Collections.synchronizedList(new ArrayList<>()))
7178
.add(future);
7279
}
73-
pollForUpdateInternal(future, delay);
80+
pollForUpdateInternal(future, 0, pollingStrategy);
7481
return future;
7582
}
7683

77-
private CompletableFuture<Void> pollForUpdateInternal(CompletableFuture<Operation> future, Duration delay) {
78-
return checkpointApiRequestBatcher.submit(null, delay).thenCompose(v -> {
79-
if (future.isDone()) {
80-
return CompletableFuture.completedFuture(null);
81-
}
82-
return pollForUpdateInternal(future, delay);
83-
});
84+
private CompletableFuture<Void> pollForUpdateInternal(
85+
CompletableFuture<Operation> future, int attempt, PollingStrategy pollingStrategy) {
86+
return checkpointApiRequestBatcher
87+
.submit(null, pollingStrategy.computeDelay(attempt))
88+
.thenCompose(v -> {
89+
if (future.isDone()) {
90+
return CompletableFuture.completedFuture(null);
91+
}
92+
return pollForUpdateInternal(future, attempt + 1, pollingStrategy);
93+
});
8494
}
8595

8696
/** Cancels all polling futures and waits for all pending checkpoint requests to complete */

sdk/src/main/java/software/amazon/lambda/durable/retry/JitterStrategy.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,32 @@ public enum JitterStrategy {
1313
/**
1414
* No jitter - use exact calculated delay. This provides predictable timing but may cause thundering herd issues.
1515
*/
16-
NONE,
17-
16+
NONE {
17+
@Override
18+
public double apply(double baseDelay) {
19+
return baseDelay;
20+
}
21+
},
1822
/**
1923
* Full jitter - random delay between 0 and calculated delay. This provides maximum spread but may result in very
2024
* short delays.
2125
*/
22-
FULL,
23-
26+
FULL {
27+
@Override
28+
public double apply(double baseDelay) {
29+
return Math.random() * baseDelay;
30+
}
31+
},
2432
/**
2533
* Half jitter - random delay between 50% and 100% of calculated delay. This provides good spread while maintaining
2634
* reasonable minimum delays.
2735
*/
28-
HALF
36+
HALF {
37+
@Override
38+
public double apply(double baseDelay) {
39+
return baseDelay / 2 + Math.random() * (baseDelay / 2);
40+
}
41+
};
42+
43+
public abstract double apply(double baseDelay);
2944
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.retry;
4+
5+
import java.time.Duration;
6+
import java.util.Objects;
7+
8+
/** Factory class for creating common polling strategies. */
9+
public class PollingStrategies {
10+
11+
/** Preset polling strategies for common use cases. */
12+
public static class Presets {
13+
14+
/**
15+
* Default polling strategy: - Base interval: 1 second - Backoff rate: 2x - Jitter: FULL - Max interval 10
16+
* second
17+
*/
18+
public static final PollingStrategy DEFAULT =
19+
exponentialBackoff(Duration.ofMillis(1000), 2.0, JitterStrategy.FULL, Duration.ofSeconds(10));
20+
}
21+
22+
/**
23+
* Creates an exponential backoff polling strategy.
24+
*
25+
* <p>The delay calculation follows the formula: delay = jitter(baseInterval × backoffRate^attempt)
26+
*
27+
* @param baseInterval Base delay before first poll
28+
* @param backoffRate Multiplier for exponential backoff (must be positive)
29+
* @param jitter Jitter strategy to apply to delays
30+
* @param maxInterval Maximum delay between polls
31+
* @return PollingStrategy implementing exponential backoff with jitter
32+
*/
33+
public static PollingStrategy exponentialBackoff(
34+
Duration baseInterval, double backoffRate, JitterStrategy jitter, Duration maxInterval) {
35+
Objects.requireNonNull(jitter, "jitter must not be null");
36+
Objects.requireNonNull(baseInterval, "base interval must not be null");
37+
Objects.requireNonNull(maxInterval, "max interval must not be null");
38+
if (backoffRate <= 0) {
39+
throw new IllegalArgumentException("backoffRate must be positive");
40+
}
41+
42+
if (baseInterval.isNegative() || baseInterval.isZero()) {
43+
throw new IllegalArgumentException("baseInterval must be positive");
44+
}
45+
46+
if (maxInterval.isNegative() || maxInterval.isZero()) {
47+
throw new IllegalArgumentException("maxInterval must be positive");
48+
}
49+
50+
return (attempt) -> {
51+
double delayMs = baseInterval.toMillis() * Math.pow(backoffRate, attempt);
52+
delayMs = Math.min(jitter.apply(delayMs), maxInterval.toMillis());
53+
return Duration.ofMillis(Math.round(delayMs));
54+
};
55+
}
56+
57+
/**
58+
* Creates a fixed-delay polling strategy that uses the same interval for every attempt.
59+
*
60+
* @param interval Fixed delay between polls
61+
* @return PollingStrategy with fixed delay
62+
*/
63+
public static PollingStrategy fixedDelay(Duration interval) {
64+
Objects.requireNonNull(interval, "interval must not be null");
65+
if (interval.isNegative() || interval.isZero()) {
66+
throw new IllegalArgumentException("interval must be positive");
67+
}
68+
return (attempt) -> interval;
69+
}
70+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.retry;
4+
5+
import java.time.Duration;
6+
7+
/** Functional interface for computing polling delays between attempts. */
8+
@FunctionalInterface
9+
public interface PollingStrategy {
10+
11+
/**
12+
* Computes the delay before the next polling attempt.
13+
*
14+
* @param attempt The current attempt number (0-based)
15+
* @return Duration to wait before the next poll
16+
*/
17+
Duration computeDelay(int attempt);
18+
}

sdk/src/main/java/software/amazon/lambda/durable/retry/RetryStrategies.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,7 @@ public static RetryStrategy exponentialBackoff(
6969
double baseDelay = Math.min(initialDelaySeconds * Math.pow(backoffRate, attemptNumber), maxDelaySeconds);
7070

7171
// Apply jitter
72-
double delayWithJitter =
73-
switch (jitter) {
74-
case NONE -> baseDelay;
75-
case FULL -> Math.random() * baseDelay;
76-
case HALF -> baseDelay / 2 + Math.random() * (baseDelay / 2);
77-
};
72+
double delayWithJitter = jitter.apply(baseDelay);
7873

7974
// Round to nearest second, minimum 1
8075
// Same rounding logic as TS SDK: https://tinyurl.com/4ntxsefu

0 commit comments

Comments
 (0)