Skip to content

Commit 9793441

Browse files
authored
Fix that load-tsfile and pipe-parser may skip time-only aligned chunks (#17625)
* Fix that load tsfile may skip time-only aligned chunks * fix LoadTsFileAnalyzer may count timestamp in point count * Fix tablet parsing * add test
1 parent bfc7056 commit 9793441

9 files changed

Lines changed: 577 additions & 61 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ public static Collection<Object[]> data() {
8686
new Object[][] {
8787
{LastCacheLoadStrategy.CLEAN_ALL},
8888
{LastCacheLoadStrategy.UPDATE},
89-
{LastCacheLoadStrategy.UPDATE_NO_BLOB},
9089
{LastCacheLoadStrategy.CLEAN_DEVICE}
9190
});
9291
}

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
3737
import org.apache.iotdb.it.framework.IoTDBTestRunner;
3838
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualEnhanced;
39+
import org.apache.iotdb.itbase.env.BaseEnv;
3940
import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
4041
import org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT;
4142
import org.apache.iotdb.rpc.TSStatusCode;
@@ -49,7 +50,9 @@
4950

5051
import java.io.IOException;
5152
import java.util.ArrayList;
53+
import java.util.Arrays;
5254
import java.util.HashMap;
55+
import java.util.HashSet;
5356
import java.util.List;
5457
import java.util.Map;
5558
import java.util.concurrent.TimeUnit;
@@ -1001,4 +1004,34 @@ public void testNegativeTimestamp() throws Exception {
10011004
TableModelUtils.assertData("test", "test", -200, 100, receiverEnv, handleFailure);
10021005
}
10031006
}
1007+
1008+
@Test
1009+
public void testHistoryDataWithEmptyField() {
1010+
TestUtils.executeNonQueries(
1011+
senderEnv,
1012+
Arrays.asList(
1013+
"CREATE DATABASE iot_table_stream_attr",
1014+
"USE iot_table_stream_attr",
1015+
"CREATE TABLE table1 (region STRING TAG, device_id STRING TAG, model_id STRING ATTRIBUTE, maintenance STRING ATTRIBUTE COMMENT 'maintenance', temperature FLOAT FIELD COMMENT 'temperature', humidity STRING ATTRIBUTE COMMENT 'humidity', plant_id STRING TAG) COMMENT 'table1'",
1016+
String.format(
1017+
"create pipe test with source ('inclusion'='all') with sink('node-urls'='%s')",
1018+
receiverEnv.getDataNodeWrapper(0).getIpAndPortString()),
1019+
"select * from table1 order by time",
1020+
"INSERT INTO table1(region, plant_id, device_id, model_id, maintenance, time, temperature, humidity) VALUES ('north', null, 'd101', 'red', null, '2025-11-26 13:38:00', 91.0, null), (null, '1003', null, null, 'maint-a', '2025-11-26 13:39:00', null, '36.2'), (null, null, null, 'green', 'maint-b', '2025-11-26 13:40:00', 88.8, '34.9')",
1021+
"INSERT INTO table1(region, plant_id, device_id, model_id, maintenance, time, temperature, humidity) VALUES ('south', '1005', 'd105', null, null, '2025-11-26 13:41:00', 87.5, null)",
1022+
"INSERT INTO table1(region, plant_id, device_id, model_id, maintenance, time, temperature, humidity) VALUES ('west', '1006', 'd106', 'blue', 'maint-c', '2025-11-26 13:42:00', null, '36.8')"),
1023+
BaseEnv.TABLE_SQL_DIALECT);
1024+
TestUtils.assertDataEventuallyOnEnv(
1025+
receiverEnv,
1026+
"select * from iot_table_stream_attr.table1 order by time",
1027+
"time,region,device_id,model_id,maintenance,temperature,humidity,plant_id,",
1028+
new HashSet<>(
1029+
Arrays.asList(
1030+
"2025-11-26T13:38:00.000Z,north,d101,red,null,91.0,null,null,",
1031+
"2025-11-26T13:39:00.000Z,null,null,null,maint-a,null,36.2,1003,",
1032+
"2025-11-26T13:40:00.000Z,null,null,green,maint-b,88.8,34.9,null,",
1033+
"2025-11-26T13:41:00.000Z,south,d105,null,null,87.5,null,1005,",
1034+
"2025-11-26T13:42:00.000Z,west,d106,blue,maint-c,null,36.8,1006,")),
1035+
(String) null);
1036+
}
10041037
}

integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@
2828

2929
import org.apache.tsfile.enums.ColumnCategory;
3030
import org.apache.tsfile.enums.TSDataType;
31+
import org.apache.tsfile.file.metadata.TableSchema;
3132
import org.apache.tsfile.utils.Pair;
33+
import org.apache.tsfile.write.TsFileWriter;
34+
import org.apache.tsfile.write.record.Tablet;
3235
import org.apache.tsfile.write.schema.IMeasurementSchema;
3336
import org.apache.tsfile.write.schema.MeasurementSchema;
3437
import org.junit.After;
@@ -486,6 +489,76 @@ private String convert2TableSQL(
486489
return tableCreation;
487490
}
488491

492+
@Test
493+
public void testLoadWithAllFieldsNullRows() throws Exception {
494+
final List<IMeasurementSchema> schemas =
495+
Arrays.asList(
496+
new MeasurementSchema("f1", TSDataType.INT32),
497+
new MeasurementSchema("f2", TSDataType.INT64));
498+
final List<ColumnCategory> columnCategories =
499+
Arrays.asList(ColumnCategory.FIELD, ColumnCategory.FIELD);
500+
501+
final File file = new File(tmpDir, "1-0-0-0.tsfile");
502+
final int totalRows = 20;
503+
504+
try (final TsFileWriter tsFileWriter = new TsFileWriter(file)) {
505+
tsFileWriter.registerTableSchema(
506+
new TableSchema(SchemaConfig.TABLE_0, schemas, columnCategories));
507+
508+
final List<String> columnNames = Arrays.asList("f1", "f2");
509+
final List<TSDataType> dataTypes = Arrays.asList(TSDataType.INT32, TSDataType.INT64);
510+
final Tablet tablet =
511+
new Tablet(SchemaConfig.TABLE_0, columnNames, dataTypes, columnCategories);
512+
513+
for (int r = 0; r < totalRows; r++) {
514+
final int row = tablet.getRowSize();
515+
tablet.addTimestamp(row, (r + 1) * 1000L);
516+
}
517+
tsFileWriter.writeTable(tablet);
518+
}
519+
520+
try (final Connection connection =
521+
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
522+
final Statement statement = connection.createStatement()) {
523+
statement.execute(String.format("create database if not exists %s", SchemaConfig.DATABASE_0));
524+
statement.execute(String.format("use %s", SchemaConfig.DATABASE_0));
525+
statement.execute(
526+
String.format(
527+
"load '%s' with ('database'='%s')", file.getAbsolutePath(), SchemaConfig.DATABASE_0));
528+
529+
try (final ResultSet resultSet =
530+
statement.executeQuery(String.format("select count(*) from %s", SchemaConfig.TABLE_0))) {
531+
Assert.assertTrue(resultSet.next());
532+
Assert.assertEquals(totalRows, resultSet.getLong(1));
533+
}
534+
535+
try (final ResultSet resultSet =
536+
statement.executeQuery(
537+
String.format("select count(f1), count(f2) from %s", SchemaConfig.TABLE_0))) {
538+
Assert.assertTrue(resultSet.next());
539+
Assert.assertEquals(0, resultSet.getLong(1));
540+
Assert.assertEquals(0, resultSet.getLong(2));
541+
}
542+
543+
try (final ResultSet resultSet =
544+
statement.executeQuery(
545+
String.format("select time, f1, f2 from %s order by time", SchemaConfig.TABLE_0))) {
546+
int count = 0;
547+
while (resultSet.next()) {
548+
final long time = resultSet.getLong("time");
549+
final int expectedTime = (count + 1) * 1000;
550+
Assert.assertEquals(expectedTime, time);
551+
resultSet.getInt("f1");
552+
Assert.assertTrue(resultSet.wasNull());
553+
resultSet.getLong("f2");
554+
Assert.assertTrue(resultSet.wasNull());
555+
count++;
556+
}
557+
Assert.assertEquals(totalRows, count);
558+
}
559+
}
560+
}
561+
489562
private static class SchemaConfig {
490563
private static final String DATABASE_0 = "root";
491564
private static final String TABLE_0 = "test";

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java

Lines changed: 95 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ public class TsFileInsertionEventTableParserTabletIterator implements Iterator<T
101101
private List<ColumnCategory> columnTypes;
102102
private List<String> measurementList;
103103
private List<TSDataType> dataTypeList;
104+
private List<IMeasurementSchema> fieldSchemaList;
104105
private int deviceIdSize;
105106

106107
private List<ModsOperationUtil.ModsInfo> modsInfoList;
@@ -194,7 +195,7 @@ public boolean hasNext() {
194195

195196
long size = 0;
196197
List<AbstractAlignedChunkMetadata> iChunkMetadataList =
197-
reader.getAlignedChunkMetadata(pair.left, true);
198+
reader.getAlignedChunkMetadata(pair.left, false);
198199

199200
Iterator<AbstractAlignedChunkMetadata> chunkMetadataIterator =
200201
iChunkMetadataList.iterator();
@@ -213,27 +214,7 @@ public boolean hasNext() {
213214
continue;
214215
}
215216

216-
Iterator<IChunkMetadata> iChunkMetadataIterator =
217-
alignedChunkMetadata.getValueChunkMetadataList().iterator();
218-
while (iChunkMetadataIterator.hasNext()) {
219-
IChunkMetadata iChunkMetadata = iChunkMetadataIterator.next();
220-
if (iChunkMetadata == null) {
221-
iChunkMetadataIterator.remove();
222-
continue;
223-
}
224-
225-
if (!modifications.isEmpty()
226-
&& ModsOperationUtil.isAllDeletedByMods(
227-
pair.getLeft(),
228-
iChunkMetadata.getMeasurementUid(),
229-
alignedChunkMetadata.getStartTime(),
230-
alignedChunkMetadata.getEndTime(),
231-
modifications)) {
232-
iChunkMetadataIterator.remove();
233-
}
234-
}
235-
236-
if (alignedChunkMetadata.getValueChunkMetadataList().isEmpty()) {
217+
if (areAllFieldsDeletedByMods(pair.getLeft(), alignedChunkMetadata)) {
237218
chunkMetadataIterator.remove();
238219
continue;
239220
}
@@ -267,6 +248,7 @@ public boolean hasNext() {
267248
dataTypeList = new ArrayList<>();
268249
columnTypes = new ArrayList<>();
269250
measurementList = new ArrayList<>();
251+
fieldSchemaList = new ArrayList<>();
270252

271253
for (int i = 0; i < columnSchemaSize; i++) {
272254
final IMeasurementSchema schema = tableSchema.getColumnSchemas().get(i);
@@ -280,6 +262,9 @@ public boolean hasNext() {
280262
measurementList.add(measurementName);
281263
dataTypeList.add(schema.getType());
282264
}
265+
if (ColumnCategory.FIELD.equals(columnCategory)) {
266+
fieldSchemaList.add(schema);
267+
}
283268
}
284269
}
285270
deviceIdSize = dataTypeList.size();
@@ -331,9 +316,9 @@ private Tablet buildNextTablet() {
331316
tablet =
332317
new Tablet(
333318
tableName,
334-
measurementList,
335-
dataTypeList,
336-
columnTypes,
319+
new ArrayList<>(measurementList),
320+
new ArrayList<>(dataTypeList),
321+
new ArrayList<>(columnTypes),
337322
rowCountAndMemorySize.getLeft());
338323
tablet.initBitMaps();
339324
isFirstRow = false;
@@ -376,6 +361,20 @@ private void initChunkReader(final AbstractAlignedChunkMetadata alignedChunkMeta
376361
long size = timeChunkSize;
377362

378363
final List<Chunk> valueChunkList = new ArrayList<>();
364+
final Map<String, IChunkMetadata> valueChunkMetadataMap =
365+
alignedChunkMetadata.getValueChunkMetadataList().stream()
366+
.filter(Objects::nonNull)
367+
.filter(
368+
metadata ->
369+
!isFieldDeletedByMods(
370+
metadata.getMeasurementUid(),
371+
alignedChunkMetadata.getStartTime(),
372+
alignedChunkMetadata.getEndTime()))
373+
.collect(
374+
Collectors.toMap(
375+
IChunkMetadata::getMeasurementUid,
376+
metadata -> metadata,
377+
(left, right) -> left));
379378

380379
// To ensure that the Tablet has the same alignedChunk column as the current one,
381380
// you need to create a new Tablet to fill in the data.
@@ -392,50 +391,98 @@ private void initChunkReader(final AbstractAlignedChunkMetadata alignedChunkMeta
392391
measurementList.subList(deviceIdSize, measurementList.size()).clear();
393392
dataTypeList.subList(deviceIdSize, dataTypeList.size()).clear();
394393

395-
for (; offset < alignedChunkMetadata.getValueChunkMetadataList().size(); ++offset) {
396-
final IChunkMetadata metadata = alignedChunkMetadata.getValueChunkMetadataList().get(offset);
394+
boolean hasSelectedField = fieldSchemaList.isEmpty();
395+
boolean hasSelectedNonNullChunk = false;
396+
for (; offset < fieldSchemaList.size(); ++offset) {
397+
final IMeasurementSchema schema = fieldSchemaList.get(offset);
398+
if (isFieldDeletedByMods(
399+
schema.getMeasurementName(),
400+
alignedChunkMetadata.getStartTime(),
401+
alignedChunkMetadata.getEndTime())) {
402+
continue;
403+
}
404+
405+
final IChunkMetadata metadata = valueChunkMetadataMap.get(schema.getMeasurementName());
406+
Chunk chunk = null;
397407
if (metadata != null) {
398-
final Chunk chunk = reader.readMemChunk((ChunkMetadata) metadata);
399-
size += PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk);
400-
if (size > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
401-
if (valueChunkList.isEmpty()) {
408+
chunk = reader.readMemChunk((ChunkMetadata) metadata);
409+
final long newSize = size + PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk);
410+
if (newSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
411+
if (!hasSelectedNonNullChunk) {
402412
// If the first chunk exceeds the memory limit, we need to allocate more memory
413+
size = newSize;
403414
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, size);
404-
columnTypes.add(ColumnCategory.FIELD);
405-
measurementList.add(metadata.getMeasurementUid());
406-
dataTypeList.add(metadata.getDataType());
407-
valueChunkList.add(chunk);
408-
++offset;
415+
} else {
416+
break;
409417
}
410-
break;
411418
} else {
412-
// Record the column information corresponding to Meta to fill in Tablet
413-
columnTypes.add(ColumnCategory.FIELD);
414-
measurementList.add(metadata.getMeasurementUid());
415-
dataTypeList.add(metadata.getDataType());
416-
valueChunkList.add(chunk);
419+
size = newSize;
417420
}
421+
hasSelectedNonNullChunk = true;
418422
}
423+
columnTypes.add(ColumnCategory.FIELD);
424+
measurementList.add(schema.getMeasurementName());
425+
dataTypeList.add(schema.getType());
426+
valueChunkList.add(chunk);
427+
hasSelectedField = true;
419428
}
420429

421-
if (offset >= alignedChunkMetadata.getValueChunkMetadataList().size()) {
430+
if (offset >= fieldSchemaList.size()) {
422431
currentChunkMetadata = null;
423432
}
424433

434+
if (!hasSelectedField) {
435+
this.chunkReader = null;
436+
this.batchData = null;
437+
return;
438+
}
439+
425440
this.chunkReader = new TableChunkReader(timeChunk, valueChunkList, null);
426441
this.modsInfoList =
427442
ModsOperationUtil.initializeMeasurementMods(deviceID, measurementList, modifications);
428443
}
429444

445+
private boolean areAllFieldsDeletedByMods(
446+
final IDeviceID currentDeviceID, final AbstractAlignedChunkMetadata alignedChunkMetadata) {
447+
if (modifications.isEmpty() || fieldSchemaList.isEmpty()) {
448+
return false;
449+
}
450+
451+
for (final IMeasurementSchema schema : fieldSchemaList) {
452+
if (!ModsOperationUtil.isAllDeletedByMods(
453+
currentDeviceID,
454+
schema.getMeasurementName(),
455+
alignedChunkMetadata.getStartTime(),
456+
alignedChunkMetadata.getEndTime(),
457+
modifications)) {
458+
return false;
459+
}
460+
}
461+
return true;
462+
}
463+
464+
private boolean isFieldDeletedByMods(
465+
final String measurementID, final long startTime, final long endTime) {
466+
return !modifications.isEmpty()
467+
&& ModsOperationUtil.isAllDeletedByMods(
468+
deviceID, measurementID, startTime, endTime, modifications);
469+
}
470+
430471
private boolean fillMeasurementValueColumns(
431472
final BatchData data, final Tablet tablet, final int rowIndex) {
432-
final TsPrimitiveType[] primitiveTypes = data.getVector();
473+
final TsPrimitiveType[] primitiveTypes =
474+
Objects.nonNull(data.getVector()) ? data.getVector() : new TsPrimitiveType[0];
433475
boolean needFillTime = false;
476+
boolean hasNonDeletedField = dataTypeList.size() == deviceIdSize;
434477

435478
for (int i = deviceIdSize, size = dataTypeList.size(); i < size; i++) {
436-
final TsPrimitiveType primitiveType = primitiveTypes[i - deviceIdSize];
437-
if (primitiveType == null
438-
|| ModsOperationUtil.isDelete(data.currentTime(), modsInfoList.get(i))) {
479+
final TsPrimitiveType primitiveType =
480+
i - deviceIdSize < primitiveTypes.length ? primitiveTypes[i - deviceIdSize] : null;
481+
final boolean isDeleted = ModsOperationUtil.isDelete(data.currentTime(), modsInfoList.get(i));
482+
if (!isDeleted) {
483+
hasNonDeletedField = true;
484+
}
485+
if (primitiveType == null || isDeleted) {
439486
switch (dataTypeList.get(i)) {
440487
case TEXT:
441488
case BLOB:
@@ -480,7 +527,7 @@ private boolean fillMeasurementValueColumns(
480527
throw new UnSupportedDataTypeException("UnSupported" + primitiveType.getDataType());
481528
}
482529
}
483-
return needFillTime;
530+
return needFillTime || hasNonDeletedField;
484531
}
485532

486533
private void fillDeviceIdColumns(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,7 @@ private static long getWritePointCount(
632632
Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadata) {
633633
return device2TimeseriesMetadata.values().stream()
634634
.flatMap(List::stream)
635+
.filter(timeseriesMetadata -> !timeseriesMetadata.getMeasurementId().isEmpty())
635636
.mapToLong(t -> t.getStatistics().getCount())
636637
.sum();
637638
}

0 commit comments

Comments
 (0)