Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,9 @@ public class IoTDBConfig {
/** The buffer for sort operation */
private long sortBufferSize = 32 * 1024 * 1024L;

/** Mods cache size limit per fi */
private long modsCacheSizeLimitPerFI = 32 * 1024 * 1024;

/**
* The strategy of inner space compaction task. There are just one inner space compaction strategy
* SIZE_TIRED_COMPACTION:
Expand Down Expand Up @@ -4235,6 +4238,14 @@ public long getSortBufferSize() {
return sortBufferSize;
}

public void setModsCacheSizeLimitPerFI(long modsCacheSizeLimitPerFI) {
this.modsCacheSizeLimitPerFI = modsCacheSizeLimitPerFI;
}

public long getModsCacheSizeLimitPerFI() {
return modsCacheSizeLimitPerFI;
}

public void setSortTmpDir(String sortTmpDir) {
this.sortTmpDir = sortTmpDir;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.function.LongConsumer;
import java.util.regex.Pattern;

public class IoTDBDescriptor {
Expand Down Expand Up @@ -1081,7 +1082,10 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException

// The buffer for sort operator to calculate

loadSortBuffer(properties);
loadFixedSizeLimitForQuery(properties, "sort_buffer_size_in_bytes", conf::setSortBufferSize);

loadFixedSizeLimitForQuery(
properties, "mods_cache_size_limit_per_fi_in_bytes", conf::setModsCacheSizeLimitPerFI);

// tmp filePath for sort operator
conf.setSortTmpDir(properties.getProperty("sort_tmp_dir", conf.getSortTmpDir()));
Expand All @@ -1106,21 +1110,17 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException
loadTrustedUriPattern(properties);
}

private void loadSortBuffer(TrimProperties properties) {
long defaultValue = calculateDefaultSortBufferSize();
long sortBufferSize =
Long.parseLong(
properties.getProperty("sort_buffer_size_in_bytes", Long.toString(defaultValue)));
if (sortBufferSize <= 0) {
sortBufferSize = defaultValue;
}
// The buffer for sort operator to calculate
conf.setSortBufferSize(sortBufferSize);
}

public static long calculateDefaultSortBufferSize() {
return Math.min(
32 * 1024 * 1024L, conf.getAllocateMemoryForOperators() / conf.getQueryThreadCount() / 2);
private void loadFixedSizeLimitForQuery(
TrimProperties properties, String name, LongConsumer setFunction) {
long defaultValue =
Math.min(
32 * 1024 * 1024L,
conf.getAllocateMemoryForOperators() / conf.getQueryThreadCount() / 2);
long size = Long.parseLong(properties.getProperty(name, Long.toString(defaultValue)));
if (size <= 0) {
size = defaultValue;
}
setFunction.accept(size);
}

private void reloadConsensusProps(TrimProperties properties) throws IOException {
Expand Down Expand Up @@ -2058,7 +2058,10 @@ public synchronized void loadHotModifiedProps(TrimProperties properties)
ConfigurationFileUtils.getConfigurationDefaultValue("tvlist_sort_threshold"))));

// sort_buffer_size_in_bytes
loadSortBuffer(properties);
loadFixedSizeLimitForQuery(properties, "sort_buffer_size_in_bytes", conf::setSortBufferSize);

loadFixedSizeLimitForQuery(
properties, "mods_cache_size_limit_per_fi_in_bytes", conf::setModsCacheSizeLimitPerFI);
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;

import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.Accountable;
import org.apache.tsfile.utils.RamUsageEstimator;

Expand All @@ -47,6 +48,8 @@ public class MemoryEstimationHelper {
RamUsageEstimator.shallowSizeOfInstance(ArrayList.class);
private static final long INTEGER_INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(Integer.class);
public static final long TIME_RANGE_INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(TimeRange.class);

private MemoryEstimationHelper() {
// hide the constructor
Expand Down Expand Up @@ -94,6 +97,19 @@ public static long getEstimatedSizeOfPartialPath(@Nullable final PartialPath par
return totalSize;
}

public static long getEstimatedSizeOfMeasurementPathNodes(
@Nullable final PartialPath partialPath) {
if (partialPath == null) {
return 0;
}
long totalSize = MEASUREMENT_PATH_INSTANCE_SIZE;
String[] nodes = partialPath.getNodes();
if (nodes != null && nodes.length > 0) {
totalSize += Arrays.stream(nodes).mapToLong(RamUsageEstimator::sizeOf).sum();
}
return totalSize;
}

// This method should only be called if the content in the current PartialPath comes from other
// structures whose memory cost have already been calculated.
public static long getEstimatedSizeOfCopiedPartialPath(@Nullable final PartialPath partialPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.DeviceContext;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
Expand All @@ -38,17 +41,21 @@
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegionScan;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceType;
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -70,6 +77,7 @@
public class FragmentInstanceContext extends QueryContext {

private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceContext.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final long END_TIME_INITIAL_VALUE = -1L;
// wait over 5s for driver to close is abnormal
private static final long LONG_WAIT_DURATION = 5_000_000_000L;
Expand All @@ -84,6 +92,8 @@ public class FragmentInstanceContext extends QueryContext {

// it will only be used once, after sharedQueryDataSource being inited, it will be set to null
protected List<PartialPath> sourcePaths;

private boolean singleSourcePath = false;
// Used for region scan.
private Map<IDeviceID, DeviceContext> devicePathsToContext;

Expand Down Expand Up @@ -307,6 +317,64 @@ public void start() {
lastExecutionStartTime.set(now);
}

@Override
protected boolean checkIfModificationExists(TsFileResource tsFileResource) {
if (isSingleSourcePath()) {
return tsFileResource.getModFile().exists();
}
if (nonExistentModFiles.contains(tsFileResource.getTsFileID())) {
return false;
}

ModificationFile modFile = tsFileResource.getModFile();
if (!modFile.exists()) {
if (nonExistentModFiles.add(tsFileResource.getTsFileID())
&& memoryReservationManager != null) {
memoryReservationManager.reserveMemoryCumulatively(RamUsageEstimator.NUM_BYTES_OBJECT_REF);
}
return false;
}
return true;
}

@Override
protected PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> getAllModifications(
TsFileResource resource) {
if (isSingleSourcePath() || memoryReservationManager == null) {
return loadAllModificationsFromDisk(resource);
}

AtomicReference<PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>>
atomicReference = new AtomicReference<>();
PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> cachedResult =
fileModCache.computeIfAbsent(
resource.getTsFileID(),
k -> {
PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> allMods =
loadAllModificationsFromDisk(resource);
atomicReference.set(allMods);
if (cachedModEntriesSize.get() >= config.getModsCacheSizeLimitPerFI()) {
return null;
}
long memCost =
RamUsageEstimator.sizeOfObject(allMods)
+ RamUsageEstimator.SHALLOW_SIZE_OF_CONCURRENT_HASHMAP_ENTRY;
long alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get();
while (alreadyUsedMemoryForCachedModEntries + memCost
< config.getModsCacheSizeLimitPerFI()) {
if (cachedModEntriesSize.compareAndSet(
alreadyUsedMemoryForCachedModEntries,
alreadyUsedMemoryForCachedModEntries + memCost)) {
memoryReservationManager.reserveMemoryCumulatively(memCost);
return allMods;
}
alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get();
}
return null;
});
return cachedResult == null ? atomicReference.get() : cachedResult;
}

// the state change listener is added here in a separate initialize() method
// instead of the constructor to prevent leaking the "this" reference to
// another thread, which will cause unsafe publication of this instance.
Expand Down Expand Up @@ -450,6 +518,9 @@ public IDataRegionForQuery getDataRegion() {

public void setSourcePaths(List<PartialPath> sourcePaths) {
this.sourcePaths = sourcePaths;
if (sourcePaths != null && sourcePaths.size() == 1) {
singleSourcePath = true;
}
}

public void setDevicePathsToContext(Map<IDeviceID, DeviceContext> devicePathsToContext) {
Expand Down Expand Up @@ -754,6 +825,8 @@ public synchronized void releaseResource() {
// release TVList/AlignedTVList owned by current query
releaseTVListOwnedByQuery();

fileModCache = null;
nonExistentModFiles = null;
dataRegion = null;
globalTimeFilter = null;
sharedQueryDataSource = null;
Expand Down Expand Up @@ -928,4 +1001,8 @@ public long getUnclosedSeqFileNum() {
public boolean ignoreNotExistsDevice() {
return ignoreNotExistsDevice;
}

public boolean isSingleSourcePath() {
return singleSourcePath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,23 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;

/** QueryContext contains the shared information with in a query. */
public class QueryContext {

private QueryStatistics queryStatistics = new QueryStatistics();

/**
* The key is the path of a ModificationFile and the value is all Modifications in this file. We
* use this field because each call of Modification.getModifications() return a copy of the
* Modifications, and we do not want it to create multiple copies within a query.
* The key is TsFileID and the value is all Modifications in this file. We use this field because
* each call of Modification.getModifications() return a copy of the Modifications, and we do not
* want it to create multiple copies within a query.
*/
private final Map<String, PatternTreeMap<Modification, ModsSerializer>> fileModCache =
protected Map<TsFileID, PatternTreeMap<Modification, ModsSerializer>> fileModCache =
new ConcurrentHashMap<>();

protected AtomicLong cachedModEntriesSize = new AtomicLong(0);

protected long queryId;

private boolean debug;
Expand All @@ -64,7 +67,7 @@ public class QueryContext {

private volatile boolean isInterrupted = false;

private final Set<TsFileID> nonExistentModFiles = new CopyOnWriteArraySet<>();
protected Set<TsFileID> nonExistentModFiles = new CopyOnWriteArraySet<>();

// referenced TVLists for the query
protected final Set<TVList> tvListSet = new HashSet<>();
Expand Down Expand Up @@ -96,18 +99,21 @@ protected boolean checkIfModificationExists(TsFileResource tsFileResource) {
return true;
}

private PatternTreeMap<Modification, ModsSerializer> getAllModifications(
ModificationFile modFile) {
protected PatternTreeMap<Modification, ModsSerializer> getAllModifications(
TsFileResource resource) {
return fileModCache.computeIfAbsent(
modFile.getFilePath(),
k -> {
PatternTreeMap<Modification, ModsSerializer> modifications =
PatternTreeMapFactory.getModsPatternTreeMap();
for (Modification modification : modFile.getModificationsIter()) {
modifications.append(modification.getPath(), modification);
}
return modifications;
});
resource.getTsFileID(), k -> loadAllModificationsFromDisk(resource));
}

public PatternTreeMap<Modification, ModsSerializer> loadAllModificationsFromDisk(
TsFileResource resource) {
PatternTreeMap<Modification, ModsSerializer> modifications =
PatternTreeMapFactory.getModsPatternTreeMap();
Iterable<Modification> modEntryIterator = resource.getModFile().getModificationsIter();
for (Modification modification : modEntryIterator) {
modifications.append(modification.getPath(), modification);
}
return modifications;
}

public List<Modification> getPathModifications(
Expand All @@ -119,20 +125,16 @@ public List<Modification> getPathModifications(
}

return ModificationFile.sortAndMerge(
getAllModifications(tsFileResource.getModFile())
.getOverlapped(new PartialPath(deviceID, measurement)));
getAllModifications(tsFileResource).getOverlapped(new PartialPath(deviceID, measurement)));
}

public List<Modification> getPathModifications(TsFileResource tsFileResource, IDeviceID deviceID)
public List<Modification> getPathModifications(
PatternTreeMap<Modification, ModsSerializer> fileMods, IDeviceID deviceID)
throws IllegalPathException {
// if the mods file does not exist, do not add it to the cache
if (!checkIfModificationExists(tsFileResource)) {
if (fileMods == null) {
return Collections.emptyList();
}

return ModificationFile.sortAndMerge(
getAllModifications(tsFileResource.getModFile())
.getDeviceOverlapped(new PartialPath(deviceID)));
return ModificationFile.sortAndMerge(fileMods.getDeviceOverlapped(new PartialPath(deviceID)));
}

/**
Expand All @@ -145,8 +147,15 @@ public List<Modification> getPathModifications(TsFileResource tsFileResource, Pa
return Collections.emptyList();
}

return ModificationFile.sortAndMerge(
getAllModifications(tsFileResource.getModFile()).getOverlapped(path));
return getPathModifications(getAllModifications(tsFileResource), path);
}

public List<Modification> getPathModifications(
PatternTreeMap<Modification, ModsSerializer> fileMods, PartialPath path) {
if (fileMods == null) {
return Collections.emptyList();
}
return ModificationFile.sortAndMerge(fileMods.getOverlapped(path));
}

/**
Expand Down
Loading
Loading