Skip to content

Commit d983de4

Browse files
committed
update
1 parent 64c6900 commit d983de4

6 files changed

Lines changed: 128 additions & 14 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,6 +1117,8 @@ public class IoTDBConfig {
11171117

11181118
private long loadMeasurementIdCacheSizeInBytes = 2 * 1024 * 1024L; // 2MB
11191119

1120+
private int loadTsFileSpiltPartitionMaxSize = 10;
1121+
11201122
private String[] loadActiveListeningDirs =
11211123
new String[] {
11221124
IoTDBConstant.EXT_FOLDER_NAME
@@ -4030,6 +4032,27 @@ public void setPipeReceiverFileDirs(String[] pipeReceiverFileDirs) {
40304032
this.pipeReceiverFileDirs = pipeReceiverFileDirs;
40314033
}
40324034

4035+
public int getLoadTsFileSpiltPartitionMaxSize() {
4036+
return loadTsFileSpiltPartitionMaxSize;
4037+
}
4038+
4039+
public void setLoadTsFileSpiltPartitionMaxSize(int loadTsFileSpiltPartitionMaxSize) {
4040+
if (loadTsFileSpiltPartitionMaxSize <= 0) {
4041+
throw new IllegalArgumentException(
4042+
"loadTsFileSpiltPartitionMaxSize should be greater than or equal to 0");
4043+
}
4044+
4045+
if (this.loadTsFileSpiltPartitionMaxSize == loadTsFileSpiltPartitionMaxSize) {
4046+
return;
4047+
}
4048+
4049+
logger.info(
4050+
"Set loadTsFileSpiltPartitionMaxSize from {} to {}",
4051+
this.loadTsFileSpiltPartitionMaxSize,
4052+
loadTsFileSpiltPartitionMaxSize);
4053+
this.loadTsFileSpiltPartitionMaxSize = loadTsFileSpiltPartitionMaxSize;
4054+
}
4055+
40334056
public String[] getPipeReceiverFileDirs() {
40344057
return (Objects.isNull(this.pipeReceiverFileDirs) || this.pipeReceiverFileDirs.length == 0)
40354058
? new String[] {systemDir + File.separator + "pipe" + File.separator + "receiver"}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2404,6 +2404,12 @@ private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws IOE
24042404
properties.getProperty(
24052405
"load_active_listening_fail_dir",
24062406
ConfigurationFileUtils.getConfigurationDefaultValue("load_active_listening_fail_dir")));
2407+
2408+
conf.setLoadTsFileSpiltPartitionMaxSize(
2409+
Integer.parseInt(
2410+
properties.getProperty(
2411+
"load_tsfile_split_partition_max_size",
2412+
Integer.toString(conf.getLoadTsFileSpiltPartitionMaxSize()))));
24072413
}
24082414

24092415
private void loadPipeHotModifiedProp(TrimProperties properties) throws IOException {
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.exception.load;
21+
22+
import org.apache.iotdb.commons.exception.IoTDBException;
23+
import org.apache.iotdb.rpc.TSStatusCode;
24+
25+
public class LoadPartitionExceededException extends IoTDBException {
26+
27+
public LoadPartitionExceededException(String message) {
28+
super(message, TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
29+
}
30+
31+
public LoadPartitionExceededException(Exception exception) {
32+
super(exception, TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
33+
}
34+
35+
public LoadPartitionExceededException(String message, Exception exception) {
36+
super(message, exception, TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
37+
}
38+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl;
4141
import org.apache.tsfile.read.reader.IChunkReader;
4242
import org.apache.tsfile.read.reader.chunk.TableChunkReader;
43+
import org.apache.tsfile.utils.Binary;
4344
import org.apache.tsfile.utils.DateUtils;
4445
import org.apache.tsfile.utils.Pair;
4546
import org.apache.tsfile.utils.TsPrimitiveType;
@@ -394,6 +395,13 @@ private void fillMeasurementValueColumns(
394395
for (int i = deviceIdSize, size = dataTypeList.size(); i < size; i++) {
395396
final TsPrimitiveType primitiveType = primitiveTypes[i - deviceIdSize];
396397
if (primitiveType == null) {
398+
switch (dataTypeList.get(i)) {
399+
case TEXT:
400+
case BLOB:
401+
case STRING:
402+
tablet.addValue(rowIndex, i, Binary.EMPTY_VALUE.getValues());
403+
}
404+
tablet.getBitMaps()[i].mark(rowIndex);
397405
continue;
398406
}
399407

@@ -420,7 +428,11 @@ private void fillMeasurementValueColumns(
420428
case TEXT:
421429
case BLOB:
422430
case STRING:
423-
tablet.addValue(rowIndex, i, primitiveType.getBinary().getValues());
431+
Binary binary = primitiveType.getBinary();
432+
tablet.addValue(
433+
rowIndex,
434+
i,
435+
binary.getValues() == null ? Binary.EMPTY_VALUE.getValues() : binary.getValues());
424436
break;
425437
default:
426438
throw new UnSupportedDataTypeException("UnSupported" + primitiveType.getDataType());
@@ -431,11 +443,20 @@ private void fillMeasurementValueColumns(
431443
private void fillDeviceIdColumns(
432444
final IDeviceID deviceID, final Tablet tablet, final int rowIndex) {
433445
final String[] deviceIdSegments = (String[]) deviceID.getSegments();
434-
for (int i = 1, totalColumns = deviceIdSegments.length; i < totalColumns; i++) {
446+
int i = 1;
447+
for (int totalColumns = deviceIdSegments.length; i < totalColumns; i++) {
435448
if (deviceIdSegments[i] == null) {
449+
tablet.addValue(rowIndex, i - 1, Binary.EMPTY_VALUE.getValues());
450+
tablet.getBitMaps()[i - 1].mark(rowIndex);
436451
continue;
437452
}
438453
tablet.addValue(rowIndex, i - 1, deviceIdSegments[i]);
439454
}
455+
456+
while (i <= deviceIdSize) {
457+
tablet.addValue(rowIndex, i - 1, Binary.EMPTY_VALUE.getValues());
458+
tablet.getBitMaps()[i - 1].mark(rowIndex);
459+
i++;
460+
}
440461
}
441462
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121

2222
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
2323
import org.apache.iotdb.commons.utils.TimePartitionUtils;
24+
import org.apache.iotdb.db.conf.IoTDBConfig;
25+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2426
import org.apache.iotdb.db.exception.load.LoadFileException;
27+
import org.apache.iotdb.db.exception.load.LoadPartitionExceededException;
2528
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
2629
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
2730

@@ -62,6 +65,8 @@
6265
public class TsFileSplitter {
6366
private static final Logger logger = LoggerFactory.getLogger(TsFileSplitter.class);
6467

68+
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
69+
6570
private final File tsFile;
6671
private final TsFileDataConsumer consumer;
6772
private Map<Long, IChunkMetadata> offset2ChunkMetadata = new HashMap<>();
@@ -72,6 +77,7 @@ public class TsFileSplitter {
7277
private IDeviceID curDevice = null;
7378
private boolean isAligned;
7479
private int timeChunkIndexOfCurrentValueColumn = 0;
80+
private Set<TTimePartitionSlot> timePartitionSlots = new HashSet<>();
7581

7682
// Maintain the number of times the chunk of each measurement appears.
7783
private Map<String, Integer> valueColumn2TimeChunkIndex = new HashMap<>();
@@ -89,7 +95,7 @@ public TsFileSplitter(File tsFile, TsFileDataConsumer consumer) {
8995

9096
@SuppressWarnings({"squid:S3776", "squid:S6541"})
9197
public void splitTsFileByDataPartition()
92-
throws IOException, LoadFileException, IllegalStateException {
98+
throws IOException, LoadFileException, LoadPartitionExceededException, IllegalStateException {
9399
try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) {
94100
getAllModification(deletions);
95101

@@ -144,7 +150,7 @@ public void splitTsFileByDataPartition()
144150
}
145151

146152
private void processTimeChunkOrNonAlignedChunk(TsFileSequenceReader reader, byte marker)
147-
throws IOException, LoadFileException {
153+
throws IOException, LoadFileException, LoadPartitionExceededException {
148154
long chunkOffset = reader.position();
149155
timeChunkIndexOfCurrentValueColumn = pageIndex2TimesList.size();
150156
consumeAllAlignedChunkData(chunkOffset, pageIndex2ChunkData);
@@ -197,7 +203,7 @@ private void decodeAndWriteTimeChunkOrNonAlignedChunk(
197203
IChunkMetadata chunkMetadata,
198204
long chunkOffset,
199205
ChunkData chunkData)
200-
throws IOException, LoadFileException {
206+
throws IOException, LoadFileException, LoadPartitionExceededException {
201207
String measurementId = header.getMeasurementID();
202208
TTimePartitionSlot timePartitionSlot = chunkData.getTimePartitionSlot();
203209
Decoder defaultTimeDecoder =
@@ -294,7 +300,7 @@ private void decodeAndWriteTimeChunkOrNonAlignedChunk(
294300
}
295301

296302
private void processValueChunk(TsFileSequenceReader reader, byte marker)
297-
throws IOException, LoadFileException {
303+
throws IOException, LoadFileException, LoadPartitionExceededException {
298304
long chunkOffset = reader.position();
299305
IChunkMetadata chunkMetadata = offset2ChunkMetadata.get(chunkOffset - Byte.BYTES);
300306
ChunkHeader header = reader.readChunkHeader(marker);
@@ -361,7 +367,8 @@ private void storeTimeChunkContext() {
361367
}
362368

363369
private void switchToTimeChunkContextOfCurrentMeasurement(
364-
TsFileSequenceReader reader, String measurement) throws IOException, LoadFileException {
370+
TsFileSequenceReader reader, String measurement)
371+
throws IOException, LoadFileException, LoadPartitionExceededException {
365372
int index = valueColumn2TimeChunkIndex.getOrDefault(measurement, 0);
366373
if (index != timeChunkIndexOfCurrentValueColumn) {
367374
consumeAllAlignedChunkData(reader.position(), pageIndex2ChunkData);
@@ -429,7 +436,7 @@ private void handleModification(List<ModEntry> deletions) throws LoadFileExcepti
429436

430437
private void consumeAllAlignedChunkData(
431438
long offset, Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData)
432-
throws LoadFileException {
439+
throws LoadFileException, LoadPartitionExceededException {
433440
if (pageIndex2ChunkData.isEmpty()) {
434441
return;
435442
}
@@ -445,6 +452,13 @@ private void consumeAllAlignedChunkData(
445452
}
446453
}
447454
for (AlignedChunkData chunkData : chunkDataMap.keySet()) {
455+
timePartitionSlots.add(chunkData.getTimePartitionSlot());
456+
if (timePartitionSlots.size() > CONFIG.getLoadTsFileSpiltPartitionMaxSize()) {
457+
throw new LoadPartitionExceededException(
458+
String.format(
459+
"Time partition slots size is greater than %s",
460+
CONFIG.getLoadTsFileSpiltPartitionMaxSize()));
461+
}
448462
if (Boolean.FALSE.equals(consumer.apply(chunkData))) {
449463
throw new IllegalStateException(
450464
String.format(
@@ -456,7 +470,14 @@ private void consumeAllAlignedChunkData(
456470
}
457471

458472
private void consumeChunkData(String measurement, long offset, ChunkData chunkData)
459-
throws LoadFileException {
473+
throws LoadFileException, LoadPartitionExceededException {
474+
timePartitionSlots.add(chunkData.getTimePartitionSlot());
475+
if (timePartitionSlots.size() > CONFIG.getLoadTsFileSpiltPartitionMaxSize()) {
476+
throw new LoadPartitionExceededException(
477+
String.format(
478+
"Time partition slots size is greater than %s",
479+
CONFIG.getLoadTsFileSpiltPartitionMaxSize()));
480+
}
460481
if (Boolean.FALSE.equals(consumer.apply(chunkData))) {
461482
throw new IllegalStateException(
462483
String.format(

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2626
import org.apache.iotdb.db.exception.StorageEngineException;
2727
import org.apache.iotdb.db.exception.load.LoadFileException;
28+
import org.apache.iotdb.db.exception.load.LoadPartitionExceededException;
2829
import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
2930
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
3031
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
@@ -89,7 +90,8 @@ public void testCompactionFlushChunk()
8990
InterruptedException,
9091
MetadataException,
9192
PageException,
92-
LoadFileException {
93+
LoadFileException,
94+
LoadPartitionExceededException {
9395
TsFileResource seqResource1 =
9496
generateSingleAlignedSeriesFile(
9597
"d0",
@@ -124,7 +126,8 @@ public void testCompactionFlushChunkAndSplitByTimePartition()
124126
InterruptedException,
125127
MetadataException,
126128
PageException,
127-
LoadFileException {
129+
LoadFileException,
130+
LoadPartitionExceededException {
128131
TsFileResource seqResource1 =
129132
generateSingleAlignedSeriesFile(
130133
"d0",
@@ -159,7 +162,8 @@ public void testCompactionFlushPage()
159162
InterruptedException,
160163
MetadataException,
161164
PageException,
162-
LoadFileException {
165+
LoadFileException,
166+
LoadPartitionExceededException {
163167
TsFileResource seqResource1 =
164168
generateSingleAlignedSeriesFile(
165169
"d0",
@@ -198,7 +202,8 @@ public void testCompactionFlushPageAndSplitByTimePartition()
198202
InterruptedException,
199203
MetadataException,
200204
PageException,
201-
LoadFileException {
205+
LoadFileException,
206+
LoadPartitionExceededException {
202207
TsFileResource seqResource1 =
203208
generateSingleAlignedSeriesFile(
204209
"d0",
@@ -264,7 +269,7 @@ private TsFileResource performCompaction()
264269
}
265270

266271
private void consumeChunkDataAndValidate(TsFileResource resource)
267-
throws IOException, IllegalPathException, LoadFileException {
272+
throws IOException, IllegalPathException, LoadFileException, LoadPartitionExceededException {
268273
Map<TTimePartitionSlot, TestLoadTsFileIOWriter> writerMap = new HashMap<>();
269274

270275
TsFileSplitter splitter =

0 commit comments

Comments
 (0)