Skip to content

Commit f1517ca

Browse files
CaideyipiJackieTien97
authored andcommitted
Pipe: Fixed the default param of single entry of disruptor queue & Banned memory checks from some missing pipe ITs & Do not check non-user pipes (#16069)
* Update CommonConfig.java * refactor * try-fix * test * revert-pom * fix * fix (cherry picked from commit 19810e5)
1 parent c44d51d commit f1517ca

27 files changed

Lines changed: 177 additions & 97 deletions

File tree

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,12 @@ public CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode) {
453453
return this;
454454
}
455455

456+
@Override
457+
public CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementEnabled) {
458+
setProperty("pipe_memory_management_enabled", String.valueOf(pipeMemoryManagementEnabled));
459+
return this;
460+
}
461+
456462
@Override
457463
public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) {
458464
setProperty("pipe_enable_memory_checked", String.valueOf(isPipeEnableMemoryCheck));

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,13 @@ public CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode) {
466466
return this;
467467
}
468468

469+
@Override
470+
public CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementEnabled) {
471+
dnConfig.setPipeMemoryManagementEnabled(pipeMemoryManagementEnabled);
472+
cnConfig.setPipeMemoryManagementEnabled(pipeMemoryManagementEnabled);
473+
return this;
474+
}
475+
469476
@Override
470477
public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) {
471478
dnConfig.setIsPipeEnableMemoryCheck(isPipeEnableMemoryCheck);

integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,11 @@ public CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode) {
328328
return this;
329329
}
330330

331+
@Override
332+
public CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementEnabled) {
333+
return this;
334+
}
335+
331336
@Override
332337
public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) {
333338
return this;

integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ CommonConfig setEnableAutoLeaderBalanceForIoTConsensus(
146146

147147
CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode);
148148

149+
CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementEnabled);
150+
149151
CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck);
150152

