diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index b77d8916b205b..2c1059272da87 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -429,6 +429,12 @@ public CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode) { return this; } + @Override + public CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementEnabled) { + setProperty("pipe_memory_management_enabled", String.valueOf(pipeMemoryManagementEnabled)); + return this; + } + @Override public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) { setProperty("pipe_enable_memory_checked", String.valueOf(isPipeEnableMemoryCheck)); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index 2461c1e6ba2d4..d131bf862c099 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -438,6 +438,13 @@ public CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode) { return this; } + @Override + public CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementEnabled) { + dnConfig.setPipeMemoryManagementEnabled(pipeMemoryManagementEnabled); + cnConfig.setPipeMemoryManagementEnabled(pipeMemoryManagementEnabled); + return this; + } + @Override public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) { dnConfig.setIsPipeEnableMemoryCheck(isPipeEnableMemoryCheck); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index e1de42382b6a6..32061709b271f 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -308,6 +308,11 @@ public CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode) { return this; } + @Override + public CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementEnabled) { + return this; + } + @Override public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) { return this; diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index 09a4adce9440b..24e23a31e5f9a 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -138,6 +138,8 @@ CommonConfig setEnableAutoLeaderBalanceForIoTConsensus( CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode); + CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementEnabled); + CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck); CommonConfig setPipeAirGapReceiverEnabled(boolean isPipeAirGapReceiverEnabled); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java index 59478fc36e67f..6d3f2e85d8bd5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java @@ -49,6 +49,7 @@ protected void setupConfig() { .setAutoCreateSchemaEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() @@ -56,6 +57,7 @@ protected void setupConfig() { .setAutoCreateSchemaEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java index 9da5591bb2f76..b89483840cb58 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java @@ -60,6 +60,7 @@ public void setUp() { .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() @@ -68,6 +69,7 @@ public void setUp() { .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java index c8d5bc0b47174..3a4ca6fcada14 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java @@ -75,6 +75,7 @@ public void setUp() { .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv @@ -86,6 +87,7 @@ public void setUp() { .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java index 771d50c97c68a..4e1270577d814 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeIdempotentIT.java @@ -66,6 +66,7 @@ public void setUp() { .setDefaultSchemaRegionGroupNumPerDatabase(1) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() @@ -73,6 +74,7 @@ public void setUp() { .setAutoCreateSchemaEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java index 13a63a585a8dc..bbf4f206b59f5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java @@ -60,6 +60,7 @@ public void setUp() { .setTimestampPrecision("ms") .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() @@ -67,6 +68,7 @@ public void setUp() { .setAutoCreateSchemaEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java index 1f64676852579..ed68cc23ebada 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java @@ -75,6 +75,7 @@ private void innerSetUp( .setDataRegionConsensusProtocolClass(dataRegionConsensus) .setSchemaReplicationFactor(schemaRegionReplicationFactor) .setDataReplicationFactor(dataRegionReplicationFactor) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() @@ -85,6 +86,7 @@ private void innerSetUp( .setDataRegionConsensusProtocolClass(dataRegionConsensus) .setSchemaReplicationFactor(schemaRegionReplicationFactor) .setDataReplicationFactor(dataRegionReplicationFactor) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java index 9a39b0b2f4ada..59cfa4321e4b0 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java @@ -67,6 +67,7 @@ public void setUp() { .setAutoCreateSchemaEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv @@ -76,6 +77,7 @@ public void setUp() { .setPipeAirGapReceiverEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java index 117d7396ec174..fe3c550334436 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java @@ -72,6 +72,7 @@ public void setUp() { .setEnableSeqSpaceCompaction(false) .setEnableUnseqSpaceCompaction(false) .setEnableCrossSpaceCompaction(false) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() @@ -79,6 +80,7 @@ public void setUp() { .setAutoCreateSchemaEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeWithLoadIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeWithLoadIT.java index c71e212b629a3..d87aa3b5fae1c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeWithLoadIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeWithLoadIT.java @@ -62,6 +62,7 @@ public void setUp() { .setEnableSeqSpaceCompaction(false) .setEnableUnseqSpaceCompaction(false) .setEnableCrossSpaceCompaction(false) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() @@ -69,6 +70,7 @@ public void setUp() { .setAutoCreateSchemaEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java index b7091a1db3299..a13e8dc152dbe 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java @@ -49,6 +49,7 @@ protected void setupConfig() { .setAutoCreateSchemaEnabled(false) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() @@ -56,6 +57,7 @@ protected void setupConfig() { .setAutoCreateSchemaEnabled(false) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java index 8156139b84ceb..21490c2687668 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java @@ -83,6 +83,7 @@ public void setUp() throws Exception { .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) .setSchemaReplicationFactor(3) .setDataReplicationFactor(2) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); EnvFactory.getEnv().initClusterEnvironment(3, 3); diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java index 7b5640234980c..fe667480c86bf 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java @@ -37,6 +37,7 @@ public void setUp() throws Exception { .getConfig() .getCommonConfig() .setSubscriptionEnabled(true) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); EnvFactory.getEnv().initClusterEnvironment(); diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java index e758bcb5a1719..6b125222e6135 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java @@ -74,6 +74,7 @@ protected void setUpConfig() { sender .getConfig() .getCommonConfig() + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false) .setSubscriptionPrefetchTsFileBatchMaxDelayInMs(500) .setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(32 * 1024); diff --git a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java index 115dafbc5dc22..88b426d9a4af9 100644 --- a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java @@ -57,6 +57,7 @@ public static void setUp() throws Exception { .getConfig() .getCommonConfig() .setSubscriptionEnabled(true) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); EnvFactory.getEnv().initClusterEnvironment(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 2fc0e6b02a8a2..43b6d4e3b5f91 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -30,6 +30,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.task.PipeTask; import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; @@ -159,18 +160,17 @@ protected void createPipeTask( final PipeTaskMeta pipeTaskMeta) throws IllegalPathException { if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) { - final PipeParameters extractorParameters = pipeStaticMeta.getExtractorParameters(); + final PipeParameters sourceParameters = pipeStaticMeta.getExtractorParameters(); final DataRegionId dataRegionId = new DataRegionId(consensusGroupId); final boolean needConstructDataRegionTask = StorageEngine.getInstance().getAllDataRegionIds().contains(dataRegionId) && DataRegionListeningFilter.shouldDataRegionBeListened( - extractorParameters, dataRegionId); + sourceParameters, dataRegionId); final boolean needConstructSchemaRegionTask = SchemaEngine.getInstance() .getAllSchemaRegionIds() .contains(new SchemaRegionId(consensusGroupId)) - && !SchemaRegionListeningFilter.parseListeningPlanTypeSet(extractorParameters) - .isEmpty(); + && !SchemaRegionListeningFilter.parseListeningPlanTypeSet(sourceParameters).isEmpty(); // Advance the extractor parameters parsing logic to avoid creating un-relevant pipeTasks if (needConstructDataRegionTask || needConstructSchemaRegionTask) { @@ -410,7 +410,7 @@ private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) thro || pipeTaskMap.entrySet().stream() .filter(entry -> dataRegionIds.contains(entry.getKey())) .allMatch(entry -> ((PipeDataNodeTask) entry.getValue()).isCompleted()); - final String extractorModeValue = + final String sourceModeValue = pipeMeta .getStaticMeta() .getExtractorParameters() @@ -422,9 +422,8 @@ private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) thro DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair( pipeMeta.getStaticMeta().getExtractorParameters()) .getLeft() - && (extractorModeValue.equalsIgnoreCase( - PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE) - || extractorModeValue.equalsIgnoreCase( + && (sourceModeValue.equalsIgnoreCase(PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE) + || sourceModeValue.equalsIgnoreCase( PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE)); final boolean isCompleted = isAllDataRegionCompleted && includeDataAndNeedDrop; @@ -665,24 +664,24 @@ public Map getAllConsensusPipe() { @Override protected void calculateMemoryUsage( - final PipeParameters extractorParameters, + final PipeStaticMeta staticMeta, + final PipeParameters sourceParameters, final PipeParameters processorParameters, - final PipeParameters connectorParameters) { - if (!PipeConfig.getInstance().isPipeEnableMemoryCheck()) { + final PipeParameters sinkParameters) { + if (!PipeConfig.getInstance().isPipeEnableMemoryCheck() + || !isInnerSource(sourceParameters) + || !PipeType.USER.equals(staticMeta.getPipeType())) { return; } - calculateInsertNodeQueueMemory(extractorParameters, processorParameters, connectorParameters); + calculateInsertNodeQueueMemory(sourceParameters); long needMemory = 0; - needMemory += - calculateTsFileParserMemory(extractorParameters, processorParameters, connectorParameters); - needMemory += - calculateSinkBatchMemory(extractorParameters, processorParameters, connectorParameters); - needMemory += - calculateSendTsFileReadBufferMemory( - extractorParameters, processorParameters, connectorParameters); + needMemory += calculateTsFileParserMemory(sourceParameters, sinkParameters); + needMemory += calculateSinkBatchMemory(sinkParameters); + needMemory += calculateSendTsFileReadBufferMemory(sourceParameters, sinkParameters); + needMemory += calculateAssignerMemory(sourceParameters); PipeMemoryManager pipeMemoryManager = PipeDataNodeResourceManager.memory(); final long freeMemorySizeInBytes = pipeMemoryManager.getFreeMemorySizeInBytes(); @@ -703,13 +702,22 @@ protected void calculateMemoryUsage( } } - private void calculateInsertNodeQueueMemory( - final PipeParameters extractorParameters, - final PipeParameters processorParameters, - final PipeParameters connectorParameters) { + private boolean isInnerSource(final PipeParameters sourceParameters) { + final String pluginName = + sourceParameters + .getStringOrDefault( + Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY, PipeSourceConstant.SOURCE_KEY), + BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) + .toLowerCase(); - // Realtime extractor is enabled by default, so we only need to check the source realtime - if (!extractorParameters.getBooleanOrDefault( + return pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) + || pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName()); + } + + private void calculateInsertNodeQueueMemory(final PipeParameters sourceParameters) { + + // Realtime source is enabled by default, so we only need to check the source realtime + if (!sourceParameters.getBooleanOrDefault( Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) { return; @@ -717,7 +725,7 @@ private void calculateInsertNodeQueueMemory( // If the realtime mode is batch or file, we do not need to allocate memory final String realtimeMode = - extractorParameters.getStringByKeys( + sourceParameters.getStringByKeys( PipeSourceConstant.EXTRACTOR_REALTIME_MODE_KEY, PipeSourceConstant.SOURCE_REALTIME_MODE_KEY); if (PipeSourceConstant.EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE.equals(realtimeMode) @@ -739,53 +747,50 @@ private void calculateInsertNodeQueueMemory( } private long calculateTsFileParserMemory( - final PipeParameters extractorParameters, - final PipeParameters processorParameters, - final PipeParameters connectorParameters) { + final PipeParameters sourceParameters, final PipeParameters sinkParameters) { - // If the extractor is not history, we do not need to allocate memory + // If the source is not history, we do not need to allocate memory boolean isExtractorHistory = - extractorParameters.getBooleanOrDefault( + sourceParameters.getBooleanOrDefault( SystemConstant.RESTART_KEY, SystemConstant.RESTART_DEFAULT_VALUE) - || extractorParameters.getBooleanOrDefault( + || sourceParameters.getBooleanOrDefault( Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY), EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE); - // If the extractor is history, and has start/end time, we need to allocate memory + // If the source is history, and has start/end time, we need to allocate memory boolean isTSFileParser = isExtractorHistory - && extractorParameters.hasAnyAttributes( + && sourceParameters.hasAnyAttributes( EXTRACTOR_HISTORY_START_TIME_KEY, SOURCE_HISTORY_START_TIME_KEY); isTSFileParser = isTSFileParser || (isExtractorHistory - && extractorParameters.hasAnyAttributes( + && sourceParameters.hasAnyAttributes( EXTRACTOR_HISTORY_END_TIME_KEY, SOURCE_HISTORY_END_TIME_KEY)); - // if the extractor has start/end time, we need to allocate memory + // if the source has start/end time, we need to allocate memory isTSFileParser = isTSFileParser - || extractorParameters.hasAnyAttributes( - SOURCE_START_TIME_KEY, EXTRACTOR_START_TIME_KEY); + || sourceParameters.hasAnyAttributes(SOURCE_START_TIME_KEY, EXTRACTOR_START_TIME_KEY); isTSFileParser = isTSFileParser - || extractorParameters.hasAnyAttributes(SOURCE_END_TIME_KEY, EXTRACTOR_END_TIME_KEY); + || sourceParameters.hasAnyAttributes(SOURCE_END_TIME_KEY, EXTRACTOR_END_TIME_KEY); - // If the extractor has pattern or path, we need to allocate memory + // If the source has pattern or path, we need to allocate memory isTSFileParser = isTSFileParser - || extractorParameters.hasAnyAttributes(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY); + || sourceParameters.hasAnyAttributes(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY); isTSFileParser = - isTSFileParser || extractorParameters.hasAnyAttributes(EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY); + isTSFileParser || sourceParameters.hasAnyAttributes(EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY); - // If the extractor is not hybrid, we do need to allocate memory + // If the source is not hybrid, we do need to allocate memory isTSFileParser = isTSFileParser || !PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE.equals( - connectorParameters.getStringOrDefault( + sinkParameters.getStringOrDefault( Arrays.asList( PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.SINK_FORMAT_KEY), PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE)); @@ -797,15 +802,12 @@ private long calculateTsFileParserMemory( return PipeConfig.getInstance().getTsFileParserMemory(); } - private long calculateSinkBatchMemory( - final PipeParameters extractorParameters, - final PipeParameters processorParameters, - final PipeParameters connectorParameters) { + private long calculateSinkBatchMemory(final PipeParameters sinkParameters) { - // If the connector format is tsfile , we need to use batch + // If the sink format is tsfile , we need to use batch boolean needUseBatch = PipeSinkConstant.CONNECTOR_FORMAT_TS_FILE_VALUE.equals( - connectorParameters.getStringOrDefault( + sinkParameters.getStringOrDefault( Arrays.asList( PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.SINK_FORMAT_KEY), PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE)); @@ -814,9 +816,9 @@ private long calculateSinkBatchMemory( return PipeConfig.getInstance().getSinkBatchMemoryTsFile(); } - // If the connector is batch mode, we need to use batch + // If the sink is batch mode, we need to use batch needUseBatch = - connectorParameters.getBooleanOrDefault( + sinkParameters.getBooleanOrDefault( Arrays.asList( PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, PipeSinkConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY), @@ -830,23 +832,21 @@ private long calculateSinkBatchMemory( } private long calculateSendTsFileReadBufferMemory( - final PipeParameters extractorParameters, - final PipeParameters processorParameters, - final PipeParameters connectorParameters) { - // If the extractor is history enable, we need to transfer tsfile + final PipeParameters sourceParameters, final PipeParameters sinkParameters) { + // If the source is history enable, we need to transfer tsfile boolean needTransferTsFile = - extractorParameters.getBooleanOrDefault( + sourceParameters.getBooleanOrDefault( SystemConstant.RESTART_KEY, SystemConstant.RESTART_DEFAULT_VALUE) - || extractorParameters.getBooleanOrDefault( + || sourceParameters.getBooleanOrDefault( Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY), EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE); String format = - connectorParameters.getStringOrDefault( + sinkParameters.getStringOrDefault( Arrays.asList(PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.SINK_FORMAT_KEY), PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE); - // If the connector format is tsfile and hybrid, we need to transfer tsfile + // If the sink format is tsfile and hybrid, we need to transfer tsfile needTransferTsFile = needTransferTsFile || PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE.equals(format) @@ -858,4 +858,19 @@ private long calculateSendTsFileReadBufferMemory( return PipeConfig.getInstance().getSendTsFileReadBuffer(); } + + private long calculateAssignerMemory(final PipeParameters sourceParameters) { + try { + if (!PipeInsertionDataNodeListener.getInstance().isEmpty() + || !DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(sourceParameters) + .getLeft()) { + return 0; + } + return PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize() + * PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes() + * Math.min(StorageEngine.getInstance().getDataRegionNumber(), 10); + } catch (final IllegalPathException e) { + return 0; + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java index aa32a6bf7f3fa..d6cfa6f6abc6f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java @@ -139,6 +139,10 @@ public void listenToDeleteData(DeleteDataNode node) { (key, value) -> value.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(node))); } + public boolean isEmpty() { + return dataRegionId2Assigner.isEmpty(); + } + /////////////////////////////// singleton /////////////////////////////// private PipeInsertionDataNodeListener() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 1dd3bea411c2e..f0e0bb52e46e7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -255,8 +255,8 @@ public class CommonConfig { private long pipeMaxWaitFinishTime = 10 * 1000; - private int pipeExtractorAssignerDisruptorRingBufferSize = 65536; - private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 50; // 50B + private int pipeExtractorAssignerDisruptorRingBufferSize = 128; + private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 72 * KB; private long pipeExtractorMatcherCacheSize = 1024; private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index 6eca59a3865f1..c7758526028aa 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -476,6 +476,7 @@ private boolean createPipe(final PipeMeta pipeMetaFromCoordinator) throws Illega final long creationTime = pipeMetaFromCoordinator.getStaticMeta().getCreationTime(); calculateMemoryUsage( + pipeMetaFromCoordinator.getStaticMeta(), pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters(), pipeMetaFromCoordinator.getStaticMeta().getProcessorParameters(), pipeMetaFromCoordinator.getStaticMeta().getConnectorParameters()); @@ -521,6 +522,7 @@ private boolean createPipe(final PipeMeta pipeMetaFromCoordinator) throws Illega } protected void calculateMemoryUsage( + final PipeStaticMeta staticMeta, final PipeParameters extractorParameters, final PipeParameters processorParameters, final PipeParameters connectorParameters) {