@@ -107,6 +107,7 @@ private void processQueue() {
107107 afterReservedCallback .apply (request .task );
108108 } catch (InterruptedException e ) {
109109 Thread .currentThread ().interrupt ();
110+ return ;
110111 } catch (Throwable e ) {
111112 // Fail the workflow task if something went wrong executing the local activity (at the
112113 // executor level, otherwise, the LA handler itself should be handling errors)
@@ -120,6 +121,11 @@ private void processQueue() {
120121 LocalActivityResult .processingFailed (
121122 executionContext .getActivityId (), request .task .getAttemptTask ().getAttempt (), e ));
122123 }
124+ if (e .getCause () instanceof InterruptedException ) {
125+ // It's possible the interrupt happens inside the callback, so check that as well.
126+ Thread .currentThread ().interrupt ();
127+ return ;
128+ }
123129 }
124130 }
125131 }
@@ -170,11 +176,9 @@ public boolean isTerminated() {
170176 @ Override
171177 public CompletableFuture <Void > shutdown (ShutdownManager shutdownManager , boolean interruptTasks ) {
172178 running = false ;
173- if (requestQueue .isEmpty ()) {
174- // Just interrupt the thread, so that if we're waiting on blocking take the thread will
175- // be interrupted and exit. Otherwise the loop will exit once the queue is empty.
176- queueThreadService .shutdownNow ();
177- }
179+ // Always interrupt. This won't cause any *tasks* to be interrupted, since the queue thread is
180+ // only responsible for handing them out.
181+ queueThreadService .shutdownNow ();
178182
179183 return interruptTasks
180184 ? shutdownManager .shutdownExecutorNowUntimed (
@@ -190,6 +194,7 @@ public void awaitTermination(long timeout, TimeUnit unit) {
190194 // timeout duration if no task was ever submitted.
191195 return ;
192196 }
197+
193198 ShutdownManager .awaitTermination (queueThreadService , unit .toMillis (timeout ));
194199 }
195200}
0 commit comments