Skip to content

Commit ee2deec

Browse files
committed
fix thread pool leak
1 parent d1cd716 commit ee2deec

2 files changed

Lines changed: 21 additions & 6 deletions

File tree

sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,20 @@ public final class DurableConfig {
7474
private static final String PROJECT_VERSION = getProjectVersion(VERSION_FILE);
7575
private static final String USER_AGENT_SUFFIX = "@aws/durable-execution-sdk-java/" + PROJECT_VERSION;
7676

77+
/**
78+
* A default ExecutorService for running user-defined operations. Uses a cached thread pool with daemon threads by
79+
* default.
80+
*
81+
* <p>This executor is used exclusively for user operations. Internal SDK coordination uses the
82+
* InternalExecutor::INSTANCE
83+
*/
84+
private static final ExecutorService DEFAULT_USER_THREAD_POOL = Executors.newCachedThreadPool(r -> {
85+
Thread t = new Thread(r);
86+
t.setName("durable-exec-" + t.getId());
87+
t.setDaemon(true);
88+
return t;
89+
});
90+
7791
private final DurableExecutionClient durableExecutionClient;
7892
private final SerDes serDes;
7993
private final ExecutorService executorService;
@@ -250,12 +264,7 @@ private static String getProjectVersion(String versionFile) {
250264
*/
251265
private static ExecutorService createDefaultExecutor() {
252266
logger.debug("Creating default ExecutorService");
253-
return Executors.newCachedThreadPool(r -> {
254-
Thread t = new Thread(r);
255-
t.setName("durable-exec-" + t.getId());
256-
t.setDaemon(true);
257-
return t;
258-
});
267+
return DEFAULT_USER_THREAD_POOL;
259268
}
260269

261270
/** Builder for DurableConfig. Provides fluent API for configuring SDK components. */

sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,17 +280,23 @@ public static boolean isTerminalStatus(OperationStatus status) {
280280
* @param exception the unrecoverable exception that caused termination
281281
*/
282282
public void terminateExecution(UnrecoverableDurableExecutionException exception) {
283+
stopAllOperations(exception);
283284
executionExceptionFuture.completeExceptionally(exception);
284285
throw exception;
285286
}
286287

287288
/** Suspends the execution by completing the execution exception future with a {@link SuspendExecutionException}. */
288289
public void suspendExecution() {
289290
var ex = new SuspendExecutionException();
291+
stopAllOperations(ex);
290292
executionExceptionFuture.completeExceptionally(ex);
291293
throw ex;
292294
}
293295

296+
private void stopAllOperations(Exception cause) {
297+
registeredOperations.values().forEach(op -> op.getCompletionFuture().completeExceptionally(cause));
298+
}
299+
294300
/**
295301
* return a future that completes when userFuture completes successfully or the execution is terminated or
296302
* suspended.

0 commit comments

Comments
 (0)