@@ -48,17 +48,28 @@ public class PipeTerminateEvent extends EnrichedEvent {
4848
4949 private final boolean shouldMark ;
5050
51+ private static final int TERMINATE_EXECUTOR_THREAD_COUNT =
52+ IoTDBDescriptor .getInstance ().getConfig ().getPipeTaskThreadCount ();
53+
54+ private static final int TERMINATE_EXECUTOR_QUEUE_SIZE =
55+ Math .max (1024 , TERMINATE_EXECUTOR_THREAD_COUNT * 64 );
56+
5157 // Do not use call run policy to avoid deadlock
52- private static final ExecutorService terminateExecutor =
53- new WrappedThreadPoolExecutor (
54- 0 ,
55- IoTDBDescriptor .getInstance ().getConfig ().getPipeTaskThreadCount (),
56- 0L ,
57- TimeUnit .SECONDS ,
58- new ArrayBlockingQueue <>(
59- IoTDBDescriptor .getInstance ().getConfig ().getPipeTaskThreadCount ()),
60- new IoTThreadFactory (ThreadName .PIPE_TERMINATE_EXECUTION_POOL .getName ()),
61- ThreadName .PIPE_TERMINATE_EXECUTION_POOL .getName ());
58+ private static final ExecutorService terminateExecutor = createTerminateExecutor ();
59+
60+ private static ExecutorService createTerminateExecutor () {
61+ final WrappedThreadPoolExecutor executor =
62+ new WrappedThreadPoolExecutor (
63+ TERMINATE_EXECUTOR_THREAD_COUNT ,
64+ TERMINATE_EXECUTOR_THREAD_COUNT ,
65+ 60L ,
66+ TimeUnit .SECONDS ,
67+ new ArrayBlockingQueue <>(TERMINATE_EXECUTOR_QUEUE_SIZE ),
68+ new IoTThreadFactory (ThreadName .PIPE_TERMINATE_EXECUTION_POOL .getName ()),
69+ ThreadName .PIPE_TERMINATE_EXECUTION_POOL .getName ());
70+ executor .allowCoreThreadTimeOut (true );
71+ return executor ;
72+ }
6273
6374 public PipeTerminateEvent (
6475 final String pipeName ,
0 commit comments