Skip to content

Commit c08dfae

Browse files
committed
fix
1 parent dab45fb commit c08dfae

6 files changed

Lines changed: 129 additions & 83 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.tsfile.write.schema.MeasurementSchema;
3232

3333
import java.time.LocalDate;
34+
import java.util.Arrays;
3435
import java.util.HashMap;
3536
import java.util.List;
3637
import java.util.Map;
@@ -158,6 +159,13 @@ public static BitMap[] compactBitMaps(final BitMap[] bitMaps, final int rowCount
158159
return hasMarkedBitMap ? bitMaps : null;
159160
}
160161

162+
public static BitMap[] copyBitMapsOrCreateEmpty(final Tablet tablet) {
163+
final BitMap[] bitMaps = tablet.getBitMaps();
164+
return Objects.nonNull(bitMaps)
165+
? Arrays.copyOf(bitMaps, bitMaps.length)
166+
: new BitMap[getColumnCount(tablet)];
167+
}
168+
161169
public static void markNullValue(final Tablet tablet, final int rowIndex, final int columnIndex) {
162170
final BitMap[] bitMaps = ensureBitMaps(tablet, columnIndex + 1);
163171
if (Objects.isNull(bitMaps[columnIndex])) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.pipe.sink.util.builder;
2121

2222
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
23+
import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
2324
import org.apache.iotdb.pipe.api.exception.PipeException;
2425

2526
import org.apache.tsfile.enums.ColumnCategory;
@@ -243,7 +244,8 @@ private <T extends Pair<Tablet, List<Pair<IDeviceID, Integer>>>> T tryBestToAggr
243244
aggregatedSchemas.addAll(tablet.getSchemas());
244245
aggregatedColumnCategories.addAll(tablet.getColumnTypes());
245246
aggregatedValues.addAll(Arrays.asList(tablet.getValues()));
246-
aggregatedBitMaps.addAll(Arrays.asList(tablet.getBitMaps()));
247+
aggregatedBitMaps.addAll(
248+
Arrays.asList(PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet)));
247249
// Remove the aggregated tablet
248250
tablets.pollFirst();
249251
} else {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilderV2.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
2424
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
2525
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
26+
import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
2627
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
2728
import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
2829
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
@@ -202,7 +203,7 @@ private void writeTabletsIntoOneFile(
202203
.map(schema -> (MeasurementSchema) schema)
203204
.toArray(MeasurementSchema[]::new);
204205
Object[] values = Arrays.copyOf(tablet.getValues(), tablet.getValues().length);
205-
BitMap[] bitMaps = Arrays.copyOf(tablet.getBitMaps(), tablet.getBitMaps().length);
206+
BitMap[] bitMaps = PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet);
206207
ColumnCategory[] columnCategory = tablet.getColumnTypes().toArray(new ColumnCategory[0]);
207208

208209
// convert date value to int refer to

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.pipe.sink.util.builder;
2121

2222
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
23+
import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
2324

2425
import org.apache.tsfile.exception.write.WriteProcessException;
2526
import org.apache.tsfile.external.commons.io.FileUtils;
@@ -230,7 +231,8 @@ private Tablet tryBestToAggregateTablets(
230231
// Aggregate the current tablet's data
231232
aggregatedSchemas.addAll(tablet.getSchemas());
232233
aggregatedValues.addAll(Arrays.asList(tablet.getValues()));
233-
aggregatedBitMaps.addAll(Arrays.asList(tablet.getBitMaps()));
234+
aggregatedBitMaps.addAll(
235+
Arrays.asList(PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet)));
234236
// Remove the aggregated tablet
235237
tablets.pollFirst();
236238
} else {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilderV2.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.commons.path.PartialPath;
2323
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
2424
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
25+
import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
2526
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
2627
import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
2728
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
@@ -146,7 +147,7 @@ private void writeTabletsIntoOneFile(
146147
.map(schema -> (MeasurementSchema) schema)
147148
.toArray(MeasurementSchema[]::new);
148149
Object[] values = Arrays.copyOf(tablet.getValues(), tablet.getValues().length);
149-
BitMap[] bitMaps = Arrays.copyOf(tablet.getBitMaps(), tablet.getBitMaps().length);
150+
BitMap[] bitMaps = PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet);
150151

151152
// convert date value to int refer to
152153
// org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java

Lines changed: 111 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public class TsFileInsertionEventParserTest {
105105
"iotdb.query.parser.performance.enabled";
106106
private static final String MANUAL_TABLE_PARSER_PERFORMANCE_TEST =
107107
"iotdb.table.parser.performance.enabled";
108+
private static final long BITMAP_TEST_PIPE_MAX_READER_CHUNK_SIZE = 1024 * 1024L;
108109

109110
private File alignedTsFile;
110111
private File nonalignedTsFile;
@@ -290,52 +291,65 @@ public void testScanParserSkipsUnnecessaryBitMaps() throws Exception {
290291

291292
@Test
292293
public void testTableParserSkipsUnnecessaryBitMaps() throws Exception {
293-
alignedTsFile = new File("table-parser-bitmap.tsfile");
294-
if (alignedTsFile.exists()) {
295-
Assert.assertTrue(alignedTsFile.delete());
296-
}
294+
final long originalPipeMaxReaderChunkSize =
295+
PipeConfig.getInstance().getPipeMaxReaderChunkSize();
296+
CommonDescriptor.getInstance()
297+
.getConfig()
298+
.setPipeMaxReaderChunkSize(BITMAP_TEST_PIPE_MAX_READER_CHUNK_SIZE);
297299

298-
final List<IMeasurementSchema> schemaList =
299-
Arrays.asList(
300-
new MeasurementSchema("tag0", TSDataType.STRING),
301-
new MeasurementSchema("dense", TSDataType.INT64),
302-
new MeasurementSchema("sparse", TSDataType.INT64));
303-
final List<String> columnNameList = Arrays.asList("tag0", "dense", "sparse");
304-
final List<TSDataType> dataTypeList =
305-
Arrays.asList(TSDataType.STRING, TSDataType.INT64, TSDataType.INT64);
306-
final List<ColumnCategory> columnCategoryList =
307-
Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD, ColumnCategory.FIELD);
308-
309-
final Tablet tablet =
310-
new Tablet("bitmap_table", columnNameList, dataTypeList, columnCategoryList, 2);
311-
for (int rowIndex = 0; rowIndex < 2; ++rowIndex) {
312-
tablet.addTimestamp(rowIndex, rowIndex);
313-
tablet.addValue(rowIndex, 0, "tag-value");
314-
tablet.addValue(rowIndex, 1, (long) rowIndex);
315-
tablet.addValue("sparse", rowIndex, rowIndex == 0 ? 100L : null);
316-
}
300+
try {
301+
alignedTsFile = new File("table-parser-bitmap.tsfile");
302+
if (alignedTsFile.exists()) {
303+
Assert.assertTrue(alignedTsFile.delete());
304+
}
317305

318-
try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
319-
writer.registerTableSchema(new TableSchema("bitmap_table", schemaList, columnCategoryList));
320-
writer.writeTable(tablet);
321-
}
306+
final List<IMeasurementSchema> schemaList =
307+
Arrays.asList(
308+
new MeasurementSchema("tag0", TSDataType.STRING),
309+
new MeasurementSchema("dense", TSDataType.INT64),
310+
new MeasurementSchema("sparse", TSDataType.INT64));
311+
final List<String> columnNameList = Arrays.asList("tag0", "dense", "sparse");
312+
final List<TSDataType> dataTypeList =
313+
Arrays.asList(TSDataType.STRING, TSDataType.INT64, TSDataType.INT64);
314+
final List<ColumnCategory> columnCategoryList =
315+
Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD, ColumnCategory.FIELD);
316+
317+
final Tablet tablet =
318+
new Tablet("bitmap_table", columnNameList, dataTypeList, columnCategoryList, 2);
319+
for (int rowIndex = 0; rowIndex < 2; ++rowIndex) {
320+
tablet.addTimestamp(rowIndex, rowIndex);
321+
tablet.addValue(rowIndex, 0, "tag-value");
322+
tablet.addValue(rowIndex, 1, (long) rowIndex);
323+
tablet.addValue("sparse", rowIndex, rowIndex == 0 ? 100L : null);
324+
}
322325

323-
try (final TsFileInsertionEventTableParser parser =
324-
new TsFileInsertionEventTableParser(
325-
alignedTsFile,
326-
new TablePattern(true, null, null),
327-
Long.MIN_VALUE,
328-
Long.MAX_VALUE,
329-
null,
330-
null,
331-
null,
332-
false)) {
333-
final Iterator<TabletInsertionEvent> iterator = parser.toTabletInsertionEvents().iterator();
334-
Assert.assertTrue(iterator.hasNext());
335-
final Tablet parsedTablet = ((PipeRawTabletInsertionEvent) iterator.next()).convertToTablet();
336-
assertBitMapExistence(parsedTablet, false, false, true);
337-
Assert.assertTrue(parsedTablet.isNull(1, 2));
338-
Assert.assertFalse(iterator.hasNext());
326+
try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
327+
writer.registerTableSchema(new TableSchema("bitmap_table", schemaList, columnCategoryList));
328+
writer.writeTable(tablet);
329+
}
330+
331+
try (final TsFileInsertionEventTableParser parser =
332+
new TsFileInsertionEventTableParser(
333+
alignedTsFile,
334+
new TablePattern(true, null, null),
335+
Long.MIN_VALUE,
336+
Long.MAX_VALUE,
337+
null,
338+
null,
339+
null,
340+
false)) {
341+
final Iterator<TabletInsertionEvent> iterator = parser.toTabletInsertionEvents().iterator();
342+
Assert.assertTrue(iterator.hasNext());
343+
final Tablet parsedTablet =
344+
((PipeRawTabletInsertionEvent) iterator.next()).convertToTablet();
345+
assertBitMapExistence(parsedTablet, false, false, true);
346+
Assert.assertTrue(parsedTablet.isNull(1, 2));
347+
Assert.assertFalse(iterator.hasNext());
348+
}
349+
} finally {
350+
CommonDescriptor.getInstance()
351+
.getConfig()
352+
.setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
339353
}
340354
}
341355