151153
CommonConfig setPipeAirGapReceiverEnabled(boolean isPipeAirGapReceiverEnabled);

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,15 @@ protected void setupConfig() {
4949
.setAutoCreateSchemaEnabled(true)
5050
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
5151
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
52+
.setPipeMemoryManagementEnabled(false)
5253
.setIsPipeEnableMemoryCheck(false);
5354
receiverEnv
5455
.getConfig()
5556
.getCommonConfig()
5657
.setAutoCreateSchemaEnabled(true)
5758
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
5859
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
60+
.setPipeMemoryManagementEnabled(false)
5961
.setIsPipeEnableMemoryCheck(false);
6062

6163
// 10 min, assert that the operations will not time out

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ public void setUp() {
6868
.setDefaultSchemaRegionGroupNumPerDatabase(1)
6969
.setTimestampPrecision("ms")
7070
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
71-
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
71+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
72+
.setPipeMemoryManagementEnabled(false)
73+
.setIsPipeEnableMemoryCheck(false);
7274
receiverEnv
7375
.getConfig()
7476
.getCommonConfig()
@@ -77,6 +79,8 @@ public void setUp() {
7779
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
7880
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
7981
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
82+
.setPipeMemoryManagementEnabled(false)
83+
.setIsPipeEnableMemoryCheck(false)
8084
.setSchemaReplicationFactor(3)
8185
.setDataReplicationFactor(2);
8286

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,10 @@ private void innerSetUp(
7676
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
7777
.setDataRegionConsensusProtocolClass(dataRegionConsensus)
7878
.setSchemaReplicationFactor(schemaRegionReplicationFactor)
79-
.setDataReplicationFactor(dataRegionReplicationFactor);
79+
.setDataReplicationFactor(dataRegionReplicationFactor)
80+
.setDnConnectionTimeoutMs(600000)
81+
.setPipeMemoryManagementEnabled(false)
82+
.setIsPipeEnableMemoryCheck(false);
8083
receiverEnv
8184
.getConfig()
8285
.getCommonConfig()
@@ -85,11 +88,10 @@ private void innerSetUp(
8588
.setSchemaRegionConsensusProtocolClass(schemaRegionConsensus)
8689
.setDataRegionConsensusProtocolClass(dataRegionConsensus)
8790
.setSchemaReplicationFactor(schemaRegionReplicationFactor)
88-
.setDataReplicationFactor(dataRegionReplicationFactor);
89-
90-
// 10 min, assert that the operations will not time out
91-
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
92-
receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
91+
.setDataReplicationFactor(dataRegionReplicationFactor)
92+
.setDnConnectionTimeoutMs(600000)
93+
.setPipeMemoryManagementEnabled(false)
94+
.setIsPipeEnableMemoryCheck(false);
9395

9496
senderEnv.initClusterEnvironment(configNodesNum, dataNodesNum);
9597
receiverEnv.initClusterEnvironment(configNodesNum, dataNodesNum);
@@ -170,7 +172,10 @@ public void testPipeOnBothSenderAndReceiver() throws Exception {
170172
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
171173
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
172174
.setSchemaReplicationFactor(3)
173-
.setDataReplicationFactor(2);
175+
.setDataReplicationFactor(2)
176+
.setDnConnectionTimeoutMs(600000)
177+
.setPipeMemoryManagementEnabled(false)
178+
.setIsPipeEnableMemoryCheck(false);
174179
receiverEnv
175180
.getConfig()
176181
.getCommonConfig()
@@ -179,11 +184,10 @@ public void testPipeOnBothSenderAndReceiver() throws Exception {
179184
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
180185
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
181186
.setSchemaReplicationFactor(1)
182-
.setDataReplicationFactor(1);
183-
184-
// 10 min, assert that the operations will not time out
185-
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
186-
receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
187+
.setDataReplicationFactor(1)
188+
.setDnConnectionTimeoutMs(600000)
189+
.setPipeMemoryManagementEnabled(false)
190+
.setIsPipeEnableMemoryCheck(false);
187191

188192
senderEnv.initClusterEnvironment(3, 3);
189193
receiverEnv.initClusterEnvironment(1, 1);
@@ -379,7 +383,10 @@ private void doTestUseNodeUrls(String connectorName) throws Exception {
379383
.setDataReplicationFactor(1)
380384
.setEnableSeqSpaceCompaction(false)
381385
.setEnableUnseqSpaceCompaction(false)
382-
.setEnableCrossSpaceCompaction(false);
386+
.setEnableCrossSpaceCompaction(false)
387+
.setDnConnectionTimeoutMs(600000)
388+
.setPipeMemoryManagementEnabled(false)
389+
.setIsPipeEnableMemoryCheck(false);
383390
receiverEnv
384391
.getConfig()
385392
.getCommonConfig()
@@ -389,11 +396,10 @@ private void doTestUseNodeUrls(String connectorName) throws Exception {
389396
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
390397
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
391398
.setSchemaReplicationFactor(3)
392-
.setDataReplicationFactor(2);
393-
394-
// 10 min, assert that the operations will not time out
395-
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
396-
receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
399+
.setDataReplicationFactor(2)
400+
.setDnConnectionTimeoutMs(600000)
401+
.setPipeMemoryManagementEnabled(false)
402+
.setIsPipeEnableMemoryCheck(false);
397403

398404
senderEnv.initClusterEnvironment(1, 1);
399405
receiverEnv.initClusterEnvironment(1, 3);

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,17 +71,20 @@ public void setUp() {
7171
// Disable sender compaction for tsfile determination in loose range test
7272
.setEnableSeqSpaceCompaction(false)
7373
.setEnableUnseqSpaceCompaction(false)
74-
.setEnableCrossSpaceCompaction(false);
74+
.setEnableCrossSpaceCompaction(false)
75+
.setDnConnectionTimeoutMs(600000)
76+
.setPipeMemoryManagementEnabled(false)
77+
.setIsPipeEnableMemoryCheck(false);
78+
7579
receiverEnv
7680
.getConfig()
7781
.getCommonConfig()
7882
.setAutoCreateSchemaEnabled(true)
7983
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
80-
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
81-
82-
// 10 min, assert that the operations will not time out
83-
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
84-
receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
84+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
85+
.setDnConnectionTimeoutMs(600000)
86+
.setPipeMemoryManagementEnabled(false)
87+
.setIsPipeEnableMemoryCheck(false);
8588

8689
senderEnv.initClusterEnvironment();
8790
receiverEnv.initClusterEnvironment();

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,10 @@ public void setUp() {
7474
.setAutoCreateSchemaEnabled(true)
7575
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
7676
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
77-
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
77+
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
78+
.setDnConnectionTimeoutMs(600000)
79+
.setPipeMemoryManagementEnabled(false)
80+
.setIsPipeEnableMemoryCheck(false);
7881

7982
receiverEnv
8083
.getConfig()
@@ -84,11 +87,10 @@ public void setUp() {
8487
.setSchemaReplicationFactor(3)
8588
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
8689
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
87-
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
88-
89-
// 10 min, assert that the operations will not time out
90-
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
91-
receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
90+
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
91+
.setDnConnectionTimeoutMs(600000)
92+
.setPipeMemoryManagementEnabled(false)
93+
.setIsPipeEnableMemoryCheck(false);
9294

9395
senderEnv.initClusterEnvironment(3, 3, 180);
9496
receiverEnv.initClusterEnvironment(3, 3, 180);

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,15 @@ public void setUp() {
6262
.setTimestampPrecision("ms")
6363
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
6464
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
65+
.setPipeMemoryManagementEnabled(false)
6566
.setIsPipeEnableMemoryCheck(false);
6667
receiverEnv
6768
.getConfig()
6869
.getCommonConfig()
6970
.setAutoCreateSchemaEnabled(true)
7071
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
7172
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
73+
.setPipeMemoryManagementEnabled(false)
7274
.setIsPipeEnableMemoryCheck(false);
7375

7476
// 10 min, assert that the operations will not time out

0 commit comments

Comments
 (0)