Skip to content

Commit a8b7098

Browse files
committed
Merge branch 'main' into chore/cloud-testing
2 parents 77862e0 + 475c38e commit a8b7098

15 files changed

Lines changed: 172 additions & 93 deletions

File tree

.github/workflows/build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,4 @@ jobs:
5858
cache: maven
5959

6060
- name: Build and test
61-
run: mvn -B -q install --file pom.xml
61+
run: mvn -B install --file pom.xml

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,9 +312,11 @@ public class OrderProcessor extends DurableHandler<Order, OrderResult> {
312312
|--------|-------------|---------|
313313
| `withLambdaClient()` | Custom AWS Lambda client | Auto-configured Lambda client |
314314
| `withSerDes()` | Serializer for step results | Jackson with default settings |
315-
| `withExecutorService()` | Thread pool for async step execution | Cached daemon thread pool |
315+
| `withExecutorService()` | Thread pool for user-defined operations | Cached daemon thread pool |
316316
| `withLoggerConfig()` | Logger behavior configuration | Suppress logs during replay |
317317

318+
The `withExecutorService()` option configures the thread pool used for running user-defined operations. Internal SDK coordination (checkpoint batching, polling) runs on an SDK-managed thread pool.
319+
318320
## Logging
319321

320322
The SDK provides a `DurableLogger` via `ctx.getLogger()` that automatically includes execution metadata in log entries and suppresses duplicate logs during replay.

docs/design.md

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,52 @@ public class MyHandler extends DurableHandler<Input, Output> {
8686
|--------|---------|
8787
| `lambdaClient` | Auto-created `LambdaClient` for current region, primed for performance (see [`DurableConfig.java`](../sdk/src/main/java/com/amazonaws/lambda/durable/DurableConfig.java)) |
8888
| `serDes` | `JacksonSerDes` |
89-
| `executor` | `Executors.newCachedThreadPool()` |
89+
| `executorService` | `Executors.newCachedThreadPool()` (for user-defined operations only) |
9090
| `loggerConfig` | `LoggerConfig.defaults()` (suppress replay logs) |
9191

92+
### Thread Pool Architecture
93+
94+
The SDK uses two separate thread pools with distinct responsibilities:
95+
96+
**User Executor (`DurableConfig.executorService`):**
97+
- Runs user-defined operations (the code passed to `ctx.step()` and `ctx.stepAsync()`)
98+
- Configurable via `DurableConfig.builder().withExecutorService()`
99+
- Default: cached daemon thread pool
100+
101+
**Internal Executor (`InternalExecutor.INSTANCE`):**
102+
- Runs SDK coordination tasks: checkpoint batching, polling for wait completion, phaser management
103+
- Dedicated cached thread pool with daemon threads named `durable-sdk-internal-*`
104+
- Not configurable by users
105+
106+
**Benefits of this separation:**
107+
108+
| Benefit | Description |
109+
|---------|-------------|
110+
| **Isolation** | User operations can't starve SDK internals, and vice versa |
111+
| **No shutdown management** | Internal pool uses daemon threads; SDK coordination continues even if the user's executor is shut down |
112+
| **Efficient resource usage** | Cached thread pool creates threads on demand and reuses idle threads (60s timeout) |
113+
| **Daemon threads** | Internal threads won't prevent JVM shutdown |
114+
| **Single configuration point** | Changing `InternalExecutor.INSTANCE` in one place affects all SDK coordination |
115+
116+
**Example: Custom thread pool for user operations:**
117+
```java
118+
@Override
119+
protected DurableConfig createConfiguration() {
120+
var executor = new ThreadPoolExecutor(
121+
4, 10, // core/max threads
122+
60L, TimeUnit.SECONDS, // idle timeout
123+
new LinkedBlockingQueue<>(100), // bounded queue
124+
new ThreadFactoryBuilder()
125+
.setNameFormat("order-processor-%d")
126+
.setDaemon(true)
127+
.build());
128+
129+
return DurableConfig.builder()
130+
.withExecutorService(executor)
131+
.build();
132+
}
133+
```
134+
92135
### Step Configuration
93136

94137
```java

sdk/src/main/java/com/amazonaws/lambda/durable/DurableConfig.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,10 @@ private static DurableExecutionClient createDefaultDurableExecutionClient() {
172172
}
173173

174174
/**
175-
* Creates a default ExecutorService for durable execution. Uses a cached thread pool with daemon threads.
175+
* Creates a default ExecutorService for running user-defined operations. Uses a cached thread pool with daemon
176+
* threads by default.
177+
*
178+
* <p>This executor is used exclusively for user operations. Internal SDK coordination uses the common ForkJoinPool.
176179
*
177180
* @return Default ExecutorService instance
178181
*/
@@ -252,7 +255,11 @@ public Builder withSerDes(SerDes serDes) {
252255
}
253256

254257
/**
255-
* Sets a custom ExecutorService. If not set, a default cached thread pool will be created.
258+
* Sets a custom ExecutorService for running user-defined operations. If not set, a default cached thread pool
259+
* will be created.
260+
*
261+
* <p>This executor is used exclusively for running user-defined operations. Internal SDK coordination (polling,
262+
* checkpointing) uses the common ForkJoinPool and is not affected by this setting.
256263
*
257264
* @param executorService Custom ExecutorService instance
258265
* @return This builder

sdk/src/main/java/com/amazonaws/lambda/durable/DurableContext.java

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,11 @@
66
import com.amazonaws.lambda.durable.execution.ExecutionManager;
77
import com.amazonaws.lambda.durable.execution.ThreadType;
88
import com.amazonaws.lambda.durable.logging.DurableLogger;
9-
import com.amazonaws.lambda.durable.logging.LoggerConfig;
109
import com.amazonaws.lambda.durable.operation.CallbackOperation;
1110
import com.amazonaws.lambda.durable.operation.InvokeOperation;
1211
import com.amazonaws.lambda.durable.operation.StepOperation;
1312
import com.amazonaws.lambda.durable.operation.WaitOperation;
1413
import com.amazonaws.lambda.durable.retry.RetryStrategies;
15-
import com.amazonaws.lambda.durable.serde.SerDes;
1614
import com.amazonaws.services.lambda.runtime.Context;
1715
import java.time.Duration;
1816
import java.util.Objects;
@@ -24,20 +22,16 @@
2422

2523
public class DurableContext {
2624
private final ExecutionManager executionManager;
27-
private final SerDes serDes;
25+
private final DurableConfig durableConfig;
2826
private final Context lambdaContext;
2927
private final AtomicInteger operationCounter;
3028
private final DurableLogger logger;
3129
private final ExecutionContext executionContext;
3230

3331
DurableContext(
34-
ExecutionManager executionManager,
35-
SerDes serDes,
36-
Context lambdaContext,
37-
LoggerConfig loggerConfig,
38-
String contextId) {
32+
ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext, String contextId) {
3933
this.executionManager = executionManager;
40-
this.serDes = serDes;
34+
this.durableConfig = durableConfig;
4135
this.lambdaContext = lambdaContext;
4236
this.operationCounter = new AtomicInteger(0);
4337
this.executionContext = new ExecutionContext(executionManager.getDurableExecutionArn());
@@ -47,14 +41,14 @@ public class DurableContext {
4741
LoggerFactory.getLogger(DurableContext.class),
4842
executionManager,
4943
requestId,
50-
loggerConfig.suppressReplayLogs());
44+
durableConfig.getLoggerConfig().suppressReplayLogs());
5145

5246
// Register root context thread as active
5347
executionManager.registerActiveThreadWithContext(contextId, ThreadType.CONTEXT);
5448
}
5549

56-
DurableContext(ExecutionManager executionManager, SerDes serDes, Context lambdaContext, LoggerConfig loggerConfig) {
57-
this(executionManager, serDes, lambdaContext, loggerConfig, "Root");
50+
DurableContext(ExecutionManager executionManager, DurableConfig config, Context lambdaContext) {
51+
this(executionManager, config, lambdaContext, "Root");
5852
}
5953

6054
public <T> T step(String name, Class<T> resultType, Supplier<T> func) {
@@ -121,8 +115,16 @@ public <T> DurableFuture<T> stepAsync(String name, TypeToken<T> typeToken, Suppl
121115
}
122116

123117
// Create and start step operation with TypeToken
124-
StepOperation<T> operation =
125-
new StepOperation<>(operationId, name, func, typeToken, config, executionManager, logger, serDes);
118+
var operation = new StepOperation<>(
119+
operationId,
120+
name,
121+
func,
122+
typeToken,
123+
config,
124+
executionManager,
125+
logger,
126+
durableConfig.getSerDes(),
127+
durableConfig.getExecutorService());
126128

127129
operation.execute(); // Start the step (returns immediately)
128130

@@ -209,7 +211,14 @@ public <T, U> DurableFuture<T> invokeAsync(
209211

210212
// Create and start invoke operation
211213
var operation = new InvokeOperation<>(
212-
operationId, name, functionName, payload, typeToken, config, executionManager, serDes);
214+
operationId,
215+
name,
216+
functionName,
217+
payload,
218+
typeToken,
219+
config,
220+
executionManager,
221+
durableConfig.getSerDes());
213222

214223
operation.execute(); // checkpoint the invoke operation
215224
return new DurableFuture<>(operation); // Block (will throw SuspendExecutionException if needed)
@@ -246,7 +255,8 @@ public <T> DurableCallbackFuture<T> createCallback(String name, Class<T> resultT
246255
validateReplay(operationId, OperationType.CALLBACK, name, existing);
247256
}
248257

249-
var operation = new CallbackOperation<>(operationId, name, resultType, config, executionManager, serDes);
258+
var operation = new CallbackOperation<>(
259+
operationId, name, resultType, config, executionManager, durableConfig.getSerDes());
250260
operation.execute();
251261

252262
return new DurableCallbackFuture<>(operation.getCallbackId(), operation);
@@ -264,7 +274,8 @@ public <T> DurableCallbackFuture<T> createCallback(String name, TypeToken<T> typ
264274
validateReplay(operationId, OperationType.CALLBACK, name, existing);
265275
}
266276

267-
var operation = new CallbackOperation<>(operationId, name, typeToken, config, executionManager, serDes);
277+
var operation = new CallbackOperation<>(
278+
operationId, name, typeToken, config, executionManager, durableConfig.getSerDes());
268279
operation.execute();
269280

270281
return new DurableCallbackFuture<>(operation.getCallbackId(), operation);

sdk/src/main/java/com/amazonaws/lambda/durable/DurableExecutor.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,14 @@ public static <I, O> DurableExecutionOutput execute(
5252
throw new IllegalStateException("First operation must be EXECUTION");
5353
}
5454

55-
// Get executor from config (always non-null)
56-
var executor = config.getExecutorService();
55+
// Get executor from config for running user step functions
56+
var userExecutor = config.getExecutorService();
5757

58-
// TODO: Should we pass the whole input instead?
5958
var executionManager = new ExecutionManager(
6059
input.durableExecutionArn(),
6160
input.checkpointToken(),
6261
input.initialExecutionState(),
63-
config.getDurableExecutionClient(),
64-
executor);
62+
config.getDurableExecutionClient());
6563

6664
var executionOp = executionManager.getExecutionOperation();
6765
logger.debug("EXECUTION operation found: {}", executionOp.id());
@@ -74,11 +72,10 @@ public static <I, O> DurableExecutionOutput execute(
7472
var handlerFuture = CompletableFuture.supplyAsync(
7573
() -> {
7674
// Create context in the executor thread so it detects the correct thread name
77-
var context =
78-
new DurableContext(executionManager, serDes, lambdaContext, config.getLoggerConfig());
75+
var context = new DurableContext(executionManager, config, lambdaContext);
7976
return handler.apply(userInput, context);
8077
},
81-
executor);
78+
userExecutor);
8279

8380
// Get suspend future from ExecutionManager. If this future completes, it
8481
// indicates
@@ -145,7 +142,7 @@ public static <I, O> DurableExecutionOutput execute(
145142

146143
// We DO NOT shutdown the executor since it should stay warm for re-invokes against a warm Lambda runtime.
147144
// For example, a re-invoke after a wait should re-use the same executor instance from DurableConfig.
148-
// executor.shutdown();
145+
// userExecutor.shutdown();
149146
}
150147
}
151148

sdk/src/main/java/com/amazonaws/lambda/durable/execution/CheckpointBatcher.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import java.util.List;
88
import java.util.concurrent.BlockingQueue;
99
import java.util.concurrent.CompletableFuture;
10-
import java.util.concurrent.ExecutorService;
1110
import java.util.concurrent.LinkedBlockingQueue;
1211
import java.util.concurrent.atomic.AtomicBoolean;
1312
import java.util.function.Supplier;
@@ -20,6 +19,11 @@
2019
*
2120
* <p>Single responsibility: Queue and batch checkpoint requests efficiently. Uses CheckpointCallback to notify when
2221
* checkpoints complete, avoiding cyclic dependency.
22+
*
23+
* <p>Uses a dedicated SDK thread pool for internal coordination, keeping checkpoint processing separate from
24+
* customer-configured executors used for user-defined operations.
25+
*
26+
* @see InternalExecutor
2327
*/
2428
class CheckpointBatcher {
2529
private static final int MAX_BATCH_SIZE_BYTES = 750 * 1024; // 750KB
@@ -30,19 +34,16 @@ class CheckpointBatcher {
3034
private final String durableExecutionArn;
3135
private final DurableExecutionClient client;
3236
private final BlockingQueue<CheckpointRequest> queue = new LinkedBlockingQueue<>();
33-
private final ExecutorService executor;
3437
private final AtomicBoolean isProcessing = new AtomicBoolean(false);
3538

3639
record CheckpointRequest(OperationUpdate update, CompletableFuture<Void> completion) {}
3740

3841
CheckpointBatcher(
3942
DurableExecutionClient client,
40-
ExecutorService executor,
4143
String durableExecutionArn,
4244
Supplier<String> tokenSupplier,
4345
CheckpointCallback callback) {
4446
this.client = client;
45-
this.executor = executor;
4647
this.durableExecutionArn = durableExecutionArn;
4748
this.tokenSupplier = tokenSupplier;
4849
this.callback = callback;
@@ -56,7 +57,7 @@ CompletableFuture<Void> checkpoint(OperationUpdate update) {
5657
queue.offer(new CheckpointRequest(update, future));
5758

5859
if (isProcessing.compareAndSet(false, true)) {
59-
executor.submit(this::processQueue);
60+
InternalExecutor.INSTANCE.execute(this::processQueue);
6061
}
6162

6263
return future;
@@ -107,7 +108,7 @@ private void processQueue() {
107108
isProcessing.set(false);
108109

109110
if (!queue.isEmpty() && isProcessing.compareAndSet(false, true)) {
110-
executor.submit(this::processQueue);
111+
InternalExecutor.INSTANCE.execute(this::processQueue);
111112
}
112113
}
113114
}

sdk/src/main/java/com/amazonaws/lambda/durable/execution/ExecutionManager.java

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
import java.util.Map;
1313
import java.util.concurrent.CompletableFuture;
1414
import java.util.concurrent.ConcurrentHashMap;
15-
import java.util.concurrent.Executor;
16-
import java.util.concurrent.Executors;
1715
import java.util.concurrent.Phaser;
1816
import java.util.concurrent.atomic.AtomicReference;
1917
import org.slf4j.Logger;
@@ -35,7 +33,10 @@
3533
* <li>Polling (for waits and retries)
3634
* </ul>
3735
*
38-
* <p>This is the single entry point for all execution coordination.
36+
* <p>This is the single entry point for all execution coordination. Internal coordination (polling, checkpointing) uses
37+
* a dedicated SDK thread pool, while user-defined operations run on a customer-configured executor.
38+
*
39+
* @see InternalExecutor
3940
*/
4041
public class ExecutionManager {
4142

@@ -54,9 +55,6 @@ public class ExecutionManager {
5455
private final Map<String, Phaser> openPhasers = Collections.synchronizedMap(new HashMap<>());
5556
private final CompletableFuture<Void> suspendExecutionFuture = new CompletableFuture<>();
5657

57-
// ===== Executors =====
58-
private final Executor managedExecutor;
59-
6058
// ===== Checkpoint Batching =====
6159
private final CheckpointBatcher checkpointBatcher;
6260
private final DurableExecutionClient client;
@@ -65,8 +63,7 @@ public ExecutionManager(
6563
String durableExecutionArn,
6664
String checkpointToken,
6765
InitialExecutionState initialExecutionState,
68-
DurableExecutionClient client,
69-
Executor executor) {
66+
DurableExecutionClient client) {
7067
this.durableExecutionArn = durableExecutionArn;
7168
this.checkpointToken = checkpointToken;
7269
this.client = client;
@@ -77,13 +74,9 @@ public ExecutionManager(
7774
this.executionMode =
7875
new AtomicReference<>(operations.size() > 1 ? ExecutionMode.REPLAY : ExecutionMode.EXECUTION);
7976

80-
this.managedExecutor = executor;
81-
82-
// Create checkpoint manager (package-private)
83-
// Pass method references to avoid cyclic dependency
84-
var checkpointExecutor = Executors.newSingleThreadExecutor();
77+
// Create checkpoint manager using common pool for internal coordination
8578
this.checkpointBatcher = new CheckpointBatcher(
86-
client, checkpointExecutor, durableExecutionArn, this::getCheckpointToken, this::onCheckpointComplete);
79+
client, durableExecutionArn, this::getCheckpointToken, this::onCheckpointComplete);
8780
}
8881

8982
private void loadAllOperations(InitialExecutionState initialExecutionState) {
@@ -260,7 +253,7 @@ public CompletableFuture<Void> sendOperationUpdate(OperationUpdate update) {
260253
// wait while another thread is still running and we therefore are not
261254
// re-invoked because we never suspended.
262255
public void pollForOperationUpdates(String operationId, Instant firstPollTime, Duration period) {
263-
managedExecutor.execute(() -> {
256+
InternalExecutor.INSTANCE.execute(() -> {
264257
// Sleep until the start
265258
try {
266259
var sleepDuration = Duration.between(Instant.now(), firstPollTime);
@@ -300,7 +293,7 @@ public void pollForOperationUpdates(String operationId, Instant firstPollTime, D
300293
// re-invoked because we never suspended.
301294
public void pollUntilReady(
302295
String operationId, CompletableFuture<Void> future, Instant firstPollTime, Duration period) {
303-
managedExecutor.execute(() -> {
296+
InternalExecutor.INSTANCE.execute(() -> {
304297
// Sleep until first poll time
305298
try {
306299
Duration sleepDuration = Duration.between(Instant.now(), firstPollTime);
@@ -342,10 +335,6 @@ public void pollUntilReady(
342335

343336
// ===== Utilities =====
344337

345-
public Executor getManagedExecutor() {
346-
return managedExecutor;
347-
}
348-
349338
public CompletableFuture<Void> getSuspendExecutionFuture() {
350339
return suspendExecutionFuture;
351340
}

0 commit comments

Comments
 (0)