Skip to content

Commit a2a801b

Browse files
authored
[improvement]: add logic to detect thread pool leak (#308)
* add logic to detect thread pool leak * add validation logic to step * fix the validator for anyOf case
1 parent 4ee9ae8 commit a2a801b

5 files changed

Lines changed: 68 additions & 3 deletions

File tree

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/CallbackIntegrationTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import software.amazon.lambda.durable.config.CallbackConfig;
1414
import software.amazon.lambda.durable.exception.CallbackFailedException;
1515
import software.amazon.lambda.durable.exception.CallbackTimeoutException;
16+
import software.amazon.lambda.durable.execution.SuspendExecutionException;
1617
import software.amazon.lambda.durable.model.ExecutionStatus;
1718
import software.amazon.lambda.durable.serde.JacksonSerDes;
1819
import software.amazon.lambda.durable.serde.SerDes;
@@ -271,6 +272,9 @@ void waitForCallbackCallbackFailed() {
271272
fail();
272273
return "should not reach here";
273274
} catch (Exception e) {
275+
if (e instanceof SuspendExecutionException) {
276+
throw e;
277+
}
274278
assertInstanceOf(CallbackFailedException.class, e);
275279
throw e;
276280
}
@@ -306,6 +310,9 @@ void waitForCallbackCallbackTimeout() {
306310
fail();
307311
return "should not reach here";
308312
} catch (Exception e) {
313+
if (e instanceof SuspendExecutionException) {
314+
throw e;
315+
}
309316
assertInstanceOf(CallbackTimeoutException.class, e);
310317
throw e;
311318
}
@@ -333,6 +340,9 @@ void waitForCallbackCallbackFailedWithUserException() {
333340
throw new IllegalArgumentException(errorMessage);
334341
});
335342
} catch (Exception e) {
343+
if (e instanceof SuspendExecutionException) {
344+
throw e;
345+
}
336346
assertInstanceOf(IllegalArgumentException.class, e);
337347
assertEquals(errorMessage, e.getMessage());
338348
throw e;

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/LocalMemoryExecutionClient.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,12 +249,20 @@ private StepDetails buildStepDetails(OperationUpdate update) {
249249
var existing = existingOp != null ? existingOp.stepDetails() : null;
250250

251251
var detailsBuilder = existing != null ? existing.toBuilder() : StepDetails.builder();
252+
var attempt = existing != null && existing.attempt() != null ? existing.attempt() + 1 : 1;
252253

253-
if (update.action() == OperationAction.RETRY || update.action() == OperationAction.FAIL) {
254-
var attempt = existing != null && existing.attempt() != null ? existing.attempt() + 1 : 1;
254+
if (update.action() == OperationAction.FAIL) {
255255
detailsBuilder.attempt(attempt).error(update.error());
256256
}
257257

258+
if (update.action() == OperationAction.RETRY) {
259+
detailsBuilder
260+
.attempt(attempt)
261+
.error(update.error())
262+
.nextAttemptTimestamp(
263+
Instant.now().plusSeconds(update.stepOptions().nextAttemptDelaySeconds()));
264+
}
265+
258266
if (update.payload() != null) {
259267
detailsBuilder.result(update.payload());
260268
}

sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
import java.util.Map;
1212
import java.util.Objects;
1313
import java.util.Set;
14+
import java.util.concurrent.CancellationException;
1415
import java.util.concurrent.CompletableFuture;
16+
import java.util.concurrent.ThreadPoolExecutor;
1517
import java.util.concurrent.atomic.AtomicReference;
1618
import java.util.stream.Collectors;
1719
import org.slf4j.Logger;
@@ -54,6 +56,7 @@ public class ExecutionManager implements AutoCloseable {
5456
private final Operation executionOp;
5557
private final String durableExecutionArn;
5658
private final AtomicReference<ExecutionMode> executionMode;
59+
private final DurableConfig durableConfig;
5760

5861
// ===== Thread Coordination =====
5962
private final Map<String, BaseDurableOperation> registeredOperations = Collections.synchronizedMap(new HashMap<>());
@@ -65,6 +68,7 @@ public class ExecutionManager implements AutoCloseable {
6568
private final CheckpointManager checkpointManager;
6669

6770
public ExecutionManager(DurableExecutionInput input, DurableConfig config) {
71+
durableConfig = config;
6872
this.durableExecutionArn = input.durableExecutionArn();
6973

7074
// Create checkpoint batcher for internal coordination
@@ -276,9 +280,41 @@ public CompletableFuture<Operation> pollForOperationUpdates(String operationId,
276280
/** Shutdown the checkpoint batcher. */
277281
@Override
278282
public void close() {
283+
validateRunningThreads();
284+
279285
checkpointManager.shutdown();
280286
}
281287

288+
private void validateRunningThreads() {
289+
// This will detect stuck user thread and thread leaks in the thread pool
290+
for (BaseDurableOperation op : registeredOperations.values()) {
291+
var userHandlerFuture = op.getRunningUserHandler();
292+
if (userHandlerFuture != null && !userHandlerFuture.isDone()) {
293+
// Some user threads can still be running because
294+
// the operations that run them have never been waiting for and the execution has completed.
295+
logger.info("Waiting for operation to complete before shutting down: {}", op.getOperationId());
296+
try {
297+
userHandlerFuture.get();
298+
} catch (InterruptedException | CancellationException e) {
299+
// if the user handler is stuck
300+
throw new IllegalStateException(
301+
"Stuck running user handler when shutting down: " + op.getOperationId());
302+
} catch (Exception e) {
303+
// ok if the future completed exceptionally
304+
}
305+
}
306+
}
307+
308+
// double check if the thread pool is empty
309+
if (durableConfig.getExecutorService() instanceof ThreadPoolExecutor threadPoolExecutor) {
310+
var threadCount = threadPoolExecutor.getActiveCount();
311+
// This may or may not be a problem because getActiveCount doesn't return an accurate number
312+
if (threadCount > 0) {
313+
logger.warn("{} active threads in user executor pool when shutting down", threadCount);
314+
}
315+
}
316+
}
317+
282318
/** Returns {@code true} if the given status represents a terminal (final) operation state. */
283319
public static boolean isTerminalStatus(OperationStatus status) {
284320
return status == OperationStatus.SUCCEEDED

sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,4 +405,8 @@ protected void validateReplay(Operation checkpointed) {
405405
getOperationId(), checkpointed.subType(), getSubType())));
406406
}
407407
}
408+
409+
public CompletableFuture<Void> getRunningUserHandler() {
410+
return runningUserHandler.get();
411+
}
408412
}

sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,14 @@ protected void replay(Operation existing) {
7575
}
7676
}
7777
// Step is pending retry - Start polling for PENDING -> READY transition
78-
case PENDING -> pollReadyAndExecuteStepLogic(existing.stepDetails().nextAttemptTimestamp(), attempt);
78+
case PENDING -> {
79+
if (existing.stepDetails() != null && existing.stepDetails().nextAttemptTimestamp() != null) {
80+
pollReadyAndExecuteStepLogic(existing.stepDetails().nextAttemptTimestamp(), attempt);
81+
} else {
82+
throw terminateExecutionWithIllegalDurableOperationException(
83+
"Unexpected PENDING step without nextAttemptTimestamp: " + getOperationId());
84+
}
85+
}
7986
// Execute with current attempt
8087
case READY -> executeStepLogic(attempt);
8188
default ->

0 commit comments

Comments
 (0)