Skip to content

Commit 47fb192

Browse files
authored
Add memory control for mod entries in query
1 parent a18c780 commit 47fb192

17 files changed

Lines changed: 326 additions & 66 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,9 @@ public class IoTDBConfig {
417417
/** The buffer for sort operation */
418418
private long sortBufferSize = 32 * 1024 * 1024L;
419419

420+
/** Mods cache size limit per fi */
421+
private long modsCacheSizeLimitPerFI = 32 * 1024 * 1024;
422+
420423
/**
421424
* The strategy of inner space compaction task. There are just one inner space compaction strategy
422425
* SIZE_TIRED_COMPACTION:
@@ -4038,6 +4041,14 @@ public long getSortBufferSize() {
40384041
return sortBufferSize;
40394042
}
40404043

4044+
public void setModsCacheSizeLimitPerFI(long modsCacheSizeLimitPerFI) {
4045+
this.modsCacheSizeLimitPerFI = modsCacheSizeLimitPerFI;
4046+
}
4047+
4048+
public long getModsCacheSizeLimitPerFI() {
4049+
return modsCacheSizeLimitPerFI;
4050+
}
4051+
40414052
public void setSortTmpDir(String sortTmpDir) {
40424053
this.sortTmpDir = sortTmpDir;
40434054
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
import java.util.ServiceLoader;
9393
import java.util.Set;
9494
import java.util.concurrent.TimeUnit;
95+
import java.util.function.LongConsumer;
9596
import java.util.regex.Pattern;
9697

9798
public class IoTDBDescriptor {
@@ -1027,7 +1028,10 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException
10271028
properties.getProperty("quota_enable", String.valueOf(conf.isQuotaEnable()))));
10281029

10291030
// The buffer for sort operator to calculate
1030-
loadSortBuffer(properties);
1031+
loadFixedSizeLimitForQuery(properties, "sort_buffer_size_in_bytes", conf::setSortBufferSize);
1032+
1033+
loadFixedSizeLimitForQuery(
1034+
properties, "mods_cache_size_limit_per_fi_in_bytes", conf::setModsCacheSizeLimitPerFI);
10311035

10321036
// tmp filePath for sort operator
10331037
conf.setSortTmpDir(properties.getProperty("sort_tmp_dir", conf.getSortTmpDir()));
@@ -1096,24 +1100,19 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException
10961100
String.valueOf(conf.isIncludeNullValueInWriteThroughputMetric()))));
10971101
}
10981102

1099-
private void loadSortBuffer(TrimProperties properties) {
1100-
long defaultValue = calculateDefaultSortBufferSize(memoryConfig);
1101-
long sortBufferSize =
1102-
Long.parseLong(
1103-
properties.getProperty("sort_buffer_size_in_bytes", Long.toString(defaultValue)));
1104-
if (sortBufferSize <= 0) {
1105-
sortBufferSize = defaultValue;
1106-
}
1107-
// The buffer for sort operator to calculate
1108-
conf.setSortBufferSize(sortBufferSize);
1109-
}
1110-
1111-
public static long calculateDefaultSortBufferSize(DataNodeMemoryConfig memoryConfig) {
1112-
return Math.min(
1113-
32 * 1024 * 1024L,
1114-
memoryConfig.getOperatorsMemoryManager().getTotalMemorySizeInBytes()
1115-
/ memoryConfig.getQueryThreadCount()
1116-
/ 2);
1103+
private void loadFixedSizeLimitForQuery(
1104+
TrimProperties properties, String name, LongConsumer setFunction) {
1105+
long defaultValue =
1106+
Math.min(
1107+
32 * 1024 * 1024L,
1108+
memoryConfig.getOperatorsMemoryManager().getTotalMemorySizeInBytes()
1109+
/ memoryConfig.getQueryThreadCount()
1110+
/ 2);
1111+
long size = Long.parseLong(properties.getProperty(name, Long.toString(defaultValue)));
1112+
if (size <= 0) {
1113+
size = defaultValue;
1114+
}
1115+
setFunction.accept(size);
11171116
}
11181117

11191118
private void reloadConsensusProps(TrimProperties properties) throws IOException {
@@ -2089,7 +2088,10 @@ public synchronized void loadHotModifiedProps(TrimProperties properties)
20892088
ConfigurationFileUtils.getConfigurationDefaultValue("tvlist_sort_threshold"))));
20902089

20912090
// sort_buffer_size_in_bytes
2092-
loadSortBuffer(properties);
2091+
loadFixedSizeLimitForQuery(properties, "sort_buffer_size_in_bytes", conf::setSortBufferSize);
2092+
2093+
loadFixedSizeLimitForQuery(
2094+
properties, "mods_cache_size_limit_per_fi_in_bytes", conf::setModsCacheSizeLimitPerFI);
20932095

20942096
conf.setIncludeNullValueInWriteThroughputMetric(
20952097
Boolean.parseBoolean(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.commons.path.MeasurementPath;
2424
import org.apache.iotdb.commons.path.PartialPath;
2525

26+
import org.apache.tsfile.read.common.TimeRange;
2627
import org.apache.tsfile.utils.Accountable;
2728
import org.apache.tsfile.utils.RamUsageEstimator;
2829

@@ -47,6 +48,8 @@ public class MemoryEstimationHelper {
4748
RamUsageEstimator.shallowSizeOfInstance(ArrayList.class);
4849
private static final long INTEGER_INSTANCE_SIZE =
4950
RamUsageEstimator.shallowSizeOfInstance(Integer.class);
51+
public static final long TIME_RANGE_INSTANCE_SIZE =
52+
RamUsageEstimator.shallowSizeOfInstance(TimeRange.class);
5053

5154
private MemoryEstimationHelper() {
5255
// hide the constructor
@@ -99,6 +102,19 @@ public static long getEstimatedSizeOfPartialPath(@Nullable final PartialPath par
99102
return totalSize;
100103
}
101104

105+
public static long getEstimatedSizeOfMeasurementPathNodes(
106+
@Nullable final PartialPath partialPath) {
107+
if (partialPath == null) {
108+
return 0;
109+
}
110+
long totalSize = MEASUREMENT_PATH_INSTANCE_SIZE;
111+
String[] nodes = partialPath.getNodes();
112+
if (nodes != null && nodes.length > 0) {
113+
totalSize += Arrays.stream(nodes).mapToLong(RamUsageEstimator::sizeOf).sum();
114+
}
115+
return totalSize;
116+
}
117+
102118
// This method should only be called if the content in the current PartialPath comes from other
103119
// structures whose memory cost have already been calculated.
104120
public static long getEstimatedSizeOfCopiedPartialPath(@Nullable final PartialPath partialPath) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@
2424
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
2525
import org.apache.iotdb.commons.path.AlignedFullPath;
2626
import org.apache.iotdb.commons.path.IFullPath;
27+
import org.apache.iotdb.commons.path.PatternTreeMap;
2728
import org.apache.iotdb.commons.utils.TestOnly;
29+
import org.apache.iotdb.db.conf.IoTDBConfig;
30+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2831
import org.apache.iotdb.db.exception.query.QueryProcessException;
2932
import org.apache.iotdb.db.queryengine.common.DeviceContext;
3033
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
@@ -39,19 +42,22 @@
3942
import org.apache.iotdb.db.storageengine.StorageEngine;
4043
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
4144
import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
45+
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
4246
import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
4347
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
4448
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegionScan;
4549
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceType;
4650
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
4751
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
4852
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
53+
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
4954
import org.apache.iotdb.db.utils.datastructure.TVList;
5055
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp;
5156

5257
import org.apache.tsfile.file.metadata.IDeviceID;
5358
import org.apache.tsfile.read.filter.basic.Filter;
5459
import org.apache.tsfile.read.filter.factory.FilterFactory;
60+
import org.apache.tsfile.utils.RamUsageEstimator;
5561
import org.slf4j.Logger;
5662
import org.slf4j.LoggerFactory;
5763

@@ -78,6 +84,7 @@
7884
public class FragmentInstanceContext extends QueryContext {
7985

8086
private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceContext.class);
87+
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
8188
private static final long END_TIME_INITIAL_VALUE = -1L;
8289
// wait over 5s for driver to close is abnormal
8390
private static final long LONG_WAIT_DURATION = 5_000_000_000L;
@@ -93,6 +100,8 @@ public class FragmentInstanceContext extends QueryContext {
93100
// it will only be used once, after sharedQueryDataSource being inited, it will be set to null
94101
private List<IFullPath> sourcePaths;
95102

103+
private boolean singleSourcePath = false;
104+
96105
// Used for region scan, relating methods are to be added.
97106
private Map<IDeviceID, DeviceContext> devicePathsToContext;
98107

@@ -320,6 +329,44 @@ public void start() {
320329
lastExecutionStartTime.set(now);
321330
}
322331

332+
@Override
333+
protected PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> getAllModifications(
334+
TsFileResource resource) {
335+
if (isSingleSourcePath() || memoryReservationManager == null) {
336+
return loadAllModificationsFromDisk(resource);
337+
}
338+
339+
AtomicReference<PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>>
340+
atomicReference = new AtomicReference<>();
341+
PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> cachedResult =
342+
fileModCache.computeIfAbsent(
343+
resource.getTsFileID(),
344+
k -> {
345+
PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> allMods =
346+
loadAllModificationsFromDisk(resource);
347+
atomicReference.set(allMods);
348+
if (cachedModEntriesSize.get() >= config.getModsCacheSizeLimitPerFI()) {
349+
return null;
350+
}
351+
long memCost =
352+
RamUsageEstimator.sizeOfObject(allMods)
353+
+ RamUsageEstimator.SHALLOW_SIZE_OF_CONCURRENT_HASHMAP_ENTRY;
354+
long alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get();
355+
while (alreadyUsedMemoryForCachedModEntries + memCost
356+
< config.getModsCacheSizeLimitPerFI()) {
357+
if (cachedModEntriesSize.compareAndSet(
358+
alreadyUsedMemoryForCachedModEntries,
359+
alreadyUsedMemoryForCachedModEntries + memCost)) {
360+
memoryReservationManager.reserveMemoryCumulatively(memCost);
361+
return allMods;
362+
}
363+
alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get();
364+
}
365+
return null;
366+
});
367+
return cachedResult == null ? atomicReference.get() : cachedResult;
368+
}
369+
323370
// the state change listener is added here in a separate initialize() method
324371
// instead of the constructor to prevent leaking the "this" reference to
325372
// another thread, which will cause unsafe publication of this instance.
@@ -479,12 +526,25 @@ public void setTimeFilterForTableModel(Filter timeFilter) {
479526
}
480527
}
481528

529+
@Override
530+
public boolean collectTable(String table) {
531+
boolean added = super.collectTable(table);
532+
if (added && memoryReservationManager != null) {
533+
memoryReservationManager.reserveMemoryCumulatively(
534+
RamUsageEstimator.sizeOf(table) + RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY);
535+
}
536+
return added;
537+
}
538+
482539
public IDataRegionForQuery getDataRegion() {
483540
return dataRegion;
484541
}
485542

486543
public void setSourcePaths(List<IFullPath> sourcePaths) {
487544
this.sourcePaths = sourcePaths;
545+
if (sourcePaths != null && sourcePaths.size() == 1) {
546+
singleSourcePath = true;
547+
}
488548
}
489549

490550
public void setDevicePathsToContext(Map<IDeviceID, DeviceContext> devicePathsToContext) {
@@ -791,6 +851,8 @@ public synchronized void releaseResource() {
791851
// release TVList/AlignedTVList owned by current query
792852
releaseTVListOwnedByQuery();
793853

854+
fileModCache = null;
855+
tables = null;
794856
dataRegion = null;
795857
globalTimeFilter = null;
796858
sharedQueryDataSource = null;
@@ -967,4 +1029,8 @@ public long getUnclosedSeqFileNum() {
9671029
public boolean ignoreNotExistsDevice() {
9681030
return ignoreNotExistsDevice;
9691031
}
1032+
1033+
public boolean isSingleSourcePath() {
1034+
return singleSourcePath;
1035+
}
9701036
}

0 commit comments

Comments
 (0)