Skip to content

Commit 5fe2153

Browse files
committed
fix race condition
1 parent 9eea0c7 commit 5fe2153

4 files changed

Lines changed: 115 additions & 133 deletions

File tree

sdk/src/main/java/com/amazonaws/lambda/durable/DurableConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ private DurableConfig(Builder builder) {
7878
this.serDes = builder.serDes != null ? builder.serDes : new JacksonSerDes();
7979
this.executorService = builder.executorService != null ? builder.executorService : createDefaultExecutor();
8080
this.loggerConfig = builder.loggerConfig != null ? builder.loggerConfig : LoggerConfig.defaults();
81-
this.pollingInterval = builder.pollingInterval != null ? builder.pollingInterval : Duration.ofSeconds(1);
81+
this.pollingInterval = builder.pollingInterval != null ? builder.pollingInterval : Duration.ofMillis(1000);
8282
this.checkpointDelay = builder.checkpointDelay != null ? builder.checkpointDelay : Duration.ofSeconds(0);
8383
}
8484

sdk/src/main/java/com/amazonaws/lambda/durable/execution/ApiRequestBatcher.java

Lines changed: 97 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import java.util.List;
99
import java.util.concurrent.CompletableFuture;
1010
import java.util.concurrent.TimeUnit;
11-
import java.util.concurrent.atomic.AtomicReference;
1211
import java.util.function.Consumer;
1312
import java.util.function.Function;
1413

