Skip to content

Commit c4f440f

Browse files
authored
[improvement] Optimize Poller (#69)
* optimize pollers * add async batcher to call checkpoint API with a delay * update the comments * add tests for batcher * rename methods/fields and update comments * remove locker from batcher * fix race condition * fix exception handling * fix flush now issue * allow operation specific poll time * poll again if the previous poll doesn't receive an update * improve shutdown of checkpointBatcher * add latencies to logs * update some comments * fix the checkpoint delay * update some comments
1 parent dafcd09 commit c4f440f

19 files changed

Lines changed: 563 additions & 251 deletions

examples/src/main/resources/log4j2.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
</Appenders>
88

99
<Loggers>
10+
<Logger name="com.amazonaws.lambda.durable" level="INFO"/>
11+
1012
<!-- AWS SDK logs at WARN -->
1113
<Logger name="software.amazon.awssdk" level="WARN" />
1214

sdk/src/main/java/com/amazonaws/lambda/durable/DurableConfig.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.amazonaws.lambda.durable.logging.LoggerConfig;
88
import com.amazonaws.lambda.durable.serde.JacksonSerDes;
99
import com.amazonaws.lambda.durable.serde.SerDes;
10+
import java.time.Duration;
1011
import java.util.Objects;
1112
import java.util.concurrent.ExecutorService;
1213
import java.util.concurrent.Executors;
@@ -67,6 +68,8 @@ public final class DurableConfig {
6768
private final SerDes serDes;
6869
private final ExecutorService executorService;
6970
private final LoggerConfig loggerConfig;
71+
private final Duration pollingInterval;
72+
private final Duration checkpointDelay;
7073

7174
private DurableConfig(Builder builder) {
7275
this.durableExecutionClient = builder.durableExecutionClient != null
@@ -75,6 +78,8 @@ private DurableConfig(Builder builder) {
7578
this.serDes = builder.serDes != null ? builder.serDes : new JacksonSerDes();
7679
this.executorService = builder.executorService != null ? builder.executorService : createDefaultExecutor();
7780
this.loggerConfig = builder.loggerConfig != null ? builder.loggerConfig : LoggerConfig.defaults();
81+
this.pollingInterval = builder.pollingInterval != null ? builder.pollingInterval : Duration.ofMillis(1000);
82+
this.checkpointDelay = builder.checkpointDelay != null ? builder.checkpointDelay : Duration.ofSeconds(0);
7883
}
7984

8085
/**
@@ -131,6 +136,24 @@ public LoggerConfig getLoggerConfig() {
131136
return loggerConfig;
132137
}
133138

139+
/**
140+
* Gets the configured polling interval.
141+
*
142+
* @return polling interval in Duration.
143+
*/
144+
public Duration getPollingInterval() {
145+
return pollingInterval;
146+
}
147+
148+
/**
149+
* Gets the configured checkpoint delay.
150+
*
151+
* @return check point in Duration.
152+
*/
153+
public Duration getCheckpointDelay() {
154+
return checkpointDelay;
155+
}
156+
134157
/**
135158
* Creates a default DurableExecutionClient with production LambdaClient. Uses
136159
* EnvironmentVariableCredentialsProvider and region from AWS_REGION. If AWS_REGION is not set, defaults to
@@ -195,6 +218,8 @@ public static final class Builder {
195218
private SerDes serDes;
196219
private ExecutorService executorService;
197220
private LoggerConfig loggerConfig;
221+
private Duration pollingInterval;
222+
private Duration checkpointDelay;
198223

199224
private Builder() {}
200225

@@ -280,6 +305,29 @@ public Builder withLoggerConfig(LoggerConfig loggerConfig) {
280305
return this;
281306
}
282307

308+
/**
309+
* Sets how often the SDK polls updates from backend.
310+
*
311+
* @param duration the polling interval in Duration
312+
* @return This builder
313+
*/
314+
public Builder withPollingInterval(Duration duration) {
315+
this.pollingInterval = duration;
316+
return this;
317+
}
318+
319+
/**
320+
* Sets how often the SDK checkpoints updates to backend. If not set, defaults to 0, which disables checkpoint
321+
* batching.
322+
*
323+
* @param duration the checkpoint delay in Duration
324+
* @return This builder
325+
*/
326+
public Builder withCheckpointDelay(Duration duration) {
327+
this.checkpointDelay = duration;
328+
return this;
329+
}
330+
283331
/**
284332
* Builds the DurableConfig instance.
285333
*

sdk/src/main/java/com/amazonaws/lambda/durable/DurableExecutor.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,7 @@ public static <I, O> DurableExecutionOutput execute(
5454
}
5555

5656
var executionManager = new ExecutionManager(
57-
input.durableExecutionArn(),
58-
input.checkpointToken(),
59-
input.initialExecutionState(),
60-
config.getDurableExecutionClient());
57+
input.durableExecutionArn(), input.checkpointToken(), input.initialExecutionState(), config);
6158

6259
logger.debug(
6360
"EXECUTION operation found: {}",

sdk/src/main/java/com/amazonaws/lambda/durable/DurableHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public final void handleRequest(InputStream inputStream, OutputStream outputStre
170170
*
171171
* <p>Customer-facing serialization uses SerDes from DurableConfig.
172172
*
173-
* @return Configured ObjectMapper for DAR backend communication
173+
* @return Configured ObjectMapper for durable backend communication
174174
*/
175175
public static ObjectMapper createObjectMapper() {
176176
var dateModule = new SimpleModule();

sdk/src/main/java/com/amazonaws/lambda/durable/client/LambdaDurableFunctionsClient.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44

55
import java.util.List;
66
import java.util.Objects;
7-
import org.slf4j.Logger;
8-
import org.slf4j.LoggerFactory;
97
import software.amazon.awssdk.services.lambda.LambdaClient;
108
import software.amazon.awssdk.services.lambda.model.CheckpointDurableExecutionRequest;
119
import software.amazon.awssdk.services.lambda.model.CheckpointDurableExecutionResponse;
@@ -15,7 +13,6 @@
1513

1614
public class LambdaDurableFunctionsClient implements DurableExecutionClient {
1715

18-
private static final Logger logger = LoggerFactory.getLogger(LambdaDurableFunctionsClient.class);
1916
private final LambdaClient lambdaClient;
2017

2118
/**
@@ -35,7 +32,6 @@ public CheckpointDurableExecutionResponse checkpoint(String arn, String token, L
3532
.checkpointToken(token)
3633
.updates(updates)
3734
.build();
38-
logger.debug("Calling DAR backend with {} updates: {}", updates.size(), request);
3935

4036
return lambdaClient.checkpointDurableExecution(request);
4137
}
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package com.amazonaws.lambda.durable.execution;
4+
5+
import java.time.Duration;
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.TimeUnit;
10+
import java.util.function.Consumer;
11+
import java.util.function.Function;
12+
13+
/**
14+
* Batches API requests to optimize throughput by grouping individual calls into batch operations. Batches are flushed
15+
* when full, when size limits are reached, or after a timeout.
16+
*
17+
* <p>Uses a dedicated SDK thread pool for internal coordination, keeping checkpoint processing separate from
18+
* customer-configured executors used for user-defined operations.
19+
*
20+
* @see InternalExecutor
21+
* @param <T> Request type
22+
*/
23+
public class ApiRequestBatcher<T> {
24+
private static final Duration MAX_DELAY = Duration.ofMinutes(60);
25+
26+
/** Maximum items allowed in a single batch */
27+
private final int maxItemCount;
28+
/** Maximum bytes allowed in a single batch */
29+
private final int maxBatchBytes;
30+
/** Calculates byte size of each request */
31+
private final Function<T, Integer> calculateItemSize;
32+
/** Executes the batch operation */
33+
private final Consumer<List<T>> executeBatch;
34+
35+
/** Accumulated requests */
36+
private final List<Item<T>> items;
37+
38+
/** Current batch size in bytes */
39+
private int totalBytes;
40+
41+
/** Time when the current batch must be flushed */
42+
private long expireTime;
43+
44+
/** Timer to auto-flush incomplete batch */
45+
private CompletableFuture<Void> flushTimer;
46+
47+
/** Future of flushing previous batch */
48+
private CompletableFuture<Void> previousBatchFuture;
49+
50+
private record Item<T>(T request, CompletableFuture<Void> result) {}
51+
52+
/**
53+
* Creates a new ApiRequestBatcher with the specified configuration.
54+
*
55+
* @param maxItemCount Maximum number of items per batch
56+
* @param maxBatchBytes Maximum total size in bytes for all items in a batch
57+
* @param calculateItemSize Function to calculate the size in bytes of each item
58+
* @param executeBatch Function to execute the batch action
59+
*/
60+
public ApiRequestBatcher(
61+
int maxItemCount,
62+
int maxBatchBytes,
63+
Function<T, Integer> calculateItemSize,
64+
Consumer<List<T>> executeBatch) {
65+
this.maxItemCount = maxItemCount;
66+
this.maxBatchBytes = maxBatchBytes;
67+
this.calculateItemSize = calculateItemSize;
68+
this.executeBatch = executeBatch;
69+
this.previousBatchFuture = CompletableFuture.allOf();
70+
this.items = new ArrayList<>();
71+
72+
initializeBatch();
73+
}
74+
75+
/**
76+
* Submits request for batched execution.
77+
*
78+
* @param request Request to batch
79+
* @return Future completed when batch executes
80+
*/
81+
CompletableFuture<Void> submit(T request, Duration flushDelay) {
82+
synchronized (items) {
83+
// Flush the current batch if request doesn't fit
84+
if (isFull() || !canFit(request)) {
85+
flushNow();
86+
}
87+
88+
// add the request to the current batch
89+
CompletableFuture<Void> future = new CompletableFuture<>();
90+
totalBytes += calculateItemSize.apply(request);
91+
items.add(new Item<>(request, future));
92+
93+
// create or update the flush timer
94+
long newExpireTime = System.nanoTime() + flushDelay.toNanos();
95+
if (expireTime > newExpireTime) {
96+
// the batch needs to be completed earlier than previously scheduled
97+
expireTime = newExpireTime;
98+
flushAfterDelay(flushDelay.toNanos());
99+
}
100+
101+
if (isFull()) {
102+
// Flush early if batch is full
103+
flushNow();
104+
}
105+
return future;
106+
}
107+
}
108+
109+
/** Flushes pending batch and waits for completion */
110+
void shutdown() {
111+
synchronized (items) {
112+
flushNow();
113+
}
114+
115+
// wait for previous batches to be flushed
116+
previousBatchFuture.join();
117+
}
118+
119+
/** clear the current batch and creates a new batch */
120+
private void initializeBatch() {
121+
this.items.clear();
122+
this.totalBytes = 0;
123+
// MAX_DELAY is longer than a single Lambda invocation
124+
this.expireTime = System.nanoTime() + MAX_DELAY.toNanos();
125+
126+
// the timer future is created initially without a timeout until an item is added to the batch
127+
this.flushTimer = new CompletableFuture<>();
128+
this.flushTimer.thenRun(() -> {
129+
synchronized (items) {
130+
execute();
131+
}
132+
});
133+
}
134+
135+
/** Returns true if request fits within byte limit */
136+
private boolean canFit(T request) {
137+
return totalBytes + calculateItemSize.apply(request) <= maxBatchBytes;
138+
}
139+
140+
/** Returns true if batch has reached item count limit */
141+
private boolean isFull() {
142+
return items.size() >= maxItemCount;
143+
}
144+
145+
private void flushAfterDelay(long delayInNanos) {
146+
flushTimer.completeOnTimeout(null, delayInNanos, TimeUnit.NANOSECONDS);
147+
}
148+
149+
private void flushNow() {
150+
// cancel the flush timer if it has not been triggered
151+
this.flushTimer.cancel(false);
152+
// execute the current batch now
153+
execute();
154+
}
155+
156+
/** Executes batch and completes all item futures */
157+
private void execute() {
158+
var copyItems = new ArrayList<>(items);
159+
initializeBatch();
160+
if (copyItems.isEmpty()) {
161+
return;
162+
}
163+
164+
// append the current batch to the previous one so that the batches can run sequentially
165+
previousBatchFuture = previousBatchFuture.thenRunAsync(
166+
() -> {
167+
try {
168+
var requests = copyItems.stream().map(Item::request).toList();
169+
executeBatch.accept(requests);
170+
for (Item<T> item : copyItems) {
171+
item.result().complete(null);
172+
}
173+
} catch (Throwable ex) {
174+
for (Item<T> item : copyItems) {
175+
item.result().completeExceptionally(ex);
176+
}
177+
}
178+
},
179+
InternalExecutor.INSTANCE);
180+
}
181+
}

0 commit comments

Comments
 (0)