Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ private void prepareTypeConversionTest(
senderSession.executeNonQueryStatement("flush");
} else {
// Send Tablet data to receiver
// Write once to create data regions, guarantee that no any tsFiles will be sent
consumer.accept(senderSession, receiverSession, tablet);
createDataPipe(uuid, false);
Thread.sleep(2000);
// The actual implementation logic of inserting data
Expand Down Expand Up @@ -390,12 +392,13 @@ private void createDataPipe(String diff, boolean isTSFile) {
String sql =
String.format(
"create pipe test%s"
+ " with source ('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='true')"
+ " with source ('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='%s')"
+ " with processor ('processor'='do-nothing-processor')"
+ " with sink ('node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')",
diff,
isTSFile ? "file" : "forced-log",
!isTSFile,
isTSFile,
receiverEnv.getIP(),
receiverEnv.getPort(),
isTSFile ? "tsfile" : "tablet");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.receiver.visitor;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
Expand Down Expand Up @@ -87,7 +88,8 @@ public Optional<TSStatus> visitLoadFile(
final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
if (status.getCode() != TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
// Ignore the error if it is caused by insufficient memory
|| (status.getMessage() != null && status.getMessage().contains("memory"))) {
|| (status.getMessage() != null && status.getMessage().contains("memory"))
|| !PipeConfig.getInstance().isPipeReceiverLoadConversionEnabled()) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ public class CommonConfig {
private double pipeReceiverActualToEstimatedMemoryRatio = 3;

private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB
private boolean pipeReceiverLoadConversionEnabled = false;

private double pipeMetaReportMaxLogNumPerRound = 0.1;
private int pipeMetaReportMaxLogIntervalRounds = 360;
Expand Down Expand Up @@ -1503,6 +1504,18 @@ public void setPipeReceiverReqDecompressedMaxLengthInBytes(
pipeReceiverReqDecompressedMaxLengthInBytes);
}

public boolean isPipeReceiverLoadConversionEnabled() {
return pipeReceiverLoadConversionEnabled;
}

public void setPipeReceiverLoadConversionEnabled(boolean pipeReceiverLoadConversionEnabled) {
if (this.pipeReceiverLoadConversionEnabled == pipeReceiverLoadConversionEnabled) {
return;
}
this.pipeReceiverLoadConversionEnabled = pipeReceiverLoadConversionEnabled;
logger.info("pipeReceiverConversionEnabled is set to {}.", pipeReceiverLoadConversionEnabled);
}

public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
return pipeReceiverReqDecompressedMaxLengthInBytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
return COMMON_CONFIG.getPipeReceiverReqDecompressedMaxLengthInBytes();
}

public boolean isPipeReceiverLoadConversionEnabled() {
return COMMON_CONFIG.isPipeReceiverLoadConversionEnabled();
}

/////////////////////////////// Logger ///////////////////////////////

public double getPipeMetaReportMaxLogNumPerRound() {
Expand Down Expand Up @@ -574,6 +578,7 @@ public void printAllConfigs() {
LOGGER.info(
"PipeReceiverReqDecompressedMaxLengthInBytes: {}",
getPipeReceiverReqDecompressedMaxLengthInBytes());
LOGGER.info("PipeReceiverLoadConversionEnabled: {}", isPipeReceiverLoadConversionEnabled());

LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", getPipeMetaReportMaxLogNumPerRound());
LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", getPipeMetaReportMaxLogIntervalRounds());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,11 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
properties.getProperty(
"pipe_receiver_req_decompressed_max_length_in_bytes",
String.valueOf(config.getPipeReceiverReqDecompressedMaxLengthInBytes()))));
config.setPipeReceiverLoadConversionEnabled(
Boolean.parseBoolean(
properties.getProperty(
"pipe_receiver_load_conversion_enabled",
String.valueOf(config.isPipeReceiverLoadConversionEnabled()))));

config.setPipeMemoryAllocateMaxRetries(
Integer.parseInt(
Expand Down
Loading