Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions sdk/src/main/java/com/amazonaws/lambda/durable/DurableConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.amazonaws.lambda.durable.logging.LoggerConfig;
import com.amazonaws.lambda.durable.serde.JacksonSerDes;
import com.amazonaws.lambda.durable.serde.SerDes;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -67,6 +68,8 @@ public final class DurableConfig {
private final SerDes serDes;
private final ExecutorService executorService;
private final LoggerConfig loggerConfig;
private final Duration pollingInterval;
private final Duration checkpointDelay;

private DurableConfig(Builder builder) {
this.durableExecutionClient = builder.durableExecutionClient != null
Expand All @@ -75,6 +78,8 @@ private DurableConfig(Builder builder) {
this.serDes = builder.serDes != null ? builder.serDes : new JacksonSerDes();
this.executorService = builder.executorService != null ? builder.executorService : createDefaultExecutor();
this.loggerConfig = builder.loggerConfig != null ? builder.loggerConfig : LoggerConfig.defaults();
this.pollingInterval = builder.pollingInterval != null ? builder.pollingInterval : Duration.ofSeconds(1);
this.checkpointDelay = builder.checkpointDelay != null ? builder.checkpointDelay : Duration.ofSeconds(0);
}

/**
Expand Down Expand Up @@ -131,6 +136,24 @@ public LoggerConfig getLoggerConfig() {
return loggerConfig;
}

/**
* Gets the configured polling interval.
*
* @return polling interval in Duration.
*/
public Duration getPollingInterval() {
return pollingInterval;
}

/**
* Gets the configured checkpoint delay.
*
* @return check point in Duration.
*/
public Duration getCheckpointDelay() {
return pollingInterval;
}

/**
* Creates a default DurableExecutionClient with production LambdaClient. Uses
* EnvironmentVariableCredentialsProvider and region from AWS_REGION. If AWS_REGION is not set, defaults to
Expand Down Expand Up @@ -195,6 +218,8 @@ public static final class Builder {
private SerDes serDes;
private ExecutorService executorService;
private LoggerConfig loggerConfig;
private Duration pollingInterval;
private Duration checkpointDelay;

private Builder() {}

Expand Down Expand Up @@ -280,6 +305,29 @@ public Builder withLoggerConfig(LoggerConfig loggerConfig) {
return this;
}

/**
* Sets how often the SDK polls updates from backend.
*
* @param duration the polling interval in Duration
* @return This builder
*/
public Builder withPollingInterval(Duration duration) {
this.pollingInterval = duration;
return this;
}

/**
* Sets how often the SDK checkpoints updates to backend. If not set, defaults to 0, which disables checkpoint
* batching.
*
* @param duration the checkpoint delay in Duration
* @return This builder
*/
public Builder withCheckpointDelay(Duration duration) {
this.checkpointDelay = duration;
return this;
}

/**
* Builds the DurableConfig instance.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ public static <I, O> DurableExecutionOutput execute(
}

var executionManager = new ExecutionManager(
input.durableExecutionArn(),
input.checkpointToken(),
input.initialExecutionState(),
config.getDurableExecutionClient());
input.durableExecutionArn(), input.checkpointToken(), input.initialExecutionState(), config);

logger.debug(
"EXECUTION operation found: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public final void handleRequest(InputStream inputStream, OutputStream outputStre
*
* <p>Customer-facing serialization uses SerDes from DurableConfig.
*
* @return Configured ObjectMapper for DAR backend communication
* @return Configured ObjectMapper for durable backend communication
*/
public static ObjectMapper createObjectMapper() {
var dateModule = new SimpleModule();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public CheckpointDurableExecutionResponse checkpoint(String arn, String token, L
.checkpointToken(token)
.updates(updates)
.build();
logger.debug("Calling DAR backend with {} updates: {}", updates.size(), request);
logger.debug("Calling durable backend with {} updates: {}", updates.size(), request);

return lambdaClient.checkpointDurableExecution(request);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package com.amazonaws.lambda.durable.execution;

import com.amazonaws.lambda.durable.util.ExceptionHelper;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* This class simplifies automatic batching of async requests. Your code deals with individual items, the service you
* are calling asynchronously has a cheaper batch API. You are willing to trade some latency by waiting for more calls
* to arrive to group them in a single batch call into the service. The batch call will be made when either a full batch
* is built, too much time has passed, or size limits are reached. This class builds a single batch at a time with
* thread-safe synchronization: - There is no batch yet. - First call arrives. Create a batch with one item in it, start
* a timer. No call to service is made yet. - More calls arrive. They get added to the same batch if size limits allow.
* - Either the batch is full, the timer has elapsed, or size limits are reached. Send the batch request. Now a new
* batch can now be built. - If entire batch call fails, each call will fail. - If batch call succeeded, outcome is
* analyzed one by one to complete results of each call. When you extend this class, you are expected to implement the
* actual batch operation and to expose a public method to perform a single action. The batcher includes comprehensive
* metrics tracking for performance monitoring.
*
* @param <T> Input of every call
*/
public class AsyncBatcher<T> {

/** Maximum time to wait before flushing a batch */
private final Duration maxDelay;
/** Maximum number of items per batch */
private final int maxBatchSize;
/** Maximum total size in bytes for all items in a batch */
private final int maxBatchBinarySizeInBytes;
/** Function to calculate the size in bytes of each item */
private final Function<T, Integer> itemSizeInBytesProvider;

private final Function<List<T>, CompletableFuture<Void>> doBatchAction;

private record BatchItem<T>(T input, CompletableFuture<Void> outputFuture) {}

/** Represents a collection of items to be processed together as a batch. */
private class Batch {
/** List of items in this batch */
private final List<BatchItem<T>> batchItems;
/** Total size in bytes of all items in this batch */
private int batchSizeInBytes;

Batch() {
this.batchItems = new ArrayList<>();
}

/**
* Adds an item to this batch and returns a future for the result.
*
* @param input The item to add to the batch
* @return A CompletableFuture that will be completed with the result
*/
CompletableFuture<Void> addItem(T input) {
final int itemSize = itemSizeInBytesProvider.apply(input);
batchSizeInBytes += itemSize;

CompletableFuture<Void> resultFuture = new CompletableFuture<>();
batchItems.add(new BatchItem<>(input, resultFuture));
return resultFuture;
}

/** Checks if this batch can accept the given item without exceeding size limits. */
boolean canAcceptItem(T input) {
return batchSizeInBytes + itemSizeInBytesProvider.apply(input) <= maxBatchBinarySizeInBytes;
}

/** Checks if this batch can accept more items without exceeding count limits. */
boolean canAcceptMore() {
return batchItems.size() < maxBatchSize;
}

/** Processes this batch by executing the batch action and handling results. */
void processBatch() {
List<T> inputs = extractInputs();

CompletableFuture<Void> batchFuture = doBatchAction.apply(inputs);

batchFuture.thenAccept(this::completeItems).exceptionally(this::failAllItems);
}

private List<T> extractInputs() {
return batchItems.stream().map(BatchItem::input).collect(Collectors.toList());
}

/** Completes individual item futures with their corresponding results */
private void completeItems(Void v) {
for (BatchItem<T> batchItem : batchItems) {
batchItem.outputFuture().complete(null);
}
}

/** Fails all item futures with the given exception */
private Void failAllItems(Throwable wrappedCause) {
Throwable cause = ExceptionHelper.unwrapCompletableFuture(wrappedCause);
for (BatchItem<T> batchItem : batchItems) {
batchItem.outputFuture().completeExceptionally(cause);
}
return null;
}
}

/** Lock for synchronizing access to current batch state */
private final Object currentBatchLock = new Object();
/** The current batch being filled with items */
private Batch currentBatch;
/** Future that completes when the current batch should be flushed due to timeout */
private CompletableFuture<Void> batchFlushFuture;

/**
* Creates a new AsyncBatcher with the specified configuration.
*
* @param maxDelay Maximum time to wait before flushing a batch
* @param maxBatchSize Maximum number of items per batch
* @param maxBatchBinarySizeInBytes Maximum total size in bytes for all items in a batch
* @param itemSizeInBytesProvider Function to calculate the size in bytes of each item
*/
public AsyncBatcher(
Duration maxDelay,
int maxBatchSize,
int maxBatchBinarySizeInBytes,
Function<T, Integer> itemSizeInBytesProvider,
Function<List<T>, CompletableFuture<Void>> doBatchAction) {
this.maxDelay = maxDelay;
this.maxBatchSize = maxBatchSize;
this.maxBatchBinarySizeInBytes = maxBatchBinarySizeInBytes;
this.itemSizeInBytesProvider = itemSizeInBytesProvider;
this.doBatchAction = doBatchAction;
this.currentBatch = null;
this.batchFlushFuture = null;
}

/**
* Submits an item for batch processing. The item will be added to the current batch or trigger batch processing if
* limits are reached.
*
* @param input The item to process
* @return A CompletableFuture that will be completed with the processing result
*/
public CompletableFuture<Void> doAction(T input) {
CompletableFuture<Void> outputFuture;
Batch previousBatchToProcess = null;
Batch newBatchToProcess = null;

synchronized (currentBatchLock) {
// If current batch can't fit this item, flush it first
if (currentBatch != null && !currentBatch.canAcceptItem(input)) {
previousBatchToProcess = getAndClearCurrentBatch();
}

// Create new batch if needed
if (currentBatch == null) {
currentBatch = new Batch();
if (batchFlushFuture != null) {
cancelAndClearCurrentFlusher();
}
}

outputFuture = currentBatch.addItem(input);

// If batch is full, process it immediately
if (!currentBatch.canAcceptMore()) {
newBatchToProcess = getAndClearCurrentBatch();
}

// Set up timeout-based flushing for non-full batches
if (currentBatch != null && batchFlushFuture == null) {
batchFlushFuture = new CompletableFuture<>();
batchFlushFuture
.completeOnTimeout(null, maxDelay.toMillis(), TimeUnit.MILLISECONDS)
.thenRun(() -> {
Batch toFlush;
synchronized (currentBatchLock) {
if (currentBatch != null) {
toFlush = getAndClearCurrentBatch();
} else {
return;
}
}
toFlush.processBatch();
});
}

// Clean up flush future if no current batch
if (currentBatch == null && batchFlushFuture != null) {
cancelAndClearCurrentFlusher();
}
}

// Process batches outside of synchronized block to avoid blocking
if (previousBatchToProcess != null) {
previousBatchToProcess.processBatch();
}

if (newBatchToProcess != null) {
newBatchToProcess.processBatch();
}

return outputFuture;
}

/** Gets the current batch and clears it, ensuring it's not null */
private Batch getAndClearCurrentBatch() {
if (currentBatch == null) {
throw new IllegalStateException("currentBatch must not be null");
}
final Batch batchToProcess = currentBatch;
currentBatch = null;
return batchToProcess;
}

/** Cancels the current flush future and clears it, ensuring it's not null */
private void cancelAndClearCurrentFlusher() {
if (batchFlushFuture == null) {
throw new IllegalStateException("batchFlushFuture must not be null");
}
batchFlushFuture.cancel(false);
batchFlushFuture = null;
}
}
Loading
Loading