@@ -30,91 +29,22 @@ public class ApiRequestBatcher<T> {
3029
/** Executes the batch operation */
3130
private final Consumer<List<T>> executeBatch;
3231

33-
private record Item<T>(T request, CompletableFuture<Void> result) {}
34-
35-
/** Batch accumulator */
36-
private class Batch {
37-
/** Accumulated requests */
38-
private final List<Item<T>> items;
39-
/** Current batch size in bytes */
40-
private int totalBytes;
41-
42-
long expireTime;
43-
/** Timer to auto-flush incomplete batch */
44-
private final CompletableFuture<Void> flushTimer;
45-
46-
Batch() {
47-
this.items = new ArrayList<>();
48-
this.totalBytes = 0;
49-
this.expireTime = System.nanoTime() + MAX_DELAY.toNanos();
50-
this.flushTimer = new CompletableFuture<>();
51-
this.flushTimer.thenRunAsync(this::execute, InternalExecutor.INSTANCE);
52-
}
53-
54-
/** Adds request to batch and returns its result future */
55-
CompletableFuture<Void> add(T request, Duration delay) {
56-
totalBytes += calculateItemSize.apply(request);
57-
CompletableFuture<Void> result = new CompletableFuture<>();
58-
items.add(new Item<>(request, result));
59-
long newExpireTime = System.nanoTime() + delay.toNanos();
60-
if (expireTime > newExpireTime) {
61-
// the batch needs to be completed earlier than previously scheduled
62-
expireTime = newExpireTime;
63-
flushAfterDelay(delay.toNanos());
64-
}
65-
return result;
66-
}
32+
/** Accumulated requests */
33+
private final List<Item<T>> items;
6734

68-
/** Returns true if request fits within byte limit */
69-
boolean canFit(T request) {
70-
return totalBytes + calculateItemSize.apply(request) <= maxBatchBytes;
71-
}
72-
73-
/** Returns true if batch has reached item count limit */
74-
boolean isFull() {
75-
return items.size() >= maxItemCount;
76-
}
35+
/** Current batch size in bytes */
36+
private volatile int totalBytes;
7737

78-
void flushAfterDelay(long delayInNanos) {
79-
flushTimer.completeOnTimeout(null, delayInNanos, TimeUnit.NANOSECONDS);
80-
}
38+
/** Time when the current batch must be flushed */
39+
private volatile long expireTime;
8140

82-
void flushNow() {
83-
flushAfterDelay(0);
84-
}
41+
/** Timer to auto-flush incomplete batch */
42+
private CompletableFuture<Void> flushTimer;
8543

86-
void cancel() {
87-
var ex = new IllegalDurableOperationException("Batch cancelled");
88-
for (Item<T> item : items) {
89-
item.result().completeExceptionally(ex);
90-
}
91-
}
92-
93-
/** Executes batch and completes all item futures */
94-
private void execute() {
95-
// detach this from active batch if it's still active
96-
detachActiveBatchAndCreateNew(this);
97-
98-
List<T> requests = new ArrayList<>(items.size());
99-
for (Item<T> item : items) {
100-
requests.add(item.request());
101-
}
102-
103-
try {
104-
executeBatch.accept(requests);
105-
for (Item<T> item : items) {
106-
item.result().complete(null);
107-
}
108-
} catch (Throwable ex) {
109-
for (Item<T> item : items) {
110-
item.result().completeExceptionally(ex);
111-
}
112-
}
113-
}
114-
}
44+
/** Future of flushing previous batch */
45+
private CompletableFuture<Void> previousBatchFuture;
11546

116-
/** Current batch accepting requests */
117-
private final AtomicReference<Batch> activeBatchAtom;
47+
private record Item<T>(T request, CompletableFuture<Void> result) {}
11848

11949
/**
12050
* Creates a new ApiRequestBatcher with the specified configuration.
@@ -133,7 +63,22 @@ public ApiRequestBatcher(
13363
this.maxBatchBytes = maxBatchBytes;
13464
this.calculateItemSize = calculateItemSize;
13565
this.executeBatch = executeBatch;
136-
this.activeBatchAtom = new AtomicReference<>(new Batch());
66+
this.previousBatchFuture = CompletableFuture.allOf();
67+
this.items = new ArrayList<>();
68+
69+
initializeBatch();
70+
}
71+
72+
private void initializeBatch() {
73+
this.items.clear();
74+
this.totalBytes = 0;
75+
this.expireTime = System.nanoTime() + MAX_DELAY.toNanos();
76+
this.flushTimer = new CompletableFuture<>();
77+
this.flushTimer.thenRun(() -> {
78+
synchronized (items) {
79+
execute();
80+
}
81+
});
13782
}
13883

13984
/**
@@ -144,49 +89,90 @@ public ApiRequestBatcher(
14489
*/
14590
public CompletableFuture<Void> submit(T request, Duration flushDelay) {
14691
// Flush the current batch if request doesn't fit
147-
while (true) {
148-
Batch activeBatch = activeBatchAtom.get();
149-
150-
if (activeBatch.isFull() || !activeBatch.canFit(request)) {
151-
if (!flushActiveBatchAndCreateNew(activeBatch)) {
152-
// failed to flush due to a race condition.
153-
continue;
154-
}
92+
synchronized (items) {
93+
if (isFull() || !canFit(request)) {
94+
flushNow();
15595
}
15696

157-
var result = activeBatch.add(request, flushDelay);
97+
var future = add(request, flushDelay);
15898

159-
// Flush early if batch is full
160-
if (activeBatch.isFull()) {
161-
flushActiveBatchAndCreateNew(activeBatch);
99+
if (isFull()) {
100+
// Flush early if batch is full
101+
flushNow();
162102
}
163-
return result;
103+
return future;
164104
}
165105
}
166106

167-
private Batch detachActiveBatchAndCreateNew(Batch oldBatch) {
168-
if (activeBatchAtom.compareAndSet(oldBatch, new Batch())) {
169-
return oldBatch;
107+
/** Adds request to batch and returns its result future */
108+
CompletableFuture<Void> add(T request, Duration delay) {
109+
synchronized (items) {
110+
totalBytes += calculateItemSize.apply(request);
111+
CompletableFuture<Void> result = new CompletableFuture<>();
112+
items.add(new Item<>(request, result));
113+
long newExpireTime = System.nanoTime() + delay.toNanos();
114+
if (expireTime > newExpireTime) {
115+
// the batch needs to be completed earlier than previously scheduled
116+
expireTime = newExpireTime;
117+
flushAfterDelay(delay.toNanos());
118+
}
119+
return result;
170120
}
121+
}
171122

172-
return null;
123+
/** Returns true if request fits within byte limit */
124+
private boolean canFit(T request) {
125+
return totalBytes + calculateItemSize.apply(request) <= maxBatchBytes;
173126
}
174127

175-
/** flushes active batch and crate a new batch. Return true if successful */
176-
private boolean flushActiveBatchAndCreateNew(Batch oldBatch) {
177-
Batch activeBatch = detachActiveBatchAndCreateNew(oldBatch);
178-
if (activeBatch != null) {
179-
activeBatch.flushNow();
180-
}
181-
return activeBatch != null;
128+
/** Returns true if batch has reached item count limit */
129+
private boolean isFull() {
130+
return items.size() >= maxItemCount;
131+
}
132+
133+
private void flushAfterDelay(long delayInNanos) {
134+
flushTimer.completeOnTimeout(null, delayInNanos, TimeUnit.NANOSECONDS);
135+
}
136+
137+
private void flushNow() {
138+
this.flushTimer.cancel(false);
139+
// wait for new batch to be ready
140+
execute();
182141
}
183142

184143
public void shutdown() {
185-
Batch activeBatch = activeBatchAtom.get();
186-
while (!activeBatchAtom.compareAndSet(activeBatch, new Batch())) {
187-
// try again
188-
activeBatch = activeBatchAtom.get();
144+
var ex = new IllegalDurableOperationException("Batch cancelled");
145+
synchronized (items) {
146+
for (Item<T> item : items) {
147+
item.result().completeExceptionally(ex);
148+
}
149+
initializeBatch();
150+
}
151+
}
152+
153+
/** Executes batch and completes all item futures */
154+
private void execute() {
155+
if (items.isEmpty()) {
156+
return;
189157
}
190-
activeBatchAtom.get().cancel();
158+
var copyItems = new ArrayList<>(items);
159+
initializeBatch();
160+
161+
// append the current batch to the previous one
162+
previousBatchFuture = previousBatchFuture.thenRunAsync(
163+
() -> {
164+
try {
165+
var requests = copyItems.stream().map(Item::request).toList();
166+
executeBatch.accept(requests);
167+
for (Item<T> item : copyItems) {
168+
item.result().complete(null);
169+
}
170+
} catch (Throwable ex) {
171+
for (Item<T> item : copyItems) {
172+
item.result().completeExceptionally(ex);
173+
}
174+
}
175+
},
176+
InternalExecutor.INSTANCE);
191177
}
192178
}

sdk/src/main/java/com/amazonaws/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,12 @@ protected ErrorObject serializeException(Throwable throwable) {
242242
}
243243

244244
protected Throwable deserializeException(ErrorObject errorObject) {
245+
Throwable original = null;
246+
if (errorObject == null) {
247+
return original;
248+
}
245249
var errorType = errorObject.errorType();
246250
var errorData = errorObject.errorData();
247-
Throwable original = null;
248251

249252
if (errorType == null) {
250253
return original;

sdk/src/test/java/com/amazonaws/lambda/durable/execution/ApiRequestBatcherTest.java

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,22 @@
44

55
import static org.junit.jupiter.api.Assertions.assertEquals;
66
import static org.junit.jupiter.api.Assertions.assertFalse;
7+
import static org.junit.jupiter.api.Assertions.assertThrows;
78
import static org.junit.jupiter.api.Assertions.assertTrue;
89
import static org.mockito.ArgumentMatchers.any;
910
import static org.mockito.Mockito.doThrow;
1011
import static org.mockito.Mockito.mock;
1112
import static org.mockito.Mockito.never;
13+
import static org.mockito.Mockito.times;
1214
import static org.mockito.Mockito.verify;
1315

14-
import java.time.Clock;
1516
import java.time.Duration;
16-
import java.time.Instant;
17-
import java.time.ZoneOffset;
1817
import java.util.ArrayList;
1918
import java.util.List;
2019
import java.util.concurrent.CompletableFuture;
2120
import java.util.concurrent.ExecutionException;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.TimeoutException;
2223
import java.util.function.Consumer;
2324
import org.junit.jupiter.api.BeforeEach;
2425
import org.junit.jupiter.api.Test;
@@ -31,46 +32,47 @@ class ApiRequestBatcherTest {
3132
private static class Input {}
3233

3334
private Input input;
34-
private Clock fixedClock;
3535
private ApiRequestBatcher<Input> cut;
3636
private Consumer<List<Input>> doBatchAction;
3737

3838
@BeforeEach
3939
void setUp() {
4040
input = mock(Input.class);
4141
doBatchAction = mock();
42-
fixedClock = Clock.fixed(Instant.now(), ZoneOffset.UTC);
4342
cut = new ApiRequestBatcher<>(MAX_BATCH_SIZE, MAX_BATCH_BINARY_SIZE_IN_BYTES, item -> 0, doBatchAction);
4443
}
4544

4645
@Test
4746
void whenSingleActionPerformed_anUncompletedFutureIsReturned() {
4847
CompletableFuture<Void> resultFuture = cut.submit(input, MAX_DELAY_MILLIS);
48+
assertThrows(TimeoutException.class, () -> resultFuture.get(50, TimeUnit.MILLISECONDS));
4949

5050
verify(doBatchAction, never()).accept(any());
5151
assertFalse(resultFuture.isDone());
5252
}
5353

5454
@Test
55-
void whenMultipleActionsPerformedBelowMaxBatchSize_anUncompletedFutureIsReturnedEachTime() {
55+
void whenMultipleActionsPerformedBelowMaxBatchSize_anUncompletedFutureIsReturnedEachTime()
56+
throws ExecutionException, InterruptedException, TimeoutException {
5657
List<CompletableFuture<Void>> resultFutures = new ArrayList<>();
5758
for (int i = 0; i < MAX_BATCH_SIZE - 1; i++) {
5859
resultFutures.add(cut.submit(input, MAX_DELAY_MILLIS));
5960
}
6061

62+
assertThrows(TimeoutException.class, () -> resultFutures.get(0).get(50, TimeUnit.MILLISECONDS));
63+
6164
verify(doBatchAction, never()).accept(any());
62-
assertTrue(resultFutures.stream().noneMatch(CompletableFuture::isDone));
6365
}
6466

6567
@Test
6668
void whenMultipleActionsPerformedMatchingMaxBatchSize_batchInvokeIsPerformed() {
6769
List<CompletableFuture<Void>> resultFutures = new ArrayList<>();
68-
for (int i = 0; i < MAX_BATCH_SIZE; i++) {
70+
for (int i = 0; i < MAX_BATCH_SIZE * 2; i++) {
6971
resultFutures.add(cut.submit(input, MAX_DELAY_MILLIS));
7072
}
7173

7274
CompletableFuture.allOf(resultFutures.toArray(CompletableFuture[]::new)).join();
73-
verify(doBatchAction).accept(any());
75+
verify(doBatchAction, times(2)).accept(any());
7476
}
7577

7678
@Test
@@ -98,7 +100,7 @@ void whenBatchInvokeThrows_allFuturesCompleteWithThatException() {
98100
}
99101

100102
@Test
101-
void whenBatchInvokeReturnsOutcome_allFuturesCompleteSuccessfully() {
103+
void whenBatchInvokeReturns_allFuturesCompleteSuccessfully() {
102104
Input input1 = mock(Input.class);
103105
Input input2 = mock(Input.class);
104106
Input input3 = mock(Input.class);
@@ -118,7 +120,7 @@ void testSubmit_whenCannotAddItemDueToBinarySizeConstraint_thenFlushCurrentBatch
118120
var future1 = cut.submit(input, MAX_DELAY_MILLIS);
119121
var future2 = cut.submit(input, MAX_DELAY_MILLIS);
120122
CompletableFuture.allOf(future1, future2)
121-
.thenAccept(v -> verify(doBatchAction).accept(any()))
123+
.thenAccept(v -> verify(doBatchAction, times(2)).accept(any()))
122124
.join();
123125
}
124126

@@ -159,13 +161,4 @@ void whenBatchInvokeThrowsCompletionException_allFuturesCompleteWithUnwrappedCau
159161
});
160162
CompletableFuture.allOf(resultFuture1, resultFuture2, resultFuture3).join();
161163
}
162-
163-
private Throwable getFutureCause(CompletableFuture<?> failedFuture) throws InterruptedException {
164-
try {
165-
failedFuture.get();
166-
return null;
167-
} catch (ExecutionException cause) {
168-
return cause.getCause();
169-
}
170-
}
171164
}

0 commit comments

Comments
 (0)