|
14 | 14 | import java.util.concurrent.CancellationException; |
15 | 15 | import java.util.concurrent.CompletableFuture; |
16 | 16 | import java.util.concurrent.ThreadPoolExecutor; |
17 | | -import java.util.concurrent.TimeUnit; |
18 | | -import java.util.concurrent.TimeoutException; |
19 | 17 | import java.util.concurrent.atomic.AtomicReference; |
20 | 18 | import java.util.stream.Collectors; |
21 | 19 | import org.slf4j.Logger; |
@@ -282,35 +280,38 @@ public CompletableFuture<Operation> pollForOperationUpdates(String operationId, |
282 | 280 | /** Shutdown the checkpoint batcher. */ |
283 | 281 | @Override |
284 | 282 | public void close() { |
285 | | - checkpointManager.shutdown(); |
286 | | - |
287 | 283 | validateRunningThreads(); |
| 284 | + |
| 285 | + checkpointManager.shutdown(); |
288 | 286 | } |
289 | 287 |
|
290 | 288 | private void validateRunningThreads() { |
291 | 289 | // This will detect stuck user thread and thread leaks in the thread pool |
292 | 290 | for (BaseDurableOperation op : registeredOperations.values()) { |
293 | 291 | var userHandlerFuture = op.getRunningUserHandler(); |
294 | 292 | if (userHandlerFuture != null && !userHandlerFuture.isDone()) { |
295 | | - // We wait a few more milliseconds for the last user thread to complete because |
296 | | - // it may have deregistered itself but haven't released the thread yet. |
| 293 | + // Some user threads can still be running because |
| 294 | + // the operations that run them have never been waiting for and the execution has completed. |
297 | 295 | logger.info("Waiting for operation to complete before shutting down: {}", op.getOperationId()); |
298 | 296 | try { |
299 | 297 | userHandlerFuture.get(); |
300 | 298 | } catch (InterruptedException | CancellationException e) { |
301 | 299 | // if the user handler is stuck |
302 | | - throw new IllegalStateException("Stuck running user handler when shutting down: " + op.getOperationId()); |
| 300 | + throw new IllegalStateException( |
| 301 | + "Stuck running user handler when shutting down: " + op.getOperationId()); |
303 | 302 | } catch (Exception e) { |
304 | 303 | // ok if the future completed exceptionally |
305 | 304 | } |
306 | 305 | } |
307 | 306 | } |
308 | 307 |
|
| 308 | + // double check if the thread pool is empty |
309 | 309 | if (durableConfig.getExecutorService() instanceof ThreadPoolExecutor threadPoolExecutor) { |
310 | 310 | var threadCount = threadPoolExecutor.getActiveCount(); |
311 | 311 | if (threadCount > 0) { |
312 | | - // this may or may not be a problem because getActiveCount doesn't return an accurate number |
313 | 312 | logger.warn("{} active threads in user executor pool when shutting down", threadCount); |
| 313 | + throw new IllegalStateException( |
| 314 | + "Threads still running on user executor pool when shutting down: " + threadCount); |
314 | 315 | } |
315 | 316 | } |
316 | 317 | } |
|
0 commit comments