Skip to content

Commit 4ee9ae8

Browse files
authored
[bugfix]: add presuspend sanity check and fix a bug in retry (#275)
* add presuspend sanity check * update cloud runner to return error when invocation fail * fix runUserHandler in BaseDurableOperation
1 parent c609ed4 commit 4ee9ae8

8 files changed

Lines changed: 48 additions & 18 deletions

File tree

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/HistoryEventProcessor.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,21 @@ public <O> TestResult<O> processEvents(List<Event> events, TypeToken<O> outputTy
5353
}
5454

5555
switch (eventType) {
56-
case EXECUTION_STARTED, INVOCATION_COMPLETED -> {
56+
case EXECUTION_STARTED -> {
5757
// Execution started - no action needed, just track the event
5858
}
59+
case INVOCATION_COMPLETED -> {
60+
var details = event.invocationCompletedDetails();
61+
if (details != null
62+
&& details.error() != null
63+
&& details.error().payload() != null) {
64+
// This will get overridden by the execution events but
65+
// the test cases will still be able to see the error
66+
// if the execution succeeds.
67+
status = ExecutionStatus.FAILED;
68+
error = details.error().payload();
69+
}
70+
}
5971
case EXECUTION_SUCCEEDED -> {
6072
status = ExecutionStatus.SUCCEEDED;
6173
var details = event.executionSucceededDetails();

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/TestResult.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ public class TestResult<O> {
5151

5252
/** Returns the execution status (SUCCEEDED, FAILED, or PENDING). */
5353
public ExecutionStatus getStatus() {
54+
if (status == ExecutionStatus.SUCCEEDED && error != null) {
55+
throw new IllegalStateException(
56+
"Execution succeeded while invocation failed with: " + error.errorMessage());
57+
}
5458
return status;
5559
}
5660

sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,11 +223,25 @@ public void deregisterActiveThread(String threadId) {
223223

224224
if (activeThreads.isEmpty()) {
225225
logger.info("No active threads remaining - suspending execution");
226+
preSuspendCheck();
226227
suspendExecution();
227228
}
228229
}
229230
}
230231

232+
private void preSuspendCheck() {
233+
var hasAnyPendingOperation = operationStorage.values().stream().anyMatch(o -> switch (o.type()) {
234+
case STEP -> o.status() == OperationStatus.PENDING;
235+
case WAIT, CALLBACK -> o.status() == OperationStatus.STARTED;
236+
case CHAINED_INVOKE -> o.status() == OperationStatus.PENDING || o.status() == OperationStatus.STARTED;
237+
default -> false;
238+
});
239+
240+
if (!hasAnyPendingOperation) {
241+
logger.warn("Invalid suspension. No operation is pending");
242+
}
243+
}
244+
231245
// ===== Checkpointing =====
232246

233247
// This method will checkpoint the operation updates to the durable backend and return a future which completes

sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,11 @@ protected Operation waitForOperationCompletion() {
224224
return op;
225225
}
226226

227-
protected void runUserHandler(Runnable runnable, String contextId, ThreadType threadType) {
227+
protected void runUserHandler(Runnable runnable, ThreadType threadType) {
228+
String operationId = getOperationId();
229+
logger.debug("Starting user handler for operation {} ({})", operationId, threadType);
228230
Runnable wrapped = () -> {
229-
executionManager.setCurrentThreadContext(new ThreadContext(contextId, threadType));
231+
executionManager.setCurrentThreadContext(new ThreadContext(operationId, threadType));
230232
try {
231233
runnable.run();
232234
} catch (Throwable throwable) {
@@ -239,11 +241,11 @@ protected void runUserHandler(Runnable runnable, String contextId, ThreadType th
239241
"An unhandled exception is thrown from user function: " + throwable);
240242
}
241243
} finally {
242-
if (contextId != null) {
244+
if (operationId != null) {
243245
try {
244246
// if this is a child context or a step context, we need to
245247
// deregister the context's thread from the execution manager
246-
executionManager.deregisterActiveThread(contextId);
248+
executionManager.deregisterActiveThread(operationId);
247249
} catch (SuspendExecutionException e) {
248250
// Expected when this is the last active thread. Must catch here because:
249251
// 1/ This runs in a worker thread detached from handlerFuture
@@ -257,8 +259,10 @@ protected void runUserHandler(Runnable runnable, String contextId, ThreadType th
257259
};
258260

259261
// runUserHandler is used to ensure that only one user handler is running at a time
260-
if (runningUserHandler.get() != null) {
261-
throw new IllegalStateException("User handler already running");
262+
if (runningUserHandler.get() != null && !runningUserHandler.get().isDone()) {
263+
logger.error("User handler already running for operation {} ({})", getOperationId(), threadType);
264+
throw terminateExecutionWithIllegalDurableOperationException(
265+
"User handler already running: " + getOperationId());
262266
}
263267

264268
// Thread registration is intentionally split across two threads:
@@ -267,14 +271,10 @@ protected void runUserHandler(Runnable runnable, String contextId, ThreadType th
267271
// 2. setCurrentContext on the CHILD thread — sets the ThreadLocal so operations inside
268272
// the child context know which context they belong to.
269273
// registerActiveThread is idempotent (no-op if already registered).
270-
registerActiveThread(contextId);
274+
registerActiveThread(operationId);
271275

272-
if (!runningUserHandler.compareAndSet(
273-
null,
274-
CompletableFuture.runAsync(
275-
wrapped, getContext().getDurableConfig().getExecutorService()))) {
276-
throw new IllegalStateException("User handler already running");
277-
}
276+
runningUserHandler.set(CompletableFuture.runAsync(
277+
wrapped, getContext().getDurableConfig().getExecutorService()));
278278
}
279279

280280
/**

sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ private void executeChildContext() {
129129
};
130130

131131
// Execute user provided child context code in user-configured executor
132-
runUserHandler(userHandler, contextId, ThreadType.CONTEXT);
132+
runUserHandler(userHandler, ThreadType.CONTEXT);
133133
}
134134

135135
private void handleChildContextSuccess(T result) {

sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ protected void executeItems() {
210210
}
211211
};
212212
// run consumer in the user thread pool, although it's not a real user thread
213-
runUserHandler(consumer, getOperationId(), ThreadType.CONTEXT);
213+
runUserHandler(consumer, ThreadType.CONTEXT);
214214
}
215215

216216
private void handleException(Throwable ex) {

sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ private void executeStepLogic(int attempt) {
112112
};
113113

114114
// Execute user provided step code in user-configured executor
115-
runUserHandler(userHandler, getOperationId(), ThreadType.STEP);
115+
runUserHandler(userHandler, ThreadType.STEP);
116116
}
117117

118118
private void checkpointStarted() {

sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ private void executeCheckLogic(T currentState, int attempt) {
167167
}
168168
};
169169

170-
runUserHandler(userHandler, getOperationId(), ThreadType.STEP);
170+
runUserHandler(userHandler, ThreadType.STEP);
171171
}
172172

173173
private void handleCheckFailure(Throwable exception) {

0 commit comments

Comments
 (0)