Skip to content

Commit 87f4a17

Browse files
author
Alex Wang
committed
improvement: Improve polling mechanism
1 parent d9b8cef commit 87f4a17

7 files changed

Lines changed: 589 additions & 16 deletions

File tree

sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import software.amazon.lambda.durable.client.DurableExecutionClient;
2222
import software.amazon.lambda.durable.client.LambdaDurableFunctionsClient;
2323
import software.amazon.lambda.durable.logging.LoggerConfig;
24+
import software.amazon.lambda.durable.retry.JitterStrategy;
2425
import software.amazon.lambda.durable.serde.JacksonSerDes;
2526
import software.amazon.lambda.durable.serde.SerDes;
2627

@@ -78,6 +79,8 @@ public final class DurableConfig {
7879
private final ExecutorService executorService;
7980
private final LoggerConfig loggerConfig;
8081
private final Duration pollingInterval;
82+
private final JitterStrategy pollingJitter;
83+
private final double pollingBackoffRate;
8184
private final Duration checkpointDelay;
8285

8386
private DurableConfig(Builder builder) {
@@ -88,6 +91,8 @@ private DurableConfig(Builder builder) {
8891
this.executorService = builder.executorService != null ? builder.executorService : createDefaultExecutor();
8992
this.loggerConfig = builder.loggerConfig != null ? builder.loggerConfig : LoggerConfig.defaults();
9093
this.pollingInterval = builder.pollingInterval != null ? builder.pollingInterval : Duration.ofMillis(1000);
94+
this.pollingJitter = builder.pollingJitter != null ? builder.pollingJitter : JitterStrategy.FULL;
95+
this.pollingBackoffRate = builder.pollingBackoffRate != 0.0 ? builder.pollingBackoffRate : 2.0;
9196
this.checkpointDelay = builder.checkpointDelay != null ? builder.checkpointDelay : Duration.ofSeconds(0);
9297
}
9398

@@ -154,6 +159,22 @@ public Duration getPollingInterval() {
154159
return pollingInterval;
155160
}
156161

162+
/**
163+
* Gets the polling backoff rate.
164+
* @return polling backoff rate.
165+
*/
166+
public double getPollingBackoffRate() {
167+
return pollingBackoffRate;
168+
}
169+
170+
/**
171+
* Gets the polling jitter.
172+
* @return polling jitter.
173+
*/
174+
public JitterStrategy getPollingJitter() {
175+
return pollingJitter;
176+
}
177+
157178
/**
158179
* Gets the configured checkpoint delay.
159180
*
@@ -249,6 +270,8 @@ public static final class Builder {
249270
private ExecutorService executorService;
250271
private LoggerConfig loggerConfig;
251272
private Duration pollingInterval;
273+
private JitterStrategy pollingJitter;
274+
private double pollingBackoffRate;
252275
private Duration checkpointDelay;
253276

254277
private Builder() {}
@@ -347,6 +370,31 @@ public Builder withPollingInterval(Duration duration) {
347370
return this;
348371
}
349372

373+
/**
374+
* Sets the polling backoff rate.
375+
*
376+
* @param backoffRate the backoff rate (must be positive)
377+
* @return This builder
378+
*/
379+
public Builder withPollingBackoffRate(double backoffRate) {
380+
if (backoffRate <= 0) {
381+
throw new IllegalArgumentException("backoffRate must be positive");
382+
}
383+
this.pollingBackoffRate = backoffRate;
384+
return this;
385+
}
386+
387+
/**
388+
* Sets the polling jitter strategy.
389+
*
390+
* @param jitter the jitter strategy to use
391+
* @return This builder
392+
*/
393+
public Builder withPollingJitter(JitterStrategy jitter) {
394+
this.pollingJitter = jitter;
395+
return this;
396+
}
397+
350398
/**
351399
* Sets how often the SDK checkpoints updates to backend. If not set, defaults to 0, which disables checkpoint
352400
* batching.

sdk/src/main/java/software/amazon/lambda/durable/execution/CheckpointBatcher.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,18 @@ CompletableFuture<Void> checkpoint(OperationUpdate update) {
5757

5858
/** Polls for updates of the specified operation with preconfigured intervals */
5959
CompletableFuture<Operation> pollForUpdate(String operationId) {
60-
return pollForUpdate(operationId, config.getPollingInterval());
60+
return pollForUpdate(operationId, config.getPollingInterval(), false);
6161
}
6262

6363
/** Polls for updates of the specified operation with specified delay */
6464
CompletableFuture<Operation> pollForUpdate(String operationId, Duration delay) {
65+
return pollForUpdate(operationId, delay, true);
66+
}
67+
68+
/**
69+
* Polls for updates of the specified operation with custom delay and interval behavior.
70+
*/
71+
CompletableFuture<Operation> pollForUpdate(String operationId, Duration delay, boolean isFixedInterval) {
6572
logger.debug("Polling request received: operation id {}", operationId);
6673
var future = new CompletableFuture<Operation>();
6774
synchronized (pollingFutures) {
@@ -70,16 +77,23 @@ CompletableFuture<Operation> pollForUpdate(String operationId, Duration delay) {
7077
.computeIfAbsent(operationId, k -> Collections.synchronizedList(new ArrayList<>()))
7178
.add(future);
7279
}
73-
pollForUpdateInternal(future, delay);
80+
pollForUpdateInternal(future, delay, isFixedInterval, 0);
7481
return future;
7582
}
7683

77-
private CompletableFuture<Void> pollForUpdateInternal(CompletableFuture<Operation> future, Duration delay) {
78-
return checkpointApiRequestBatcher.submit(null, delay).thenCompose(v -> {
84+
private CompletableFuture<Void> pollForUpdateInternal(CompletableFuture<Operation> future, Duration initialDelay, boolean isFixedInterval, int attempt) {
85+
Duration actualDelay = initialDelay;
86+
if (!isFixedInterval) {
87+
double delayMilli = initialDelay.toMillis();
88+
delayMilli = delayMilli * Math.pow(config.getPollingBackoffRate(), attempt);
89+
delayMilli = config.getPollingJitter().apply(delayMilli);
90+
actualDelay = Duration.ofMillis(Math.round(delayMilli));
91+
}
92+
return checkpointApiRequestBatcher.submit(null, actualDelay).thenCompose(v -> {
7993
if (future.isDone()) {
8094
return CompletableFuture.completedFuture(null);
8195
}
82-
return pollForUpdateInternal(future, delay);
96+
return pollForUpdateInternal(future, initialDelay, isFixedInterval, attempt + 1);
8397
});
8498
}
8599

sdk/src/main/java/software/amazon/lambda/durable/retry/JitterStrategy.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,32 @@ public enum JitterStrategy {
1313
/**
1414
* No jitter - use exact calculated delay. This provides predictable timing but may cause thundering herd issues.
1515
*/
16-
NONE,
17-
16+
NONE {
17+
@Override
18+
public double apply(double baseDelay) {
19+
return baseDelay;
20+
}
21+
},
1822
/**
1923
* Full jitter - random delay between 0 and calculated delay. This provides maximum spread but may result in very
2024
* short delays.
2125
*/
22-
FULL,
23-
26+
FULL {
27+
@Override
28+
public double apply(double baseDelay) {
29+
return Math.random() * baseDelay;
30+
}
31+
},
2432
/**
2533
* Half jitter - random delay between 50% and 100% of calculated delay. This provides good spread while maintaining
2634
* reasonable minimum delays.
2735
*/
28-
HALF
36+
HALF {
37+
@Override
38+
public double apply(double baseDelay) {
39+
return baseDelay / 2 + Math.random() * (baseDelay / 2);
40+
}
41+
};
42+
43+
public abstract double apply(double baseDelay);
2944
}

sdk/src/main/java/software/amazon/lambda/durable/retry/RetryStrategies.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,7 @@ public static RetryStrategy exponentialBackoff(
6969
double baseDelay = Math.min(initialDelaySeconds * Math.pow(backoffRate, attemptNumber), maxDelaySeconds);
7070

7171
// Apply jitter
72-
double delayWithJitter =
73-
switch (jitter) {
74-
case NONE -> baseDelay;
75-
case FULL -> Math.random() * baseDelay;
76-
case HALF -> baseDelay / 2 + Math.random() * (baseDelay / 2);
77-
};
72+
double delayWithJitter = jitter.apply(baseDelay);
7873

7974
// Round to nearest second, minimum 1
8075
// Same rounding logic as TS SDK: https://tinyurl.com/4ntxsefu

sdk/src/test/java/software/amazon/lambda/durable/DurableConfigTest.java

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@
1212
import static org.junit.jupiter.api.Assertions.assertTrue;
1313
import static org.mockito.Mockito.mock;
1414

15+
import java.time.Duration;
1516
import java.util.concurrent.ExecutorService;
1617
import org.junit.jupiter.api.BeforeEach;
1718
import org.junit.jupiter.api.Test;
1819
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
1920
import software.amazon.awssdk.services.lambda.LambdaClient;
2021
import software.amazon.lambda.durable.client.DurableExecutionClient;
2122
import software.amazon.lambda.durable.client.LambdaDurableFunctionsClient;
23+
import software.amazon.lambda.durable.retry.JitterStrategy;
2224
import software.amazon.lambda.durable.serde.JacksonSerDes;
2325
import software.amazon.lambda.durable.serde.SerDes;
2426

@@ -294,4 +296,149 @@ void testAddUserAgentSuffix_ReturnsSameBuilderInstance() {
294296

295297
assertSame(lambdaClientBuilder, result);
296298
}
299+
300+
// --- Polling interval tests ---
301+
302+
@Test
303+
void testDefaultConfig_PollingIntervalDefaults() {
304+
var config = DurableConfig.defaultConfig();
305+
306+
assertEquals(Duration.ofMillis(1000), config.getPollingInterval());
307+
assertEquals(JitterStrategy.FULL, config.getPollingJitter());
308+
assertEquals(2.0, config.getPollingBackoffRate());
309+
assertEquals(Duration.ofSeconds(0), config.getCheckpointDelay());
310+
}
311+
312+
@Test
313+
void testBuilder_WithCustomPollingInterval() {
314+
var config = DurableConfig.builder()
315+
.withDurableExecutionClient(mockClient)
316+
.withPollingInterval(Duration.ofMillis(500))
317+
.build();
318+
319+
assertEquals(Duration.ofMillis(500), config.getPollingInterval());
320+
}
321+
322+
@Test
323+
void testBuilder_WithCustomPollingBackoffRate() {
324+
var config = DurableConfig.builder()
325+
.withDurableExecutionClient(mockClient)
326+
.withPollingBackoffRate(3.0)
327+
.build();
328+
329+
assertEquals(3.0, config.getPollingBackoffRate());
330+
}
331+
332+
@Test
333+
void testBuilder_WithPollingBackoffRateNotSet_UsesDefault() {
334+
var config = DurableConfig.builder()
335+
.withDurableExecutionClient(mockClient)
336+
.build();
337+
338+
assertEquals(2.0, config.getPollingBackoffRate());
339+
}
340+
341+
@Test
342+
void testBuilder_WithZeroPollingBackoffRate_ThrowsException() {
343+
var builder = DurableConfig.builder();
344+
345+
var exception = assertThrows(IllegalArgumentException.class, () -> builder.withPollingBackoffRate(0.0));
346+
347+
assertEquals("backoffRate must be positive", exception.getMessage());
348+
}
349+
350+
@Test
351+
void testBuilder_WithNegativePollingBackoffRate_ThrowsException() {
352+
var builder = DurableConfig.builder();
353+
354+
var exception = assertThrows(IllegalArgumentException.class, () -> builder.withPollingBackoffRate(-1.0));
355+
356+
assertEquals("backoffRate must be positive", exception.getMessage());
357+
}
358+
359+
@Test
360+
void testBuilder_WithCustomPollingJitter() {
361+
var config = DurableConfig.builder()
362+
.withDurableExecutionClient(mockClient)
363+
.withPollingJitter(JitterStrategy.NONE)
364+
.build();
365+
366+
assertEquals(JitterStrategy.NONE, config.getPollingJitter());
367+
}
368+
369+
@Test
370+
void testBuilder_WithPollingJitterHalf() {
371+
var config = DurableConfig.builder()
372+
.withDurableExecutionClient(mockClient)
373+
.withPollingJitter(JitterStrategy.HALF)
374+
.build();
375+
376+
assertEquals(JitterStrategy.HALF, config.getPollingJitter());
377+
}
378+
379+
@Test
380+
void testBuilder_WithPollingJitterNull_UsesDefault() {
381+
var config = DurableConfig.builder()
382+
.withDurableExecutionClient(mockClient)
383+
.withPollingJitter(null)
384+
.build();
385+
386+
assertEquals(JitterStrategy.FULL, config.getPollingJitter());
387+
}
388+
389+
@Test
390+
void testBuilder_WithCustomCheckpointDelay() {
391+
var config = DurableConfig.builder()
392+
.withDurableExecutionClient(mockClient)
393+
.withCheckpointDelay(Duration.ofSeconds(5))
394+
.build();
395+
396+
assertEquals(Duration.ofSeconds(5), config.getCheckpointDelay());
397+
}
398+
399+
@Test
400+
void testBuilder_WithAllPollingSettings() {
401+
var config = DurableConfig.builder()
402+
.withDurableExecutionClient(mockClient)
403+
.withPollingInterval(Duration.ofMillis(200))
404+
.withPollingBackoffRate(1.5)
405+
.withPollingJitter(JitterStrategy.HALF)
406+
.withCheckpointDelay(Duration.ofSeconds(2))
407+
.build();
408+
409+
assertEquals(Duration.ofMillis(200), config.getPollingInterval());
410+
assertEquals(1.5, config.getPollingBackoffRate());
411+
assertEquals(JitterStrategy.HALF, config.getPollingJitter());
412+
assertEquals(Duration.ofSeconds(2), config.getCheckpointDelay());
413+
}
414+
415+
@Test
416+
void testBuilder_FluentAPI_PollingMethods() {
417+
var builder = DurableConfig.builder();
418+
419+
assertSame(builder, builder.withPollingInterval(Duration.ofMillis(500)));
420+
assertSame(builder, builder.withPollingBackoffRate(2.0));
421+
assertSame(builder, builder.withPollingJitter(JitterStrategy.FULL));
422+
assertSame(builder, builder.withCheckpointDelay(Duration.ofSeconds(1)));
423+
}
424+
425+
@Test
426+
void testBuilder_PollingIntervalNull_UsesDefault() {
427+
var config = DurableConfig.builder()
428+
.withDurableExecutionClient(mockClient)
429+
.withPollingInterval(null)
430+
.build();
431+
432+
assertEquals(Duration.ofMillis(1000), config.getPollingInterval());
433+
}
434+
435+
@Test
436+
void testBuilder_CheckpointDelayNull_UsesDefault() {
437+
var config = DurableConfig.builder()
438+
.withDurableExecutionClient(mockClient)
439+
.withCheckpointDelay(null)
440+
.build();
441+
442+
assertEquals(Duration.ofSeconds(0), config.getCheckpointDelay());
443+
}
297444
}

0 commit comments

Comments
 (0)