Skip to content

Commit 622ef31

Browse files
committed
update some comments
1 parent 997675f commit 622ef31

2 files changed

Lines changed: 47 additions & 48 deletions

File tree

sdk/src/main/java/com/amazonaws/lambda/durable/execution/ApiRequestBatcher.java

Lines changed: 45 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
* Batches API requests to optimize throughput by grouping individual calls into batch operations. Batches are flushed
1515
* when full, when size limits are reached, or after a timeout.
1616
*
17+
* <p>Uses a dedicated SDK thread pool for internal coordination, keeping checkpoint processing separate from
18+
* customer-configured executors used for user-defined operations.
19+
*
20+
* @see InternalExecutor
1721
* @param <T> Request type
1822
*/
1923
public class ApiRequestBatcher<T> {
@@ -32,10 +36,10 @@ public class ApiRequestBatcher<T> {
3236
private final List<Item<T>> items;
3337

3438
/** Current batch size in bytes */
35-
private volatile int totalBytes;
39+
private int totalBytes;
3640

3741
/** Time when the current batch must be flushed */
38-
private volatile long expireTime;
42+
private long expireTime;
3943

4044
/** Timer to auto-flush incomplete batch */
4145
private CompletableFuture<Void> flushTimer;
@@ -68,32 +72,31 @@ public ApiRequestBatcher(
6872
initializeBatch();
6973
}
7074

71-
private void initializeBatch() {
72-
this.items.clear();
73-
this.totalBytes = 0;
74-
this.expireTime = System.nanoTime() + MAX_DELAY.toNanos();
75-
this.flushTimer = new CompletableFuture<>();
76-
this.flushTimer.thenRun(() -> {
77-
synchronized (items) {
78-
execute();
79-
}
80-
});
81-
}
82-
8375
/**
8476
* Submits request for batched execution.
8577
*
8678
* @param request Request to batch
8779
* @return Future completed when batch executes
8880
*/
89-
public CompletableFuture<Void> submit(T request, Duration flushDelay) {
90-
// Flush the current batch if request doesn't fit
81+
CompletableFuture<Void> submit(T request, Duration flushDelay) {
9182
synchronized (items) {
83+
// Flush the current batch if request doesn't fit
9284
if (isFull() || !canFit(request)) {
9385
flushNow();
9486
}
9587

96-
var future = add(request, flushDelay);
88+
// add the request to the current batch
89+
CompletableFuture<Void> future = new CompletableFuture<>();
90+
totalBytes += calculateItemSize.apply(request);
91+
items.add(new Item<>(request, future));
92+
93+
// create or update the flush timer
94+
long newExpireTime = System.nanoTime() + flushDelay.toNanos();
95+
if (expireTime > newExpireTime) {
96+
// the batch needs to be completed earlier than previously scheduled
97+
expireTime = newExpireTime;
98+
flushAfterDelay(flushDelay.toNanos());
99+
}
97100

98101
if (isFull()) {
99102
// Flush early if batch is full
@@ -103,20 +106,30 @@ public CompletableFuture<Void> submit(T request, Duration flushDelay) {
103106
}
104107
}
105108

106-
/** Adds request to batch and returns its result future */
107-
CompletableFuture<Void> add(T request, Duration delay) {
109+
/** Flushes pending batch and waits for completion */
110+
void shutdown() {
108111
synchronized (items) {
109-
totalBytes += calculateItemSize.apply(request);
110-
CompletableFuture<Void> result = new CompletableFuture<>();
111-
items.add(new Item<>(request, result));
112-
long newExpireTime = System.nanoTime() + delay.toNanos();
113-
if (expireTime > newExpireTime) {
114-
// the batch needs to be completed earlier than previously scheduled
115-
expireTime = newExpireTime;
116-
flushAfterDelay(delay.toNanos());
117-
}
118-
return result;
112+
flushNow();
119113
}
114+
115+
// wait for previous batches to be flushed
116+
previousBatchFuture.join();
117+
}
118+
119+
/** clear the current batch and creates a new batch */
120+
private void initializeBatch() {
121+
this.items.clear();
122+
this.totalBytes = 0;
123+
// MAX_DELAY is longer than a single Lambda invocation
124+
this.expireTime = System.nanoTime() + MAX_DELAY.toNanos();
125+
126+
// the timer future is created initially without a timeout until an item is added to the batch
127+
this.flushTimer = new CompletableFuture<>();
128+
this.flushTimer.thenRun(() -> {
129+
synchronized (items) {
130+
execute();
131+
}
132+
});
120133
}
121134

122135
/** Returns true if request fits within byte limit */
@@ -134,21 +147,12 @@ private void flushAfterDelay(long delayInNanos) {
134147
}
135148

136149
private void flushNow() {
150+
// cancel the flush timer if it has not been triggered
137151
this.flushTimer.cancel(false);
138-
// wait for new batch to be ready
152+
// execute the current batch now
139153
execute();
140154
}
141155

142-
/** Flushes pending batch and waits for completion */
143-
public void shutdown() {
144-
synchronized (items) {
145-
flushNow();
146-
}
147-
148-
// wait for previous batches to be flushed
149-
previousBatchFuture.join();
150-
}
151-
152156
/** Executes batch and completes all item futures */
153157
private void execute() {
154158
var copyItems = new ArrayList<>(items);
@@ -157,7 +161,7 @@ private void execute() {
157161
return;
158162
}
159163

160-
// append the current batch to the previous one
164+
// append the current batch to the previous one so that the batches can run sequentially
161165
previousBatchFuture = previousBatchFuture.thenRunAsync(
162166
() -> {
163167
try {

sdk/src/main/java/com/amazonaws/lambda/durable/execution/CheckpointBatcher.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,6 @@
2323
*
2424
* <p>Single responsibility: Queue and batch checkpoint requests efficiently. Uses a Consumer to notify when checkpoints
2525
* complete, avoiding cyclic dependency.
26-
*
27-
* <p>Uses a dedicated SDK thread pool for internal coordination, keeping checkpoint processing separate from
28-
* customer-configured executors used for user-defined operations.
29-
*
30-
* @see InternalExecutor
3126
*/
3227
class CheckpointBatcher {
3328
private static final int MAX_BATCH_SIZE_BYTES = 750 * 1024; // 750KB
@@ -97,15 +92,15 @@ void shutdown() {
9792
futures.forEach(f -> f.completeExceptionally(new IllegalStateException("CheckpointManager shutdown")));
9893
}
9994

100-
// wait for all checkpoint requests to complete
95+
// wait for all non-polling checkpoint requests to complete
10196
checkpointApiRequestBatcher.shutdown();
10297
}
10398

10499
/**
105100
* Calling GetExecutionState API to get all pages of operations given CheckpointUpdatedExecutionState(operations,
106101
* nextMarker)
107102
*/
108-
public List<Operation> fetchAllPages(CheckpointUpdatedExecutionState checkpointUpdatedExecutionState) {
103+
List<Operation> fetchAllPages(CheckpointUpdatedExecutionState checkpointUpdatedExecutionState) {
109104
List<Operation> operations = new ArrayList<>();
110105
if (checkpointUpdatedExecutionState == null) {
111106
return operations;

0 commit comments

Comments
 (0)