Skip to content

Commit a6e717f

Browse files
authored
Clean up dead RPC thread config and use node-specific selectorNum (#17551)
1 parent f5fbaa2 commit a6e717f

32 files changed

Lines changed: 324 additions & 171 deletions

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -163,22 +163,7 @@ public static Map<Integer, Long> listPortOccupationWindows(final List<Integer> p
163163
*/
164164
public static Map<Integer, Long> listPortOccupationUnix(final List<Integer> ports)
165165
throws IOException {
166-
return listPortOccupation(ports, "lsof -iTCP -sTCP:LISTEN -P -n", 10, 9, 1);
167-
}
168-
169-
private static String getSearchAvailablePortCmd(final List<Integer> ports) {
170-
return SystemUtils.IS_OS_WINDOWS ? getWindowsSearchPortCmd(ports) : getUnixSearchPortCmd(ports);
171-
}
172-
173-
private static String getWindowsSearchPortCmd(final List<Integer> ports) {
174-
return "netstat -aon -p tcp | findStr "
175-
+ ports.stream().map(v -> "/C:\"127.0.0.1:" + v + "\"").collect(Collectors.joining(" "));
176-
}
177-
178-
private static String getUnixSearchPortCmd(final List<Integer> ports) {
179-
return "lsof -iTCP -sTCP:LISTEN -P -n | awk '{print $9}' | grep -E "
180-
+ ports.stream().map(String::valueOf).collect(Collectors.joining("|"))
181-
+ "\"";
166+
return listPortOccupation(ports, "lsof -iTCP -sTCP:LISTEN,TIME_WAIT -P -n", 10, 9, 1);
182167
}
183168

184169
private static Pair<Integer, Integer> getClusterNodesNum(final int index) {

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ public abstract class MppBaseConfig {
4040
/** Create an empty MppPersistentConfig. */
4141
protected MppBaseConfig() {
4242
this.properties = new Properties();
43+
this.properties.setProperty("cn_selector_thread_nums_of_client_manager", "1");
44+
this.properties.setProperty("dn_selector_thread_nums_of_client_manager", "1");
4345
}
4446

4547
/**

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iotdb.commons.client.IClientManager;
2626
import org.apache.iotdb.commons.client.async.AsyncAINodeInternalServiceClient;
2727
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.AINodeHeartbeatHandler;
28+
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
2829

2930
/** Asynchronously send RPC requests to AINodes. */
3031
public class AsyncAINodeHeartbeatClientPool {
@@ -35,7 +36,8 @@ private AsyncAINodeHeartbeatClientPool() {
3536
clientManager =
3637
new IClientManager.Factory<TEndPoint, AsyncAINodeInternalServiceClient>()
3738
.createClientManager(
38-
new ClientPoolFactory.AsyncAINodeHeartbeatServiceClientPoolFactory());
39+
new ClientPoolFactory.AsyncAINodeHeartbeatServiceClientPoolFactory(
40+
ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager()));
3941
}
4042

4143
/**

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.commons.client.IClientManager;
2525
import org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClient;
2626
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
27+
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
2728
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatReq;
2829

2930
public class AsyncConfigNodeHeartbeatClientPool {
@@ -34,7 +35,8 @@ private AsyncConfigNodeHeartbeatClientPool() {
3435
clientManager =
3536
new IClientManager.Factory<TEndPoint, AsyncConfigNodeInternalServiceClient>()
3637
.createClientManager(
37-
new ClientPoolFactory.AsyncConfigNodeHeartbeatServiceClientPoolFactory());
38+
new ClientPoolFactory.AsyncConfigNodeHeartbeatServiceClientPoolFactory(
39+
ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager()));
3840
}
3941

4042
/**

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
2626
import org.apache.iotdb.confignode.client.async.handlers.audit.DataNodeWriteAuditLogHandler;
2727
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
28+
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
2829
import org.apache.iotdb.mpp.rpc.thrift.TAuditLogReq;
2930
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq;
3031

@@ -37,7 +38,8 @@ private AsyncDataNodeHeartbeatClientPool() {
3738
clientManager =
3839
new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
3940
.createClientManager(
40-
new ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory());
41+
new ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory(
42+
ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager()));
4143
}
4244

4345
/**

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.iotdb.confignode.client.async.handlers.rpc.ConfigNodeAsyncRequestRPCHandler;
3131
import org.apache.iotdb.confignode.client.async.handlers.rpc.ConfigNodeTSStatusRPCHandler;
3232
import org.apache.iotdb.confignode.client.async.handlers.rpc.SubmitTestConnectionTaskToConfigNodeRPCHandler;
33+
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
3334

3435
import org.slf4j.Logger;
3536
import org.slf4j.LoggerFactory;
@@ -40,6 +41,10 @@ public class CnToCnInternalServiceAsyncRequestManager
4041
private static final Logger LOGGER =
4142
LoggerFactory.getLogger(CnToCnInternalServiceAsyncRequestManager.class);
4243

44+
public CnToCnInternalServiceAsyncRequestManager(int selectorNumOfAsyncClientManager) {
45+
super(selectorNumOfAsyncClientManager);
46+
}
47+
4348
@Override
4449
protected void initActionMapBuilder() {
4550
actionMapBuilder.put(
@@ -71,7 +76,8 @@ protected void adjustClientTimeoutIfNecessary(
7176

7277
private static class ClientPoolHolder {
7378
private static final CnToCnInternalServiceAsyncRequestManager INSTANCE =
74-
new CnToCnInternalServiceAsyncRequestManager();
79+
new CnToCnInternalServiceAsyncRequestManager(
80+
ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager());
7581

7682
private ClientPoolHolder() {
7783
// Empty constructor

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.CheckSchemaRegionUsingTemplateRPCHandler;
4949
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.ConsumerGroupPushMetaRPCHandler;
5050
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.TopicPushMetaRPCHandler;
51+
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
5152
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
5253
import org.apache.iotdb.mpp.rpc.thrift.TAlterEncodingCompressorReq;
5354
import org.apache.iotdb.mpp.rpc.thrift.TAlterTimeSeriesReq;
@@ -120,6 +121,10 @@ public class CnToDnInternalServiceAsyncRequestManager
120121
private static final Logger LOGGER =
121122
LoggerFactory.getLogger(CnToDnInternalServiceAsyncRequestManager.class);
122123

124+
private CnToDnInternalServiceAsyncRequestManager() {
125+
super(ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager());
126+
}
127+
123128
@SuppressWarnings("unchecked")
124129
@Override
125130
protected void initActionMapBuilder() {

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,17 @@ public class ConfigNodeConfig {
145145
*/
146146
private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
147147

148+
private int maxIdleClientNumForEachNode = DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE;
149+
150+
/**
151+
* ClientManager will have so many selector threads (TAsyncClientManager) to distribute to its
152+
* clients.
153+
*/
154+
private int selectorNumOfClientManager =
155+
Runtime.getRuntime().availableProcessors() / 4 > 0
156+
? Runtime.getRuntime().availableProcessors() / 4
157+
: 1;
158+
148159
/** System directory, including version file for each database and metadata. */
149160
private String systemDir =
150161
IoTDBConstant.CN_DEFAULT_DATA_DIR + File.separator + IoTDBConstant.SYSTEM_FOLDER_NAME;
@@ -460,6 +471,23 @@ public ConfigNodeConfig setMaxClientNumForEachNode(int maxClientNumForEachNode)
460471
return this;
461472
}
462473

474+
public int getMaxIdleClientNumForEachNode() {
475+
return maxIdleClientNumForEachNode;
476+
}
477+
478+
public ConfigNodeConfig setMaxIdleClientNumForEachNode(int maxIdleClientNumForEachNode) {
479+
this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode;
480+
return this;
481+
}
482+
483+
public int getSelectorNumOfClientManager() {
484+
return selectorNumOfClientManager;
485+
}
486+
487+
public void setSelectorNumOfClientManager(int selectorNumOfClientManager) {
488+
this.selectorNumOfClientManager = selectorNumOfClientManager;
489+
}
490+
463491
public String getConsensusDir() {
464492
return consensusDir;
465493
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,24 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio
271271
"cn_max_client_count_for_each_node_in_client_manager",
272272
String.valueOf(conf.getMaxClientNumForEachNode()))));
273273

274+
int cnMaxIdleClientNumForEachNode =
275+
Integer.parseInt(
276+
properties.getProperty(
277+
"cn_max_idle_client_count_for_each_node_in_client_manager",
278+
String.valueOf(conf.getMaxIdleClientNumForEachNode())));
279+
if (cnMaxIdleClientNumForEachNode >= 0) {
280+
conf.setMaxIdleClientNumForEachNode(cnMaxIdleClientNumForEachNode);
281+
}
282+
283+
int cnSelectorNumOfClientManager =
284+
Integer.parseInt(
285+
properties.getProperty(
286+
"cn_selector_thread_nums_of_client_manager",
287+
String.valueOf(conf.getSelectorNumOfClientManager())));
288+
if (cnSelectorNumOfClientManager > 0) {
289+
conf.setSelectorNumOfClientManager(cnSelectorNumOfClientManager);
290+
}
291+
274292
conf.setSystemDir(properties.getProperty("cn_system_dir", conf.getSystemDir()));
275293

276294
conf.setConsensusDir(properties.getProperty("cn_consensus_dir", conf.getConsensusDir()));

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,6 @@ public class IoTDBConfig {
137137
/** Port which the JDBC server listens to. */
138138
private int rpcPort = 6667;
139139

140-
/** Rpc Selector thread num */
141-
private int rpcSelectorThreadCount = 1;
142-
143140
/** Max concurrent client number */
144141
private int rpcMaxConcurrentClientNum = 1000;
145142

@@ -953,6 +950,8 @@ public class IoTDBConfig {
953950
*/
954951
private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
955952

953+
private int maxIdleClientNumForEachNode = DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE;
954+
956955
/**
957956
* Cache size of partition cache in {@link
958957
* org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher}
@@ -989,9 +988,6 @@ public class IoTDBConfig {
989988
/** ThreadPool size for read operation in coordinator */
990989
private int coordinatorReadExecutorSize = 20;
991990

992-
/** ThreadPool size for write operation in coordinator */
993-
private int coordinatorWriteExecutorSize = 50;
994-
995991
/** Policy of DataNodeSchemaCache eviction */
996992
private String dataNodeSchemaCacheEvictionPolicy = "FIFO";
997993

@@ -1823,14 +1819,6 @@ public void setUnSeqTsFileSize(long unSeqTsFileSize) {
18231819
this.unSeqTsFileSize = unSeqTsFileSize;
18241820
}
18251821

1826-
public int getRpcSelectorThreadCount() {
1827-
return rpcSelectorThreadCount;
1828-
}
1829-
1830-
public void setRpcSelectorThreadCount(int rpcSelectorThreadCount) {
1831-
this.rpcSelectorThreadCount = rpcSelectorThreadCount;
1832-
}
1833-
18341822
public int getRpcMaxConcurrentClientNum() {
18351823
return rpcMaxConcurrentClientNum;
18361824
}
@@ -3202,6 +3190,14 @@ public void setMaxClientNumForEachNode(int maxClientNumForEachNode) {
32023190
this.maxClientNumForEachNode = maxClientNumForEachNode;
32033191
}
32043192

3193+
public int getMaxIdleClientNumForEachNode() {
3194+
return maxIdleClientNumForEachNode;
3195+
}
3196+
3197+
public void setMaxIdleClientNumForEachNode(int maxIdleClientNumForEachNode) {
3198+
this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode;
3199+
}
3200+
32053201
public int getSelectorNumOfClientManager() {
32063202
return selectorNumOfClientManager;
32073203
}
@@ -3349,14 +3345,6 @@ public void setCoordinatorReadExecutorSize(int coordinatorReadExecutorSize) {
33493345
this.coordinatorReadExecutorSize = coordinatorReadExecutorSize;
33503346
}
33513347

3352-
public int getCoordinatorWriteExecutorSize() {
3353-
return coordinatorWriteExecutorSize;
3354-
}
3355-
3356-
public void setCoordinatorWriteExecutorSize(int coordinatorWriteExecutorSize) {
3357-
this.coordinatorWriteExecutorSize = coordinatorWriteExecutorSize;
3358-
}
3359-
33603348
public TEndPoint getAddressAndPort() {
33613349
return new TEndPoint(rpcAddress, rpcPort);
33623350
}

0 commit comments

Comments
 (0)