Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public class IoTDBOrderBy2IT extends IoTDBOrderByIT {
public static void setUp() throws Exception {
EnvFactory.getEnv().getConfig().getDataNodeCommonConfig().setSortBufferSize(2048);
EnvFactory.getEnv().getConfig().getDataNodeCommonConfig().setMaxTsBlockSizeInByte(200);
EnvFactory.getEnv()
.getConfig()
.getDataNodeCommonConfig()
.setQueryMemoryProportion("1:100:200:50:400:200:200:50");
EnvFactory.getEnv().initClusterEnvironment();
insertData();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ public class IoTDBOrderByForDebugIT {
@BeforeClass
public static void setUp() throws Exception {
EnvFactory.getEnv().getConfig().getDataNodeCommonConfig().setSortBufferSize(1024 * 1024L);
EnvFactory.getEnv()
.getConfig()
.getDataNodeCommonConfig()
.setQueryMemoryProportion("1:100:200:50:400:200:200:50");
EnvFactory.getEnv().initClusterEnvironment();
insertData();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ public class IoTDBOrderByIT {
@BeforeClass
public static void setUp() throws Exception {
EnvFactory.getEnv().getConfig().getDataNodeCommonConfig().setSortBufferSize(1024 * 1024L);
EnvFactory.getEnv()
.getConfig()
.getDataNodeCommonConfig()
.setQueryMemoryProportion("1:100:200:50:400:200:200:50");
EnvFactory.getEnv().initClusterEnvironment();
insertData();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.function.LongConsumer;

/**
* This class is used to record the context of a query including QueryId, query statement, session
Expand Down Expand Up @@ -88,6 +89,10 @@ public class MPPQueryContext {
// the updateScanNum process in distributed planning can be skipped.
private boolean needUpdateScanNumForLastQuery = false;

private long reservedMemoryCostForSchemaTree = 0;
private boolean releaseSchemaTreeAfterAnalyzing = true;
private LongConsumer reserveMemoryForSchemaTreeFunc = null;

private boolean userQuery = false;

public MPPQueryContext(QueryId queryId) {
Expand Down Expand Up @@ -128,6 +133,34 @@ public MPPQueryContext(
this.initResultNodeContext();
}

public void setReserveMemoryForSchemaTreeFunc(LongConsumer reserveMemoryForSchemaTreeFunc) {
this.reserveMemoryForSchemaTreeFunc = reserveMemoryForSchemaTreeFunc;
}

public void reserveMemoryForSchemaTree(long memoryCost) {
if (reserveMemoryForSchemaTreeFunc == null) {
return;
}
reserveMemoryForSchemaTreeFunc.accept(memoryCost);
this.reservedMemoryCostForSchemaTree += memoryCost;
}

public void setReleaseSchemaTreeAfterAnalyzing(boolean releaseSchemaTreeAfterAnalyzing) {
this.releaseSchemaTreeAfterAnalyzing = releaseSchemaTreeAfterAnalyzing;
}

public boolean releaseSchemaTreeAfterAnalyzing() {
return releaseSchemaTreeAfterAnalyzing;
}

public void releaseMemoryForSchemaTree() {
if (reservedMemoryCostForSchemaTree <= 0) {
return;
}
this.memoryReservationManager.releaseMemoryCumulatively(reservedMemoryCostForSchemaTree);
reservedMemoryCostForSchemaTree = 0;
}

public void prepareForRetry() {
this.initResultNodeContext();
this.releaseAllMemoryReservedForFrontEnd();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iotdb.db.schemaengine.template.Template;

import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.schema.IMeasurementSchema;

Expand All @@ -49,8 +50,10 @@
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;

import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
Expand All @@ -60,6 +63,8 @@
import static org.apache.iotdb.db.queryengine.common.schematree.node.SchemaNode.SCHEMA_MEASUREMENT_NODE;

public class ClusterSchemaTree implements ISchemaTree {
private static final long SHALLOW_SIZE =
RamUsageEstimator.shallowSizeOfInstance(ClusterSchemaTree.class);
private static final ClusterTemplateManager templateManager =
ClusterTemplateManager.getInstance();

Expand All @@ -75,6 +80,8 @@ public class ClusterSchemaTree implements ISchemaTree {

private Map<Integer, Template> templateMap = new HashMap<>();

private long ramBytesUsed;

public ClusterSchemaTree() {
root = new SchemaInternalNode(PATH_ROOT);
}
Expand Down Expand Up @@ -484,59 +491,158 @@ public void serialize(OutputStream outputStream) throws IOException {
root.serialize(outputStream);
}

public static ClusterSchemaTree deserialize(InputStream inputStream) throws IOException {
public Iterator<SchemaNode> getIteratorForSerialize() {
return new SchemaNodePostOrderIterator(root);
}

byte nodeType;
int childNum;
Deque<SchemaNode> stack = new ArrayDeque<>();
SchemaNode child;
boolean hasLogicalView = false;
boolean hasNormalTimeSeries = false;
Map<Integer, Template> templateMap = new HashMap<>();

while (inputStream.available() > 0) {
nodeType = ReadWriteIOUtils.readByte(inputStream);
if (nodeType == SCHEMA_MEASUREMENT_NODE) {
SchemaMeasurementNode measurementNode = SchemaMeasurementNode.deserialize(inputStream);
stack.push(measurementNode);
if (measurementNode.isLogicalView()) {
hasLogicalView = true;
@Override
public long ramBytesUsed() {
if (ramBytesUsed > 0) {
return ramBytesUsed;
}
ramBytesUsed =
root.ramBytesUsed()
+ SHALLOW_SIZE
+ RamUsageEstimator.sizeOfMapWithKnownShallowSize(
templateMap,
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP,
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY);
return ramBytesUsed;
}

public void setRamBytesUsed(long ramBytesUsed) {
this.ramBytesUsed = ramBytesUsed;
}

private static class SchemaNodePostOrderIterator implements Iterator<SchemaNode> {
// This class is likely to be faster than Stack when used as a stack
private final Deque<Pair<SchemaNode, Iterator<SchemaNode>>> stack = new ArrayDeque<>();
private SchemaNode nextNode;

public SchemaNodePostOrderIterator(SchemaNode root) {
stack.push(new Pair<>(root, root.getChildrenIterator()));
prepareNext();
}

@Override
public boolean hasNext() {
return nextNode != null;
}

@Override
public SchemaNode next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
SchemaNode result = nextNode;
prepareNext();
return result;
}

private void prepareNext() {
nextNode = null;
while (!stack.isEmpty()) {
Pair<SchemaNode, Iterator<SchemaNode>> pair = stack.peek();
SchemaNode currentNode = pair.getLeft();
Iterator<SchemaNode> childrenIterator = pair.getRight();
if (childrenIterator.hasNext()) {
SchemaNode child = childrenIterator.next();
stack.push(new Pair<>(child, child.getChildrenIterator()));
} else {
stack.pop();
nextNode = currentNode;
return;
}
hasNormalTimeSeries = true;
} else {
SchemaInternalNode internalNode;
if (nodeType == SCHEMA_ENTITY_NODE) {
internalNode = SchemaEntityNode.deserialize(inputStream);
int templateId = internalNode.getAsEntityNode().getTemplateId();
if (templateId != NON_TEMPLATE) {
templateMap.putIfAbsent(templateId, templateManager.getTemplate(templateId));
}
}
}

public static class SchemaNodeBatchDeserializer {
private byte nodeType;
private int childNum;
// This class is likely to be faster than Stack when used as a stack
private final Deque<SchemaNode> stack = new ArrayDeque<>();
private SchemaNode child;
private boolean hasLogicalView = false;
private boolean hasNormalTimeSeries = false;
private Map<Integer, Template> templateMap = new HashMap<>();
private boolean isFirstBatch = true;

public boolean isFirstBatch() {
return isFirstBatch;
}

public void deserializeFromBatch(InputStream inputStream) throws IOException {
isFirstBatch = false;
while (inputStream.available() > 0) {
nodeType = ReadWriteIOUtils.readByte(inputStream);
if (nodeType == SCHEMA_MEASUREMENT_NODE) {
SchemaMeasurementNode measurementNode = SchemaMeasurementNode.deserialize(inputStream);
stack.push(measurementNode);
if (measurementNode.isLogicalView()) {
hasLogicalView = true;
}
hasNormalTimeSeries = true;
} else {
internalNode = SchemaInternalNode.deserialize(inputStream);
}
SchemaInternalNode internalNode;
if (nodeType == SCHEMA_ENTITY_NODE) {
internalNode = SchemaEntityNode.deserialize(inputStream);
int templateId = internalNode.getAsEntityNode().getTemplateId();
if (templateId != NON_TEMPLATE) {
templateMap.putIfAbsent(templateId, templateManager.getTemplate(templateId));
}
} else {
internalNode = SchemaInternalNode.deserialize(inputStream);
}

childNum = ReadWriteIOUtils.readInt(inputStream);
while (childNum > 0) {
child = stack.pop();
internalNode.addChild(child.getName(), child);
if (child.isMeasurement()) {
SchemaMeasurementNode measurementNode = child.getAsMeasurementNode();
if (measurementNode.getAlias() != null) {
internalNode
.getAsEntityNode()
.addAliasChild(measurementNode.getAlias(), measurementNode);
childNum = ReadWriteIOUtils.readInt(inputStream);
while (childNum > 0) {
child = stack.pop();
internalNode.addChild(child.getName(), child);
if (child.isMeasurement()) {
SchemaMeasurementNode measurementNode = child.getAsMeasurementNode();
if (measurementNode.getAlias() != null) {
internalNode
.getAsEntityNode()
.addAliasChild(measurementNode.getAlias(), measurementNode);
}
}
childNum--;
}
childNum--;
stack.push(internalNode);
}
stack.push(internalNode);
}
}
ClusterSchemaTree result = new ClusterSchemaTree(stack.poll());
result.templateMap = templateMap;
result.hasLogicalMeasurementPath = hasLogicalView;
result.hasNormalTimeSeries = hasNormalTimeSeries;
return result;

public ClusterSchemaTree finish() {
try {
ClusterSchemaTree result = new ClusterSchemaTree(stack.poll());
result.templateMap = templateMap;
result.hasLogicalMeasurementPath = hasLogicalView;
result.hasNormalTimeSeries = hasNormalTimeSeries;
return result;
} finally {
reset();
}
}

private void reset() {
nodeType = 0;
childNum = 0;
stack.clear();
child = null;
hasLogicalView = false;
hasNormalTimeSeries = false;
// templateMap is set to the returned schema tree, so we should create a new one
templateMap = new HashMap<>();
isFirstBatch = true;
}
}

public static ClusterSchemaTree deserialize(InputStream inputStream) throws IOException {
SchemaNodeBatchDeserializer schemaNodeBatchDeserializer = new SchemaNodeBatchDeserializer();
schemaNodeBatchDeserializer.deserializeFromBatch(inputStream);
return schemaNodeBatchDeserializer.finish();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.schemaengine.template.Template;

import org.apache.tsfile.utils.Accountable;
import org.apache.tsfile.utils.Pair;

import java.util.List;
import java.util.Set;

public interface ISchemaTree {
public interface ISchemaTree extends Accountable {
/**
* Return all measurement paths for given path pattern and filter the result by slimit and offset.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.queryengine.common.schematree.node;

import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;

import java.io.IOException;
Expand All @@ -31,6 +32,9 @@

public class SchemaEntityNode extends SchemaInternalNode {

private static final long SHALLOW_SIZE =
RamUsageEstimator.shallowSizeOfInstance(SchemaEntityNode.class);

private boolean isAligned;

private Map<String, SchemaMeasurementNode> aliasChildren;
Expand Down Expand Up @@ -117,6 +121,11 @@ public byte getType() {
@Override
public void serialize(OutputStream outputStream) throws IOException {
serializeChildren(outputStream);
this.serializeNodeOwnContent(outputStream);
}

@Override
public void serializeNodeOwnContent(OutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(getType(), outputStream);
ReadWriteIOUtils.write(name, outputStream);
ReadWriteIOUtils.write(isAligned, outputStream);
Expand All @@ -133,4 +142,18 @@ public static SchemaEntityNode deserialize(InputStream inputStream) throws IOExc
entityNode.setTemplateId(templateId);
return entityNode;
}

@Override
public long ramBytesUsed() {
return SHALLOW_SIZE
+ RamUsageEstimator.sizeOf(name)
+ RamUsageEstimator.sizeOfMapWithKnownShallowSize(
children,
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP,
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY)
+ RamUsageEstimator.sizeOfMapWithKnownShallowSize(
aliasChildren,
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP,
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY);
}
}
Loading
Loading