Skip to content

Commit cb46d4b

Browse files
committed
Return on LA queue thread interrupts
1 parent a76b6e8 commit cb46d4b

1 file changed

Lines changed: 10 additions & 5 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ private void processQueue() {
107107
afterReservedCallback.apply(request.task);
108108
} catch (InterruptedException e) {
109109
Thread.currentThread().interrupt();
110+
return;
110111
} catch (Throwable e) {
111112
// Fail the workflow task if something went wrong executing the local activity (at the
112113
// executor level, otherwise, the LA handler itself should be handling errors)
@@ -120,6 +121,11 @@ private void processQueue() {
120121
LocalActivityResult.processingFailed(
121122
executionContext.getActivityId(), request.task.getAttemptTask().getAttempt(), e));
122123
}
124+
if (e.getCause() instanceof InterruptedException) {
125+
// It's possible the interrupt happens inside the callback, so check that as well.
126+
Thread.currentThread().interrupt();
127+
return;
128+
}
123129
}
124130
}
125131
}
@@ -170,11 +176,9 @@ public boolean isTerminated() {
170176
@Override
171177
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
172178
running = false;
173-
if (requestQueue.isEmpty()) {
174-
// Just interrupt the thread, so that if we're waiting on blocking take the thread will
175-
// be interrupted and exit. Otherwise the loop will exit once the queue is empty.
176-
queueThreadService.shutdownNow();
177-
}
179+
// Always interrupt. This won't cause any *tasks* to be interrupted, since the queue thread is
180+
// only responsible for handing them out.
181+
queueThreadService.shutdownNow();
178182

179183
return interruptTasks
180184
? shutdownManager.shutdownExecutorNowUntimed(
@@ -190,6 +194,7 @@ public void awaitTermination(long timeout, TimeUnit unit) {
190194
// timeout duration if no task was ever submitted.
191195
return;
192196
}
197+
193198
ShutdownManager.awaitTermination(queueThreadService, unit.toMillis(timeout));
194199
}
195200
}

0 commit comments

Comments
 (0)