From d82d3dfd940e3f94ddcce4094b9a11eb9c498c09 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 1 Aug 2025 19:27:52 +0800 Subject: [PATCH 1/7] logger --- iotdb-client/client-go | 2 +- ...bleStatementDataTypeConvertExecutionVisitor.java | 4 +++- ...reeStatementDataTypeConvertExecutionVisitor.java | 4 +++- .../org/apache/iotdb/commons/conf/CommonConfig.java | 13 +++++++++++++ .../iotdb/commons/pipe/config/PipeConfig.java | 5 +++++ .../iotdb/commons/pipe/config/PipeDescriptor.java | 5 +++++ 6 files changed, 30 insertions(+), 3 deletions(-) diff --git a/iotdb-client/client-go b/iotdb-client/client-go index 8faa354ced450..dc64b1a7648d3 160000 --- a/iotdb-client/client-go +++ b/iotdb-client/client-go @@ -1 +1 @@ -Subproject commit 8faa354ced45031748721da2eed2267062c8d8cc +Subproject commit dc64b1a7648d3c505c10eed5419f422bb49f1def diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java index 4e7530f05fe0b..78ac9ddcc9c66 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.receiver.visitor; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser; @@ -74,7 +75,8 @@ public PipeTableStatementDataTypeConvertExecutionVisitor( private Optional tryExecute(final Statement statement, final String databaseName) { try { - if (Objects.isNull(databaseName)) { + if (Objects.isNull(databaseName) + || !PipeConfig.getInstance().isPipeReceiverLoadConversionEnabled()) { LOGGER.warn( "Database name is unexpectedly null for statement: {}. Skip data type conversion.", statement); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java index 9aa352385ed6c..77bbf49094c3e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.receiver.visitor; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser; import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver; @@ -88,7 +89,8 @@ public Optional visitLoadFile( final LoadTsFileStatement loadTsFileStatement, final TSStatus status) { if (status.getCode() != TSStatusCode.LOAD_FILE_ERROR.getStatusCode() // Ignore the error if it is caused by insufficient memory - || (status.getMessage() != null && status.getMessage().contains("memory"))) { + || (status.getMessage() != null && status.getMessage().contains("memory")) + || !PipeConfig.getInstance().isPipeReceiverLoadConversionEnabled()) { return Optional.empty(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 345f468006942..d1dcf8d993f9c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -294,6 +294,7 @@ public class CommonConfig { private double pipeReceiverActualToEstimatedMemoryRatio = 3; private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB + private boolean pipeReceiverLoadConversionEnabled = false; private double pipeMetaReportMaxLogNumPerRound = 0.1; private int pipeMetaReportMaxLogIntervalRounds = 360; @@ -1496,6 +1497,18 @@ public void setPipeReceiverReqDecompressedMaxLengthInBytes( pipeReceiverReqDecompressedMaxLengthInBytes); } + public boolean isPipeReceiverLoadConversionEnabled() { + return pipeReceiverLoadConversionEnabled; + } + + public void setPipeReceiverLoadConversionEnabled(boolean pipeReceiverLoadConversionEnabled) { + if (this.pipeReceiverLoadConversionEnabled == pipeReceiverLoadConversionEnabled) { + return; + } + this.pipeReceiverLoadConversionEnabled = pipeReceiverLoadConversionEnabled; + logger.info("pipeReceiverConversionEnabled is set to {}.", pipeReceiverLoadConversionEnabled); + } + public int getPipeReceiverReqDecompressedMaxLengthInBytes() { return pipeReceiverReqDecompressedMaxLengthInBytes; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 770e0e959b29f..a646ed90dbf8b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -343,6 +343,10 @@ public int getPipeReceiverReqDecompressedMaxLengthInBytes() { return COMMON_CONFIG.getPipeReceiverReqDecompressedMaxLengthInBytes(); } + public boolean isPipeReceiverLoadConversionEnabled() { + return COMMON_CONFIG.isPipeReceiverLoadConversionEnabled(); + } + /////////////////////////////// Logger /////////////////////////////// public double getPipeMetaReportMaxLogNumPerRound() { @@ -573,6 +577,7 @@ public void printAllConfigs() { LOGGER.info( "PipeReceiverReqDecompressedMaxLengthInBytes: {}", getPipeReceiverReqDecompressedMaxLengthInBytes()); + LOGGER.info("PipeReceiverLoadConversionEnabled: {}", isPipeReceiverLoadConversionEnabled()); LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", getPipeMetaReportMaxLogNumPerRound()); LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", getPipeMetaReportMaxLogIntervalRounds()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 7e27838618b0a..7fb13c46707b4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -432,6 +432,11 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr properties.getProperty( "pipe_receiver_req_decompressed_max_length_in_bytes", String.valueOf(config.getPipeReceiverReqDecompressedMaxLengthInBytes())))); + config.setPipeReceiverLoadConversionEnabled( + Boolean.parseBoolean( + properties.getProperty( + "pipe_receiver_conversion_enabled", + String.valueOf(config.isPipeReceiverLoadConversionEnabled())))); config.setPipeMemoryAllocateMaxRetries( Integer.parseInt( From dc8cf304542792b2e67b089583ae6a410a255558 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 4 Aug 2025 11:08:40 +0800 Subject: [PATCH 2/7] ci-fix --- .../manual/enhanced/IoTDBPipeTypeConversionISessionIT.java | 5 ++++- .../treemodel/manual/IoTDBPipeTypeConversionISessionIT.java | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionISessionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionISessionIT.java index f4dfca32c4cc2..1684ca1f7f5df 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionISessionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionISessionIT.java @@ -128,6 +128,8 @@ private void prepareTypeConversionTest( createDataPipe(true); } else { // Send Tablet data to receiver + // Write once to create data regions, guarantee that no any tsFiles will be sent + executeDataWriteOperation.accept(senderSession, receiverSession, tablet); createDataPipe(false); // The actual implementation logic of inserting data executeDataWriteOperation.accept(senderSession, receiverSession, tablet); @@ -195,10 +197,11 @@ private void createDataPipe(boolean isTSFile) { String sql = String.format( "create pipe test" - + " with source ('source'='iotdb-source','realtime.mode'='%s')" + + " with source ('source'='iotdb-source','realtime.mode'='%s','history.enable'='%s')" + " with processor ('processor'='do-nothing-processor')" + " with sink ('node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')", isTSFile ? "file" : "forced-log", + isTSFile, receiverEnv.getIP(), receiverEnv.getPort(), isTSFile ? "tsfile" : "tablet"); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java index 2d89285d72a7f..cefc3c7df7139 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java @@ -346,6 +346,8 @@ private void prepareTypeConversionTest( senderSession.executeNonQueryStatement("flush"); } else { // Send Tablet data to receiver + // Write once to create data regions, guarantee that no any tsFiles will be sent + consumer.accept(senderSession, receiverSession, tablet); createDataPipe(uuid, false); Thread.sleep(2000); // The actual implementation logic of inserting data @@ -395,12 +397,13 @@ private void createDataPipe(String diff, boolean isTSFile) { String sql = String.format( "create pipe test%s" - + " with source ('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='true')" + + " with source ('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='%s')" + " with processor ('processor'='do-nothing-processor')" + " with sink ('node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')", diff, isTSFile ? "file" : "forced-log", !isTSFile, + isTSFile, receiverEnv.getIP(), receiverEnv.getPort(), isTSFile ? "tsfile" : "tablet"); From b0c3c8899525fb7f6e448f854310989b2c8c5925 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 5 Aug 2025 12:02:22 +0800 Subject: [PATCH 3/7] partial --- .../PipeTableStatementDataTypeConvertExecutionVisitor.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java index 78ac9ddcc9c66..5fb87e550dfc9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java @@ -75,8 +75,7 @@ public PipeTableStatementDataTypeConvertExecutionVisitor( private Optional tryExecute(final Statement statement, final String databaseName) { try { - if (Objects.isNull(databaseName) - || !PipeConfig.getInstance().isPipeReceiverLoadConversionEnabled()) { + if (Objects.isNull(databaseName)) { LOGGER.warn( "Database name is unexpectedly null for statement: {}. Skip data type conversion.", statement); @@ -110,6 +109,10 @@ public Optional visitLoadFile( return Optional.empty(); } + if (!PipeConfig.getInstance().isPipeReceiverLoadConversionEnabled()) { + return Optional.empty(); + } + if (status.getCode() != TSStatusCode.LOAD_FILE_ERROR.getStatusCode() // Ignore the error if it is caused by insufficient memory || (status.getMessage() != null && status.getMessage().contains("memory"))) { From 791b6c347978fcb144ba5da87bca0f527dd3d2cb Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 5 Aug 2025 12:10:10 +0800 Subject: [PATCH 4/7] Delete client-go --- iotdb-client/client-go | 1 - 1 file changed, 1 deletion(-) delete mode 160000 iotdb-client/client-go diff --git a/iotdb-client/client-go b/iotdb-client/client-go deleted file mode 160000 index dc64b1a7648d3..0000000000000 --- a/iotdb-client/client-go +++ /dev/null @@ -1 +0,0 @@ -Subproject commit dc64b1a7648d3c505c10eed5419f422bb49f1def From be5d9cd44d7be52a41cb1a3b79ac4735e074cb6d Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 5 Aug 2025 12:10:48 +0800 Subject: [PATCH 5/7] fix --- iotdb-client/client-go | 1 + 1 file changed, 1 insertion(+) create mode 160000 iotdb-client/client-go diff --git a/iotdb-client/client-go b/iotdb-client/client-go new file mode 160000 index 0000000000000..dc64b1a7648d3 --- /dev/null +++ b/iotdb-client/client-go @@ -0,0 +1 @@ +Subproject commit dc64b1a7648d3c505c10eed5419f422bb49f1def From 34e96ab14565ef12df855ca5abf68102ae570a69 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 5 Aug 2025 14:27:59 +0800 Subject: [PATCH 6/7] rename --- .../org/apache/iotdb/commons/pipe/config/PipeDescriptor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 7fb13c46707b4..7bdfab83efa1f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -435,7 +435,7 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr config.setPipeReceiverLoadConversionEnabled( Boolean.parseBoolean( properties.getProperty( - "pipe_receiver_conversion_enabled", + "pipe_receiver_load_conversion_enabled", String.valueOf(config.isPipeReceiverLoadConversionEnabled())))); config.setPipeMemoryAllocateMaxRetries( From c3fae05827e88d447ef1ebb64895e1b17e27d5e4 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 5 Aug 2025 16:25:42 +0800 Subject: [PATCH 7/7] fix-ci --- .../dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java | 8 +++----- pom.xml | 1 + 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java index 50e96dc3e28e3..b714fe79ca728 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java @@ -208,15 +208,13 @@ public void testReceiverNotLoadWhenIdColumnMismatch() throws Exception { } Set expectedResSet = new java.util.HashSet<>(); - expectedResSet.add("1970-01-01T00:00:00.002Z,d3,d4,blue2,20,null,null,null,null,"); - expectedResSet.add("1970-01-01T00:00:00.001Z,d3,d4,red2,10,null,null,null,null,"); - expectedResSet.add("1970-01-01T00:00:00.002Z,null,null,null,null,d1,d2,blue,2,"); - expectedResSet.add("1970-01-01T00:00:00.001Z,null,null,null,null,d1,d2,red,1,"); + expectedResSet.add("1970-01-01T00:00:00.002Z,d3,d4,blue2,20,"); + expectedResSet.add("1970-01-01T00:00:00.001Z,d3,d4,red2,10,"); // make sure data are not transferred TestUtils.assertDataEventuallyOnEnv( receiverEnv, "select * from t1", - "time,tag3,tag4,s3,s4,tag1,tag2,s1,s2,", + "time,tag3,tag4,s3,s4,", expectedResSet, "db", handleFailure); diff --git a/pom.xml b/pom.xml index d3cb5b5677573..ac876a8d925c3 100644 --- a/pom.xml +++ b/pom.xml @@ -47,6 +47,7 @@ distribution example library-udf + integration-test