Skip to content

Commit ca91f4c

Browse files
CaideyipiJackieTien97
authored andcommitted
Load: Optimized the partial path split logic in modifications coverage judgment (#16212)
* optimize * shit * test * revert * opti * fix
1 parent c0d7088 commit ca91f4c

4 files changed

Lines changed: 85 additions & 42 deletions

File tree

integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.it.utils;
2121

22+
import org.apache.iotdb.commons.conf.IoTDBConstant;
2223
import org.apache.iotdb.commons.exception.IllegalPathException;
2324
import org.apache.iotdb.commons.path.PartialPath;
2425
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
@@ -248,6 +249,36 @@ private void generateTEXT(final Object obj, final int row) {
248249
new Binary(String.format("test point %d", random.nextInt()), TSFileConfig.STRING_CHARSET);
249250
}
250251

252+
public void generateDeletion(final String device) throws IOException, IllegalPathException {
253+
try (final ModificationFile modificationFile =
254+
new ModificationFile(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX)) {
255+
modificationFile.write(
256+
new Deletion(
257+
new PartialPath(
258+
device + TsFileConstant.PATH_SEPARATOR + IoTDBConstant.ONE_LEVEL_PATH_WILDCARD),
259+
tsFile.length(),
260+
Long.MIN_VALUE,
261+
Long.MAX_VALUE));
262+
device2TimeSet.remove(device);
263+
device2MeasurementSchema.remove(device);
264+
}
265+
}
266+
267+
public void generateDeletion(final String device, final MeasurementSchema measurement)
268+
throws IOException, IllegalPathException {
269+
try (final ModificationFile modificationFile =
270+
new ModificationFile(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX)) {
271+
modificationFile.write(
272+
new Deletion(
273+
new PartialPath(
274+
device + TsFileConstant.PATH_SEPARATOR + measurement.getMeasurementId()),
275+
tsFile.length(),
276+
Long.MIN_VALUE,
277+
Long.MAX_VALUE));
278+
device2MeasurementSchema.get(device).remove(measurement);
279+
}
280+
}
281+
251282
public void generateDeletion(final String device, final int number)
252283
throws IOException, IllegalPathException {
253284
try (final ModificationFile modificationFile =

integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.it;
2121

2222
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
23+
import org.apache.iotdb.db.it.utils.TestUtils;
2324
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
2425
import org.apache.iotdb.it.env.EnvFactory;
2526
import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -781,16 +782,19 @@ public void testLoadWithMods() throws Exception {
781782
generator.registerTimeseries(
782783
SchemaConfig.DEVICE_3, Collections.singletonList(SchemaConfig.MEASUREMENT_30));
783784
generator.registerAlignedTimeseries(
784-
SchemaConfig.DEVICE_4, Collections.singletonList(SchemaConfig.MEASUREMENT_40));
785+
SchemaConfig.DEVICE_4,
786+
new ArrayList<>(Arrays.asList(SchemaConfig.MEASUREMENT_30, SchemaConfig.MEASUREMENT_40)));
785787
generator.generateData(SchemaConfig.DEVICE_2, 100, PARTITION_INTERVAL / 10_000, false);
786788
generator.generateData(SchemaConfig.DEVICE_3, 100, PARTITION_INTERVAL / 10_000, false);
789+
generator.generateDeletion(SchemaConfig.DEVICE_3);
787790
generator.generateData(SchemaConfig.DEVICE_4, 100, PARTITION_INTERVAL / 10_000, true);
788791
generator.generateDeletion(SchemaConfig.DEVICE_2, 2);
789792
generator.generateDeletion(SchemaConfig.DEVICE_4, 2);
790793
generator.generateData(SchemaConfig.DEVICE_2, 100, PARTITION_INTERVAL / 10_000, false);
791794
generator.generateData(SchemaConfig.DEVICE_4, 100, PARTITION_INTERVAL / 10_000, true);
792795
generator.generateDeletion(SchemaConfig.DEVICE_2, 2);
793796
generator.generateDeletion(SchemaConfig.DEVICE_4, 2);
797+
generator.generateDeletion(SchemaConfig.DEVICE_4, SchemaConfig.MEASUREMENT_30);
794798
writtenPoint2 = generator.getTotalNumber();
795799
}
796800

@@ -810,6 +814,10 @@ public void testLoadWithMods() throws Exception {
810814
Assert.fail("This ResultSet is empty.");
811815
}
812816
}
817+
818+
TestUtils.assertSingleResultSetEqual(
819+
TestUtils.executeQueryWithRetry(statement, "count timeSeries"),
820+
Collections.singletonMap("count(timeseries)", "18"));
813821
}
814822
}
815823

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.iotdb.commons.consensus.ConfigRegionId;
2929
import org.apache.iotdb.commons.exception.IllegalPathException;
3030
import org.apache.iotdb.commons.path.PartialPath;
31+
import org.apache.iotdb.commons.path.PatternTreeMap;
3132
import org.apache.iotdb.commons.schema.SchemaConstant;
3233
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
3334
import org.apache.iotdb.commons.utils.RetryUtils;
@@ -58,7 +59,6 @@
5859
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
5960
import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement;
6061
import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement;
61-
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
6262
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
6363
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
6464
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
@@ -72,6 +72,7 @@
7272
import org.apache.iotdb.db.utils.ModificationUtils;
7373
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
7474
import org.apache.iotdb.db.utils.constant.SqlConstant;
75+
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
7576
import org.apache.iotdb.rpc.RpcUtils;
7677
import org.apache.iotdb.rpc.TSStatusCode;
7778

