Skip to content

Commit 76fe964

Browse files
committed
fix thread pool leak
1 parent d1cd716 commit 76fe964

7 files changed

Lines changed: 49 additions & 14 deletions

File tree

examples/src/main/java/software/amazon/lambda/durable/examples/callback/WaitForCallbackFailedExample.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import software.amazon.lambda.durable.config.WaitForCallbackConfig;
1010
import software.amazon.lambda.durable.examples.types.ApprovalRequest;
1111
import software.amazon.lambda.durable.exception.SerDesException;
12+
import software.amazon.lambda.durable.execution.SuspendExecutionException;
1213
import software.amazon.lambda.durable.serde.JacksonSerDes;
1314

1415
public class WaitForCallbackFailedExample extends DurableHandler<ApprovalRequest, String> {
@@ -32,6 +33,10 @@ public String handleRequest(ApprovalRequest input, DurableContext context) {
3233
.build())
3334
.build());
3435
} catch (Exception ex) {
36+
// not to swallow the SuspendExecutionException
37+
if (ex instanceof SuspendExecutionException suspendExecutionException) {
38+
throw suspendExecutionException;
39+
}
3540
return ex.getClass().getSimpleName() + ":" + ex.getMessage();
3641
}
3742

sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,20 @@ public final class DurableConfig {
7474
private static final String PROJECT_VERSION = getProjectVersion(VERSION_FILE);
7575
private static final String USER_AGENT_SUFFIX = "@aws/durable-execution-sdk-java/" + PROJECT_VERSION;
7676

77+
/**
78+
* A default ExecutorService for running user-defined operations. Uses a cached thread pool with daemon threads by
79+
* default.
80+
*
81+
* <p>This executor is used exclusively for user operations. Internal SDK coordination uses the
82+
* InternalExecutor::INSTANCE
83+
*/
84+
private static final ExecutorService DEFAULT_USER_THREAD_POOL = Executors.newCachedThreadPool(r -> {
85+
Thread t = new Thread(r);
86+
t.setName("durable-exec-" + t.getId());
87+
t.setDaemon(true);
88+
return t;
89+
});
90+
7791
private final DurableExecutionClient durableExecutionClient;
7892
private final SerDes serDes;
7993
private final ExecutorService executorService;
@@ -250,12 +264,7 @@ private static String getProjectVersion(String versionFile) {
250264
*/
251265
private static ExecutorService createDefaultExecutor() {
252266
logger.debug("Creating default ExecutorService");
253-
return Executors.newCachedThreadPool(r -> {
254-
Thread t = new Thread(r);
255-
t.setName("durable-exec-" + t.getId());
256-
t.setDaemon(true);
257-
return t;
258-
});
267+
return DEFAULT_USER_THREAD_POOL;
259268
}
260269

261270
/** Builder for DurableConfig. Provides fluent API for configuring SDK components. */

sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,17 +280,23 @@ public static boolean isTerminalStatus(OperationStatus status) {
280280
* @param exception the unrecoverable exception that caused termination
281281
*/
282282
public void terminateExecution(UnrecoverableDurableExecutionException exception) {
283+
stopAllOperations(exception);
283284
executionExceptionFuture.completeExceptionally(exception);
284285
throw exception;
285286
}
286287

287288
/** Suspends the execution by completing the execution exception future with a {@link SuspendExecutionException}. */
288289
public void suspendExecution() {
289290
var ex = new SuspendExecutionException();
291+
stopAllOperations(ex);
290292
executionExceptionFuture.completeExceptionally(ex);
291293
throw ex;
292294
}
293295

296+
private void stopAllOperations(Exception cause) {
297+
registeredOperations.values().forEach(op -> op.getCompletionFuture().completeExceptionally(cause));
298+
}
299+
294300
/**
295301
* return a future that completes when userFuture completes successfully or the execution is terminated or
296302
* suspended.

sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import software.amazon.lambda.durable.execution.ThreadType;
2323
import software.amazon.lambda.durable.model.OperationIdentifier;
2424
import software.amazon.lambda.durable.model.OperationSubType;
25+
import software.amazon.lambda.durable.util.ExceptionHelper;
2526

2627
/**
2728
* Base class for all durable operations (STEP, WAIT, etc.).
@@ -208,7 +209,11 @@ protected Operation waitForOperationCompletion() {
208209
}
209210

210211
// Block until operation completes. No-op if the future is already completed.
211-
completionFuture.join();
212+
try {
213+
completionFuture.join();
214+
} catch (Throwable throwable) {
215+
ExceptionHelper.sneakyThrow(ExceptionHelper.unwrapCompletableFuture(throwable));
216+
}
212217

213218
// Get result based on status
214219
var op = getOperation();

sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414
import software.amazon.lambda.durable.config.CompletionConfig;
1515
import software.amazon.lambda.durable.config.MapConfig;
1616
import software.amazon.lambda.durable.context.DurableContextImpl;
17+
import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException;
18+
import software.amazon.lambda.durable.execution.SuspendExecutionException;
1719
import software.amazon.lambda.durable.model.ConcurrencyCompletionStatus;
1820
import software.amazon.lambda.durable.model.MapResult;
1921
import software.amazon.lambda.durable.model.OperationIdentifier;
2022
import software.amazon.lambda.durable.model.OperationSubType;
2123
import software.amazon.lambda.durable.serde.SerDes;
24+
import software.amazon.lambda.durable.util.ExceptionHelper;
2225

2326
/**
2427
* Executes a map operation: applies a function to each item in a collection concurrently, with each item running in its
@@ -153,8 +156,18 @@ protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletio
153156
} else {
154157
try {
155158
resultItems.set(i, MapResult.MapResultItem.succeeded(branch.get()));
156-
} catch (Exception e) {
157-
resultItems.set(i, MapResult.MapResultItem.failed(MapResult.MapError.of(e)));
159+
} catch (Throwable exception) {
160+
Throwable throwable = ExceptionHelper.unwrapCompletableFuture(exception);
161+
if (throwable instanceof SuspendExecutionException suspendExecutionException) {
162+
// Rethrow Error immediately — do not checkpoint
163+
throw suspendExecutionException;
164+
}
165+
if (throwable
166+
instanceof UnrecoverableDurableExecutionException unrecoverableDurableExecutionException) {
167+
// terminate the execution and throw the exception if it's not recoverable
168+
throw terminateExecution(unrecoverableDurableExecutionException);
169+
}
170+
resultItems.set(i, MapResult.MapResultItem.failed(MapResult.MapError.of(throwable)));
158171
}
159172
}
160173
}

sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import java.time.Duration;
66
import java.util.concurrent.CompletableFuture;
7-
import java.util.concurrent.ExecutorService;
87
import java.util.function.BiFunction;
98
import software.amazon.awssdk.services.lambda.model.Operation;
109
import software.amazon.awssdk.services.lambda.model.OperationAction;
@@ -40,7 +39,6 @@ public class WaitForConditionOperation<T> extends SerializableDurableOperation<T
4039

4140
private final BiFunction<T, StepContext, WaitForConditionResult<T>> checkFunc;
4241
private final WaitForConditionConfig<T> config;
43-
private final ExecutorService userExecutor;
4442

4543
public WaitForConditionOperation(
4644
String operationId,
@@ -57,7 +55,6 @@ public WaitForConditionOperation(
5755

5856
this.checkFunc = checkFunc;
5957
this.config = config;
60-
this.userExecutor = durableContext.getDurableConfig().getExecutorService();
6158
}
6259

6360
@Override

sdk/src/test/java/software/amazon/lambda/durable/DurableConfigTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ void testBuilder_MultipleBuilds_CreateIndependentInstances() {
190190
assertEquals(config1.getDurableExecutionClient(), config2.getDurableExecutionClient());
191191

192192
// ExecutorService should be different instances (each gets its own)
193-
assertNotSame(config1.getExecutorService(), config2.getExecutorService());
193+
assertSame(config1.getExecutorService(), config2.getExecutorService());
194194
}
195195

196196
@Test
@@ -210,7 +210,7 @@ void testDefaultConfig_CreatesNewInstancesEachTime() {
210210
var config2 = DurableConfig.defaultConfig();
211211

212212
assertNotSame(config1, config2);
213-
assertNotSame(config1.getExecutorService(), config2.getExecutorService());
213+
assertSame(config1.getExecutorService(), config2.getExecutorService());
214214
}
215215

216216
@Test

0 commit comments

Comments
 (0)