Skip to content

Commit 49dfe74

Browse files
committed
Merge remote-tracking branch 'origin/main' into exception
2 parents 7888631 + 114f749 commit 49dfe74

2 files changed

Lines changed: 18 additions & 17 deletions

File tree

sdk/src/main/java/com/amazonaws/lambda/durable/DurableExecutor.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import com.amazonaws.lambda.durable.exception.DurableOperationException;
66
import com.amazonaws.lambda.durable.execution.ExecutionManager;
7+
import com.amazonaws.lambda.durable.execution.SuspendExecutionException;
78
import com.amazonaws.lambda.durable.model.DurableExecutionInput;
89
import com.amazonaws.lambda.durable.model.DurableExecutionOutput;
910
import com.amazonaws.lambda.durable.serde.SerDes;
@@ -81,11 +82,7 @@ public static <I, O> DurableExecutionOutput execute(
8182
userExecutor);
8283

8384
// Get suspend future from ExecutionManager. If this future completes, it
84-
// indicates
85-
// that no threads are active and we can safely suspend. This is useful for
86-
// async scenarios where multiple operations are scheduled concurrently and
87-
// awaited
88-
// at a later point.
85+
// indicates that no threads are active and we can safely suspend.
8986
var suspendFuture = executionManager.getSuspendExecutionFuture();
9087

9188
// Wait for either handler to complete or suspension to occur
@@ -96,16 +93,6 @@ public static <I, O> DurableExecutionOutput execute(
9693
return DurableExecutionOutput.pending();
9794
}
9895

99-
if (handlerFuture.isCompletedExceptionally()) {
100-
try {
101-
handlerFuture.join(); // Will throw the exception
102-
} catch (Exception e) {
103-
Throwable cause = ExceptionHelper.unwrapCompletableFuture(e);
104-
logger.debug("Execution failed: {}", cause.getMessage());
105-
return DurableExecutionOutput.failure(buildErrorObject(cause, serDes));
106-
}
107-
}
108-
10996
var result = handlerFuture.get();
11097
var outputPayload = serDes.serialize(result);
11198

@@ -137,7 +124,13 @@ public static <I, O> DurableExecutionOutput execute(
137124
logger.debug("Execution completed");
138125
return DurableExecutionOutput.success(outputPayload);
139126
} catch (Exception e) {
140-
return DurableExecutionOutput.failure(buildErrorObject(ExceptionHelper.unwrapCompletableFuture(e), serDes));
127+
Throwable cause = ExceptionHelper.unwrapCompletableFuture(e);
128+
if (cause instanceof SuspendExecutionException) {
129+
logger.debug("Execution suspended");
130+
return DurableExecutionOutput.pending();
131+
}
132+
logger.debug("Execution failed: {}", cause.getMessage());
133+
return DurableExecutionOutput.failure(buildErrorObject(cause, serDes));
141134
} finally {
142135
// We shutdown the execution to make sure remaining checkpoint calls in the queue are drained
143136
executionManager.shutdown();

sdk/src/main/java/com/amazonaws/lambda/durable/operation/StepOperation.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.amazonaws.lambda.durable.exception.StepInterruptedException;
1111
import com.amazonaws.lambda.durable.execution.ExecutionManager;
1212
import com.amazonaws.lambda.durable.execution.ExecutionPhase;
13+
import com.amazonaws.lambda.durable.execution.SuspendExecutionException;
1314
import com.amazonaws.lambda.durable.execution.ThreadType;
1415
import com.amazonaws.lambda.durable.logging.DurableLogger;
1516
import com.amazonaws.lambda.durable.serde.SerDes;
@@ -190,7 +191,14 @@ private void executeStepLogic(int attempt) {
190191
} catch (Throwable e) {
191192
handleStepError(e, attempt);
192193
} finally {
193-
executionManager.deregisterActiveThread(stepThreadId);
194+
try {
195+
executionManager.deregisterActiveThread(stepThreadId);
196+
} catch (SuspendExecutionException e) {
197+
// Expected when this is the last active thread. Must catch here because:
198+
// 1/ This runs in a worker thread detached from handlerFuture
199+
// 2/ Uncaught exception would prevent phaser from advancing, blocking stepAsync().get()
200+
// Suspension is already signaled via suspendExecutionFuture before the throw.
201+
}
194202
durableLogger.clearOperationContext();
195203
}
196204
});

0 commit comments

Comments
 (0)