Skip to content

Commit e4fc758

Browse files
authored
Pipe: Optimized the thread executor of the terminate event (#17638)
1 parent 5029a0a commit e4fc758

1 file changed

Lines changed: 21 additions & 10 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,28 @@ public class PipeTerminateEvent extends EnrichedEvent {
4949

5050
private final boolean shouldMark;
5151

52+
private static final int TERMINATE_EXECUTOR_THREAD_COUNT =
53+
IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount();
54+
55+
private static final int TERMINATE_EXECUTOR_QUEUE_SIZE =
56+
Math.max(1024, TERMINATE_EXECUTOR_THREAD_COUNT * 64);
57+
5258
// Do not use call run policy to avoid deadlock
53-
private static final ExecutorService terminateExecutor =
54-
new WrappedThreadPoolExecutor(
55-
0,
56-
IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount(),
57-
0L,
58-
TimeUnit.SECONDS,
59-
new ArrayBlockingQueue<>(
60-
IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount()),
61-
new IoTThreadFactory(ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()),
62-
ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName());
59+
private static final ExecutorService terminateExecutor = createTerminateExecutor();
60+
61+
private static ExecutorService createTerminateExecutor() {
62+
final WrappedThreadPoolExecutor executor =
63+
new WrappedThreadPoolExecutor(
64+
TERMINATE_EXECUTOR_THREAD_COUNT,
65+
TERMINATE_EXECUTOR_THREAD_COUNT,
66+
60L,
67+
TimeUnit.SECONDS,
68+
new ArrayBlockingQueue<>(TERMINATE_EXECUTOR_QUEUE_SIZE),
69+
new IoTThreadFactory(ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()),
70+
ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName());
71+
executor.allowCoreThreadTimeOut(true);
72+
return executor;
73+
}
6374

6475
public PipeTerminateEvent(
6576
final String pipeName,

0 commit comments

Comments
 (0)