Skip to content

Commit 5d9d7ae

Browse files
authored
[to dev/1.3] Add memory control for mod entries in query
1 parent 489bd73 commit 5d9d7ae

12 files changed

Lines changed: 238 additions & 56 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,9 @@ public class IoTDBConfig {
454454
/** The buffer for sort operation */
455455
private long sortBufferSize = 32 * 1024 * 1024L;
456456

457+
/** Mods cache size limit per fi */
458+
private long modsCacheSizeLimitPerFI = 32 * 1024 * 1024;
459+
457460
/**
458461
* The strategy of inner space compaction task. There are just one inner space compaction strategy
459462
* SIZE_TIRED_COMPACTION:
@@ -4235,6 +4238,14 @@ public long getSortBufferSize() {
42354238
return sortBufferSize;
42364239
}
42374240

4241+
public void setModsCacheSizeLimitPerFI(long modsCacheSizeLimitPerFI) {
4242+
this.modsCacheSizeLimitPerFI = modsCacheSizeLimitPerFI;
4243+
}
4244+
4245+
public long getModsCacheSizeLimitPerFI() {
4246+
return modsCacheSizeLimitPerFI;
4247+
}
4248+
42384249
public void setSortTmpDir(String sortTmpDir) {
42394250
this.sortTmpDir = sortTmpDir;
42404251
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import java.util.Properties;
9191
import java.util.ServiceLoader;
9292
import java.util.Set;
93+
import java.util.function.LongConsumer;
9394
import java.util.regex.Pattern;
9495

9596
public class IoTDBDescriptor {
@@ -1081,7 +1082,10 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException
10811082

10821083
// The buffer for sort operator to calculate
10831084

1084-
loadSortBuffer(properties);
1085+
loadFixedSizeLimitForQuery(properties, "sort_buffer_size_in_bytes", conf::setSortBufferSize);
1086+
1087+
loadFixedSizeLimitForQuery(
1088+
properties, "mods_cache_size_limit_per_fi_in_bytes", conf::setModsCacheSizeLimitPerFI);
10851089

10861090
// tmp filePath for sort operator
10871091
conf.setSortTmpDir(properties.getProperty("sort_tmp_dir", conf.getSortTmpDir()));
@@ -1106,21 +1110,17 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException
11061110
loadTrustedUriPattern(properties);
11071111
}
11081112

1109-
private void loadSortBuffer(TrimProperties properties) {
1110-
long defaultValue = calculateDefaultSortBufferSize();
1111-
long sortBufferSize =
1112-
Long.parseLong(
1113-
properties.getProperty("sort_buffer_size_in_bytes", Long.toString(defaultValue)));
1114-
if (sortBufferSize <= 0) {
1115-
sortBufferSize = defaultValue;
1116-
}
1117-
// The buffer for sort operator to calculate
1118-
conf.setSortBufferSize(sortBufferSize);
1119-
}
1120-
1121-
public static long calculateDefaultSortBufferSize() {
1122-
return Math.min(
1123-
32 * 1024 * 1024L, conf.getAllocateMemoryForOperators() / conf.getQueryThreadCount() / 2);
1113+
private void loadFixedSizeLimitForQuery(
1114+
TrimProperties properties, String name, LongConsumer setFunction) {
1115+
long defaultValue =
1116+
Math.min(
1117+
32 * 1024 * 1024L,
1118+
conf.getAllocateMemoryForOperators() / conf.getQueryThreadCount() / 2);
1119+
long size = Long.parseLong(properties.getProperty(name, Long.toString(defaultValue)));
1120+
if (size <= 0) {
1121+
size = defaultValue;
1122+
}
1123+
setFunction.accept(size);
11241124
}
11251125

11261126
private void reloadConsensusProps(TrimProperties properties) throws IOException {
@@ -2058,7 +2058,10 @@ public synchronized void loadHotModifiedProps(TrimProperties properties)
20582058
ConfigurationFileUtils.getConfigurationDefaultValue("tvlist_sort_threshold"))));
20592059

20602060
// sort_buffer_size_in_bytes
2061-
loadSortBuffer(properties);
2061+
loadFixedSizeLimitForQuery(properties, "sort_buffer_size_in_bytes", conf::setSortBufferSize);
2062+
2063+
loadFixedSizeLimitForQuery(
2064+
properties, "mods_cache_size_limit_per_fi_in_bytes", conf::setModsCacheSizeLimitPerFI);
20622065
} catch (Exception e) {
20632066
if (e instanceof InterruptedException) {
20642067
Thread.currentThread().interrupt();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.commons.path.MeasurementPath;
2424
import org.apache.iotdb.commons.path.PartialPath;
2525

26+
import org.apache.tsfile.read.common.TimeRange;
2627
import org.apache.tsfile.utils.Accountable;
2728
import org.apache.tsfile.utils.RamUsageEstimator;
2829

@@ -47,6 +48,8 @@ public class MemoryEstimationHelper {
4748
RamUsageEstimator.shallowSizeOfInstance(ArrayList.class);
4849
private static final long INTEGER_INSTANCE_SIZE =
4950
RamUsageEstimator.shallowSizeOfInstance(Integer.class);
51+
public static final long TIME_RANGE_INSTANCE_SIZE =
52+
RamUsageEstimator.shallowSizeOfInstance(TimeRange.class);
5053

5154
private MemoryEstimationHelper() {
5255
// hide the constructor
@@ -94,6 +97,19 @@ public static long getEstimatedSizeOfPartialPath(@Nullable final PartialPath par
9497
return totalSize;
9598
}
9699

100+
public static long getEstimatedSizeOfMeasurementPathNodes(
101+
@Nullable final PartialPath partialPath) {
102+
if (partialPath == null) {
103+
return 0;
104+
}
105+
long totalSize = MEASUREMENT_PATH_INSTANCE_SIZE;
106+
String[] nodes = partialPath.getNodes();
107+
if (nodes != null && nodes.length > 0) {
108+
totalSize += Arrays.stream(nodes).mapToLong(RamUsageEstimator::sizeOf).sum();
109+
}
110+
return totalSize;
111+
}
112+
97113
// This method should only be called if the content in the current PartialPath comes from other
98114
// structures whose memory cost have already been calculated.
99115
public static long getEstimatedSizeOfCopiedPartialPath(@Nullable final PartialPath partialPath) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323
import org.apache.iotdb.commons.exception.IoTDBException;
2424
import org.apache.iotdb.commons.path.AlignedPath;
2525
import org.apache.iotdb.commons.path.PartialPath;
26+
import org.apache.iotdb.commons.path.PatternTreeMap;
2627
import org.apache.iotdb.commons.utils.TestOnly;
28+
import org.apache.iotdb.db.conf.IoTDBConfig;
29+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2730
import org.apache.iotdb.db.exception.query.QueryProcessException;
2831
import org.apache.iotdb.db.queryengine.common.DeviceContext;
2932
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
@@ -38,17 +41,21 @@
3841
import org.apache.iotdb.db.storageengine.StorageEngine;
3942
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
4043
import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
44+
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
45+
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
4146
import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
4247
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
4348
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegionScan;
4449
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceType;
4550
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
4651
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
52+
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
4753
import org.apache.iotdb.db.utils.datastructure.TVList;
4854
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp;
4955

5056
import org.apache.tsfile.file.metadata.IDeviceID;
5157
import org.apache.tsfile.read.filter.basic.Filter;
58+
import org.apache.tsfile.utils.RamUsageEstimator;
5259
import org.slf4j.Logger;
5360
import org.slf4j.LoggerFactory;
5461

@@ -70,6 +77,7 @@
7077
public class FragmentInstanceContext extends QueryContext {
7178

7279
private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceContext.class);
80+
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
7381
private static final long END_TIME_INITIAL_VALUE = -1L;
7482
// wait over 5s for driver to close is abnormal
7583
private static final long LONG_WAIT_DURATION = 5_000_000_000L;
@@ -84,6 +92,8 @@ public class FragmentInstanceContext extends QueryContext {
8492

8593
// it will only be used once, after sharedQueryDataSource being inited, it will be set to null
8694
protected List<PartialPath> sourcePaths;
95+
96+
private boolean singleSourcePath = false;
8797
// Used for region scan.
8898
private Map<IDeviceID, DeviceContext> devicePathsToContext;
8999

@@ -307,6 +317,64 @@ public void start() {
307317
lastExecutionStartTime.set(now);
308318
}
309319

320+
@Override
321+
protected boolean checkIfModificationExists(TsFileResource tsFileResource) {
322+
if (isSingleSourcePath()) {
323+
return tsFileResource.getModFile().exists();
324+
}
325+
if (nonExistentModFiles.contains(tsFileResource.getTsFileID())) {
326+
return false;
327+
}
328+
329+
ModificationFile modFile = tsFileResource.getModFile();
330+
if (!modFile.exists()) {
331+
if (nonExistentModFiles.add(tsFileResource.getTsFileID())
332+
&& memoryReservationManager != null) {
333+
memoryReservationManager.reserveMemoryCumulatively(RamUsageEstimator.NUM_BYTES_OBJECT_REF);
334+
}
335+
return false;
336+
}
337+
return true;
338+
}
339+
340+
@Override
341+
protected PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> getAllModifications(
342+
TsFileResource resource) {
343+
if (isSingleSourcePath() || memoryReservationManager == null) {
344+
return loadAllModificationsFromDisk(resource);
345+
}
346+
347+
AtomicReference<PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>>
348+
atomicReference = new AtomicReference<>();
349+
PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> cachedResult =
350+
fileModCache.computeIfAbsent(
351+
resource.getTsFileID(),
352+
k -> {
353+
PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> allMods =
354+
loadAllModificationsFromDisk(resource);
355+
atomicReference.set(allMods);
356+
if (cachedModEntriesSize.get() >= config.getModsCacheSizeLimitPerFI()) {
357+
return null;
358+
}
359+
long memCost =
360+
RamUsageEstimator.sizeOfObject(allMods)
361+
+ RamUsageEstimator.SHALLOW_SIZE_OF_CONCURRENT_HASHMAP_ENTRY;
362+
long alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get();
363+
while (alreadyUsedMemoryForCachedModEntries + memCost
364+
< config.getModsCacheSizeLimitPerFI()) {
365+
if (cachedModEntriesSize.compareAndSet(
366+
alreadyUsedMemoryForCachedModEntries,
367+
alreadyUsedMemoryForCachedModEntries + memCost)) {
368+
memoryReservationManager.reserveMemoryCumulatively(memCost);
369+
return allMods;
370+
}
371+
alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get();
372+
}
373+
return null;
374+
});
375+
return cachedResult == null ? atomicReference.get() : cachedResult;
376+
}
377+
310378
// the state change listener is added here in a separate initialize() method
311379
// instead of the constructor to prevent leaking the "this" reference to
312380
// another thread, which will cause unsafe publication of this instance.
@@ -450,6 +518,9 @@ public IDataRegionForQuery getDataRegion() {
450518

451519
public void setSourcePaths(List<PartialPath> sourcePaths) {
452520
this.sourcePaths = sourcePaths;
521+
if (sourcePaths != null && sourcePaths.size() == 1) {
522+
singleSourcePath = true;
523+
}
453524
}
454525

455526
public void setDevicePathsToContext(Map<IDeviceID, DeviceContext> devicePathsToContext) {
@@ -754,6 +825,8 @@ public synchronized void releaseResource() {
754825
// release TVList/AlignedTVList owned by current query
755826
releaseTVListOwnedByQuery();
756827

828+
fileModCache = null;
829+
nonExistentModFiles = null;
757830
dataRegion = null;
758831
globalTimeFilter = null;
759832
sharedQueryDataSource = null;
@@ -928,4 +1001,8 @@ public long getUnclosedSeqFileNum() {
9281001
public boolean ignoreNotExistsDevice() {
9291002
return ignoreNotExistsDevice;
9301003
}
1004+
1005+
public boolean isSingleSourcePath() {
1006+
return singleSourcePath;
1007+
}
9311008
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -41,20 +41,23 @@
4141
import java.util.Set;
4242
import java.util.concurrent.ConcurrentHashMap;
4343
import java.util.concurrent.CopyOnWriteArraySet;
44+
import java.util.concurrent.atomic.AtomicLong;
4445

4546
/** QueryContext contains the shared information with in a query. */
4647
public class QueryContext {
4748

4849
private QueryStatistics queryStatistics = new QueryStatistics();
4950

5051
/**
51-
* The key is the path of a ModificationFile and the value is all Modifications in this file. We
52-
* use this field because each call of Modification.getModifications() return a copy of the
53-
* Modifications, and we do not want it to create multiple copies within a query.
52+
* The key is TsFileID and the value is all Modifications in this file. We use this field because
53+
* each call of Modification.getModifications() return a copy of the Modifications, and we do not
54+
* want it to create multiple copies within a query.
5455
*/
55-
private final Map<String, PatternTreeMap<Modification, ModsSerializer>> fileModCache =
56+
protected Map<TsFileID, PatternTreeMap<Modification, ModsSerializer>> fileModCache =
5657
new ConcurrentHashMap<>();
5758

59+
protected AtomicLong cachedModEntriesSize = new AtomicLong(0);
60+
5861
protected long queryId;
5962

6063
private boolean debug;
@@ -64,7 +67,7 @@ public class QueryContext {
6467

6568
private volatile boolean isInterrupted = false;
6669

67-
private final Set<TsFileID> nonExistentModFiles = new CopyOnWriteArraySet<>();
70+
protected Set<TsFileID> nonExistentModFiles = new CopyOnWriteArraySet<>();
6871

6972
// referenced TVLists for the query
7073
protected final Set<TVList> tvListSet = new HashSet<>();
@@ -96,18 +99,21 @@ protected boolean checkIfModificationExists(TsFileResource tsFileResource) {
9699
return true;
97100
}
98101

99-
private PatternTreeMap<Modification, ModsSerializer> getAllModifications(
100-
ModificationFile modFile) {
102+
protected PatternTreeMap<Modification, ModsSerializer> getAllModifications(
103+
TsFileResource resource) {
101104
return fileModCache.computeIfAbsent(
102-
modFile.getFilePath(),
103-
k -> {
104-
PatternTreeMap<Modification, ModsSerializer> modifications =
105-
PatternTreeMapFactory.getModsPatternTreeMap();
106-
for (Modification modification : modFile.getModificationsIter()) {
107-
modifications.append(modification.getPath(), modification);
108-
}
109-
return modifications;
110-
});
105+
resource.getTsFileID(), k -> loadAllModificationsFromDisk(resource));
106+
}
107+
108+
public PatternTreeMap<Modification, ModsSerializer> loadAllModificationsFromDisk(
109+
TsFileResource resource) {
110+
PatternTreeMap<Modification, ModsSerializer> modifications =
111+
PatternTreeMapFactory.getModsPatternTreeMap();
112+
Iterable<Modification> modEntryIterator = resource.getModFile().getModificationsIter();
113+
for (Modification modification : modEntryIterator) {
114+
modifications.append(modification.getPath(), modification);
115+
}
116+
return modifications;
111117
}
112118

113119
public List<Modification> getPathModifications(
@@ -119,20 +125,16 @@ public List<Modification> getPathModifications(
119125
}
120126

121127
return ModificationFile.sortAndMerge(
122-
getAllModifications(tsFileResource.getModFile())
123-
.getOverlapped(new PartialPath(deviceID, measurement)));
128+
getAllModifications(tsFileResource).getOverlapped(new PartialPath(deviceID, measurement)));
124129
}
125130

126-
public List<Modification> getPathModifications(TsFileResource tsFileResource, IDeviceID deviceID)
131+
public List<Modification> getPathModifications(
132+
PatternTreeMap<Modification, ModsSerializer> fileMods, IDeviceID deviceID)
127133
throws IllegalPathException {
128-
// if the mods file does not exist, do not add it to the cache
129-
if (!checkIfModificationExists(tsFileResource)) {
134+
if (fileMods == null) {
130135
return Collections.emptyList();
131136
}
132-
133-
return ModificationFile.sortAndMerge(
134-
getAllModifications(tsFileResource.getModFile())
135-
.getDeviceOverlapped(new PartialPath(deviceID)));
137+
return ModificationFile.sortAndMerge(fileMods.getDeviceOverlapped(new PartialPath(deviceID)));
136138
}
137139

138140
/**
@@ -145,8 +147,15 @@ public List<Modification> getPathModifications(TsFileResource tsFileResource, Pa
145147
return Collections.emptyList();
146148
}
147149

148-
return ModificationFile.sortAndMerge(
149-
getAllModifications(tsFileResource.getModFile()).getOverlapped(path));
150+
return getPathModifications(getAllModifications(tsFileResource), path);
151+
}
152+
153+
public List<Modification> getPathModifications(
154+
PatternTreeMap<Modification, ModsSerializer> fileMods, PartialPath path) {
155+
if (fileMods == null) {
156+
return Collections.emptyList();
157+
}
158+
return ModificationFile.sortAndMerge(fileMods.getOverlapped(path));
150159
}
151160

152161
/**

0 commit comments

Comments
 (0)