@@ -96,7 +97,6 @@
9697
import java.io.IOException;
9798
import java.util.ArrayList;
9899
import java.util.Arrays;
99-
import java.util.Collection;
100100
import java.util.Collections;
101101
import java.util.HashMap;
102102
import java.util.HashSet;
@@ -582,10 +582,10 @@ public void setCurrentTimeIndex(final ITimeIndex timeIndex) {
582582

583583
public void autoCreateAndVerify(
584584
TsFileSequenceReader reader,
585-
Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadataList)
585+
Map<IDeviceID, List<TimeseriesMetadata>> device2TimeSeriesMetadataList)
586586
throws IOException, AuthException, LoadAnalyzeTypeMismatchException {
587587
for (final Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
588-
device2TimeseriesMetadataList.entrySet()) {
588+
device2TimeSeriesMetadataList.entrySet()) {
589589
final IDeviceID device = entry.getKey();
590590

591591
try {
@@ -601,15 +601,15 @@ public void autoCreateAndVerify(
601601

602602
for (final TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
603603
try {
604-
if (schemaCache.isTimeseriesDeletedByMods(device, timeseriesMetadata)) {
604+
if (schemaCache.isTimeSeriesDeletedByMods(device, timeseriesMetadata)) {
605605
continue;
606606
}
607607
} catch (IllegalPathException e) {
608608
// In aligned devices, there may be empty measurements which will cause
609609
// IllegalPathException.
610610
if (!timeseriesMetadata.getMeasurementId().isEmpty()) {
611611
LOGGER.warn(
612-
"Failed to check if device {}, timeseries {} is deleted by mods. Will see it as not deleted.",
612+
"Failed to check if device {}, timeSeries {} is deleted by mods. Will see it as not deleted.",
613613
device,
614614
timeseriesMetadata.getMeasurementId(),
615615
e);
@@ -974,7 +974,7 @@ private static class LoadTsFileAnalyzeSchemaCache {
974974
private Map<IDeviceID, Boolean> tsFileDevice2IsAligned;
975975
private Set<PartialPath> alreadySetDatabases;
976976

977-
private Collection<Modification> currentModifications;
977+
private PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> currentModifications;
978978
private ITimeIndex currentTimeIndex;
979979

980980
private long batchDevice2TimeSeriesSchemasMemoryUsageSizeInBytes = 0;
@@ -992,7 +992,7 @@ public LoadTsFileAnalyzeSchemaCache() throws LoadRuntimeOutOfMemoryException {
992992
this.currentBatchDevice2TimeSeriesSchemas = new HashMap<>();
993993
this.tsFileDevice2IsAligned = new HashMap<>();
994994
this.alreadySetDatabases = new HashSet<>();
995-
this.currentModifications = new ArrayList<>();
995+
this.currentModifications = PatternTreeMapFactory.getModsPatternTreeMap();
996996
}
997997

998998
public Map<IDeviceID, Set<MeasurementSchema>> getDevice2TimeSeries() {
@@ -1051,10 +1051,13 @@ public void addIsAlignedCache(IDeviceID device, boolean isAligned, boolean addIf
10511051
public void setCurrentModificationsAndTimeIndex(TsFileResource resource) throws IOException {
10521052
clearModificationsAndTimeIndex();
10531053

1054-
currentModifications = resource.getModFile().getModifications();
1055-
for (final Modification modification : currentModifications) {
1056-
currentModificationsMemoryUsageSizeInBytes += ((Deletion) modification).getSerializedSize();
1057-
}
1054+
resource
1055+
.getModFile()
1056+
.getModifications()
1057+
.forEach(
1058+
modification -> currentModifications.append(modification.getPath(), modification));
1059+
1060+
currentModificationsMemoryUsageSizeInBytes = currentModifications.ramBytesUsed();
10581061
block.addMemoryUsage(currentModificationsMemoryUsageSizeInBytes);
10591062

10601063
if (resource.resourceFileExists()) {
@@ -1076,9 +1079,9 @@ public boolean isDeviceDeletedByMods(IDeviceID device) throws IllegalPathExcepti
10761079
currentModifications, currentTimeIndex, device);
10771080
}
10781081

1079-
public boolean isTimeseriesDeletedByMods(
1082+
public boolean isTimeSeriesDeletedByMods(
10801083
IDeviceID device, TimeseriesMetadata timeseriesMetadata) throws IllegalPathException {
1081-
return ModificationUtils.isTimeseriesDeletedByMods(
1084+
return ModificationUtils.isTimeSeriesDeletedByMods(
10821085
currentModifications,
10831086
device,
10841087
timeseriesMetadata.getMeasurementId(),
@@ -1116,7 +1119,7 @@ public void clearTimeSeries() {
11161119
}
11171120

11181121
public void clearModificationsAndTimeIndex() {
1119-
currentModifications.clear();
1122+
currentModifications = PatternTreeMapFactory.getModsPatternTreeMap();
11201123
currentTimeIndex = null;
11211124
block.reduceMemoryUsage(currentModificationsMemoryUsageSizeInBytes);
11221125
block.reduceMemoryUsage(currentTimeIndexMemoryUsageSizeInBytes);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,18 @@
1919

2020
package org.apache.iotdb.db.utils;
2121

22-
import org.apache.iotdb.commons.conf.IoTDBConstant;
2322
import org.apache.iotdb.commons.exception.IllegalPathException;
2423
import org.apache.iotdb.commons.path.AlignedPath;
2524
import org.apache.iotdb.commons.path.MeasurementPath;
2625
import org.apache.iotdb.commons.path.PartialPath;
26+
import org.apache.iotdb.commons.path.PatternTreeMap;
27+
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
2728
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.SettleSelectorImpl;
2829
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
2930
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
3031
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
3132
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
33+
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
3234

3335
import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
3436
import org.apache.tsfile.file.metadata.IChunkMetadata;
@@ -37,7 +39,6 @@
3739
import org.apache.tsfile.utils.Pair;
3840

3941
import java.util.ArrayList;
40-
import java.util.Collection;
4142
import java.util.List;
4243
import java.util.Objects;
4344

@@ -195,33 +196,31 @@ public static boolean isPointDeleted(long timestamp, List<TimeRange> deletionLis
195196
* There are some slight differences from that in {@link SettleSelectorImpl}.
196197
*/
197198
public static boolean isDeviceDeletedByMods(
198-
Collection<Modification> modifications, IDeviceID device, long startTime, long endTime)
199+
final PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> modifications,
200+
final IDeviceID device,
201+
final long startTime,
202+
final long endTime)
199203
throws IllegalPathException {
200-
for (Modification modification : modifications) {
201-
PartialPath path = modification.getPath();
202-
if (path.include(new PartialPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD))
203-
&& ((Deletion) modification).getTimeRange().contains(startTime, endTime)) {
204-
return true;
205-
}
206-
}
207-
return false;
204+
final List<Modification> mods =
205+
modifications.getOverlapped(
206+
CompactionPathUtils.getPath(device, AlignedPath.VECTOR_PLACEHOLDER));
207+
return mods.stream()
208+
.anyMatch(
209+
modification -> ((Deletion) modification).getTimeRange().contains(startTime, endTime));
208210
}
209211

210-
public static boolean isTimeseriesDeletedByMods(
211-
Collection<Modification> modifications,
212-
IDeviceID device,
213-
String timeseriesId,
214-
long startTime,
215-
long endTime)
212+
public static boolean isTimeSeriesDeletedByMods(
213+
final PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> modifications,
214+
final IDeviceID device,
215+
final String timeSeriesId,
216+
final long startTime,
217+
final long endTime)
216218
throws IllegalPathException {
217-
for (Modification modification : modifications) {
218-
PartialPath path = modification.getPath();
219-
if (path.include(new PartialPath(device, timeseriesId))
220-
&& ((Deletion) modification).getTimeRange().contains(startTime, endTime)) {
221-
return true;
222-
}
223-
}
224-
return false;
219+
final List<Modification> mods =
220+
modifications.getOverlapped(CompactionPathUtils.getPath(device, timeSeriesId));
221+
return mods.stream()
222+
.anyMatch(
223+
modification -> ((Deletion) modification).getTimeRange().contains(startTime, endTime));
225224
}
226225

227226
private static void doModifyChunkMetaData(Modification modification, IChunkMetadata metaData) {
@@ -298,7 +297,9 @@ private static List<Modification> getModificationsForMemtable(
298297
}
299298

300299
public static boolean isDeviceDeletedByMods(
301-
Collection<Modification> currentModifications, ITimeIndex currentTimeIndex, IDeviceID device)
300+
PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> currentModifications,
301+
ITimeIndex currentTimeIndex,
302+
IDeviceID device)
302303
throws IllegalPathException {
303304
return isDeviceDeletedByMods(
304305
currentModifications,

0 commit comments

Comments
 (0)