Skip to content

Commit 2f0113f

Browse files
committed
Refactor to InternalExecutor with cached thread pool.
1 parent 48ce2b4 commit 2f0113f

6 files changed

Lines changed: 54 additions & 24 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ public class OrderProcessor extends DurableHandler<Order, OrderResult> {
315315
| `withExecutorService()` | Thread pool for user-defined operations | Cached daemon thread pool |
316316
| `withLoggerConfig()` | Logger behavior configuration | Suppress logs during replay |
317317

318-
The `withExecutorService()` option configures the thread pool used for running user-defined operations. Internal SDK coordination (checkpoint batching, polling) runs on the JVM's common `ForkJoinPool`.
318+
The `withExecutorService()` option configures the thread pool used for running user-defined operations. Internal SDK coordination (checkpoint batching, polling) runs on an SDK-managed thread pool.
319319

320320
## Logging
321321

docs/design.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,19 +98,20 @@ The SDK uses two separate thread pools with distinct responsibilities:
9898
- Configurable via `DurableConfig.builder().withExecutorService()`
9999
- Default: cached daemon thread pool
100100

101-
**Internal Executor (`ForkJoinPool.commonPool()`):**
101+
**Internal Executor (`InternalExecutor.INSTANCE`):**
102102
- Runs SDK coordination tasks: checkpoint batching, polling for wait completion, phaser management
103+
- Dedicated cached thread pool with daemon threads named `durable-sdk-internal-*`
103104
- Not configurable by users
104105

105106
**Benefits of this separation:**
106107

107108
| Benefit | Description |
108109
|---------|-------------|
109110
| **Isolation** | User operations can't starve SDK internals, and vice versa |
110-
| **No shutdown management** | The common pool is JVM-managed; SDK coordination continues even if the user's executor is shut down |
111-
| **Efficient resource usage** | Common pool uses work-stealing and scales with available processors |
112-
| **Daemon threads** | Common pool threads won't prevent JVM shutdown |
113-
| **Single configuration point** | Changing `INTERNAL_EXECUTOR` in one place affects all SDK coordination |
111+
| **No shutdown management** | Internal pool uses daemon threads; SDK coordination continues even if the user's executor is shut down |
112+
| **Efficient resource usage** | Cached thread pool creates threads on demand and reuses idle threads (60s timeout) |
113+
| **Daemon threads** | Internal threads won't prevent JVM shutdown |
114+
| **Single configuration point** | Changing `InternalExecutor.INSTANCE` in one place affects all SDK coordination |
114115

115116
**Example: Custom thread pool for user operations:**
116117
```java

sdk/src/main/java/com/amazonaws/lambda/durable/DurableExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public static <I, O> DurableExecutionOutput execute(
142142

143143
// We DO NOT shutdown the executor since it should stay warm for re-invokes against a warm Lambda runtime.
144144
// For example, a re-invoke after a wait should re-use the same executor instance from DurableConfig.
145-
// executor.shutdown();
145+
// userExecutor.shutdown();
146146
}
147147
}
148148

sdk/src/main/java/com/amazonaws/lambda/durable/execution/CheckpointBatcher.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
import java.util.List;
88
import java.util.concurrent.BlockingQueue;
99
import java.util.concurrent.CompletableFuture;
10-
import java.util.concurrent.Executor;
11-
import java.util.concurrent.ForkJoinPool;
1210
import java.util.concurrent.LinkedBlockingQueue;
1311
import java.util.concurrent.atomic.AtomicBoolean;
1412
import java.util.function.Supplier;
@@ -22,16 +20,15 @@
2220
* <p>Single responsibility: Queue and batch checkpoint requests efficiently. Uses CheckpointCallback to notify when
2321
* checkpoints complete, avoiding cyclic dependency.
2422
*
25-
* <p>Uses the common ForkJoinPool for internal coordination, keeping checkpoint processing separate from
26-
* customer-configured executors.
23+
* <p>Uses a dedicated SDK thread pool for internal coordination, keeping checkpoint processing separate from
24+
* customer-configured executors used for user-defined operations.
25+
*
26+
* @see InternalExecutor
2727
*/
2828
class CheckpointBatcher {
2929
private static final int MAX_BATCH_SIZE_BYTES = 750 * 1024; // 750KB
3030
private static final Logger logger = LoggerFactory.getLogger(CheckpointBatcher.class);
3131

32-
/** Internal executor for SDK coordination tasks (checkpoint queue processing). */
33-
private static final Executor INTERNAL_EXECUTOR = ForkJoinPool.commonPool();
34-
3532
private final CheckpointCallback callback;
3633
private final Supplier<String> tokenSupplier;
3734
private final String durableExecutionArn;
@@ -60,7 +57,7 @@ CompletableFuture<Void> checkpoint(OperationUpdate update) {
6057
queue.offer(new CheckpointRequest(update, future));
6158

6259
if (isProcessing.compareAndSet(false, true)) {
63-
INTERNAL_EXECUTOR.execute(this::processQueue);
60+
InternalExecutor.INSTANCE.execute(this::processQueue);
6461
}
6562

6663
return future;
@@ -111,7 +108,7 @@ private void processQueue() {
111108
isProcessing.set(false);
112109

113110
if (!queue.isEmpty() && isProcessing.compareAndSet(false, true)) {
114-
INTERNAL_EXECUTOR.execute(this::processQueue);
111+
InternalExecutor.INSTANCE.execute(this::processQueue);
115112
}
116113
}
117114
}

sdk/src/main/java/com/amazonaws/lambda/durable/execution/ExecutionManager.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
import java.util.Map;
1313
import java.util.concurrent.CompletableFuture;
1414
import java.util.concurrent.ConcurrentHashMap;
15-
import java.util.concurrent.Executor;
16-
import java.util.concurrent.ForkJoinPool;
1715
import java.util.concurrent.Phaser;
1816
import java.util.concurrent.atomic.AtomicReference;
1917
import org.slf4j.Logger;
@@ -36,15 +34,14 @@
3634
* </ul>
3735
*
3836
* <p>This is the single entry point for all execution coordination. Internal coordination (polling, checkpointing) uses
39-
* the common ForkJoinPool, while user operations run on a customer-configured executor.
37+
* a dedicated SDK thread pool, while user-defined operations run on a customer-configured executor.
38+
*
39+
* @see InternalExecutor
4040
*/
4141
public class ExecutionManager {
4242

4343
private static final Logger logger = LoggerFactory.getLogger(ExecutionManager.class);
4444

45-
/** Internal executor for SDK coordination tasks (polling, phaser management). */
46-
private static final Executor INTERNAL_EXECUTOR = ForkJoinPool.commonPool();
47-
4845
// ===== Execution State =====
4946
private final Map<String, Operation> operations = new ConcurrentHashMap<>();
5047
private final String executionOperationId;
@@ -256,7 +253,7 @@ public CompletableFuture<Void> sendOperationUpdate(OperationUpdate update) {
256253
// wait while another thread is still running and we therefore are not
257254
// re-invoked because we never suspended.
258255
public void pollForOperationUpdates(String operationId, Instant firstPollTime, Duration period) {
259-
INTERNAL_EXECUTOR.execute(() -> {
256+
InternalExecutor.INSTANCE.execute(() -> {
260257
// Sleep until the start
261258
try {
262259
var sleepDuration = Duration.between(Instant.now(), firstPollTime);
@@ -296,7 +293,7 @@ public void pollForOperationUpdates(String operationId, Instant firstPollTime, D
296293
// re-invoked because we never suspended.
297294
public void pollUntilReady(
298295
String operationId, CompletableFuture<Void> future, Instant firstPollTime, Duration period) {
299-
INTERNAL_EXECUTOR.execute(() -> {
296+
InternalExecutor.INSTANCE.execute(() -> {
300297
// Sleep until first poll time
301298
try {
302299
Duration sleepDuration = Duration.between(Instant.now(), firstPollTime);
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package com.amazonaws.lambda.durable.execution;
4+
5+
import java.util.concurrent.Executor;
6+
import java.util.concurrent.Executors;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
/**
10+
* Shared executor for internal SDK coordination tasks.
11+
*
12+
* <p>This executor is used for SDK-internal operations such as checkpoint batching, polling for wait completion, and
13+
* phaser coordination. It is separate from the user-configured executor (via {@code DurableConfig}) which runs
14+
* user-defined operations.
15+
*
16+
* <p>Using a dedicated thread pool ensures SDK coordination tasks are isolated from user code.
17+
*/
18+
final class InternalExecutor {
19+
20+
private static final AtomicInteger THREAD_COUNTER = new AtomicInteger(0);
21+
22+
/**
23+
* Shared executor for all SDK-internal coordination tasks. Uses a cached thread pool that creates threads on
24+
* demand, reuses idle threads, and terminates threads after 60 seconds of inactivity by default.
25+
*/
26+
static final Executor INSTANCE = Executors.newCachedThreadPool(runnable -> {
27+
var thread = new Thread(runnable, "durable-sdk-internal-" + THREAD_COUNTER.getAndIncrement());
28+
thread.setDaemon(true);
29+
return thread;
30+
});
31+
32+
private InternalExecutor() {
33+
// Utility class
34+
}
35+
}

0 commit comments

Comments
 (0)