Skip to content

Commit 3627e75

Browse files
committed
Return on LA queue thread interrupts
1 parent a76b6e8 commit 3627e75

1 file changed

Lines changed: 14 additions & 6 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,11 @@ private void processQueue() {
8383
SlotPermit slotPermit = null;
8484
QueuedLARequest request = null;
8585
try {
86-
request = requestQueue.take();
86+
// Use poll with timeout instead of take() to avoid being blocked indefinitely
87+
request = requestQueue.poll(100, TimeUnit.MILLISECONDS);
88+
if (request == null) {
89+
continue;
90+
}
8791

8892
SlotSupplierFuture future = slotSupplier.reserveSlot(request.data);
8993
try {
@@ -107,6 +111,7 @@ private void processQueue() {
107111
afterReservedCallback.apply(request.task);
108112
} catch (InterruptedException e) {
109113
Thread.currentThread().interrupt();
114+
return;
110115
} catch (Throwable e) {
111116
// Fail the workflow task if something went wrong executing the local activity (at the
112117
// executor level, otherwise, the LA handler itself should be handling errors)
@@ -120,6 +125,10 @@ private void processQueue() {
120125
LocalActivityResult.processingFailed(
121126
executionContext.getActivityId(), request.task.getAttemptTask().getAttempt(), e));
122127
}
128+
if (e.getCause() instanceof InterruptedException) {
129+
Thread.currentThread().interrupt();
130+
return;
131+
}
123132
}
124133
}
125134
}
@@ -170,11 +179,9 @@ public boolean isTerminated() {
170179
@Override
171180
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
172181
running = false;
173-
if (requestQueue.isEmpty()) {
174-
// Just interrupt the thread, so that if we're waiting on blocking take the thread will
175-
// be interrupted and exit. Otherwise the loop will exit once the queue is empty.
176-
queueThreadService.shutdownNow();
177-
}
182+
// Always interrupt. This won't cause any *tasks* to be interrupted, since the queue thread is
183+
// only responsible for handing them out.
184+
queueThreadService.shutdownNow();
178185

179186
return interruptTasks
180187
? shutdownManager.shutdownExecutorNowUntimed(
@@ -190,6 +197,7 @@ public void awaitTermination(long timeout, TimeUnit unit) {
190197
// timeout duration if no task was ever submitted.
191198
return;
192199
}
200+
193201
ShutdownManager.awaitTermination(queueThreadService, unit.toMillis(timeout));
194202
}
195203
}

0 commit comments

Comments
 (0)