Skip to content

Commit e7af492

Browse files
CaideyipiJackieTien97
authored andcommitted
Pipe: Totally banned the receiver conversion (#16086)
* logger * ci-fix * partial * Delete client-go * fix * rename * fix-ci (cherry picked from commit 07117e0)
1 parent 8de4357 commit e7af492

9 files changed

Lines changed: 43 additions & 8 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -204,15 +204,13 @@ public void testReceiverNotLoadWhenIdColumnMismatch() throws Exception {
204204
}
205205

206206
Set<String> expectedResSet = new java.util.HashSet<>();
207-
expectedResSet.add("1970-01-01T00:00:00.002Z,d3,d4,blue2,20,null,null,null,null,");
208-
expectedResSet.add("1970-01-01T00:00:00.001Z,d3,d4,red2,10,null,null,null,null,");
209-
expectedResSet.add("1970-01-01T00:00:00.002Z,null,null,null,null,d1,d2,blue,2,");
210-
expectedResSet.add("1970-01-01T00:00:00.001Z,null,null,null,null,d1,d2,red,1,");
207+
expectedResSet.add("1970-01-01T00:00:00.002Z,d3,d4,blue2,20,");
208+
expectedResSet.add("1970-01-01T00:00:00.001Z,d3,d4,red2,10,");
211209
// make sure data are not transferred
212210
TestUtils.assertDataEventuallyOnEnv(
213211
receiverEnv,
214212
"select * from t1",
215-
"time,tag3,tag4,s3,s4,tag1,tag2,s1,s2,",
213+
"time,tag3,tag4,s3,s4,",
216214
expectedResSet,
217215
"db",
218216
handleFailure);

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeTypeConversionISessionIT.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ private void prepareTypeConversionTest(
128128
createDataPipe(true);
129129
} else {
130130
// Send Tablet data to receiver
131+
// Write once to create data regions, guarantee that no any tsFiles will be sent
132+
executeDataWriteOperation.accept(senderSession, receiverSession, tablet);
131133
createDataPipe(false);
132134
// The actual implementation logic of inserting data
133135
executeDataWriteOperation.accept(senderSession, receiverSession, tablet);
@@ -194,10 +196,11 @@ private void createDataPipe(boolean isTSFile) {
194196
String sql =
195197
String.format(
196198
"create pipe test"
197-
+ " with source ('source'='iotdb-source','realtime.mode'='%s')"
199+
+ " with source ('source'='iotdb-source','realtime.mode'='%s','history.enable'='%s')"
198200
+ " with processor ('processor'='do-nothing-processor')"
199201
+ " with sink ('node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')",
200202
isTSFile ? "file" : "forced-log",
203+
isTSFile,
201204
receiverEnv.getIP(),
202205
receiverEnv.getPort(),
203206
isTSFile ? "tsfile" : "tablet");

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTypeConversionISessionIT.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,8 @@ private void prepareTypeConversionTest(
346346
senderSession.executeNonQueryStatement("flush");
347347
} else {
348348
// Send Tablet data to receiver
349+
// Write once to create data regions, guarantee that no any tsFiles will be sent
350+
consumer.accept(senderSession, receiverSession, tablet);
349351
createDataPipe(uuid, false);
350352
Thread.sleep(2000);
351353
// The actual implementation logic of inserting data
@@ -394,12 +396,13 @@ private void createDataPipe(String diff, boolean isTSFile) {
394396
String sql =
395397
String.format(
396398
"create pipe test%s"
397-
+ " with source ('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='true')"
399+
+ " with source ('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='%s')"
398400
+ " with processor ('processor'='do-nothing-processor')"
399401
+ " with sink ('node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')",
400402
diff,
401403
isTSFile ? "file" : "forced-log",
402404
!isTSFile,
405+
isTSFile,
403406
receiverEnv.getIP(),
404407
receiverEnv.getPort(),
405408
isTSFile ? "tsfile" : "tablet");

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.pipe.receiver.visitor;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23+
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2324
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
2425
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
2526
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser;
@@ -108,6 +109,10 @@ public Optional<TSStatus> visitLoadFile(
108109
return Optional.empty();
109110
}
110111

112+
if (!PipeConfig.getInstance().isPipeReceiverLoadConversionEnabled()) {
113+
return Optional.empty();
114+
}
115+
111116
if (status.getCode() != TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
112117
// Ignore the error if it is caused by insufficient memory
113118
|| (status.getMessage() != null && status.getMessage().contains("memory"))) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.pipe.receiver.visitor;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23+
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2324
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
2425
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
2526
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
@@ -88,7 +89,8 @@ public Optional<TSStatus> visitLoadFile(
8889
final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
8990
if (status.getCode() != TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
9091
// Ignore the error if it is caused by insufficient memory
91-
|| (status.getMessage() != null && status.getMessage().contains("memory"))) {
92+
|| (status.getMessage() != null && status.getMessage().contains("memory"))
93+
|| !PipeConfig.getInstance().isPipeReceiverLoadConversionEnabled()) {
9294
return Optional.empty();
9395
}
9496

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ public class CommonConfig {
294294
private double pipeReceiverActualToEstimatedMemoryRatio = 3;
295295

296296
private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB
297+
private boolean pipeReceiverLoadConversionEnabled = false;
297298

298299
private double pipeMetaReportMaxLogNumPerRound = 0.1;
299300
private int pipeMetaReportMaxLogIntervalRounds = 360;
@@ -1491,6 +1492,18 @@ public void setPipeReceiverReqDecompressedMaxLengthInBytes(
14911492
pipeReceiverReqDecompressedMaxLengthInBytes);
14921493
}
14931494

1495+
public boolean isPipeReceiverLoadConversionEnabled() {
1496+
return pipeReceiverLoadConversionEnabled;
1497+
}
1498+
1499+
public void setPipeReceiverLoadConversionEnabled(boolean pipeReceiverLoadConversionEnabled) {
1500+
if (this.pipeReceiverLoadConversionEnabled == pipeReceiverLoadConversionEnabled) {
1501+
return;
1502+
}
1503+
this.pipeReceiverLoadConversionEnabled = pipeReceiverLoadConversionEnabled;
1504+
logger.info("pipeReceiverConversionEnabled is set to {}.", pipeReceiverLoadConversionEnabled);
1505+
}
1506+
14941507
public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
14951508
return pipeReceiverReqDecompressedMaxLengthInBytes;
14961509
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,10 @@ public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
343343
return COMMON_CONFIG.getPipeReceiverReqDecompressedMaxLengthInBytes();
344344
}
345345

346+
public boolean isPipeReceiverLoadConversionEnabled() {
347+
return COMMON_CONFIG.isPipeReceiverLoadConversionEnabled();
348+
}
349+
346350
/////////////////////////////// Logger ///////////////////////////////
347351

348352
public double getPipeMetaReportMaxLogNumPerRound() {
@@ -573,6 +577,7 @@ public void printAllConfigs() {
573577
LOGGER.info(
574578
"PipeReceiverReqDecompressedMaxLengthInBytes: {}",
575579
getPipeReceiverReqDecompressedMaxLengthInBytes());
580+
LOGGER.info("PipeReceiverLoadConversionEnabled: {}", isPipeReceiverLoadConversionEnabled());
576581

577582
LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", getPipeMetaReportMaxLogNumPerRound());
578583
LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", getPipeMetaReportMaxLogIntervalRounds());

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,11 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
432432
properties.getProperty(
433433
"pipe_receiver_req_decompressed_max_length_in_bytes",
434434
String.valueOf(config.getPipeReceiverReqDecompressedMaxLengthInBytes()))));
435+
config.setPipeReceiverLoadConversionEnabled(
436+
Boolean.parseBoolean(
437+
properties.getProperty(
438+
"pipe_receiver_load_conversion_enabled",
439+
String.valueOf(config.isPipeReceiverLoadConversionEnabled()))));
435440

436441
config.setPipeMemoryAllocateMaxRetries(
437442
Integer.parseInt(

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
<module>distribution</module>
4848
<module>example</module>
4949
<module>library-udf</module>
50+
<module>integration-test</module>
5051
</modules>
5152
<properties>
5253
<!-- This was the last version to support Java 8 -->

0 commit comments

Comments
 (0)