Skip to content

Commit f51a15f

Browse files
authored
Load: Optimized the partial path split logic in modifications coverage judgment (#16212)
* optimize * shit * test * revert * opti * fix
1 parent 04b7e23 commit f51a15f

4 files changed

Lines changed: 85 additions & 42 deletions

File tree

integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.it.utils;
2121

22+
import org.apache.iotdb.commons.conf.IoTDBConstant;
2223
import org.apache.iotdb.commons.exception.IllegalPathException;
2324
import org.apache.iotdb.commons.path.PartialPath;
2425
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
@@ -248,6 +249,36 @@ private void generateTEXT(final Object obj, final int row) {
248249
new Binary(String.format("test point %d", random.nextInt()), TSFileConfig.STRING_CHARSET);
249250
}
250251

252+
public void generateDeletion(final String device) throws IOException, IllegalPathException {
253+
try (final ModificationFile modificationFile =
254+
new ModificationFile(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX)) {
255+
modificationFile.write(
256+
new Deletion(
257+
new PartialPath(
258+
device + TsFileConstant.PATH_SEPARATOR + IoTDBConstant.ONE_LEVEL_PATH_WILDCARD),
259+
tsFile.length(),
260+
Long.MIN_VALUE,
261+
Long.MAX_VALUE));
262+
device2TimeSet.remove(device);
263+
device2MeasurementSchema.remove(device);
264+
}
265+
}
266+
267+
public void generateDeletion(final String device, final MeasurementSchema measurement)
268+
throws IOException, IllegalPathException {
269+
try (final ModificationFile modificationFile =
270+
new ModificationFile(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX)) {
271+
modificationFile.write(
272+
new Deletion(
273+
new PartialPath(
274+
device + TsFileConstant.PATH_SEPARATOR + measurement.getMeasurementId()),
275+
tsFile.length(),
276+
Long.MIN_VALUE,
277+
Long.MAX_VALUE));
278+
device2MeasurementSchema.get(device).remove(measurement);
279+
}
280+
}
281+
251282
public void generateDeletion(final String device, final int number)
252283
throws IOException, IllegalPathException {
253284
try (final ModificationFile modificationFile =

integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.it;
2121

2222
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
23+
import org.apache.iotdb.db.it.utils.TestUtils;
2324
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
2425
import org.apache.iotdb.it.env.EnvFactory;
2526
import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -781,16 +782,19 @@ public void testLoadWithMods() throws Exception {
781782
generator.registerTimeseries(
782783
SchemaConfig.DEVICE_3, Collections.singletonList(SchemaConfig.MEASUREMENT_30));
783784
generator.registerAlignedTimeseries(
784-
SchemaConfig.DEVICE_4, Collections.singletonList(SchemaConfig.MEASUREMENT_40));
785+
SchemaConfig.DEVICE_4,
786+
new ArrayList<>(Arrays.asList(SchemaConfig.MEASUREMENT_30, SchemaConfig.MEASUREMENT_40)));
785787
generator.generateData(SchemaConfig.DEVICE_2, 100, PARTITION_INTERVAL / 10_000, false);
786788
generator.generateData(SchemaConfig.DEVICE_3, 100, PARTITION_INTERVAL / 10_000, false);
789+
generator.generateDeletion(SchemaConfig.DEVICE_3);
787790
generator.generateData(SchemaConfig.DEVICE_4, 100, PARTITION_INTERVAL / 10_000, true);
788791
generator.generateDeletion(SchemaConfig.DEVICE_2, 2);
789792
generator.generateDeletion(SchemaConfig.DEVICE_4, 2);
790793
generator.generateData(SchemaConfig.DEVICE_2, 100, PARTITION_INTERVAL / 10_000, false);
791794
generator.generateData(SchemaConfig.DEVICE_4, 100, PARTITION_INTERVAL / 10_000, true);
792795
generator.generateDeletion(SchemaConfig.DEVICE_2, 2);
793796
generator.generateDeletion(SchemaConfig.DEVICE_4, 2);
797+
generator.generateDeletion(SchemaConfig.DEVICE_4, SchemaConfig.MEASUREMENT_30);
794798
writtenPoint2 = generator.getTotalNumber();
795799
}
796800

@@ -810,6 +814,10 @@ public void testLoadWithMods() throws Exception {
810814
Assert.fail("This ResultSet is empty.");
811815
}
812816
}
817+
818+
TestUtils.assertSingleResultSetEqual(
819+
TestUtils.executeQueryWithRetry(statement, "count timeSeries"),
820+
Collections.singletonMap("count(timeseries)", "18"));
813821
}
814822
}
815823

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.iotdb.commons.consensus.ConfigRegionId;
2929
import org.apache.iotdb.commons.exception.IllegalPathException;
3030
import org.apache.iotdb.commons.path.PartialPath;
31+
import org.apache.iotdb.commons.path.PatternTreeMap;
3132
import org.apache.iotdb.commons.schema.SchemaConstant;
3233
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
3334
import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
@@ -58,7 +59,6 @@
5859
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
5960
import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement;
6061
import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement;
61-
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
6262
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
6363
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
6464
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
@@ -73,6 +73,7 @@
7373
import org.apache.iotdb.db.utils.ModificationUtils;
7474
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
7575
import org.apache.iotdb.db.utils.constant.SqlConstant;
76+
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
7677
import org.apache.iotdb.rpc.RpcUtils;
7778
import org.apache.iotdb.rpc.TSStatusCode;
7879

