diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 2f6accf13f791..3bfa24641b97a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -417,6 +417,9 @@ public class IoTDBConfig { /** The buffer for sort operation */ private long sortBufferSize = 32 * 1024 * 1024L; + /** Mods cache size limit per fi */ + private long modsCacheSizeLimitPerFI = 32 * 1024 * 1024; + /** * The strategy of inner space compaction task. There are just one inner space compaction strategy * SIZE_TIRED_COMPACTION: @@ -4038,6 +4041,14 @@ public long getSortBufferSize() { return sortBufferSize; } + public void setModsCacheSizeLimitPerFI(long modsCacheSizeLimitPerFI) { + this.modsCacheSizeLimitPerFI = modsCacheSizeLimitPerFI; + } + + public long getModsCacheSizeLimitPerFI() { + return modsCacheSizeLimitPerFI; + } + public void setSortTmpDir(String sortTmpDir) { this.sortTmpDir = sortTmpDir; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 753607db4a62a..f9377892ef38b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -92,6 +92,7 @@ import java.util.ServiceLoader; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.LongConsumer; import java.util.regex.Pattern; public class IoTDBDescriptor { @@ -1027,7 +1028,10 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException properties.getProperty("quota_enable", String.valueOf(conf.isQuotaEnable())))); // The buffer for sort operator to calculate - loadSortBuffer(properties); + loadFixedSizeLimitForQuery(properties, "sort_buffer_size_in_bytes", conf::setSortBufferSize); + + loadFixedSizeLimitForQuery( + properties, "mods_cache_size_limit_per_fi_in_bytes", conf::setModsCacheSizeLimitPerFI); // tmp filePath for sort operator conf.setSortTmpDir(properties.getProperty("sort_tmp_dir", conf.getSortTmpDir())); @@ -1096,24 +1100,19 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException String.valueOf(conf.isIncludeNullValueInWriteThroughputMetric())))); } - private void loadSortBuffer(TrimProperties properties) { - long defaultValue = calculateDefaultSortBufferSize(memoryConfig); - long sortBufferSize = - Long.parseLong( - properties.getProperty("sort_buffer_size_in_bytes", Long.toString(defaultValue))); - if (sortBufferSize <= 0) { - sortBufferSize = defaultValue; - } - // The buffer for sort operator to calculate - conf.setSortBufferSize(sortBufferSize); - } - - public static long calculateDefaultSortBufferSize(DataNodeMemoryConfig memoryConfig) { - return Math.min( - 32 * 1024 * 1024L, - memoryConfig.getOperatorsMemoryManager().getTotalMemorySizeInBytes() - / memoryConfig.getQueryThreadCount() - / 2); + private void loadFixedSizeLimitForQuery( + TrimProperties properties, String name, LongConsumer setFunction) { + long defaultValue = + Math.min( + 32 * 1024 * 1024L, + memoryConfig.getOperatorsMemoryManager().getTotalMemorySizeInBytes() + / memoryConfig.getQueryThreadCount() + / 2); + long size = Long.parseLong(properties.getProperty(name, Long.toString(defaultValue))); + if (size <= 0) { + size = defaultValue; + } + setFunction.accept(size); } private void reloadConsensusProps(TrimProperties properties) throws IOException { @@ -2089,7 +2088,10 @@ public synchronized void loadHotModifiedProps(TrimProperties properties) ConfigurationFileUtils.getConfigurationDefaultValue("tvlist_sort_threshold")))); // sort_buffer_size_in_bytes - loadSortBuffer(properties); + loadFixedSizeLimitForQuery(properties, "sort_buffer_size_in_bytes", conf::setSortBufferSize); + + loadFixedSizeLimitForQuery( + properties, "mods_cache_size_limit_per_fi_in_bytes", conf::setModsCacheSizeLimitPerFI); conf.setIncludeNullValueInWriteThroughputMetric( Boolean.parseBoolean( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java index f7318865c2ed7..f5db0ccb0a668 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.utils.Accountable; import org.apache.tsfile.utils.RamUsageEstimator; @@ -47,6 +48,8 @@ public class MemoryEstimationHelper { RamUsageEstimator.shallowSizeOfInstance(ArrayList.class); private static final long INTEGER_INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(Integer.class); + public static final long TIME_RANGE_INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TimeRange.class); private MemoryEstimationHelper() { // hide the constructor @@ -99,6 +102,19 @@ public static long getEstimatedSizeOfPartialPath(@Nullable final PartialPath par return totalSize; } + public static long getEstimatedSizeOfMeasurementPathNodes( + @Nullable final PartialPath partialPath) { + if (partialPath == null) { + return 0; + } + long totalSize = MEASUREMENT_PATH_INSTANCE_SIZE; + String[] nodes = partialPath.getNodes(); + if (nodes != null && nodes.length > 0) { + totalSize += Arrays.stream(nodes).mapToLong(RamUsageEstimator::sizeOf).sum(); + } + return totalSize; + } + // This method should only be called if the content in the current PartialPath comes from other // structures whose memory cost have already been calculated. public static long getEstimatedSizeOfCopiedPartialPath(@Nullable final PartialPath partialPath) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 8fc462bbd4285..50c9afd7ff05d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -24,7 +24,10 @@ import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.commons.path.IFullPath; +import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.common.DeviceContext; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; @@ -39,6 +42,7 @@ import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegionScan; @@ -46,12 +50,14 @@ import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.TimestampPrecisionUtils; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.read.filter.factory.FilterFactory; +import org.apache.tsfile.utils.RamUsageEstimator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +84,7 @@ public class FragmentInstanceContext extends QueryContext { private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceContext.class); + private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); private static final long END_TIME_INITIAL_VALUE = -1L; // wait over 5s for driver to close is abnormal private static final long LONG_WAIT_DURATION = 5_000_000_000L; @@ -93,6 +100,8 @@ public class FragmentInstanceContext extends QueryContext { // it will only be used once, after sharedQueryDataSource being inited, it will be set to null private List sourcePaths; + private boolean singleSourcePath = false; + // Used for region scan, relating methods are to be added. private Map devicePathsToContext; @@ -320,6 +329,44 @@ public void start() { lastExecutionStartTime.set(now); } + @Override + protected PatternTreeMap getAllModifications( + TsFileResource resource) { + if (isSingleSourcePath() || memoryReservationManager == null) { + return loadAllModificationsFromDisk(resource); + } + + AtomicReference> + atomicReference = new AtomicReference<>(); + PatternTreeMap cachedResult = + fileModCache.computeIfAbsent( + resource.getTsFileID(), + k -> { + PatternTreeMap allMods = + loadAllModificationsFromDisk(resource); + atomicReference.set(allMods); + if (cachedModEntriesSize.get() >= config.getModsCacheSizeLimitPerFI()) { + return null; + } + long memCost = + RamUsageEstimator.sizeOfObject(allMods) + + RamUsageEstimator.SHALLOW_SIZE_OF_CONCURRENT_HASHMAP_ENTRY; + long alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get(); + while (alreadyUsedMemoryForCachedModEntries + memCost + < config.getModsCacheSizeLimitPerFI()) { + if (cachedModEntriesSize.compareAndSet( + alreadyUsedMemoryForCachedModEntries, + alreadyUsedMemoryForCachedModEntries + memCost)) { + memoryReservationManager.reserveMemoryCumulatively(memCost); + return allMods; + } + alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get(); + } + return null; + }); + return cachedResult == null ? atomicReference.get() : cachedResult; + } + // the state change listener is added here in a separate initialize() method // instead of the constructor to prevent leaking the "this" reference to // another thread, which will cause unsafe publication of this instance. @@ -479,12 +526,25 @@ public void setTimeFilterForTableModel(Filter timeFilter) { } } + @Override + public boolean collectTable(String table) { + boolean added = super.collectTable(table); + if (added && memoryReservationManager != null) { + memoryReservationManager.reserveMemoryCumulatively( + RamUsageEstimator.sizeOf(table) + RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY); + } + return added; + } + public IDataRegionForQuery getDataRegion() { return dataRegion; } public void setSourcePaths(List sourcePaths) { this.sourcePaths = sourcePaths; + if (sourcePaths != null && sourcePaths.size() == 1) { + singleSourcePath = true; + } } public void setDevicePathsToContext(Map devicePathsToContext) { @@ -791,6 +851,8 @@ public synchronized void releaseResource() { // release TVList/AlignedTVList owned by current query releaseTVListOwnedByQuery(); + fileModCache = null; + tables = null; dataRegion = null; globalTimeFilter = null; sharedQueryDataSource = null; @@ -967,4 +1029,8 @@ public long getUnclosedSeqFileNum() { public boolean ignoreNotExistsDevice() { return ignoreNotExistsDevice; } + + public boolean isSingleSourcePath() { + return singleSourcePath; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java index c2f95e39b1727..e115132265676 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.ModificationUtils; @@ -41,7 +42,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; /** QueryContext contains the shared information with in a query. */ @@ -51,13 +52,15 @@ public class QueryContext { private QueryStatistics queryStatistics = new QueryStatistics(); /** - * The key is the path of a ModificationFile and the value is all Modifications in this file. We - * use this field because each call of Modification.getModifications() return a copy of the - * Modifications, and we do not want it to create multiple copies within a query. + * The key is TsFileID and the value is all Modifications in this file. We use this field because + * each call of Modification.getModifications() return a copy of the Modifications, and we do not + * want it to create multiple copies within a query. */ - private final Map> fileModCache = + protected Map> fileModCache = new ConcurrentHashMap<>(); + protected AtomicLong cachedModEntriesSize = new AtomicLong(0); + protected long queryId; private boolean debug; @@ -71,11 +74,11 @@ public class QueryContext { // for tree model, it will be true private boolean ignoreAllNullRows = true; - private final Set nonExistentModFiles = new CopyOnWriteArraySet<>(); - // referenced TVLists for the query protected final Set tvListSet = new HashSet<>(); + protected Set tables; + public QueryContext() {} public QueryContext(long queryId) { @@ -90,30 +93,48 @@ public QueryContext(long queryId, boolean debug, long startTime, long timeout) { this.timeout = timeout; } - // if the mods file does not exist, do not add it to the cache - protected boolean checkIfModificationExists(TsFileResource tsFileResource) { - if (nonExistentModFiles.contains(tsFileResource.getTsFileID())) { - return false; + // Only used for query with table data(Tree view is not included) + public boolean collectTable(String table) { + // In the current version (2025.08.14), there is only one table under one FI + // Therefore, SingletonSet is initially used here for better performance + if (tables == null) { + tables = Collections.singleton(table); + return true; } - - if (!tsFileResource.anyModFileExists()) { - nonExistentModFiles.add(tsFileResource.getTsFileID()); - return false; + if (!(tables instanceof HashSet)) { + tables = new HashSet<>(tables); } - return true; + return tables.add(table); } - private PatternTreeMap getAllModifications(TsFileResource resource) { + // if the mods file does not exist, do not add it to the cache + protected boolean checkIfModificationExists(TsFileResource tsFileResource) { + // The exists state of ModificationFile is maintained in memory, and ModificationFile instance + // is set to the related TsFileResource instance after it is constructed. + return tsFileResource.anyModFileExists(); + } + + protected PatternTreeMap getAllModifications(TsFileResource resource) { return fileModCache.computeIfAbsent( - resource.getTsFilePath(), - k -> { - PatternTreeMap modifications = - PatternTreeMapFactory.getModsPatternTreeMap(); - for (ModEntry modification : resource.getAllModEntries()) { - modifications.append(modification.keyOfPatternTree(), modification); - } - return modifications; - }); + resource.getTsFileID(), k -> loadAllModificationsFromDisk(resource)); + } + + public PatternTreeMap loadAllModificationsFromDisk( + TsFileResource resource) { + PatternTreeMap modifications = + PatternTreeMapFactory.getModsPatternTreeMap(); + TsFileResource.ModIterator modEntryIterator = resource.getModEntryIterator(); + while (modEntryIterator.hasNext()) { + ModEntry modification = modEntryIterator.next(); + if (tables != null && modification instanceof TableDeletionEntry) { + String tableName = ((TableDeletionEntry) modification).getTableName(); + if (!tables.contains(tableName)) { + continue; + } + } + modifications.append(modification.keyOfPatternTree(), modification); + } + return modifications; } public List getPathModifications( @@ -123,8 +144,17 @@ public List getPathModifications( return Collections.emptyList(); } - List modEntries = - getAllModifications(tsFileResource).getOverlapped(deviceID, measurement); + return getPathModifications(getAllModifications(tsFileResource), deviceID, measurement); + } + + public List getPathModifications( + PatternTreeMap fileModEntries, + IDeviceID deviceID, + String measurement) { + if (fileModEntries == null) { + return Collections.emptyList(); + } + List modEntries = fileModEntries.getOverlapped(deviceID, measurement); if (deviceID.isTableModel()) { // the pattern tree has false-positive for table model deletion, so we do a further // filtering @@ -144,8 +174,16 @@ public List getPathModifications(TsFileResource tsFileResource, IDevic if (!checkIfModificationExists(tsFileResource)) { return Collections.emptyList(); } - List modEntries = - getAllModifications(tsFileResource).getOverlapped(new PartialPath(deviceID)); + return getPathModifications(getAllModifications(tsFileResource), deviceID); + } + + public List getPathModifications( + PatternTreeMap fileModEntries, IDeviceID deviceID) + throws IllegalPathException { + if (fileModEntries == null) { + return Collections.emptyList(); + } + List modEntries = fileModEntries.getDeviceOverlapped(new PartialPath(deviceID)); if (deviceID.isTableModel()) { // the pattern tree has false-positive for table model deletion, so we do a further // filtering diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 52e1afe754a17..a27d28d0f30d2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -1207,6 +1207,7 @@ public Operator visitDeviceTableScan( TableScanOperator tableScanOperator = new TableScanOperator(parameter); + context.getInstanceContext().collectTable(node.getQualifiedObjectName().getObjectName()); addSource( tableScanOperator, context, @@ -3021,6 +3022,7 @@ public Operator visitAggregationTableScan( } else { DefaultAggTableScanOperator aggTableScanOperator = new DefaultAggTableScanOperator(parameter); + context.getInstanceContext().collectTable(node.getQualifiedObjectName().getObjectName()); addSource( aggTableScanOperator, context, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java index d5bbe88501d84..7e79e8f580dc5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java @@ -23,6 +23,8 @@ import org.apache.iotdb.db.utils.io.StreamSerializable; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.utils.Accountable; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteForEncodingUtils; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -35,8 +37,10 @@ import java.util.List; import java.util.Objects; -public class DeletionPredicate implements StreamSerializable, BufferSerializable { +public class DeletionPredicate implements StreamSerializable, BufferSerializable, Accountable { + public static final long SHALLOW_SIZE = + RamUsageEstimator.shallowSizeOfInstance(DeletionPredicate.class); private String tableName; private IDPredicate idPredicate = new NOP(); // an empty list means affecting all columns @@ -180,4 +184,12 @@ public String toString() { + measurementNames + '}'; } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + + RamUsageEstimator.sizeOf(tableName) + + RamUsageEstimator.sizeOfObject(idPredicate) + + RamUsageEstimator.sizeOfArrayList(measurementNames); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/IDPredicate.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/IDPredicate.java index e2de0d8bd91bd..44741f9e67940 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/IDPredicate.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/IDPredicate.java @@ -24,6 +24,8 @@ import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IDeviceID.Deserializer; +import org.apache.tsfile.utils.Accountable; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteForEncodingUtils; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -36,7 +38,7 @@ import java.util.List; import java.util.Objects; -public abstract class IDPredicate implements StreamSerializable, BufferSerializable { +public abstract class IDPredicate implements StreamSerializable, BufferSerializable, Accountable { public int serializedSize() { // type @@ -124,6 +126,7 @@ public static IDPredicate createFrom(InputStream stream) throws IOException { } public static class NOP extends IDPredicate { + public static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(NOP.class); public NOP() { super(IDPredicateType.NOP); @@ -158,10 +161,17 @@ public boolean equals(Object obj) { public String toString() { return "NOP"; } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE; + } } public static class FullExactMatch extends IDPredicate { + public static final long SHALLOW_SIZE = + RamUsageEstimator.shallowSizeOfInstance(FullExactMatch.class); private IDeviceID deviceID; public FullExactMatch(IDeviceID deviceID) { @@ -228,10 +238,17 @@ public int hashCode() { public String toString() { return "FullExactMatch{" + "deviceID=" + deviceID + '}'; } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + RamUsageEstimator.sizeOfObject(deviceID); + } } public static class SegmentExactMatch extends IDPredicate { + public static final long SHALLOW_SIZE = + RamUsageEstimator.shallowSizeOfInstance(SegmentExactMatch.class); private String pattern; private int segmentIndex; @@ -318,10 +335,16 @@ public String toString() { + segmentIndex + '}'; } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + RamUsageEstimator.sizeOf(pattern); + } } public static class And extends IDPredicate { + public static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(And.class); private final List predicates = new ArrayList<>(); public And(IDPredicate... predicates) { @@ -405,5 +428,10 @@ public int hashCode() { public String toString() { return "And{" + "predicates=" + predicates + '}'; } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + RamUsageEstimator.sizeOfArrayList(predicates); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModEntry.java index 36b943291b8ad..eef66bd8675ca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModEntry.java @@ -25,15 +25,17 @@ import org.apache.tsfile.annotations.TreeModel; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.utils.Accountable; import org.apache.tsfile.utils.ReadWriteIOUtils; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; public abstract class ModEntry - implements StreamSerializable, BufferSerializable, Comparable { + implements StreamSerializable, BufferSerializable, Comparable, Accountable { protected ModType modType; protected TimeRange timeRange; @@ -177,7 +179,13 @@ public static ModType deserialize(ByteBuffer buffer) { } public static ModType deserialize(InputStream stream) throws IOException { - byte typeNum = ReadWriteIOUtils.readByte(stream); + // The ModIterator needs to use this EOFException to determine whether it has finished + // reading. And we should not use InputStream.available() to make this judgments outside, + // because calling it frequently will have a certain overhead. + int typeNum = stream.read(); + if (typeNum == -1) { + throw new EOFException(); + } switch (typeNum) { case 0x00: return TABLE_DELETION; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java index e72b06b9a1af8..bc0e86a18c99e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java @@ -304,10 +304,6 @@ public boolean hasNext() { } if (nextEntry == null) { try { - if (inputStream.available() == 0) { - close(); - return false; - } nextEntry = ModEntry.createFrom(inputStream); } catch (EOFException e) { close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java index 9dfbe6102a46c..858b6645b2f9a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java @@ -20,10 +20,12 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.utils.ModificationUtils; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.utils.RamUsageEstimator; import java.io.IOException; import java.io.InputStream; @@ -32,6 +34,8 @@ import java.util.Objects; public class TableDeletionEntry extends ModEntry { + public static final long SHALLOW_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TableDeletionEntry.class); private DeletionPredicate predicate; public TableDeletionEntry() { @@ -152,4 +156,11 @@ public int hashCode() { public DeletionPredicate getPredicate() { return predicate; } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + + MemoryEstimationHelper.TIME_RANGE_INSTANCE_SIZE + + RamUsageEstimator.sizeOfObject(predicate); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TreeDeletionEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TreeDeletionEntry.java index 02fb95680f5ee..eefe86e78b322 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TreeDeletionEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TreeDeletionEntry.java @@ -23,13 +23,17 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternUtil; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; import org.apache.iotdb.db.storageengine.dataregion.modification.v1.Deletion; import org.apache.iotdb.db.utils.ModificationUtils; +import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteForEncodingUtils; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; @@ -44,6 +48,8 @@ public class TreeDeletionEntry extends ModEntry { + public static final long SHALLOW_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TreeDeletionEntry.class); private static final Logger LOGGER = LoggerFactory.getLogger(TreeDeletionEntry.class); private MeasurementPath pathPattern; @@ -104,7 +110,7 @@ public long serialize(ByteBuffer buffer) { public void deserialize(InputStream stream) throws IOException { super.deserialize(stream); try { - this.pathPattern = new MeasurementPath(ReadWriteIOUtils.readVarIntString(stream)); + this.pathPattern = getMeasurementPath(ReadWriteIOUtils.readVarIntString(stream)); } catch (IllegalPathException e) { throw new IOException(e); } @@ -114,12 +120,23 @@ public void deserialize(InputStream stream) throws IOException { public void deserialize(ByteBuffer buffer) { super.deserialize(buffer); try { - this.pathPattern = new MeasurementPath(ReadWriteIOUtils.readVarIntString(buffer)); + this.pathPattern = getMeasurementPath(ReadWriteIOUtils.readVarIntString(buffer)); } catch (IllegalPathException e) { throw new IllegalArgumentException(e); } } + private MeasurementPath getMeasurementPath(String path) throws IllegalPathException { + // In this place, we can be sure that the path pattern here has been checked by antlr before, so + // when conditions permit, a lighter split method can be used here. + if (path.contains(TsFileConstant.BACK_QUOTE_STRING)) { + return new MeasurementPath(PathUtils.splitPathToDetachedNodes(path)); + } else { + String[] nodes = path.split(TsFileConstant.PATH_SEPARATER_NO_REGEX); + return new MeasurementPath(nodes); + } + } + @Override public boolean matches(PartialPath path) { return pathPattern.matchFullPath(path); @@ -226,4 +243,11 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(pathPattern, timeRange); } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + + MemoryEstimationHelper.TIME_RANGE_INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfMeasurementPathNodes(pathPattern); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java index 3471cd4539b21..6ebb6f7073bbc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; @@ -35,6 +36,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileDeviceStartEndTimeIterator; import org.apache.iotdb.db.utils.ModificationUtils; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.tsfile.file.metadata.IChunkMetadata; @@ -60,6 +62,7 @@ public class ClosedFileScanHandleImpl implements IFileScanHandle { private final TsFileResource tsFileResource; private final QueryContext queryContext; + private PatternTreeMap curFileModEntries = null; // Used to cache the modifications of each timeseries private final Map>> deviceToModifications; @@ -80,7 +83,11 @@ public TsFileDeviceStartEndTimeIterator getDeviceStartEndTimeIterator() throws I @Override public boolean isDeviceTimeDeleted(IDeviceID deviceID, long timestamp) throws IllegalPathException { - List modifications = queryContext.getPathModifications(tsFileResource, deviceID); + curFileModEntries = + curFileModEntries != null + ? curFileModEntries + : queryContext.loadAllModificationsFromDisk(tsFileResource); + List modifications = queryContext.getPathModifications(curFileModEntries, deviceID); List timeRangeList = modifications.stream().map(ModEntry::getTimeRange).collect(Collectors.toList()); return ModificationUtils.isPointDeletedWithoutOrderedRange(timestamp, timeRangeList); @@ -89,14 +96,17 @@ public boolean isDeviceTimeDeleted(IDeviceID deviceID, long timestamp) @Override public boolean isTimeSeriesTimeDeleted(IDeviceID deviceID, String timeSeriesName, long timestamp) throws IllegalPathException { - + curFileModEntries = + curFileModEntries != null + ? curFileModEntries + : queryContext.loadAllModificationsFromDisk(tsFileResource); Map> modificationTimeRange = deviceToModifications.get(deviceID); if (modificationTimeRange != null && modificationTimeRange.containsKey(timeSeriesName)) { return ModificationUtils.isPointDeleted(timestamp, modificationTimeRange.get(timeSeriesName)); } List modifications = - queryContext.getPathModifications(tsFileResource, deviceID, timeSeriesName); + queryContext.getPathModifications(curFileModEntries, deviceID, timeSeriesName); List timeRangeList = modifications.stream().map(ModEntry::getTimeRange).collect(Collectors.toList()); TimeRange.sortAndMerge(timeRangeList); diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 288c814613ea6..f0957132d2d3f 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -1106,6 +1106,13 @@ batch_size=100000 # Datatype: long sort_buffer_size_in_bytes=0 +# The maximum mod entries size that each FragmentInstance can cache. +# if mods_cache_size_limit_per_fi_in_bytes <= 0, default value will be used, default value = min(32MB, memory for query operators / query_thread_count / 2) +# if mods_cache_size_limit_per_fi_in_bytes > 0, the specified value will be used. +# effectiveMode: hot_reload +# Datatype: long +mods_cache_size_limit_per_fi_in_bytes=0 + # The threshold of operator count in the result set of EXPLAIN ANALYZE, if the number of operator in the result set is larger than this threshold, operator will be merged. # effectiveMode: hot_reload # Datatype: int diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternNode.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternNode.java index b5da17d92bdef..67616c5070b49 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternNode.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternNode.java @@ -19,7 +19,9 @@ package org.apache.iotdb.commons.path; +import org.apache.tsfile.utils.Accountable; import org.apache.tsfile.utils.PublicBAOS; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -40,8 +42,10 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD; -public class PathPatternNode> { +public class PathPatternNode> implements Accountable { + private static final long SHALLOW_SIZE = + RamUsageEstimator.shallowSizeOfInstance(PathPatternNode.class); private final String name; private final Map> children; private Set valueSet; @@ -273,6 +277,18 @@ public static > PathPatternNode return node; } + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + + RamUsageEstimator.sizeOf(name) + + RamUsageEstimator.sizeOfHashSet(valueSet) + + RamUsageEstimator.sizeOfHashSet(childrenNamesWithNonTrivialWildcard) + + RamUsageEstimator.sizeOfMapWithKnownShallowSize( + children, + RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP, + RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY); + } + /** * Interface to support serialize and deserialize valueSet. * diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PatternTreeMap.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PatternTreeMap.java index 99216d10f4542..8e3bbdf5f9a64 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PatternTreeMap.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PatternTreeMap.java @@ -21,6 +21,8 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.utils.Accountable; +import org.apache.tsfile.utils.RamUsageEstimator; import javax.annotation.concurrent.NotThreadSafe; @@ -34,7 +36,9 @@ import java.util.function.Supplier; @NotThreadSafe -public class PatternTreeMap> { +public class PatternTreeMap> + implements Accountable { + private final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(PatternTreeMap.class); private final Map> rootMap; private final Supplier> supplier; private final BiConsumer> appendFunction; @@ -269,4 +273,13 @@ private void searchDeviceOverlapped( searchDeviceOverlapped(child, deviceNodes, pos + 1, resultSet); } } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + + RamUsageEstimator.sizeOfMapWithKnownShallowSize( + rootMap, + RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP, + RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY); + } } diff --git a/pom.xml b/pom.xml index da509c4b16dc8..b58f90db5d420 100644 --- a/pom.xml +++ b/pom.xml @@ -175,7 +175,7 @@ 0.14.1 1.9 1.5.6-3 - 2.2.0-250730-SNAPSHOT + 2.2.0-250813-SNAPSHOT