Skip to content

Commit 805a846

Browse files
committed
add async batcher to call checkpoint API with a delay
1 parent 15e5859 commit 805a846

4 files changed

Lines changed: 305 additions & 89 deletions

File tree

sdk/src/main/java/com/amazonaws/lambda/durable/DurableConfig.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public final class DurableConfig {
6969
private final ExecutorService executorService;
7070
private final LoggerConfig loggerConfig;
7171
private final Duration pollingInterval;
72+
private final Duration checkpointDelay;
7273

7374
private DurableConfig(Builder builder) {
7475
this.durableExecutionClient = builder.durableExecutionClient != null
@@ -78,6 +79,7 @@ private DurableConfig(Builder builder) {
7879
this.executorService = builder.executorService != null ? builder.executorService : createDefaultExecutor();
7980
this.loggerConfig = builder.loggerConfig != null ? builder.loggerConfig : LoggerConfig.defaults();
8081
this.pollingInterval = builder.pollingInterval != null ? builder.pollingInterval : Duration.ofSeconds(1);
82+
this.checkpointDelay = builder.checkpointDelay != null ? builder.checkpointDelay : Duration.ofSeconds(0);
8183
}
8284

8385
/**
@@ -143,6 +145,15 @@ public Duration getPollingInterval() {
143145
return pollingInterval;
144146
}
145147

148+
/**
149+
* Gets the configured checkpoint delay.
150+
*
151+
* @return check point in Duration.
152+
*/
153+
public Duration getCheckpointDelay() {
154+
return pollingInterval;
155+
}
156+
146157
/**
147158
* Creates a default DurableExecutionClient with production LambdaClient. Uses
148159
* EnvironmentVariableCredentialsProvider and region from AWS_REGION. If AWS_REGION is not set, defaults to
@@ -207,7 +218,8 @@ public static final class Builder {
207218
private SerDes serDes;
208219
private ExecutorService executorService;
209220
private LoggerConfig loggerConfig;
210-
public Duration pollingInterval;
221+
private Duration pollingInterval;
222+
private Duration checkpointDelay;
211223

212224
private Builder() {}
213225

@@ -304,6 +316,18 @@ public Builder withPollingInterval(Duration duration) {
304316
return this;
305317
}
306318

319+
/**
320+
* Sets how often the SDK checkpoints updates to backend. If not set, defaults to 0, which disables checkpoint
321+
* batching.
322+
*
323+
* @param duration the checkpoint delay in Duration
324+
* @return This builder
325+
*/
326+
public Builder withCheckpointDelay(Duration duration) {
327+
this.checkpointDelay = duration;
328+
return this;
329+
}
330+
307331
/**
308332
* Builds the DurableConfig instance.
309333
*
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package com.amazonaws.lambda.durable.execution;
4+
5+
import com.amazonaws.lambda.durable.util.ExceptionHelper;
6+
import java.time.Duration;
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
import java.util.concurrent.CompletableFuture;
10+
import java.util.concurrent.TimeUnit;
11+
import java.util.function.Function;
12+
import java.util.stream.Collectors;
13+
14+
/**
15+
* This class simplifies automatic batching of async requests. Your code deals with individual items, the service you
16+
* are calling asynchronously has a cheaper batch API. You are willing to trade some latency by waiting for more calls
17+
* to arrive to group them in a single batch call into the service. The batch call will be made when either a full batch
18+
* is built, too much time has passed, or size limits are reached. This class builds a single batch at a time with
19+
* thread-safe synchronization: - There is no batch yet. - First call arrives. Create a batch with one item in it, start
20+
* a timer. No call to service is made yet. - More calls arrive. They get added to the same batch if size limits allow.
21+
* - Either the batch is full, the timer has elapsed, or size limits are reached. Send the batch request. Now a new
22+
* batch can now be built. - If entire batch call fails, each call will fail. - If batch call succeeded, outcome is
23+
* analyzed one by one to complete results of each call. When you extend this class, you are expected to implement the
24+
* actual batch operation and to expose a public method to perform a single action. The batcher includes comprehensive
25+
* metrics tracking for performance monitoring.
26+
*
27+
* @param <T> Input of every call
28+
*/
29+
public class AsyncBatcher<T> {
30+
31+
/** Maximum time to wait before flushing a batch */
32+
private final Duration maxDelay;
33+
/** Maximum number of items per batch */
34+
private final int maxBatchSize;
35+
/** Maximum total size in bytes for all items in a batch */
36+
private final int maxBatchBinarySizeInBytes;
37+
/** Function to calculate the size in bytes of each item */
38+
private final Function<T, Integer> itemSizeInBytesProvider;
39+
40+
private final Function<List<T>, CompletableFuture<Void>> doBatchAction;
41+
42+
private record BatchItem<T>(T input, CompletableFuture<Void> outputFuture) {}
43+
44+
/** Represents a collection of items to be processed together as a batch. */
45+
private class Batch {
46+
/** List of items in this batch */
47+
private final List<BatchItem<T>> batchItems;
48+
/** Total size in bytes of all items in this batch */
49+
private int batchSizeInBytes;
50+
51+
Batch() {
52+
this.batchItems = new ArrayList<>();
53+
}
54+
55+
/**
56+
* Adds an item to this batch and returns a future for the result.
57+
*
58+
* @param input The item to add to the batch
59+
* @return A CompletableFuture that will be completed with the result
60+
*/
61+
CompletableFuture<Void> addItem(T input) {
62+
final int itemSize = itemSizeInBytesProvider.apply(input);
63+
batchSizeInBytes += itemSize;
64+
65+
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
66+
batchItems.add(new BatchItem<>(input, resultFuture));
67+
return resultFuture;
68+
}
69+
70+
/** Checks if this batch can accept the given item without exceeding size limits. */
71+
boolean canAcceptItem(T input) {
72+
return batchSizeInBytes + itemSizeInBytesProvider.apply(input) <= maxBatchBinarySizeInBytes;
73+
}
74+
75+
/** Checks if this batch can accept more items without exceeding count limits. */
76+
boolean canAcceptMore() {
77+
return batchItems.size() < maxBatchSize;
78+
}
79+
80+
/** Processes this batch by executing the batch action and handling results. */
81+
void processBatch() {
82+
List<T> inputs = extractInputs();
83+
84+
CompletableFuture<Void> batchFuture = doBatchAction.apply(inputs);
85+
86+
batchFuture.thenAccept(this::completeItems).exceptionally(this::failAllItems);
87+
}
88+
89+
private List<T> extractInputs() {
90+
return batchItems.stream().map(BatchItem::input).collect(Collectors.toList());
91+
}
92+
93+
/** Completes individual item futures with their corresponding results */
94+
private void completeItems(Void v) {
95+
for (BatchItem<T> batchItem : batchItems) {
96+
batchItem.outputFuture().complete(null);
97+
}
98+
}
99+
100+
/** Fails all item futures with the given exception */
101+
private Void failAllItems(Throwable wrappedCause) {
102+
Throwable cause = ExceptionHelper.unwrapCompletableFuture(wrappedCause);
103+
for (BatchItem<T> batchItem : batchItems) {
104+
batchItem.outputFuture().completeExceptionally(cause);
105+
}
106+
return null;
107+
}
108+
}
109+
110+
/** Lock for synchronizing access to current batch state */
111+
private final Object currentBatchLock = new Object();
112+
/** The current batch being filled with items */
113+
private Batch currentBatch;
114+
/** Future that completes when the current batch should be flushed due to timeout */
115+
private CompletableFuture<Void> batchFlushFuture;
116+
117+
/**
118+
* Creates a new AsyncBatcher with the specified configuration.
119+
*
120+
* @param maxDelay Maximum time to wait before flushing a batch
121+
* @param maxBatchSize Maximum number of items per batch
122+
* @param maxBatchBinarySizeInBytes Maximum total size in bytes for all items in a batch
123+
* @param itemSizeInBytesProvider Function to calculate the size in bytes of each item
124+
*/
125+
public AsyncBatcher(
126+
Duration maxDelay,
127+
int maxBatchSize,
128+
int maxBatchBinarySizeInBytes,
129+
Function<T, Integer> itemSizeInBytesProvider,
130+
Function<List<T>, CompletableFuture<Void>> doBatchAction) {
131+
this.maxDelay = maxDelay;
132+
this.maxBatchSize = maxBatchSize;
133+
this.maxBatchBinarySizeInBytes = maxBatchBinarySizeInBytes;
134+
this.itemSizeInBytesProvider = itemSizeInBytesProvider;
135+
this.doBatchAction = doBatchAction;
136+
this.currentBatch = null;
137+
this.batchFlushFuture = null;
138+
}
139+
140+
/**
141+
* Submits an item for batch processing. The item will be added to the current batch or trigger batch processing if
142+
* limits are reached.
143+
*
144+
* @param input The item to process
145+
* @return A CompletableFuture that will be completed with the processing result
146+
*/
147+
public CompletableFuture<Void> doAction(T input) {
148+
CompletableFuture<Void> outputFuture;
149+
Batch previousBatchToProcess = null;
150+
Batch newBatchToProcess = null;
151+
152+
synchronized (currentBatchLock) {
153+
// If current batch can't fit this item, flush it first
154+
if (currentBatch != null && !currentBatch.canAcceptItem(input)) {
155+
previousBatchToProcess = getAndClearCurrentBatch();
156+
}
157+
158+
// Create new batch if needed
159+
if (currentBatch == null) {
160+
currentBatch = new Batch();
161+
if (batchFlushFuture != null) {
162+
cancelAndClearCurrentFlusher();
163+
}
164+
}
165+
166+
outputFuture = currentBatch.addItem(input);
167+
168+
// If batch is full, process it immediately
169+
if (!currentBatch.canAcceptMore()) {
170+
newBatchToProcess = getAndClearCurrentBatch();
171+
}
172+
173+
// Set up timeout-based flushing for non-full batches
174+
if (currentBatch != null && batchFlushFuture == null) {
175+
batchFlushFuture = new CompletableFuture<>();
176+
batchFlushFuture
177+
.completeOnTimeout(null, maxDelay.toMillis(), TimeUnit.MILLISECONDS)
178+
.thenRun(() -> {
179+
Batch toFlush;
180+
synchronized (currentBatchLock) {
181+
if (currentBatch != null) {
182+
toFlush = getAndClearCurrentBatch();
183+
} else {
184+
return;
185+
}
186+
}
187+
toFlush.processBatch();
188+
});
189+
}
190+
191+
// Clean up flush future if no current batch
192+
if (currentBatch == null && batchFlushFuture != null) {
193+
cancelAndClearCurrentFlusher();
194+
}
195+
}
196+
197+
// Process batches outside of synchronized block to avoid blocking
198+
if (previousBatchToProcess != null) {
199+
previousBatchToProcess.processBatch();
200+
}
201+
202+
if (newBatchToProcess != null) {
203+
newBatchToProcess.processBatch();
204+
}
205+
206+
return outputFuture;
207+
}
208+
209+
/** Gets the current batch and clears it, ensuring it's not null */
210+
private Batch getAndClearCurrentBatch() {
211+
if (currentBatch == null) {
212+
throw new IllegalStateException("currentBatch must not be null");
213+
}
214+
final Batch batchToProcess = currentBatch;
215+
currentBatch = null;
216+
return batchToProcess;
217+
}
218+
219+
/** Cancels the current flush future and clears it, ensuring it's not null */
220+
private void cancelAndClearCurrentFlusher() {
221+
if (batchFlushFuture == null) {
222+
throw new IllegalStateException("batchFlushFuture must not be null");
223+
}
224+
batchFlushFuture.cancel(false);
225+
batchFlushFuture = null;
226+
}
227+
}

0 commit comments

Comments
 (0)