@@ -1034,45 +1048,63 @@ private void testPartialNullValue(final boolean isQuery) throws Exception {
10341048
}
10351049

10361050
private void testTreeParserSkipsUnnecessaryBitMaps(final boolean isQuery) throws Exception {
1037-
alignedTsFile = new File(isQuery ? "query-parser-bitmap.tsfile" : "scan-parser-bitmap.tsfile");
1038-
if (alignedTsFile.exists()) {
1039-
Assert.assertTrue(alignedTsFile.delete());
1040-
}
1051+
final long originalPipeMaxReaderChunkSize =
1052+
PipeConfig.getInstance().getPipeMaxReaderChunkSize();
1053+
CommonDescriptor.getInstance()
1054+
.getConfig()
1055+
.setPipeMaxReaderChunkSize(BITMAP_TEST_PIPE_MAX_READER_CHUNK_SIZE);
10411056

1042-
final List<IMeasurementSchema> schemaList =
1043-
Arrays.asList(
1044-
new MeasurementSchema("dense", TSDataType.INT64),
1045-
new MeasurementSchema("sparse", TSDataType.INT64));
1046-
final Tablet tablet = new Tablet("root.sg.d", schemaList, 2);
1047-
for (int rowIndex = 0; rowIndex < 2; ++rowIndex) {
1048-
tablet.addTimestamp(rowIndex, rowIndex);
1049-
tablet.addValue("dense", rowIndex, (long) rowIndex);
1050-
tablet.addValue("sparse", rowIndex, rowIndex == 0 ? 100L : null);
1051-
}
1057+
try {
1058+
alignedTsFile =
1059+
new File(isQuery ? "query-parser-bitmap.tsfile" : "scan-parser-bitmap.tsfile");
1060+
if (alignedTsFile.exists()) {
1061+
Assert.assertTrue(alignedTsFile.delete());
1062+
}
10521063

1053-
try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
1054-
writer.registerAlignedTimeseries(new PartialPath("root.sg.d"), schemaList);
1055-
writer.writeAligned(tablet);
1056-
}
1064+
final List<IMeasurementSchema> schemaList =
1065+
Arrays.asList(
1066+
new MeasurementSchema("dense", TSDataType.INT64),
1067+
new MeasurementSchema("sparse", TSDataType.INT64));
1068+
final Tablet tablet = new Tablet("root.sg.d", schemaList, 2);
1069+
for (int rowIndex = 0; rowIndex < 2; ++rowIndex) {
1070+
tablet.addTimestamp(rowIndex, rowIndex);
1071+
tablet.addValue("dense", rowIndex, (long) rowIndex);
1072+
tablet.addValue("sparse", rowIndex, rowIndex == 0 ? 100L : null);
1073+
}
10571074

1058-
try (final TsFileInsertionEventParser parser =
1059-
isQuery
1060-
? new TsFileInsertionEventQueryParser(
1061-
alignedTsFile, new PrefixTreePattern("root"), Long.MIN_VALUE, Long.MAX_VALUE, null)
1062-
: new TsFileInsertionEventScanParser(
1063-
alignedTsFile,
1064-
new PrefixTreePattern("root"),
1065-
Long.MIN_VALUE,
1066-
Long.MAX_VALUE,
1067-
null,
1068-
null,
1069-
false)) {
1070-
final Iterator<TabletInsertionEvent> iterator = parser.toTabletInsertionEvents().iterator();
1071-
Assert.assertTrue(iterator.hasNext());
1072-
final Tablet parsedTablet = ((PipeRawTabletInsertionEvent) iterator.next()).convertToTablet();
1073-
assertBitMapExistence(parsedTablet, false, true);
1074-
Assert.assertTrue(parsedTablet.isNull(1, 1));
1075-
Assert.assertFalse(iterator.hasNext());
1075+
try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
1076+
writer.registerAlignedTimeseries(new PartialPath("root.sg.d"), schemaList);
1077+
writer.writeAligned(tablet);
1078+
}
1079+
1080+
try (final TsFileInsertionEventParser parser =
1081+
isQuery
1082+
? new TsFileInsertionEventQueryParser(
1083+
alignedTsFile,
1084+
new PrefixTreePattern("root"),
1085+
Long.MIN_VALUE,
1086+
Long.MAX_VALUE,
1087+
null)
1088+
: new TsFileInsertionEventScanParser(
1089+
alignedTsFile,
1090+
new PrefixTreePattern("root"),
1091+
Long.MIN_VALUE,
1092+
Long.MAX_VALUE,
1093+
null,
1094+
null,
1095+
false)) {
1096+
final Iterator<TabletInsertionEvent> iterator = parser.toTabletInsertionEvents().iterator();
1097+
Assert.assertTrue(iterator.hasNext());
1098+
final Tablet parsedTablet =
1099+
((PipeRawTabletInsertionEvent) iterator.next()).convertToTablet();
1100+
assertBitMapExistence(parsedTablet, false, true);
1101+
Assert.assertTrue(parsedTablet.isNull(1, 1));
1102+
Assert.assertFalse(iterator.hasNext());
1103+
}
1104+
} finally {
1105+
CommonDescriptor.getInstance()
1106+
.getConfig()
1107+
.setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
10761108
}
10771109
}
10781110

0 commit comments

Comments
 (0)