Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public class IoTDBOrderByIT {
@BeforeClass
public static void setUp() throws Exception {
EnvFactory.getEnv().getConfig().getDataNodeCommonConfig().setSortBufferSize(1024 * 1024L);
EnvFactory.getEnv()
.getConfig()
.getDataNodeCommonConfig()
.setQueryMemoryProportion("1:100:200:50:200:400:200:50");
EnvFactory.getEnv().initClusterEnvironment();
insertData();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.LongConsumer;

/**
* This class is used to record the context of a query including QueryId, query statement, session
Expand Down Expand Up @@ -89,6 +90,10 @@ public class MPPQueryContext {
// the updateScanNum process in distributed planning can be skipped.
private boolean needUpdateScanNumForLastQuery = false;

private long reservedMemoryCostForSchemaTree = 0;
private boolean releaseSchemaTreeAfterAnalyzing = true;
private LongConsumer reserveMemoryForSchemaTreeFunc = null;

private boolean userQuery = false;

public MPPQueryContext(QueryId queryId) {
Expand Down Expand Up @@ -129,6 +134,34 @@ public MPPQueryContext(
this.initResultNodeContext();
}

public void setReserveMemoryForSchemaTreeFunc(LongConsumer reserveMemoryForSchemaTreeFunc) {
this.reserveMemoryForSchemaTreeFunc = reserveMemoryForSchemaTreeFunc;
}

public void reserveMemoryForSchemaTree(long memoryCost) {
if (reserveMemoryForSchemaTreeFunc == null) {
return;
}
reserveMemoryForSchemaTreeFunc.accept(memoryCost);
this.reservedMemoryCostForSchemaTree += memoryCost;
}

public void setReleaseSchemaTreeAfterAnalyzing(boolean releaseSchemaTreeAfterAnalyzing) {
this.releaseSchemaTreeAfterAnalyzing = releaseSchemaTreeAfterAnalyzing;
}

public boolean releaseSchemaTreeAfterAnalyzing() {
return releaseSchemaTreeAfterAnalyzing;
}

public void releaseMemoryForSchemaTree() {
if (reservedMemoryCostForSchemaTree <= 0) {
return;
}
this.memoryReservationManager.releaseMemoryCumulatively(reservedMemoryCostForSchemaTree);
reservedMemoryCostForSchemaTree = 0;
}

public void prepareForRetry() {
this.initResultNodeContext();
this.releaseAllMemoryReservedForFrontEnd();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.schema.IMeasurementSchema;

Expand All @@ -50,8 +51,10 @@
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;

import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
Expand All @@ -61,6 +64,8 @@
import static org.apache.iotdb.db.queryengine.common.schematree.node.SchemaNode.SCHEMA_MEASUREMENT_NODE;

public class ClusterSchemaTree implements ISchemaTree {
private static final long SHALLOW_SIZE =
RamUsageEstimator.shallowSizeOfInstance(ClusterSchemaTree.class);
private static final ClusterTemplateManager templateManager =
ClusterTemplateManager.getInstance();

Expand All @@ -76,6 +81,8 @@ public class ClusterSchemaTree implements ISchemaTree {

private Map<Integer, Template> templateMap = new HashMap<>();

private long ramBytesUsed;

public ClusterSchemaTree() {
root = new SchemaInternalNode(PATH_ROOT);
}
Expand Down Expand Up @@ -485,59 +492,158 @@ public void serialize(OutputStream outputStream) throws IOException {
root.serialize(outputStream);
}

public static ClusterSchemaTree deserialize(InputStream inputStream) throws IOException {
public Iterator<SchemaNode> getIteratorForSerialize() {
return new SchemaNodePostOrderIterator(root);
}

byte nodeType;
int childNum;
Deque<SchemaNode> stack = new ArrayDeque<>();
SchemaNode child;
boolean hasLogicalView = false;
boolean hasNormalTimeSeries = false;
Map<Integer, Template> templateMap = new HashMap<>();

while (inputStream.available() > 0) {
nodeType = ReadWriteIOUtils.readByte(inputStream);
if (nodeType == SCHEMA_MEASUREMENT_NODE) {
SchemaMeasurementNode measurementNode = SchemaMeasurementNode.deserialize(inputStream);
stack.push(measurementNode);
if (measurementNode.isLogicalView()) {
hasLogicalView = true;
@Override
public long ramBytesUsed() {
if (ramBytesUsed > 0) {
return ramBytesUsed;
}
ramBytesUsed =
root.ramBytesUsed()
+ SHALLOW_SIZE
+ RamUsageEstimator.sizeOfMapWithKnownShallowSize(
templateMap,
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP,
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY);
return ramBytesUsed;
}

public void setRamBytesUsed(long ramBytesUsed) {
this.ramBytesUsed = ramBytesUsed;
}

private static class SchemaNodePostOrderIterator implements Iterator<SchemaNode> {
// This class is likely to be faster than Stack when used as a stack
private final Deque<Pair<SchemaNode, Iterator<SchemaNode>>> stack = new ArrayDeque<>();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe Stack<Pair<SchemaNode, Boolean>> is enough

private SchemaNode nextNode;

public SchemaNodePostOrderIterator(SchemaNode root) {
stack.push(new Pair<>(root, root.getChildrenIterator()));
prepareNext();
}

@Override
public boolean hasNext() {
return nextNode != null;
}

@Override
public SchemaNode next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
SchemaNode result = nextNode;
prepareNext();
return result;
}

private void prepareNext() {
nextNode = null;
while (!stack.isEmpty()) {
Pair<SchemaNode, Iterator<SchemaNode>> pair = stack.peek();
SchemaNode currentNode = pair.getLeft();
Iterator<SchemaNode> childrenIterator = pair.getRight();
if (childrenIterator.hasNext()) {
SchemaNode child = childrenIterator.next();
stack.push(new Pair<>(child, child.getChildrenIterator()));
} else {
stack.pop();
nextNode = currentNode;
return;
}
hasNormalTimeSeries = true;
} else {
SchemaInternalNode internalNode;
if (nodeType == SCHEMA_ENTITY_NODE) {
internalNode = SchemaEntityNode.deserialize(inputStream);
int templateId = internalNode.getAsEntityNode().getTemplateId();
if (templateId != NON_TEMPLATE) {
templateMap.putIfAbsent(templateId, templateManager.getTemplate(templateId));
}
}
}

public static class SchemaNodeBatchDeserializer {
private byte nodeType;
private int childNum;
// This class is likely to be faster than Stack when used as a stack
private final Deque<SchemaNode> stack = new ArrayDeque<>();
private SchemaNode child;
private boolean hasLogicalView = false;
private boolean hasNormalTimeSeries = false;
private Map<Integer, Template> templateMap = new HashMap<>();
Comment thread
JackieTien97 marked this conversation as resolved.
private boolean isFirstBatch = true;

public boolean isFirstBatch() {
return isFirstBatch;
}

public void deserializeFromBatch(InputStream inputStream) throws IOException {
isFirstBatch = false;
while (inputStream.available() > 0) {
nodeType = ReadWriteIOUtils.readByte(inputStream);
if (nodeType == SCHEMA_MEASUREMENT_NODE) {
SchemaMeasurementNode measurementNode = SchemaMeasurementNode.deserialize(inputStream);
stack.push(measurementNode);
if (measurementNode.isLogicalView()) {
hasLogicalView = true;
}
hasNormalTimeSeries = true;
} else {
internalNode = SchemaInternalNode.deserialize(inputStream);
}
SchemaInternalNode internalNode;
if (nodeType == SCHEMA_ENTITY_NODE) {
internalNode = SchemaEntityNode.deserialize(inputStream);
int templateId = internalNode.getAsEntityNode().getTemplateId();
if (templateId != NON_TEMPLATE) {
templateMap.putIfAbsent(templateId, templateManager.getTemplate(templateId));
}
} else {
internalNode = SchemaInternalNode.deserialize(inputStream);
}

childNum = ReadWriteIOUtils.readInt(inputStream);
while (childNum > 0) {
child = stack.pop();
internalNode.addChild(child.getName(), child);
if (child.isMeasurement()) {
SchemaMeasurementNode measurementNode = child.getAsMeasurementNode();
if (measurementNode.getAlias() != null) {
internalNode
.getAsEntityNode()
.addAliasChild(measurementNode.getAlias(), measurementNode);
childNum = ReadWriteIOUtils.readInt(inputStream);
while (childNum > 0) {
child = stack.pop();
internalNode.addChild(child.getName(), child);
if (child.isMeasurement()) {
SchemaMeasurementNode measurementNode = child.getAsMeasurementNode();
if (measurementNode.getAlias() != null) {
internalNode
.getAsEntityNode()
.addAliasChild(measurementNode.getAlias(), measurementNode);
}
}
childNum--;
}
childNum--;
stack.push(internalNode);
}
stack.push(internalNode);
}
}
ClusterSchemaTree result = new ClusterSchemaTree(stack.poll());
result.templateMap = templateMap;
result.hasLogicalMeasurementPath = hasLogicalView;
result.hasNormalTimeSeries = hasNormalTimeSeries;
return result;

public ClusterSchemaTree finish() {
try {
ClusterSchemaTree result = new ClusterSchemaTree(stack.poll());
result.templateMap = templateMap;
result.hasLogicalMeasurementPath = hasLogicalView;
result.hasNormalTimeSeries = hasNormalTimeSeries;
return result;
} finally {
reset();
}
}

private void reset() {
nodeType = 0;
childNum = 0;
stack.clear();
child = null;
hasLogicalView = false;
hasNormalTimeSeries = false;
// templateMap is set to the returned schema tree, so we should create a new one
templateMap = new HashMap<>();
isFirstBatch = true;
}
}

public static ClusterSchemaTree deserialize(InputStream inputStream) throws IOException {
SchemaNodeBatchDeserializer schemaNodeBatchDeserializer = new SchemaNodeBatchDeserializer();
schemaNodeBatchDeserializer.deserializeFromBatch(inputStream);
return schemaNodeBatchDeserializer.finish();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
import org.apache.iotdb.db.schemaengine.template.Template;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Accountable;
import org.apache.tsfile.utils.Pair;

import java.util.List;
import java.util.Set;

public interface ISchemaTree {
public interface ISchemaTree extends Accountable {
/**
* Return all measurement paths for given path pattern and filter the result by slimit and offset.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.queryengine.common.schematree.node;

import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;

import java.io.IOException;
Expand All @@ -31,6 +32,9 @@

public class SchemaEntityNode extends SchemaInternalNode {

private static final long SHALLOW_SIZE =
RamUsageEstimator.shallowSizeOfInstance(SchemaEntityNode.class);

private boolean isAligned;

private Map<String, SchemaMeasurementNode> aliasChildren;
Expand Down Expand Up @@ -117,6 +121,11 @@ public byte getType() {
@Override
public void serialize(OutputStream outputStream) throws IOException {
serializeChildren(outputStream);
this.serializeNodeOwnContent(outputStream);
}

@Override
public void serializeNodeOwnContent(OutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(getType(), outputStream);
ReadWriteIOUtils.write(name, outputStream);
ReadWriteIOUtils.write(isAligned, outputStream);
Expand All @@ -133,4 +142,18 @@ public static SchemaEntityNode deserialize(InputStream inputStream) throws IOExc
entityNode.setTemplateId(templateId);
return entityNode;
}

@Override
public long ramBytesUsed() {
return SHALLOW_SIZE
+ RamUsageEstimator.sizeOf(name)
+ RamUsageEstimator.sizeOfMapWithKnownShallowSize(
children,
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP,
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY)
+ RamUsageEstimator.sizeOfMapWithKnownShallowSize(
aliasChildren,
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP,
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY);
}
}
Loading
Loading