Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,14 @@ public void setUp() throws Exception {
tmpDir = new File(Files.createTempDirectory("load").toUri());
EnvFactory.getEnv().getConfig().getCommonConfig().setTimePartitionInterval(PARTITION_INTERVAL);
EnvFactory.getEnv().getConfig().getCommonConfig().setEnforceStrongPassword(false);
EnvFactory.getEnv().getConfig().getCommonConfig().setPipeMemoryManagementEnabled(false);
EnvFactory.getEnv().getConfig().getCommonConfig().setDatanodeMemoryProportion("1:10:1:1:1:1");
EnvFactory.getEnv()
.getConfig()
.getDataNodeConfig()
.setConnectionTimeoutInMS(connectionTimeoutInMS)
.setLoadTsFileAnalyzeSchemaMemorySizeInBytes(loadTsFileAnalyzeSchemaMemorySizeInBytes);

EnvFactory.getEnv().initClusterEnvironment();
}

Expand Down Expand Up @@ -224,7 +227,7 @@ public void testLoad() throws Exception {
generator.generateData(SchemaConfig.DEVICE_2, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_3, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_4, 10000, PARTITION_INTERVAL / 10_000, true);
for (int i = 0; i < 10000; i++) {
for (int i = 0; i < 1000; i++) {
generator.generateData(SchemaConfig.DEVICE_4, 1, PARTITION_INTERVAL - 10, true);
}
writtenPoint2 = generator.getTotalNumber();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,8 @@ public class IoTDBConfig {

private long loadMeasurementIdCacheSizeInBytes = 2 * 1024 * 1024L; // 2MB

private int loadTsFileSpiltPartitionMaxSize = 10;

private String[] loadActiveListeningDirs =
new String[] {
IoTDBConstant.EXT_FOLDER_NAME
Expand Down Expand Up @@ -4030,6 +4032,27 @@ public void setPipeReceiverFileDirs(String[] pipeReceiverFileDirs) {
this.pipeReceiverFileDirs = pipeReceiverFileDirs;
}

public int getLoadTsFileSpiltPartitionMaxSize() {
return loadTsFileSpiltPartitionMaxSize;
}

public void setLoadTsFileSpiltPartitionMaxSize(int loadTsFileSpiltPartitionMaxSize) {
if (loadTsFileSpiltPartitionMaxSize <= 0) {
throw new IllegalArgumentException(
"loadTsFileSpiltPartitionMaxSize should be greater than or equal to 0");
}

if (this.loadTsFileSpiltPartitionMaxSize == loadTsFileSpiltPartitionMaxSize) {
return;
}

logger.info(
"Set loadTsFileSpiltPartitionMaxSize from {} to {}",
this.loadTsFileSpiltPartitionMaxSize,
loadTsFileSpiltPartitionMaxSize);
this.loadTsFileSpiltPartitionMaxSize = loadTsFileSpiltPartitionMaxSize;
}

public String[] getPipeReceiverFileDirs() {
return (Objects.isNull(this.pipeReceiverFileDirs) || this.pipeReceiverFileDirs.length == 0)
? new String[] {systemDir + File.separator + "pipe" + File.separator + "receiver"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2404,6 +2404,12 @@ private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws IOE
properties.getProperty(
"load_active_listening_fail_dir",
ConfigurationFileUtils.getConfigurationDefaultValue("load_active_listening_fail_dir")));

conf.setLoadTsFileSpiltPartitionMaxSize(
Integer.parseInt(
properties.getProperty(
"load_tsfile_split_partition_max_size",
Integer.toString(conf.getLoadTsFileSpiltPartitionMaxSize()))));
}

private void loadPipeHotModifiedProp(TrimProperties properties) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl;
import org.apache.tsfile.read.reader.IChunkReader;
import org.apache.tsfile.read.reader.chunk.TableChunkReader;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.DateUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsPrimitiveType;
Expand Down Expand Up @@ -394,6 +395,13 @@ private void fillMeasurementValueColumns(
for (int i = deviceIdSize, size = dataTypeList.size(); i < size; i++) {
final TsPrimitiveType primitiveType = primitiveTypes[i - deviceIdSize];
if (primitiveType == null) {
switch (dataTypeList.get(i)) {
case TEXT:
case BLOB:
case STRING:
tablet.addValue(rowIndex, i, Binary.EMPTY_VALUE.getValues());
}
tablet.getBitMaps()[i].mark(rowIndex);
continue;
}

Expand All @@ -420,7 +428,11 @@ private void fillMeasurementValueColumns(
case TEXT:
case BLOB:
case STRING:
tablet.addValue(rowIndex, i, primitiveType.getBinary().getValues());
Binary binary = primitiveType.getBinary();
tablet.addValue(
rowIndex,
i,
binary.getValues() == null ? Binary.EMPTY_VALUE.getValues() : binary.getValues());
break;
default:
throw new UnSupportedDataTypeException("UnSupported" + primitiveType.getDataType());
Expand All @@ -431,11 +443,20 @@ private void fillMeasurementValueColumns(
private void fillDeviceIdColumns(
final IDeviceID deviceID, final Tablet tablet, final int rowIndex) {
final String[] deviceIdSegments = (String[]) deviceID.getSegments();
for (int i = 1, totalColumns = deviceIdSegments.length; i < totalColumns; i++) {
int i = 1;
for (int totalColumns = deviceIdSegments.length; i < totalColumns; i++) {
if (deviceIdSegments[i] == null) {
tablet.addValue(rowIndex, i - 1, Binary.EMPTY_VALUE.getValues());
tablet.getBitMaps()[i - 1].mark(rowIndex);
continue;
}
tablet.addValue(rowIndex, i - 1, deviceIdSegments[i]);
}

while (i <= deviceIdSize) {
tablet.addValue(rowIndex, i - 1, Binary.EMPTY_VALUE.getValues());
tablet.getBitMaps()[i - 1].mark(rowIndex);
i++;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ private Map<IDeviceID, PartitionSplitInfo> collectSplitRanges() {

Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap = new LinkedHashMap<>();

for (int i = 1; i < times.length; i++) { // times are sorted in session API.
for (int i = 1; i < rowCount; i++) { // times are sorted in session API.
IDeviceID nextDeviceId = getDeviceID(i);
if (times[i] >= upperBoundOfTimePartition || !currDeviceId.equals(nextDeviceId)) {
final PartitionSplitInfo splitInfo =
Expand All @@ -253,7 +253,7 @@ private Map<IDeviceID, PartitionSplitInfo> collectSplitRanges() {
deviceIDSplitInfoMap.computeIfAbsent(currDeviceId, deviceID1 -> new PartitionSplitInfo());
// the final range
splitInfo.ranges.add(startLoc); // included
splitInfo.ranges.add(times.length); // excluded
splitInfo.ranges.add(rowCount); // excluded
splitInfo.timePartitionSlots.add(timePartitionSlot);

return deviceIDSplitInfoMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,18 @@ public InsertTabletStatement(
}

private Object convertTableColumn(final Object input) {
return input instanceof LocalDate[]
? Arrays.stream(((LocalDate[]) input))
.map(date -> Objects.nonNull(date) ? DateUtils.parseDateExpressionToInt(date) : 0)
.mapToInt(Integer::intValue)
.toArray()
: input;
if (input instanceof LocalDate[]) {
return Arrays.stream(((LocalDate[]) input))
.map(date -> Objects.nonNull(date) ? DateUtils.parseDateExpressionToInt(date) : 0)
.mapToInt(Integer::intValue)
.toArray();
} else if (input instanceof Binary[]) {
return Arrays.stream(((Binary[]) input))
.map(binary -> Objects.nonNull(binary) ? binary : Binary.EMPTY_VALUE)
.toArray(Binary[]::new);
}

return input;
}

public InsertTabletStatement(InsertTabletNode node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,19 @@ public class LoadTsFileDataTypeConverter {

private final SqlParser relationalSqlParser = new SqlParser();
private final LoadTableStatementDataTypeConvertExecutionVisitor
tableStatementDataTypeConvertExecutionVisitor =
new LoadTableStatementDataTypeConvertExecutionVisitor(this::executeForTableModel);
tableStatementDataTypeConvertExecutionVisitor;
private final LoadTreeStatementDataTypeConvertExecutionVisitor
treeStatementDataTypeConvertExecutionVisitor =
new LoadTreeStatementDataTypeConvertExecutionVisitor(this::executeForTreeModel);
treeStatementDataTypeConvertExecutionVisitor;

public LoadTsFileDataTypeConverter(
final MPPQueryContext context, final boolean isGeneratedByPipe) {
this.context = context;
this.isGeneratedByPipe = isGeneratedByPipe;

tableStatementDataTypeConvertExecutionVisitor =
new LoadTableStatementDataTypeConvertExecutionVisitor(this::executeForTableModel);
treeStatementDataTypeConvertExecutionVisitor =
new LoadTreeStatementDataTypeConvertExecutionVisitor(this::executeForTreeModel);
}

public Optional<TSStatus> convertForTableModel(final LoadTsFile loadTsFileTableStatement) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.load.LoadFileException;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
Expand Down Expand Up @@ -62,6 +64,8 @@
public class TsFileSplitter {
private static final Logger logger = LoggerFactory.getLogger(TsFileSplitter.class);

private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();

private final File tsFile;
private final TsFileDataConsumer consumer;
private Map<Long, IChunkMetadata> offset2ChunkMetadata = new HashMap<>();
Expand All @@ -72,6 +76,7 @@ public class TsFileSplitter {
private IDeviceID curDevice = null;
private boolean isAligned;
private int timeChunkIndexOfCurrentValueColumn = 0;
private Set<TTimePartitionSlot> timePartitionSlots = new HashSet<>();

// Maintain the number of times the chunk of each measurement appears.
private Map<String, Integer> valueColumn2TimeChunkIndex = new HashMap<>();
Expand Down Expand Up @@ -445,6 +450,14 @@ private void consumeAllAlignedChunkData(
}
}
for (AlignedChunkData chunkData : chunkDataMap.keySet()) {
timePartitionSlots.add(chunkData.getTimePartitionSlot());
if (deletions.isEmpty()
&& timePartitionSlots.size() > CONFIG.getLoadTsFileSpiltPartitionMaxSize()) {
throw new LoadFileException(
String.format(
"Time partition slots size is greater than %s",
CONFIG.getLoadTsFileSpiltPartitionMaxSize()));
}
if (Boolean.FALSE.equals(consumer.apply(chunkData))) {
throw new IllegalStateException(
String.format(
Expand All @@ -457,6 +470,14 @@ private void consumeAllAlignedChunkData(

private void consumeChunkData(String measurement, long offset, ChunkData chunkData)
throws LoadFileException {
timePartitionSlots.add(chunkData.getTimePartitionSlot());
if (deletions.isEmpty()
&& timePartitionSlots.size() > CONFIG.getLoadTsFileSpiltPartitionMaxSize()) {
throw new LoadFileException(
String.format(
"Time partition slots size is greater than %s",
CONFIG.getLoadTsFileSpiltPartitionMaxSize()));
}
if (Boolean.FALSE.equals(consumer.apply(chunkData))) {
throw new IllegalStateException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ public void testSplitInsertTablet() throws IllegalPathException {
insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32});
insertTabletNode.setColumns(
new Object[] {new int[] {-20, -10, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100}});
insertTabletNode.setRowCount(insertTabletNode.getTimes().length);

DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
dataPartitionQueryParam.setDeviceID(
Expand Down Expand Up @@ -314,6 +315,7 @@ public void testInsertMultiTablets() throws IllegalPathException {
insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32});
insertTabletNode.setColumns(
new Object[] {new int[] {10, 20, 30, 40, 50, 60, 70, 80, 90, 100}});
insertTabletNode.setRowCount(insertTabletNode.getTimes().length);
insertMultiTabletsNode.addInsertTabletNode(insertTabletNode, 2 * i);

insertTabletNode = new InsertTabletNode(new PlanNodeId("plan node 3"));
Expand All @@ -322,6 +324,7 @@ public void testInsertMultiTablets() throws IllegalPathException {
insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32});
insertTabletNode.setColumns(
new Object[] {new int[] {10, 20, 30, 40, 50, 60, 70, 80, 90, 100}});
insertTabletNode.setRowCount(insertTabletNode.getTimes().length);
insertMultiTabletsNode.addInsertTabletNode(insertTabletNode, 2 * i);
}

Expand Down
Loading