Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,9 @@ public class IoTDBConfig {
/** The buffer for sort operation */
private long sortBufferSize = 32 * 1024 * 1024L;

/** Mods cache size limit per fi */
private long modsCacheSizeLimitPerFI = 32 * 1024 * 1024;

/**
* The strategy of inner space compaction task. There are just one inner space compaction strategy
* SIZE_TIRED_COMPACTION:
Expand Down Expand Up @@ -4038,6 +4041,14 @@ public long getSortBufferSize() {
return sortBufferSize;
}

public void setModsCacheSizeLimitPerFI(long modsCacheSizeLimitPerFI) {
this.modsCacheSizeLimitPerFI = modsCacheSizeLimitPerFI;
}

public long getModsCacheSizeLimitPerFI() {
return modsCacheSizeLimitPerFI;
}

public void setSortTmpDir(String sortTmpDir) {
this.sortTmpDir = sortTmpDir;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.LongConsumer;
import java.util.regex.Pattern;

public class IoTDBDescriptor {
Expand Down Expand Up @@ -1027,7 +1028,10 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException
properties.getProperty("quota_enable", String.valueOf(conf.isQuotaEnable()))));

// The buffer for sort operator to calculate
loadSortBuffer(properties);
loadFixedSizeLimitForQuery(properties, "sort_buffer_size_in_bytes", conf::setSortBufferSize);

loadFixedSizeLimitForQuery(
properties, "mods_cache_size_limit_per_fi_in_bytes", conf::setModsCacheSizeLimitPerFI);

// tmp filePath for sort operator
conf.setSortTmpDir(properties.getProperty("sort_tmp_dir", conf.getSortTmpDir()));
Expand Down Expand Up @@ -1096,24 +1100,19 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException
String.valueOf(conf.isIncludeNullValueInWriteThroughputMetric()))));
}

private void loadSortBuffer(TrimProperties properties) {
long defaultValue = calculateDefaultSortBufferSize(memoryConfig);
long sortBufferSize =
Long.parseLong(
properties.getProperty("sort_buffer_size_in_bytes", Long.toString(defaultValue)));
if (sortBufferSize <= 0) {
sortBufferSize = defaultValue;
}
// The buffer for sort operator to calculate
conf.setSortBufferSize(sortBufferSize);
}

public static long calculateDefaultSortBufferSize(DataNodeMemoryConfig memoryConfig) {
return Math.min(
32 * 1024 * 1024L,
memoryConfig.getOperatorsMemoryManager().getTotalMemorySizeInBytes()
/ memoryConfig.getQueryThreadCount()
/ 2);
private void loadFixedSizeLimitForQuery(
TrimProperties properties, String name, LongConsumer setFunction) {
long defaultValue =
Math.min(
32 * 1024 * 1024L,
memoryConfig.getOperatorsMemoryManager().getTotalMemorySizeInBytes()
/ memoryConfig.getQueryThreadCount()
/ 2);
long size = Long.parseLong(properties.getProperty(name, Long.toString(defaultValue)));
if (size <= 0) {
size = defaultValue;
}
setFunction.accept(size);
}

private void reloadConsensusProps(TrimProperties properties) throws IOException {
Expand Down Expand Up @@ -2089,7 +2088,10 @@ public synchronized void loadHotModifiedProps(TrimProperties properties)
ConfigurationFileUtils.getConfigurationDefaultValue("tvlist_sort_threshold"))));

// sort_buffer_size_in_bytes
loadSortBuffer(properties);
loadFixedSizeLimitForQuery(properties, "sort_buffer_size_in_bytes", conf::setSortBufferSize);

loadFixedSizeLimitForQuery(
properties, "mods_cache_size_limit_per_fi_in_bytes", conf::setModsCacheSizeLimitPerFI);

conf.setIncludeNullValueInWriteThroughputMetric(
Boolean.parseBoolean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;

import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.Accountable;
import org.apache.tsfile.utils.RamUsageEstimator;

Expand All @@ -47,6 +48,8 @@ public class MemoryEstimationHelper {
RamUsageEstimator.shallowSizeOfInstance(ArrayList.class);
private static final long INTEGER_INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(Integer.class);
public static final long TIME_RANGE_INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(TimeRange.class);

private MemoryEstimationHelper() {
// hide the constructor
Expand Down Expand Up @@ -99,6 +102,19 @@ public static long getEstimatedSizeOfPartialPath(@Nullable final PartialPath par
return totalSize;
}

public static long getEstimatedSizeOfMeasurementPathNodes(
@Nullable final PartialPath partialPath) {
if (partialPath == null) {
return 0;
}
long totalSize = MEASUREMENT_PATH_INSTANCE_SIZE;
String[] nodes = partialPath.getNodes();
if (nodes != null && nodes.length > 0) {
totalSize += Arrays.stream(nodes).mapToLong(RamUsageEstimator::sizeOf).sum();
}
return totalSize;
}

// This method should only be called if the content in the current PartialPath comes from other
// structures whose memory cost have already been calculated.
public static long getEstimatedSizeOfCopiedPartialPath(@Nullable final PartialPath partialPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.path.AlignedFullPath;
import org.apache.iotdb.commons.path.IFullPath;
import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.DeviceContext;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
Expand All @@ -39,19 +42,22 @@
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegionScan;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceType;
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.read.filter.factory.FilterFactory;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -78,6 +84,7 @@
public class FragmentInstanceContext extends QueryContext {

private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceContext.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final long END_TIME_INITIAL_VALUE = -1L;
// wait over 5s for driver to close is abnormal
private static final long LONG_WAIT_DURATION = 5_000_000_000L;
Expand All @@ -93,6 +100,8 @@ public class FragmentInstanceContext extends QueryContext {
// it will only be used once, after sharedQueryDataSource being inited, it will be set to null
private List<IFullPath> sourcePaths;

private boolean singleSourcePath = false;

// Used for region scan, relating methods are to be added.
private Map<IDeviceID, DeviceContext> devicePathsToContext;

Expand Down Expand Up @@ -320,6 +329,44 @@ public void start() {
lastExecutionStartTime.set(now);
}

@Override
protected PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> getAllModifications(
TsFileResource resource) {
if (isSingleSourcePath() || memoryReservationManager == null) {
return loadAllModificationsFromDisk(resource);
}

AtomicReference<PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>>
atomicReference = new AtomicReference<>();
PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> cachedResult =
fileModCache.computeIfAbsent(
resource.getTsFileID(),
k -> {
PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> allMods =
loadAllModificationsFromDisk(resource);
atomicReference.set(allMods);
if (cachedModEntriesSize.get() >= config.getModsCacheSizeLimitPerFI()) {
return null;
}
long memCost =
RamUsageEstimator.sizeOfObject(allMods)
+ RamUsageEstimator.SHALLOW_SIZE_OF_CONCURRENT_HASHMAP_ENTRY;
long alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get();
while (alreadyUsedMemoryForCachedModEntries + memCost
< config.getModsCacheSizeLimitPerFI()) {
if (cachedModEntriesSize.compareAndSet(
alreadyUsedMemoryForCachedModEntries,
alreadyUsedMemoryForCachedModEntries + memCost)) {
memoryReservationManager.reserveMemoryCumulatively(memCost);
return allMods;
}
alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get();
}
return null;
});
return cachedResult == null ? atomicReference.get() : cachedResult;
}

// the state change listener is added here in a separate initialize() method
// instead of the constructor to prevent leaking the "this" reference to
// another thread, which will cause unsafe publication of this instance.
Expand Down Expand Up @@ -479,12 +526,25 @@ public void setTimeFilterForTableModel(Filter timeFilter) {
}
}

@Override
public boolean collectTable(String table) {
boolean added = super.collectTable(table);
if (added && memoryReservationManager != null) {
memoryReservationManager.reserveMemoryCumulatively(
RamUsageEstimator.sizeOf(table) + RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY);
}
return added;
}

public IDataRegionForQuery getDataRegion() {
return dataRegion;
}

public void setSourcePaths(List<IFullPath> sourcePaths) {
this.sourcePaths = sourcePaths;
if (sourcePaths != null && sourcePaths.size() == 1) {
singleSourcePath = true;
}
}

public void setDevicePathsToContext(Map<IDeviceID, DeviceContext> devicePathsToContext) {
Expand Down Expand Up @@ -791,6 +851,8 @@ public synchronized void releaseResource() {
// release TVList/AlignedTVList owned by current query
releaseTVListOwnedByQuery();

fileModCache = null;
tables = null;
dataRegion = null;
globalTimeFilter = null;
sharedQueryDataSource = null;
Expand Down Expand Up @@ -967,4 +1029,8 @@ public long getUnclosedSeqFileNum() {
public boolean ignoreNotExistsDevice() {
return ignoreNotExistsDevice;
}

public boolean isSingleSourcePath() {
return singleSourcePath;
}
}
Loading
Loading