Skip to content

Commit 7710c90

Browse files
committed
fix runUserHandler in BaseDurableOperation
1 parent 04c9060 commit 7710c90

5 files changed

Lines changed: 17 additions & 17 deletions

File tree

sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,11 @@ protected Operation waitForOperationCompletion() {
224224
return op;
225225
}
226226

227-
protected void runUserHandler(Runnable runnable, String contextId, ThreadType threadType) {
227+
protected void runUserHandler(Runnable runnable, ThreadType threadType) {
228+
String operationId = getOperationId();
229+
logger.debug("Starting user handler for operation {} ({})", operationId, threadType);
228230
Runnable wrapped = () -> {
229-
executionManager.setCurrentThreadContext(new ThreadContext(contextId, threadType));
231+
executionManager.setCurrentThreadContext(new ThreadContext(operationId, threadType));
230232
try {
231233
runnable.run();
232234
} catch (Throwable throwable) {
@@ -239,11 +241,11 @@ protected void runUserHandler(Runnable runnable, String contextId, ThreadType th
239241
"An unhandled exception is thrown from user function: " + throwable);
240242
}
241243
} finally {
242-
if (contextId != null) {
244+
if (operationId != null) {
243245
try {
244246
// if this is a child context or a step context, we need to
245247
// deregister the context's thread from the execution manager
246-
executionManager.deregisterActiveThread(contextId);
248+
executionManager.deregisterActiveThread(operationId);
247249
} catch (SuspendExecutionException e) {
248250
// Expected when this is the last active thread. Must catch here because:
249251
// 1/ This runs in a worker thread detached from handlerFuture
@@ -257,8 +259,10 @@ protected void runUserHandler(Runnable runnable, String contextId, ThreadType th
257259
};
258260

259261
// runUserHandler is used to ensure that only one user handler is running at a time
260-
if (runningUserHandler.get() != null) {
261-
throw new IllegalStateException("User handler already running");
262+
if (runningUserHandler.get() != null && !runningUserHandler.get().isDone()) {
263+
logger.error("User handler already running for operation {} ({})", getOperationId(), threadType);
264+
throw terminateExecutionWithIllegalDurableOperationException(
265+
"User handler already running: " + getOperationId());
262266
}
263267

264268
// Thread registration is intentionally split across two threads:
@@ -267,14 +271,10 @@ protected void runUserHandler(Runnable runnable, String contextId, ThreadType th
267271
// 2. setCurrentContext on the CHILD thread — sets the ThreadLocal so operations inside
268272
// the child context know which context they belong to.
269273
// registerActiveThread is idempotent (no-op if already registered).
270-
registerActiveThread(contextId);
274+
registerActiveThread(operationId);
271275

272-
if (!runningUserHandler.compareAndSet(
273-
null,
274-
CompletableFuture.runAsync(
275-
wrapped, getContext().getDurableConfig().getExecutorService()))) {
276-
throw new IllegalStateException("User handler already running");
277-
}
276+
runningUserHandler.set(CompletableFuture.runAsync(
277+
wrapped, getContext().getDurableConfig().getExecutorService()));
278278
}
279279

280280
/**

sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ private void executeChildContext() {
129129
};
130130

131131
// Execute user provided child context code in user-configured executor
132-
runUserHandler(userHandler, contextId, ThreadType.CONTEXT);
132+
runUserHandler(userHandler, ThreadType.CONTEXT);
133133
}
134134

135135
private void handleChildContextSuccess(T result) {

sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ protected void executeItems() {
210210
}
211211
};
212212
// run consumer in the user thread pool, although it's not a real user thread
213-
runUserHandler(consumer, getOperationId(), ThreadType.CONTEXT);
213+
runUserHandler(consumer, ThreadType.CONTEXT);
214214
}
215215

216216
private void handleException(Throwable ex) {

sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ private void executeStepLogic(int attempt) {
112112
};
113113

114114
// Execute user provided step code in user-configured executor
115-
runUserHandler(userHandler, getOperationId(), ThreadType.STEP);
115+
runUserHandler(userHandler, ThreadType.STEP);
116116
}
117117

118118
private void checkpointStarted() {

sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ private void executeCheckLogic(T currentState, int attempt) {
167167
}
168168
};
169169

170-
runUserHandler(userHandler, getOperationId(), ThreadType.STEP);
170+
runUserHandler(userHandler, ThreadType.STEP);
171171
}
172172

173173
private void handleCheckFailure(Throwable exception) {

0 commit comments

Comments
 (0)