22// SPDX-License-Identifier: Apache-2.0
33package com .amazonaws .lambda .durable .execution ;
44
5- import com .amazonaws .lambda .durable .util . ExceptionHelper ;
5+ import com .amazonaws .lambda .durable .exception . IllegalDurableOperationException ;
66import java .time .Duration ;
77import java .util .ArrayList ;
88import java .util .List ;
99import java .util .concurrent .CompletableFuture ;
1010import java .util .concurrent .TimeUnit ;
11+ import java .util .concurrent .atomic .AtomicReference ;
12+ import java .util .function .Consumer ;
1113import java .util .function .Function ;
1214
1315/**
14- * Batches API requests to optimize throughput by grouping individual calls into batch operations.
15- * Batches are flushed when full, when size limits are reached, or after a timeout.
16+ * Batches API requests to optimize throughput by grouping individual calls into batch operations. Batches are flushed
17+ * when full, when size limits are reached, or after a timeout.
1618 *
1719 * @param <T> Request type
1820 */
1921public class ApiRequestBatcher <T > {
22+ private static final Duration MAX_DELAY = Duration .ofMinutes (60 );
2023
21- /** Timeout before auto-flushing incomplete batch */
22- private final Duration flushDelay ;
2324 /** Maximum items allowed in a single batch */
2425 private final int maxItemCount ;
2526 /** Maximum bytes allowed in a single batch */
2627 private final int maxBatchBytes ;
2728 /** Calculates byte size of each request */
2829 private final Function <T , Integer > calculateItemSize ;
2930 /** Executes the batch operation */
30- private final Function <List <T >, CompletableFuture < Void >> executeBatch ;
31+ private final Consumer <List <T >> executeBatch ;
3132
3233 private record Item <T >(T request , CompletableFuture <Void > result ) {}
3334
@@ -38,15 +39,29 @@ private class Batch {
3839 /** Current batch size in bytes */
3940 private int totalBytes ;
4041
42+ long expireTime ;
43+ /** Timer to auto-flush incomplete batch */
44+ private final CompletableFuture <Void > flushTimer ;
45+
4146 Batch () {
4247 this .items = new ArrayList <>();
48+ this .totalBytes = 0 ;
49+ this .expireTime = System .nanoTime () + MAX_DELAY .toNanos ();
50+ this .flushTimer = new CompletableFuture <>();
51+ this .flushTimer .thenRunAsync (this ::execute , InternalExecutor .INSTANCE );
4352 }
4453
4554 /** Adds request to batch and returns its result future */
46- CompletableFuture <Void > add (T request ) {
55+ CompletableFuture <Void > add (T request , Duration delay ) {
4756 totalBytes += calculateItemSize .apply (request );
4857 CompletableFuture <Void > result = new CompletableFuture <>();
4958 items .add (new Item <>(request , result ));
59+ long newExpireTime = System .nanoTime () + delay .toNanos ();
60+ if (expireTime > newExpireTime ) {
61+ // the batch needs to be completed earlier than previously scheduled
62+ expireTime = newExpireTime ;
63+ flushAfterDelay (delay .toNanos ());
64+ }
5065 return result ;
5166 }
5267
@@ -60,55 +75,65 @@ boolean isFull() {
6075 return items .size () >= maxItemCount ;
6176 }
6277
78+ void flushAfterDelay (long delayInNanos ) {
79+ flushTimer .completeOnTimeout (null , delayInNanos , TimeUnit .NANOSECONDS );
80+ }
81+
82+ void flushNow () {
83+ flushAfterDelay (0 );
84+ }
85+
86+ void cancel () {
87+ var ex = new IllegalDurableOperationException ("Batch cancelled" );
88+ for (Item <T > item : items ) {
89+ item .result ().completeExceptionally (ex );
90+ }
91+ }
92+
6393 /** Executes batch and completes all item futures */
64- void execute () {
94+ private void execute () {
95+ // detach this from active batch if it's still active
96+ detachActiveBatchAndCreateNew (this );
97+
6598 List <T > requests = new ArrayList <>(items .size ());
6699 for (Item <T > item : items ) {
67100 requests .add (item .request ());
68101 }
69102
70- executeBatch .apply (requests ).whenComplete ((v , ex ) -> {
71- if (ex == null ) {
72- for (Item <T > item : items ) {
73- item .result ().complete (null );
74- }
75- } else {
76- Throwable cause = ExceptionHelper .unwrapCompletableFuture (ex );
77- for (Item <T > item : items ) {
78- item .result ().completeExceptionally (cause );
79- }
103+ try {
104+ executeBatch .accept (requests );
105+ for (Item <T > item : items ) {
106+ item .result ().complete (null );
107+ }
108+ } catch (Throwable ex ) {
109+ for (Item <T > item : items ) {
110+ item .result ().completeExceptionally (ex );
80111 }
81- });
112+ }
82113 }
83114 }
84115
85- /** Synchronizes batch state access */
86- private final Object lock = new Object ();
87116 /** Current batch accepting requests */
88- private Batch activeBatch ;
89- /** Timer to auto-flush incomplete batch */
90- private CompletableFuture <Void > flushTimer ;
117+ private final AtomicReference <Batch > activeBatchAtom ;
91118
92119 /**
93120 * Creates a new ApiRequestBatcher with the specified configuration.
94121 *
95- * @param flushDelay Maximum time to wait before flushing a batch
96122 * @param maxItemCount Maximum number of items per batch
97123 * @param maxBatchBytes Maximum total size in bytes for all items in a batch
98124 * @param calculateItemSize Function to calculate the size in bytes of each item
99125 * @param executeBatch Function to execute the batch action
100126 */
101127 public ApiRequestBatcher (
102- Duration flushDelay ,
103128 int maxItemCount ,
104129 int maxBatchBytes ,
105130 Function <T , Integer > calculateItemSize ,
106- Function <List <T >, CompletableFuture <Void >> executeBatch ) {
107- this .flushDelay = flushDelay ;
131+ Consumer <List <T >> executeBatch ) {
108132 this .maxItemCount = maxItemCount ;
109133 this .maxBatchBytes = maxBatchBytes ;
110134 this .calculateItemSize = calculateItemSize ;
111135 this .executeBatch = executeBatch ;
136+ this .activeBatchAtom = new AtomicReference <>(new Batch ());
112137 }
113138
114139 /**
@@ -117,84 +142,51 @@ public ApiRequestBatcher(
117142 * @param request Request to batch
118143 * @return Future completed when batch executes
119144 */
120- public CompletableFuture <Void > submit (T request ) {
121- CompletableFuture <Void > result ;
122- Batch batchToFlush = null ;
123- Batch fullBatch = null ;
124-
125- synchronized (lock ) {
126- // Flush if request doesn't fit
127- if (activeBatch != null && !activeBatch .canFit (request )) {
128- batchToFlush = detachBatch ();
129- }
130-
131- // Create batch and start timer
132- if (activeBatch == null ) {
133- activeBatch = new Batch ();
134- if (flushTimer != null ) {
135- cancelTimer ();
145+ public CompletableFuture <Void > submit (T request , Duration flushDelay ) {
146+ // Flush the current batch if request doesn't fit
147+ while (true ) {
148+ Batch activeBatch = activeBatchAtom .get ();
149+
150+ if (activeBatch .isFull () || !activeBatch .canFit (request )) {
151+ if (!flushActiveBatchAndCreateNew (activeBatch )) {
152+ // failed to flush due to a race condition.
153+ continue ;
136154 }
137155 }
138156
139- result = activeBatch .add (request );
157+ var result = activeBatch .add (request , flushDelay );
140158
141- // Flush if batch full
159+ // Flush early if batch is full
142160 if (activeBatch .isFull ()) {
143- fullBatch = detachBatch ( );
161+ flushActiveBatchAndCreateNew ( activeBatch );
144162 }
145-
146- // Start flush timer for new batch
147- if (activeBatch != null && flushTimer == null ) {
148- flushTimer = new CompletableFuture <>();
149- flushTimer
150- .completeOnTimeout (null , flushDelay .toMillis (), TimeUnit .MILLISECONDS )
151- .thenRun (() -> {
152- Batch toFlush ;
153- synchronized (lock ) {
154- if (activeBatch != null ) {
155- toFlush = detachBatch ();
156- } else {
157- return ;
158- }
159- }
160- toFlush .execute ();
161- });
162- }
163-
164- // Cancel timer if batch was flushed
165- if (activeBatch == null && flushTimer != null ) {
166- cancelTimer ();
167- }
168- }
169-
170- // Execute outside lock to avoid blocking
171- if (batchToFlush != null ) {
172- batchToFlush .execute ();
163+ return result ;
173164 }
165+ }
174166
175- if (fullBatch != null ) {
176- fullBatch .execute ();
167+ private Batch detachActiveBatchAndCreateNew (Batch oldBatch ) {
168+ if (activeBatchAtom .compareAndSet (oldBatch , new Batch ())) {
169+ return oldBatch ;
177170 }
178171
179- return result ;
172+ return null ;
180173 }
181174
182- /** Detaches and returns active batch */
183- private Batch detachBatch () {
184- if (activeBatch == null ) {
185- throw new IllegalStateException ("activeBatch must not be null" );
175+ /** flushes active batch and crate a new batch. Return true if successful */
176+ private boolean flushActiveBatchAndCreateNew (Batch oldBatch ) {
177+ Batch activeBatch = detachActiveBatchAndCreateNew (oldBatch );
178+ if (activeBatch != null ) {
179+ activeBatch .flushNow ();
186180 }
187- Batch batch = activeBatch ;
188- activeBatch = null ;
189- return batch ;
181+ return activeBatch != null ;
190182 }
191183
192- /** Cancels and clears flush timer */
193- private void cancelTimer () {
194- if (flushTimer == null ) {
195- throw new IllegalStateException ("flushTimer must not be null" );
184+ public void shutdown () {
185+ Batch activeBatch = activeBatchAtom .get ();
186+ while (!activeBatchAtom .compareAndSet (activeBatch , new Batch ())) {
187+ // try again
188+ activeBatch = activeBatchAtom .get ();
196189 }
197- flushTimer .cancel (false );
198- flushTimer = null ;
190+ activeBatchAtom .get ().cancel ();
199191 }
200192}
0 commit comments