@@ -97,7 +98,6 @@
9798
import java.io.IOException;
9899
import java.util.ArrayList;
99100
import java.util.Arrays;
100-
import java.util.Collection;
101101
import java.util.Collections;
102102
import java.util.HashMap;
103103
import java.util.HashSet;
@@ -527,10 +527,10 @@ public void setCurrentTimeIndex(final ITimeIndex timeIndex) {
527527

528528
public void autoCreateAndVerify(
529529
TsFileSequenceReader reader,
530-
Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadataList)
530+
Map<IDeviceID, List<TimeseriesMetadata>> device2TimeSeriesMetadataList)
531531
throws IOException, AuthException, LoadAnalyzeTypeMismatchException {
532532
for (final Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
533-
device2TimeseriesMetadataList.entrySet()) {
533+
device2TimeSeriesMetadataList.entrySet()) {
534534
final IDeviceID device = entry.getKey();
535535

536536
try {
@@ -546,15 +546,15 @@ public void autoCreateAndVerify(
546546

547547
for (final TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
548548
try {
549-
if (schemaCache.isTimeseriesDeletedByMods(device, timeseriesMetadata)) {
549+
if (schemaCache.isTimeSeriesDeletedByMods(device, timeseriesMetadata)) {
550550
continue;
551551
}
552552
} catch (IllegalPathException e) {
553553
// In aligned devices, there may be empty measurements which will cause
554554
// IllegalPathException.
555555
if (!timeseriesMetadata.getMeasurementId().isEmpty()) {
556556
LOGGER.warn(
557-
"Failed to check if device {}, timeseries {} is deleted by mods. Will see it as not deleted.",
557+
"Failed to check if device {}, timeSeries {} is deleted by mods. Will see it as not deleted.",
558558
device,
559559
timeseriesMetadata.getMeasurementId(),
560560
e);
@@ -926,7 +926,7 @@ private static class LoadTsFileAnalyzeSchemaCache {
926926
private Map<IDeviceID, Boolean> tsFileDevice2IsAligned;
927927
private Set<PartialPath> alreadySetDatabases;
928928

929-
private Collection<Modification> currentModifications;
929+
private PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> currentModifications;
930930
private ITimeIndex currentTimeIndex;
931931

932932
private long batchDevice2TimeSeriesSchemasMemoryUsageSizeInBytes = 0;
@@ -944,7 +944,7 @@ public LoadTsFileAnalyzeSchemaCache() throws LoadRuntimeOutOfMemoryException {
944944
this.currentBatchDevice2TimeSeriesSchemas = new HashMap<>();
945945
this.tsFileDevice2IsAligned = new HashMap<>();
946946
this.alreadySetDatabases = new HashSet<>();
947-
this.currentModifications = new ArrayList<>();
947+
this.currentModifications = PatternTreeMapFactory.getModsPatternTreeMap();
948948
}
949949

950950
public Map<IDeviceID, Set<MeasurementSchema>> getDevice2TimeSeries() {
@@ -1003,10 +1003,13 @@ public void addIsAlignedCache(IDeviceID device, boolean isAligned, boolean addIf
10031003
public void setCurrentModificationsAndTimeIndex(TsFileResource resource) throws IOException {
10041004
clearModificationsAndTimeIndex();
10051005

1006-
currentModifications = resource.getModFile().getModifications();
1007-
for (final Modification modification : currentModifications) {
1008-
currentModificationsMemoryUsageSizeInBytes += ((Deletion) modification).getSerializedSize();
1009-
}
1006+
resource
1007+
.getModFile()
1008+
.getModifications()
1009+
.forEach(
1010+
modification -> currentModifications.append(modification.getPath(), modification));
1011+
1012+
currentModificationsMemoryUsageSizeInBytes = currentModifications.ramBytesUsed();
10101013
block.addMemoryUsage(currentModificationsMemoryUsageSizeInBytes);
10111014

10121015
if (resource.resourceFileExists()) {
@@ -1028,9 +1031,9 @@ public boolean isDeviceDeletedByMods(IDeviceID device) throws IllegalPathExcepti
10281031
currentModifications, currentTimeIndex, device);
10291032
}
10301033

1031-
public boolean isTimeseriesDeletedByMods(
1034+
public boolean isTimeSeriesDeletedByMods(
10321035
IDeviceID device, TimeseriesMetadata timeseriesMetadata) throws IllegalPathException {
1033-
return ModificationUtils.isTimeseriesDeletedByMods(
1036+
return ModificationUtils.isTimeSeriesDeletedByMods(
10341037
currentModifications,
10351038
device,
10361039
timeseriesMetadata.getMeasurementId(),
@@ -1068,7 +1071,7 @@ public void clearTimeSeries() {
10681071
}
10691072

10701073
public void clearModificationsAndTimeIndex() {
1071-
currentModifications.clear();
1074+
currentModifications = PatternTreeMapFactory.getModsPatternTreeMap();
10721075
currentTimeIndex = null;
10731076
block.reduceMemoryUsage(currentModificationsMemoryUsageSizeInBytes);
10741077
block.reduceMemoryUsage(currentTimeIndexMemoryUsageSizeInBytes);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,18 @@
1919

2020
package org.apache.iotdb.db.utils;
2121

22-
import org.apache.iotdb.commons.conf.IoTDBConstant;
2322
import org.apache.iotdb.commons.exception.IllegalPathException;
2423
import org.apache.iotdb.commons.path.AlignedPath;
2524
import org.apache.iotdb.commons.path.MeasurementPath;
2625
import org.apache.iotdb.commons.path.PartialPath;
26+
import org.apache.iotdb.commons.path.PatternTreeMap;
27+
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
2728
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.SettleSelectorImpl;
2829
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
2930
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
3031
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
3132
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
33+
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
3234

3335
import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
3436
import org.apache.tsfile.file.metadata.IChunkMetadata;
@@ -37,7 +39,6 @@
3739
import org.apache.tsfile.utils.Pair;
3840

3941
import java.util.ArrayList;
40-
import java.util.Collection;
4142
import java.util.List;
4243
import java.util.Objects;
4344

@@ -195,33 +196,31 @@ public static boolean isPointDeleted(long timestamp, List<TimeRange> deletionLis
195196
* There are some slight differences from that in {@link SettleSelectorImpl}.
196197
*/
197198
public static boolean isDeviceDeletedByMods(
198-
Collection<Modification> modifications, IDeviceID device, long startTime, long endTime)
199+
final PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> modifications,
200+
final IDeviceID device,
201+
final long startTime,
202+
final long endTime)
199203
throws IllegalPathException {
200-
for (Modification modification : modifications) {
201-
PartialPath path = modification.getPath();
202-
if (path.include(new PartialPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD))
203-
&& ((Deletion) modification).getTimeRange().contains(startTime, endTime)) {
204-
return true;
205-
}
206-
}
207-
return false;
204+
final List<Modification> mods =
205+
modifications.getOverlapped(
206+
CompactionPathUtils.getPath(device, AlignedPath.VECTOR_PLACEHOLDER));
207+
return mods.stream()
208+
.anyMatch(
209+
modification -> ((Deletion) modification).getTimeRange().contains(startTime, endTime));
208210
}
209211

210-
public static boolean isTimeseriesDeletedByMods(
211-
Collection<Modification> modifications,
212-
IDeviceID device,
213-
String timeseriesId,
214-
long startTime,
215-
long endTime)
212+
public static boolean isTimeSeriesDeletedByMods(
213+
final PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> modifications,
214+
final IDeviceID device,
215+
final String timeSeriesId,
216+
final long startTime,
217+
final long endTime)
216218
throws IllegalPathException {
217-
for (Modification modification : modifications) {
218-
PartialPath path = modification.getPath();
219-
if (path.include(new PartialPath(device, timeseriesId))
220-
&& ((Deletion) modification).getTimeRange().contains(startTime, endTime)) {
221-
return true;
222-
}
223-
}
224-
return false;
219+
final List<Modification> mods =
220+
modifications.getOverlapped(CompactionPathUtils.getPath(device, timeSeriesId));
221+
return mods.stream()
222+
.anyMatch(
223+
modification -> ((Deletion) modification).getTimeRange().contains(startTime, endTime));
225224
}
226225

227226
private static void doModifyChunkMetaData(Modification modification, IChunkMetadata metaData) {
@@ -298,7 +297,9 @@ private static List<Modification> getModificationsForMemtable(
298297
}
299298

300299
public static boolean isDeviceDeletedByMods(
301-
Collection<Modification> currentModifications, ITimeIndex currentTimeIndex, IDeviceID device)
300+
PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> currentModifications,
301+
ITimeIndex currentTimeIndex,
302+
IDeviceID device)
302303
throws IllegalPathException {
303304
return isDeviceDeletedByMods(
304305
currentModifications,

0 commit comments

Comments
 (0)