|
9 | 9 | import java.util.concurrent.CompletableFuture; |
10 | 10 | import java.util.concurrent.TimeUnit; |
11 | 11 | import java.util.function.Function; |
12 | | -import java.util.stream.Collectors; |
13 | 12 |
|
14 | 13 | /** |
15 | | - * This class simplifies automatic batching of api requests. The individual request items will be grouped if the service |
16 | | - * has a cheaper batch API, and we want to trade some latency by waiting for more calls to arrive. The batch call will |
17 | | - * be made when either a full batch is built, too much time has passed, or size limits are reached. This class builds a |
18 | | - * single batch at a time with thread-safe synchronization: - There is no batch yet. - First call arrives. Create a |
19 | | - * batch with one item in it, start a timer. No call to service is made yet. - More calls arrive. They get added to the |
20 | | - * same batch if size limits allow. - Either the batch is full, the timer has elapsed, or size limits are reached. Send |
21 | | - * the batch request. Now a new batch can now be built. - If entire batch call fails, each call will fail. - If batch |
22 | | - * call succeeded, outcome is analyzed one by one to complete results of each call. When you extend this class, you are |
23 | | - * expected to implement the actual batch operation and to expose a public method to perform a single action. The |
24 | | - * batcher includes comprehensive metrics tracking for performance monitoring. |
| 14 | + * Batches API requests to optimize throughput by grouping individual calls into batch operations. |
| 15 | + * Batches are flushed when full, when size limits are reached, or after a timeout. |
25 | 16 | * |
26 | | - * @param <T> Input of every call |
| 17 | + * @param <T> Request type |
27 | 18 | */ |
28 | 19 | public class ApiRequestBatcher<T> { |
29 | 20 |
|
30 | | - /** Maximum time to wait before flushing a batch */ |
31 | | - private final Duration maxDelay; |
32 | | - /** Maximum number of items per batch */ |
33 | | - private final int maxBatchSize; |
34 | | - /** Maximum total size in bytes for all items in a batch */ |
35 | | - private final int maxBatchBinarySizeInBytes; |
36 | | - /** Function to calculate the size in bytes of each item */ |
37 | | - private final Function<T, Integer> itemSizeInBytesProvider; |
| 21 | + /** Timeout before auto-flushing incomplete batch */ |
| 22 | + private final Duration flushDelay; |
| 23 | + /** Maximum items allowed in a single batch */ |
| 24 | + private final int maxItemCount; |
| 25 | + /** Maximum bytes allowed in a single batch */ |
| 26 | + private final int maxBatchBytes; |
| 27 | + /** Calculates byte size of each request */ |
| 28 | + private final Function<T, Integer> calculateItemSize; |
| 29 | + /** Executes the batch operation */ |
| 30 | + private final Function<List<T>, CompletableFuture<Void>> executeBatch; |
38 | 31 |
|
39 | | - private final Function<List<T>, CompletableFuture<Void>> doBatchAction; |
| 32 | + private record Item<T>(T request, CompletableFuture<Void> result) {} |
40 | 33 |
|
41 | | - private record BatchItem<T>(T input, CompletableFuture<Void> outputFuture) {} |
42 | | - |
43 | | - /** Represents a collection of items to be processed together as a batch. */ |
| 34 | + /** Batch accumulator */ |
44 | 35 | private class Batch { |
45 | | - /** List of items in this batch */ |
46 | | - private final List<BatchItem<T>> batchItems; |
47 | | - /** Total size in bytes of all items in this batch */ |
48 | | - private int batchSizeInBytes; |
| 36 | + /** Accumulated requests */ |
| 37 | + private final List<Item<T>> items; |
| 38 | + /** Current batch size in bytes */ |
| 39 | + private int totalBytes; |
49 | 40 |
|
50 | 41 | Batch() { |
51 | | - this.batchItems = new ArrayList<>(); |
52 | | - } |
53 | | - |
54 | | - /** |
55 | | - * Adds an item to this batch and returns a future for the result. |
56 | | - * |
57 | | - * @param input The item to add to the batch |
58 | | - * @return A CompletableFuture that will be completed with the result |
59 | | - */ |
60 | | - CompletableFuture<Void> addItem(T input) { |
61 | | - final int itemSize = itemSizeInBytesProvider.apply(input); |
62 | | - batchSizeInBytes += itemSize; |
63 | | - |
64 | | - CompletableFuture<Void> resultFuture = new CompletableFuture<>(); |
65 | | - batchItems.add(new BatchItem<>(input, resultFuture)); |
66 | | - return resultFuture; |
67 | | - } |
68 | | - |
69 | | - /** Checks if this batch can accept the given item without exceeding size limits. */ |
70 | | - boolean canAcceptItem(T input) { |
71 | | - return batchSizeInBytes + itemSizeInBytesProvider.apply(input) <= maxBatchBinarySizeInBytes; |
| 42 | + this.items = new ArrayList<>(); |
72 | 43 | } |
73 | 44 |
|
74 | | - /** Checks if this batch can accept more items without exceeding count limits. */ |
75 | | - boolean canAcceptMore() { |
76 | | - return batchItems.size() < maxBatchSize; |
| 45 | + /** Adds request to batch and returns its result future */ |
| 46 | + CompletableFuture<Void> add(T request) { |
| 47 | + totalBytes += calculateItemSize.apply(request); |
| 48 | + CompletableFuture<Void> result = new CompletableFuture<>(); |
| 49 | + items.add(new Item<>(request, result)); |
| 50 | + return result; |
77 | 51 | } |
78 | 52 |
|
79 | | - /** Processes this batch by executing the batch action and handling results. */ |
80 | | - void processBatch() { |
81 | | - List<T> inputs = extractInputs(); |
82 | | - |
83 | | - CompletableFuture<Void> batchFuture = doBatchAction.apply(inputs); |
84 | | - |
85 | | - batchFuture.thenAccept(this::completeItems).exceptionally(this::failAllItems); |
| 53 | + /** Returns true if request fits within byte limit */ |
| 54 | + boolean canFit(T request) { |
| 55 | + return totalBytes + calculateItemSize.apply(request) <= maxBatchBytes; |
86 | 56 | } |
87 | 57 |
|
88 | | - private List<T> extractInputs() { |
89 | | - return batchItems.stream().map(BatchItem::input).collect(Collectors.toList()); |
| 58 | + /** Returns true if batch has reached item count limit */ |
| 59 | + boolean isFull() { |
| 60 | + return items.size() >= maxItemCount; |
90 | 61 | } |
91 | 62 |
|
92 | | - /** Completes individual item futures with their corresponding results */ |
93 | | - private void completeItems(Void v) { |
94 | | - for (BatchItem<T> batchItem : batchItems) { |
95 | | - batchItem.outputFuture().complete(null); |
| 63 | + /** Executes batch and completes all item futures */ |
| 64 | + void execute() { |
| 65 | + List<T> requests = new ArrayList<>(items.size()); |
| 66 | + for (Item<T> item : items) { |
| 67 | + requests.add(item.request()); |
96 | 68 | } |
97 | | - } |
98 | 69 |
|
99 | | - /** Fails all item futures with the given exception */ |
100 | | - private Void failAllItems(Throwable wrappedCause) { |
101 | | - Throwable cause = ExceptionHelper.unwrapCompletableFuture(wrappedCause); |
102 | | - for (BatchItem<T> batchItem : batchItems) { |
103 | | - batchItem.outputFuture().completeExceptionally(cause); |
104 | | - } |
105 | | - return null; |
| 70 | + executeBatch.apply(requests).whenComplete((v, ex) -> { |
| 71 | + if (ex == null) { |
| 72 | + for (Item<T> item : items) { |
| 73 | + item.result().complete(null); |
| 74 | + } |
| 75 | + } else { |
| 76 | + Throwable cause = ExceptionHelper.unwrapCompletableFuture(ex); |
| 77 | + for (Item<T> item : items) { |
| 78 | + item.result().completeExceptionally(cause); |
| 79 | + } |
| 80 | + } |
| 81 | + }); |
106 | 82 | } |
107 | 83 | } |
108 | 84 |
|
109 | | - /** Lock for synchronizing access to current batch state */ |
110 | | - private final Object currentBatchLock = new Object(); |
111 | | - /** The current batch being filled with items */ |
112 | | - private Batch currentBatch; |
113 | | - /** Future that completes when the current batch should be flushed due to timeout */ |
114 | | - private CompletableFuture<Void> batchFlushFuture; |
| 85 | + /** Synchronizes batch state access */ |
| 86 | + private final Object lock = new Object(); |
| 87 | + /** Current batch accepting requests */ |
| 88 | + private Batch activeBatch; |
| 89 | + /** Timer to auto-flush incomplete batch */ |
| 90 | + private CompletableFuture<Void> flushTimer; |
115 | 91 |
|
116 | 92 | /** |
117 | 93 | * Creates a new ApiRequestBatcher with the specified configuration. |
118 | 94 | * |
119 | | - * @param maxDelay Maximum time to wait before flushing a batch |
120 | | - * @param maxBatchSize Maximum number of items per batch |
121 | | - * @param maxBatchBinarySizeInBytes Maximum total size in bytes for all items in a batch |
122 | | - * @param itemSizeInBytesProvider Function to calculate the size in bytes of each item |
| 95 | + * @param flushDelay Maximum time to wait before flushing a batch |
| 96 | + * @param maxItemCount Maximum number of items per batch |
| 97 | + * @param maxBatchBytes Maximum total size in bytes for all items in a batch |
| 98 | + * @param calculateItemSize Function to calculate the size in bytes of each item |
| 99 | + * @param executeBatch Function to execute the batch action |
123 | 100 | */ |
124 | 101 | public ApiRequestBatcher( |
125 | | - Duration maxDelay, |
126 | | - int maxBatchSize, |
127 | | - int maxBatchBinarySizeInBytes, |
128 | | - Function<T, Integer> itemSizeInBytesProvider, |
129 | | - Function<List<T>, CompletableFuture<Void>> doBatchAction) { |
130 | | - this.maxDelay = maxDelay; |
131 | | - this.maxBatchSize = maxBatchSize; |
132 | | - this.maxBatchBinarySizeInBytes = maxBatchBinarySizeInBytes; |
133 | | - this.itemSizeInBytesProvider = itemSizeInBytesProvider; |
134 | | - this.doBatchAction = doBatchAction; |
135 | | - this.currentBatch = null; |
136 | | - this.batchFlushFuture = null; |
| 102 | + Duration flushDelay, |
| 103 | + int maxItemCount, |
| 104 | + int maxBatchBytes, |
| 105 | + Function<T, Integer> calculateItemSize, |
| 106 | + Function<List<T>, CompletableFuture<Void>> executeBatch) { |
| 107 | + this.flushDelay = flushDelay; |
| 108 | + this.maxItemCount = maxItemCount; |
| 109 | + this.maxBatchBytes = maxBatchBytes; |
| 110 | + this.calculateItemSize = calculateItemSize; |
| 111 | + this.executeBatch = executeBatch; |
137 | 112 | } |
138 | 113 |
|
139 | 114 | /** |
140 | | - * Submits an item for batch processing. The item will be added to the current batch or trigger batch processing if |
141 | | - * limits are reached. |
| 115 | + * Submits request for batched execution. |
142 | 116 | * |
143 | | - * @param input The item to process |
144 | | - * @return A CompletableFuture that will be completed with the processing result |
| 117 | + * @param request Request to batch |
| 118 | + * @return Future completed when batch executes |
145 | 119 | */ |
146 | | - public CompletableFuture<Void> doAction(T input) { |
147 | | - CompletableFuture<Void> outputFuture; |
148 | | - Batch previousBatchToProcess = null; |
149 | | - Batch newBatchToProcess = null; |
150 | | - |
151 | | - synchronized (currentBatchLock) { |
152 | | - // If current batch can't fit this item, flush it first |
153 | | - if (currentBatch != null && !currentBatch.canAcceptItem(input)) { |
154 | | - previousBatchToProcess = getAndClearCurrentBatch(); |
| 120 | + public CompletableFuture<Void> submit(T request) { |
| 121 | + CompletableFuture<Void> result; |
| 122 | + Batch batchToFlush = null; |
| 123 | + Batch fullBatch = null; |
| 124 | + |
| 125 | + synchronized (lock) { |
| 126 | + // Flush if request doesn't fit |
| 127 | + if (activeBatch != null && !activeBatch.canFit(request)) { |
| 128 | + batchToFlush = detachBatch(); |
155 | 129 | } |
156 | 130 |
|
157 | | - // Create new batch if needed |
158 | | - if (currentBatch == null) { |
159 | | - currentBatch = new Batch(); |
160 | | - if (batchFlushFuture != null) { |
161 | | - cancelAndClearCurrentFlusher(); |
| 131 | + // Create batch and start timer |
| 132 | + if (activeBatch == null) { |
| 133 | + activeBatch = new Batch(); |
| 134 | + if (flushTimer != null) { |
| 135 | + cancelTimer(); |
162 | 136 | } |
163 | 137 | } |
164 | 138 |
|
165 | | - outputFuture = currentBatch.addItem(input); |
| 139 | + result = activeBatch.add(request); |
166 | 140 |
|
167 | | - // If batch is full, process it immediately |
168 | | - if (!currentBatch.canAcceptMore()) { |
169 | | - newBatchToProcess = getAndClearCurrentBatch(); |
| 141 | + // Flush if batch full |
| 142 | + if (activeBatch.isFull()) { |
| 143 | + fullBatch = detachBatch(); |
170 | 144 | } |
171 | 145 |
|
172 | | - // Set up timeout-based flushing for non-full batches |
173 | | - if (currentBatch != null && batchFlushFuture == null) { |
174 | | - batchFlushFuture = new CompletableFuture<>(); |
175 | | - batchFlushFuture |
176 | | - .completeOnTimeout(null, maxDelay.toMillis(), TimeUnit.MILLISECONDS) |
| 146 | + // Start flush timer for new batch |
| 147 | + if (activeBatch != null && flushTimer == null) { |
| 148 | + flushTimer = new CompletableFuture<>(); |
| 149 | + flushTimer |
| 150 | + .completeOnTimeout(null, flushDelay.toMillis(), TimeUnit.MILLISECONDS) |
177 | 151 | .thenRun(() -> { |
178 | 152 | Batch toFlush; |
179 | | - synchronized (currentBatchLock) { |
180 | | - if (currentBatch != null) { |
181 | | - toFlush = getAndClearCurrentBatch(); |
| 153 | + synchronized (lock) { |
| 154 | + if (activeBatch != null) { |
| 155 | + toFlush = detachBatch(); |
182 | 156 | } else { |
183 | 157 | return; |
184 | 158 | } |
185 | 159 | } |
186 | | - toFlush.processBatch(); |
| 160 | + toFlush.execute(); |
187 | 161 | }); |
188 | 162 | } |
189 | 163 |
|
190 | | - // Clean up flush future if no current batch |
191 | | - if (currentBatch == null && batchFlushFuture != null) { |
192 | | - cancelAndClearCurrentFlusher(); |
| 164 | + // Cancel timer if batch was flushed |
| 165 | + if (activeBatch == null && flushTimer != null) { |
| 166 | + cancelTimer(); |
193 | 167 | } |
194 | 168 | } |
195 | 169 |
|
196 | | - // Process batches outside of synchronized block to avoid blocking |
197 | | - if (previousBatchToProcess != null) { |
198 | | - previousBatchToProcess.processBatch(); |
| 170 | + // Execute outside lock to avoid blocking |
| 171 | + if (batchToFlush != null) { |
| 172 | + batchToFlush.execute(); |
199 | 173 | } |
200 | 174 |
|
201 | | - if (newBatchToProcess != null) { |
202 | | - newBatchToProcess.processBatch(); |
| 175 | + if (fullBatch != null) { |
| 176 | + fullBatch.execute(); |
203 | 177 | } |
204 | 178 |
|
205 | | - return outputFuture; |
| 179 | + return result; |
206 | 180 | } |
207 | 181 |
|
208 | | - /** Gets the current batch and clears it, ensuring it's not null */ |
209 | | - private Batch getAndClearCurrentBatch() { |
210 | | - if (currentBatch == null) { |
211 | | - throw new IllegalStateException("currentBatch must not be null"); |
| 182 | + /** Detaches and returns active batch */ |
| 183 | + private Batch detachBatch() { |
| 184 | + if (activeBatch == null) { |
| 185 | + throw new IllegalStateException("activeBatch must not be null"); |
212 | 186 | } |
213 | | - final Batch batchToProcess = currentBatch; |
214 | | - currentBatch = null; |
215 | | - return batchToProcess; |
| 187 | + Batch batch = activeBatch; |
| 188 | + activeBatch = null; |
| 189 | + return batch; |
216 | 190 | } |
217 | 191 |
|
218 | | - /** Cancels the current flush future and clears it, ensuring it's not null */ |
219 | | - private void cancelAndClearCurrentFlusher() { |
220 | | - if (batchFlushFuture == null) { |
221 | | - throw new IllegalStateException("batchFlushFuture must not be null"); |
| 192 | + /** Cancels and clears flush timer */ |
| 193 | + private void cancelTimer() { |
| 194 | + if (flushTimer == null) { |
| 195 | + throw new IllegalStateException("flushTimer must not be null"); |
222 | 196 | } |
223 | | - batchFlushFuture.cancel(false); |
224 | | - batchFlushFuture = null; |
| 197 | + flushTimer.cancel(false); |
| 198 | + flushTimer = null; |
225 | 199 | } |
226 | 200 | } |
0 commit comments