Skip to content

Commit 04b7e23

Browse files
authored
[to dev/1.3] Using PatternTreeMap to cache mod entries in ReadChunkCompactionPerformer (#16183)
* Using PatternTreeMap to cache mod entries in ReadChunkCompactionPerformer * modify SettleSelector * fix * fix
1 parent bee05d1 commit 04b7e23

8 files changed

Lines changed: 71 additions & 57 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,10 @@ public List<Modification> getPathModifications(
134134
if (fileMods == null) {
135135
return Collections.emptyList();
136136
}
137-
return ModificationFile.sortAndMerge(fileMods.getDeviceOverlapped(new PartialPath(deviceID)));
137+
138+
return ModificationFile.sortAndMerge(
139+
fileMods.getOverlapped(
140+
new PartialPath(deviceID).concatAsMeasurementPath(AlignedPath.VECTOR_PLACEHOLDER)));
138141
}
139142

140143
/**

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -362,10 +362,7 @@ private void readModification(List<TsFileResource> resources) {
362362
}
363363
// read mods
364364
PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> modifications =
365-
PatternTreeMapFactory.getModsPatternTreeMap();
366-
for (Modification modification : resource.getModFile().getModificationsIter()) {
367-
modifications.append(modification.getPath(), modification);
368-
}
365+
CompactionUtils.buildModEntryPatternTreeMap(resource);
369366
modificationCache.put(resource.getTsFile().getName(), modifications);
370367
}
371368
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121

2222
import org.apache.iotdb.commons.conf.CommonDescriptor;
2323
import org.apache.iotdb.commons.conf.IoTDBConstant;
24+
import org.apache.iotdb.commons.exception.IllegalPathException;
25+
import org.apache.iotdb.commons.path.PartialPath;
26+
import org.apache.iotdb.commons.path.PatternTreeMap;
2427
import org.apache.iotdb.commons.service.metric.MetricService;
2528
import org.apache.iotdb.commons.service.metric.enums.Tag;
2629
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
@@ -31,6 +34,7 @@
3134
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
3235
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
3336
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
37+
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
3438
import org.apache.iotdb.metrics.utils.MetricLevel;
3539
import org.apache.iotdb.metrics.utils.SystemMetric;
3640

@@ -45,6 +49,7 @@
4549
import java.io.File;
4650
import java.io.IOException;
4751
import java.util.Collection;
52+
import java.util.Collections;
4853
import java.util.HashSet;
4954
import java.util.List;
5055
import java.util.Set;
@@ -357,6 +362,29 @@ public static void deleteTsFileResourceWithoutLock(TsFileResource resource) {
357362
}
358363
}
359364

365+
public static PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>
366+
buildModEntryPatternTreeMap(TsFileResource resource) {
367+
PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> patternTreeMap =
368+
PatternTreeMapFactory.getModsPatternTreeMap();
369+
Iterable<Modification> iterator = resource.getModFile().getModificationsIter();
370+
for (Modification modification : iterator) {
371+
patternTreeMap.append(modification.getPath(), modification);
372+
}
373+
return patternTreeMap;
374+
}
375+
376+
public static List<Modification> getMatchedModifications(
377+
PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> patternTreeMap,
378+
IDeviceID deviceID,
379+
String measurement)
380+
throws IllegalPathException {
381+
if (patternTreeMap == null) {
382+
return Collections.emptyList();
383+
}
384+
PartialPath path = CompactionPathUtils.getPath(deviceID, measurement);
385+
return ModificationFile.sortAndMerge(patternTreeMap.getOverlapped(path));
386+
}
387+
360388
public static boolean isDiskHasSpace() {
361389
return isDiskHasSpace(0d);
362390
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,17 @@
2222
import org.apache.iotdb.commons.conf.IoTDBConstant;
2323
import org.apache.iotdb.commons.exception.IllegalPathException;
2424
import org.apache.iotdb.commons.path.PartialPath;
25+
import org.apache.iotdb.commons.path.PatternTreeMap;
2526
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
2627
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
2728
import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader;
2829
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
2930
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
3031
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
31-
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
3232
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
3333
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
3434
import org.apache.iotdb.db.utils.ModificationUtils;
35+
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
3536

3637
import org.apache.tsfile.enums.TSDataType;
3738
import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
@@ -69,7 +70,9 @@ public class MultiTsFileDeviceIterator implements AutoCloseable {
6970
private List<TsFileResource> tsFileResourcesSortedByAsc;
7071
private Map<TsFileResource, TsFileSequenceReader> readerMap = new HashMap<>();
7172
private final Map<TsFileResource, TsFileDeviceIterator> deviceIteratorMap = new HashMap<>();
72-
private final Map<TsFileResource, List<Modification>> modificationCache = new HashMap<>();
73+
private final Map<
74+
TsFileResource, PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>>
75+
modificationCache = new HashMap<>();
7376
private Pair<IDeviceID, Boolean> currentDevice = null;
7477
private long ttlForCurrentDevice;
7578
private long timeLowerBoundForCurrentDevice;
@@ -433,10 +436,9 @@ private void applyModificationForAlignedChunkMetadataList(
433436
timeLowerBoundForCurrentDevice);
434437
}
435438

436-
List<Modification> modifications =
439+
PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> modifications =
437440
modificationCache.computeIfAbsent(
438-
tsFileResource,
439-
r -> new LinkedList<>(ModificationFile.getNormalMods(r).getModifications()));
441+
tsFileResource, CompactionUtils::buildModEntryPatternTreeMap);
440442

441443
// construct the input params List<List<Modification>> for QueryUtils.modifyAlignedChunkMetaData
442444
AlignedChunkMetadata alignedChunkMetadata = alignedChunkMetadataList.get(0);
@@ -447,15 +449,9 @@ private void applyModificationForAlignedChunkMetadataList(
447449
modificationForCurDevice.add(Collections.emptyList());
448450
continue;
449451
}
450-
List<Modification> modificationList = new ArrayList<>();
451-
PartialPath path =
452-
CompactionPathUtils.getPath(
453-
currentDevice.getLeft(), valueChunkMetadata.getMeasurementUid());
454-
for (Modification modification : modifications) {
455-
if (modification.getPath().matchFullPath(path)) {
456-
modificationList.add(modification);
457-
}
458-
}
452+
List<Modification> modificationList =
453+
CompactionUtils.getMatchedModifications(
454+
modifications, device, valueChunkMetadata.getMeasurementUid());
459455
if (ttlDeletion != null) {
460456
modificationList.add(ttlDeletion);
461457
}
@@ -663,17 +659,14 @@ public String nextSeries() throws IllegalPathException {
663659
chunkMetadataListMap.get(currentCompactingSeries);
664660
chunkMetadataListMap.remove(currentCompactingSeries);
665661

666-
List<Modification> modificationsInThisResource =
667-
modificationCache.computeIfAbsent(
668-
resource,
669-
r -> new LinkedList<>(ModificationFile.getNormalMods(r).getModifications()));
670-
LinkedList<Modification> modificationForCurrentSeries = new LinkedList<>();
662+
PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>
663+
modificationsInThisResource =
664+
modificationCache.computeIfAbsent(
665+
resource, CompactionUtils::buildModEntryPatternTreeMap);
671666
// collect the modifications for current series
672-
for (Modification modification : modificationsInThisResource) {
673-
if (modification.getPath().matchFullPath(path)) {
674-
modificationForCurrentSeries.add(modification);
675-
}
676-
}
667+
List<Modification> modificationForCurrentSeries =
668+
CompactionUtils.getMatchedModifications(
669+
modificationsInThisResource, device, currentCompactingSeries);
677670
// add ttl deletion for current series
678671
if (ttlDeletion != null) {
679672
modificationForCurrentSeries.add(ttlDeletion);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.iotdb.commons.path.PatternTreeMap;
2424
import org.apache.iotdb.db.exception.WriteProcessException;
2525
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
26-
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
2726
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
2827
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.AlignedSeriesBatchCompactionUtils;
2928
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement;
@@ -242,9 +241,7 @@ protected List<AlignedChunkMetadata> getAlignedChunkMetadataList(TsFileResource
242241
valueModifications.add(null);
243242
} else {
244243
valueModifications.add(
245-
getModificationsFromCache(
246-
resource,
247-
CompactionPathUtils.getPath(deviceId, x.getMeasurementUid())));
244+
getModificationsFromCache(resource, deviceId, x.getMeasurementUid()));
248245
}
249246
} catch (IllegalPathException e) {
250247
throw new RuntimeException(e);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.iotdb.commons.path.PatternTreeMap;
2424
import org.apache.iotdb.db.exception.WriteProcessException;
2525
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
26-
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
2726
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
2827
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement;
2928
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.FileElement;
@@ -147,9 +146,7 @@ void deserializeFileIntoChunkMetadataQueue(List<FileElement> fileElements)
147146
ModificationUtils.modifyChunkMetaData(
148147
iChunkMetadataList,
149148
getModificationsFromCache(
150-
resource,
151-
CompactionPathUtils.getPath(
152-
deviceId, iChunkMetadataList.get(0).getMeasurementUid())));
149+
resource, deviceId, iChunkMetadataList.get(0).getMeasurementUid()));
153150
if (iChunkMetadataList.isEmpty()) {
154151
// all chunks has been deleted in this file, just remove it
155152
removeFile(fileElement);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,17 @@
2020
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast;
2121

2222
import org.apache.iotdb.commons.exception.IllegalPathException;
23-
import org.apache.iotdb.commons.path.PartialPath;
2423
import org.apache.iotdb.commons.path.PatternTreeMap;
2524
import org.apache.iotdb.db.exception.WriteProcessException;
2625
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
26+
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
2727
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
2828
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement;
2929
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.FileElement;
3030
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.PageElement;
3131
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.PointPriorityReader;
3232
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.AbstractCompactionWriter;
3333
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
34-
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
3534
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
3635
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
3736

@@ -46,7 +45,6 @@
4645
import java.io.IOException;
4746
import java.util.ArrayList;
4847
import java.util.Collection;
49-
import java.util.Collections;
5048
import java.util.List;
5149
import java.util.Map;
5250
import java.util.Optional;
@@ -474,17 +472,13 @@ protected void removeFile(FileElement fileElement) throws IllegalPathException,
474472
/**
475473
* Get the modifications of a timeseries in the ModificationFile of a TsFile. Create ttl
476474
* modification from ttl cache.
477-
*
478-
* @param path name of the time series
479475
*/
480476
protected List<Modification> getModificationsFromCache(
481-
TsFileResource tsFileResource, PartialPath path) {
477+
TsFileResource tsFileResource, IDeviceID deviceId, String measurement)
478+
throws IllegalPathException {
482479
PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> allModifications =
483480
modificationCacheMap.get(tsFileResource.getTsFile().getName());
484-
if (allModifications == null) {
485-
return Collections.emptyList();
486-
}
487-
return ModificationFile.sortAndMerge(allModifications.getOverlapped(path));
481+
return CompactionUtils.getMatchedModifications(allModifications, deviceId, measurement);
488482
}
489483

490484
@SuppressWarnings("squid:S3776")

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020

2121
import org.apache.iotdb.commons.conf.IoTDBConstant;
2222
import org.apache.iotdb.commons.exception.IllegalPathException;
23-
import org.apache.iotdb.commons.path.PartialPath;
23+
import org.apache.iotdb.commons.path.AlignedPath;
24+
import org.apache.iotdb.commons.path.PatternTreeMap;
2425
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
2526
import org.apache.iotdb.db.conf.IoTDBConfig;
2627
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -38,6 +39,7 @@
3839
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
3940
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
4041
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
42+
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
4143

4244
import org.apache.tsfile.file.metadata.IDeviceID;
4345
import org.apache.tsfile.file.metadata.PlainDeviceID;
@@ -46,7 +48,6 @@
4648

4749
import java.io.IOException;
4850
import java.util.ArrayList;
49-
import java.util.Collection;
5051
import java.util.Collections;
5152
import java.util.HashSet;
5253
import java.util.List;
@@ -213,9 +214,9 @@ private FileDirtyInfo selectFileBaseOnModSize(TsFileResource resource) {
213214
*
214215
* @return dirty status means the status of current resource.
215216
*/
217+
@SuppressWarnings("OptionalGetWithoutIsPresent") // iterating the index, must present
216218
private FileDirtyInfo selectFileBaseOnDirtyData(TsFileResource resource)
217219
throws IOException, IllegalPathException {
218-
ModificationFile modFile = resource.getModFile();
219220
ITimeIndex timeIndex = resource.getTimeIndex();
220221
if (timeIndex instanceof FileTimeIndex) {
221222
timeIndex = CompactionUtils.buildDeviceTimeIndex(resource);
@@ -224,7 +225,8 @@ private FileDirtyInfo selectFileBaseOnDirtyData(TsFileResource resource)
224225
boolean hasExpiredTooLong = false;
225226
long currentTime = CommonDateTimeUtils.currentTime();
226227

227-
Collection<Modification> modifications = modFile.getModifications();
228+
PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> modifications =
229+
CompactionUtils.buildModEntryPatternTreeMap(resource);
228230
for (IDeviceID device : ((DeviceTimeIndex) timeIndex).getDevices()) {
229231
// check expired device by ttl
230232
// TODO: remove deviceId conversion
@@ -279,13 +281,16 @@ private FileDirtyInfo selectFileBaseOnDirtyData(TsFileResource resource)
279281

280282
/** Check whether the device is completely deleted by mods or not. */
281283
private boolean isDeviceDeletedByMods(
282-
Collection<Modification> modifications, IDeviceID device, long startTime, long endTime)
284+
PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> modifications,
285+
IDeviceID device,
286+
long startTime,
287+
long endTime)
283288
throws IllegalPathException {
284-
for (Modification modification : modifications) {
285-
PartialPath path = modification.getPath();
286-
if (path.endWithMultiLevelWildcard()
287-
&& path.getDevicePath().matchFullPath(new PartialPath(device))
288-
&& ((Deletion) modification).getTimeRange().contains(startTime, endTime)) {
289+
List<Modification> deviceModifications =
290+
CompactionUtils.getMatchedModifications(
291+
modifications, device, AlignedPath.VECTOR_PLACEHOLDER);
292+
for (Modification modification : deviceModifications) {
293+
if (((Deletion) modification).getTimeRange().contains(startTime, endTime)) {
289294
return true;
290295
}
291296
}

0 commit comments

Comments
 (0)