Skip to content

Commit f407b66

Browse files
authored
Transfer schema tree in batches & add memory control for schema tree
1 parent cffabae commit f407b66

14 files changed

Lines changed: 429 additions & 89 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderByIT.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ public class IoTDBOrderByIT {
100100
@BeforeClass
101101
public static void setUp() throws Exception {
102102
EnvFactory.getEnv().getConfig().getDataNodeCommonConfig().setSortBufferSize(1024 * 1024L);
103+
EnvFactory.getEnv()
104+
.getConfig()
105+
.getDataNodeCommonConfig()
106+
.setQueryMemoryProportion("1:100:200:50:200:400:200:50");
103107
EnvFactory.getEnv().initClusterEnvironment();
104108
insertData();
105109
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.Optional;
3838
import java.util.Set;
3939
import java.util.concurrent.ConcurrentHashMap;
40+
import java.util.function.LongConsumer;
4041

4142
/**
4243
* This class is used to record the context of a query including QueryId, query statement, session
@@ -89,6 +90,10 @@ public class MPPQueryContext {
8990
// the updateScanNum process in distributed planning can be skipped.
9091
private boolean needUpdateScanNumForLastQuery = false;
9192

93+
private long reservedMemoryCostForSchemaTree = 0;
94+
private boolean releaseSchemaTreeAfterAnalyzing = true;
95+
private LongConsumer reserveMemoryForSchemaTreeFunc = null;
96+
9297
private boolean userQuery = false;
9398

9499
public MPPQueryContext(QueryId queryId) {
@@ -129,6 +134,34 @@ public MPPQueryContext(
129134
this.initResultNodeContext();
130135
}
131136

137+
public void setReserveMemoryForSchemaTreeFunc(LongConsumer reserveMemoryForSchemaTreeFunc) {
138+
this.reserveMemoryForSchemaTreeFunc = reserveMemoryForSchemaTreeFunc;
139+
}
140+
141+
public void reserveMemoryForSchemaTree(long memoryCost) {
142+
if (reserveMemoryForSchemaTreeFunc == null) {
143+
return;
144+
}
145+
reserveMemoryForSchemaTreeFunc.accept(memoryCost);
146+
this.reservedMemoryCostForSchemaTree += memoryCost;
147+
}
148+
149+
public void setReleaseSchemaTreeAfterAnalyzing(boolean releaseSchemaTreeAfterAnalyzing) {
150+
this.releaseSchemaTreeAfterAnalyzing = releaseSchemaTreeAfterAnalyzing;
151+
}
152+
153+
public boolean releaseSchemaTreeAfterAnalyzing() {
154+
return releaseSchemaTreeAfterAnalyzing;
155+
}
156+
157+
public void releaseMemoryForSchemaTree() {
158+
if (reservedMemoryCostForSchemaTree <= 0) {
159+
return;
160+
}
161+
this.memoryReservationManager.releaseMemoryCumulatively(reservedMemoryCostForSchemaTree);
162+
reservedMemoryCostForSchemaTree = 0;
163+
}
164+
132165
public void prepareForRetry() {
133166
this.initResultNodeContext();
134167
this.releaseAllMemoryReservedForFrontEnd();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java

Lines changed: 149 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import org.apache.tsfile.file.metadata.IDeviceID;
4242
import org.apache.tsfile.utils.Pair;
43+
import org.apache.tsfile.utils.RamUsageEstimator;
4344
import org.apache.tsfile.utils.ReadWriteIOUtils;
4445
import org.apache.tsfile.write.schema.IMeasurementSchema;
4546

@@ -50,8 +51,10 @@
5051
import java.util.ArrayList;
5152
import java.util.Deque;
5253
import java.util.HashMap;
54+
import java.util.Iterator;
5355
import java.util.List;
5456
import java.util.Map;
57+
import java.util.NoSuchElementException;
5558
import java.util.Set;
5659

5760
import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
@@ -61,6 +64,8 @@
6164
import static org.apache.iotdb.db.queryengine.common.schematree.node.SchemaNode.SCHEMA_MEASUREMENT_NODE;
6265

6366
public class ClusterSchemaTree implements ISchemaTree {
67+
private static final long SHALLOW_SIZE =
68+
RamUsageEstimator.shallowSizeOfInstance(ClusterSchemaTree.class);
6469
private static final ClusterTemplateManager templateManager =
6570
ClusterTemplateManager.getInstance();
6671

@@ -76,6 +81,8 @@ public class ClusterSchemaTree implements ISchemaTree {
7681

7782
private Map<Integer, Template> templateMap = new HashMap<>();
7883

84+
private long ramBytesUsed;
85+
7986
public ClusterSchemaTree() {
8087
root = new SchemaInternalNode(PATH_ROOT);
8188
}
@@ -485,59 +492,158 @@ public void serialize(OutputStream outputStream) throws IOException {
485492
root.serialize(outputStream);
486493
}
487494

488-
public static ClusterSchemaTree deserialize(InputStream inputStream) throws IOException {
495+
public Iterator<SchemaNode> getIteratorForSerialize() {
496+
return new SchemaNodePostOrderIterator(root);
497+
}
489498

490-
byte nodeType;
491-
int childNum;
492-
Deque<SchemaNode> stack = new ArrayDeque<>();
493-
SchemaNode child;
494-
boolean hasLogicalView = false;
495-
boolean hasNormalTimeSeries = false;
496-
Map<Integer, Template> templateMap = new HashMap<>();
497-
498-
while (inputStream.available() > 0) {
499-
nodeType = ReadWriteIOUtils.readByte(inputStream);
500-
if (nodeType == SCHEMA_MEASUREMENT_NODE) {
501-
SchemaMeasurementNode measurementNode = SchemaMeasurementNode.deserialize(inputStream);
502-
stack.push(measurementNode);
503-
if (measurementNode.isLogicalView()) {
504-
hasLogicalView = true;
499+
@Override
500+
public long ramBytesUsed() {
501+
if (ramBytesUsed > 0) {
502+
return ramBytesUsed;
503+
}
504+
ramBytesUsed =
505+
root.ramBytesUsed()
506+
+ SHALLOW_SIZE
507+
+ RamUsageEstimator.sizeOfMapWithKnownShallowSize(
508+
templateMap,
509+
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP,
510+
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY);
511+
return ramBytesUsed;
512+
}
513+
514+
public void setRamBytesUsed(long ramBytesUsed) {
515+
this.ramBytesUsed = ramBytesUsed;
516+
}
517+
518+
private static class SchemaNodePostOrderIterator implements Iterator<SchemaNode> {
519+
// This class is likely to be faster than Stack when used as a stack
520+
private final Deque<Pair<SchemaNode, Iterator<SchemaNode>>> stack = new ArrayDeque<>();
521+
private SchemaNode nextNode;
522+
523+
public SchemaNodePostOrderIterator(SchemaNode root) {
524+
stack.push(new Pair<>(root, root.getChildrenIterator()));
525+
prepareNext();
526+
}
527+
528+
@Override
529+
public boolean hasNext() {
530+
return nextNode != null;
531+
}
532+
533+
@Override
534+
public SchemaNode next() {
535+
if (!hasNext()) {
536+
throw new NoSuchElementException();
537+
}
538+
SchemaNode result = nextNode;
539+
prepareNext();
540+
return result;
541+
}
542+
543+
private void prepareNext() {
544+
nextNode = null;
545+
while (!stack.isEmpty()) {
546+
Pair<SchemaNode, Iterator<SchemaNode>> pair = stack.peek();
547+
SchemaNode currentNode = pair.getLeft();
548+
Iterator<SchemaNode> childrenIterator = pair.getRight();
549+
if (childrenIterator.hasNext()) {
550+
SchemaNode child = childrenIterator.next();
551+
stack.push(new Pair<>(child, child.getChildrenIterator()));
552+
} else {
553+
stack.pop();
554+
nextNode = currentNode;
555+
return;
505556
}
506-
hasNormalTimeSeries = true;
507-
} else {
508-
SchemaInternalNode internalNode;
509-
if (nodeType == SCHEMA_ENTITY_NODE) {
510-
internalNode = SchemaEntityNode.deserialize(inputStream);
511-
int templateId = internalNode.getAsEntityNode().getTemplateId();
512-
if (templateId != NON_TEMPLATE) {
513-
templateMap.putIfAbsent(templateId, templateManager.getTemplate(templateId));
557+
}
558+
}
559+
}
560+
561+
public static class SchemaNodeBatchDeserializer {
562+
private byte nodeType;
563+
private int childNum;
564+
// This class is likely to be faster than Stack when used as a stack
565+
private final Deque<SchemaNode> stack = new ArrayDeque<>();
566+
private SchemaNode child;
567+
private boolean hasLogicalView = false;
568+
private boolean hasNormalTimeSeries = false;
569+
private Map<Integer, Template> templateMap = new HashMap<>();
570+
private boolean isFirstBatch = true;
571+
572+
public boolean isFirstBatch() {
573+
return isFirstBatch;
574+
}
575+
576+
public void deserializeFromBatch(InputStream inputStream) throws IOException {
577+
isFirstBatch = false;
578+
while (inputStream.available() > 0) {
579+
nodeType = ReadWriteIOUtils.readByte(inputStream);
580+
if (nodeType == SCHEMA_MEASUREMENT_NODE) {
581+
SchemaMeasurementNode measurementNode = SchemaMeasurementNode.deserialize(inputStream);
582+
stack.push(measurementNode);
583+
if (measurementNode.isLogicalView()) {
584+
hasLogicalView = true;
514585
}
586+
hasNormalTimeSeries = true;
515587
} else {
516-
internalNode = SchemaInternalNode.deserialize(inputStream);
517-
}
588+
SchemaInternalNode internalNode;
589+
if (nodeType == SCHEMA_ENTITY_NODE) {
590+
internalNode = SchemaEntityNode.deserialize(inputStream);
591+
int templateId = internalNode.getAsEntityNode().getTemplateId();
592+
if (templateId != NON_TEMPLATE) {
593+
templateMap.putIfAbsent(templateId, templateManager.getTemplate(templateId));
594+
}
595+
} else {
596+
internalNode = SchemaInternalNode.deserialize(inputStream);
597+
}
518598

519-
childNum = ReadWriteIOUtils.readInt(inputStream);
520-
while (childNum > 0) {
521-
child = stack.pop();
522-
internalNode.addChild(child.getName(), child);
523-
if (child.isMeasurement()) {
524-
SchemaMeasurementNode measurementNode = child.getAsMeasurementNode();
525-
if (measurementNode.getAlias() != null) {
526-
internalNode
527-
.getAsEntityNode()
528-
.addAliasChild(measurementNode.getAlias(), measurementNode);
599+
childNum = ReadWriteIOUtils.readInt(inputStream);
600+
while (childNum > 0) {
601+
child = stack.pop();
602+
internalNode.addChild(child.getName(), child);
603+
if (child.isMeasurement()) {
604+
SchemaMeasurementNode measurementNode = child.getAsMeasurementNode();
605+
if (measurementNode.getAlias() != null) {
606+
internalNode
607+
.getAsEntityNode()
608+
.addAliasChild(measurementNode.getAlias(), measurementNode);
609+
}
529610
}
611+
childNum--;
530612
}
531-
childNum--;
613+
stack.push(internalNode);
532614
}
533-
stack.push(internalNode);
534615
}
535616
}
536-
ClusterSchemaTree result = new ClusterSchemaTree(stack.poll());
537-
result.templateMap = templateMap;
538-
result.hasLogicalMeasurementPath = hasLogicalView;
539-
result.hasNormalTimeSeries = hasNormalTimeSeries;
540-
return result;
617+
618+
public ClusterSchemaTree finish() {
619+
try {
620+
ClusterSchemaTree result = new ClusterSchemaTree(stack.poll());
621+
result.templateMap = templateMap;
622+
result.hasLogicalMeasurementPath = hasLogicalView;
623+
result.hasNormalTimeSeries = hasNormalTimeSeries;
624+
return result;
625+
} finally {
626+
reset();
627+
}
628+
}
629+
630+
private void reset() {
631+
nodeType = 0;
632+
childNum = 0;
633+
stack.clear();
634+
child = null;
635+
hasLogicalView = false;
636+
hasNormalTimeSeries = false;
637+
// templateMap is set to the returned schema tree, so we should create a new one
638+
templateMap = new HashMap<>();
639+
isFirstBatch = true;
640+
}
641+
}
642+
643+
public static ClusterSchemaTree deserialize(InputStream inputStream) throws IOException {
644+
SchemaNodeBatchDeserializer schemaNodeBatchDeserializer = new SchemaNodeBatchDeserializer();
645+
schemaNodeBatchDeserializer.deserializeFromBatch(inputStream);
646+
return schemaNodeBatchDeserializer.finish();
541647
}
542648

543649
/**

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ISchemaTree.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@
2525
import org.apache.iotdb.db.schemaengine.template.Template;
2626

2727
import org.apache.tsfile.file.metadata.IDeviceID;
28+
import org.apache.tsfile.utils.Accountable;
2829
import org.apache.tsfile.utils.Pair;
2930

3031
import java.util.List;
3132
import java.util.Set;
3233

33-
public interface ISchemaTree {
34+
public interface ISchemaTree extends Accountable {
3435
/**
3536
* Return all measurement paths for given path pattern and filter the result by slimit and offset.
3637
*

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaEntityNode.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.db.queryengine.common.schematree.node;
2121

22+
import org.apache.tsfile.utils.RamUsageEstimator;
2223
import org.apache.tsfile.utils.ReadWriteIOUtils;
2324

2425
import java.io.IOException;
@@ -31,6 +32,9 @@
3132

3233
public class SchemaEntityNode extends SchemaInternalNode {
3334

35+
private static final long SHALLOW_SIZE =
36+
RamUsageEstimator.shallowSizeOfInstance(SchemaEntityNode.class);
37+
3438
private boolean isAligned;
3539

3640
private Map<String, SchemaMeasurementNode> aliasChildren;
@@ -117,6 +121,11 @@ public byte getType() {
117121
@Override
118122
public void serialize(OutputStream outputStream) throws IOException {
119123
serializeChildren(outputStream);
124+
this.serializeNodeOwnContent(outputStream);
125+
}
126+
127+
@Override
128+
public void serializeNodeOwnContent(OutputStream outputStream) throws IOException {
120129
ReadWriteIOUtils.write(getType(), outputStream);
121130
ReadWriteIOUtils.write(name, outputStream);
122131
ReadWriteIOUtils.write(isAligned, outputStream);
@@ -133,4 +142,18 @@ public static SchemaEntityNode deserialize(InputStream inputStream) throws IOExc
133142
entityNode.setTemplateId(templateId);
134143
return entityNode;
135144
}
145+
146+
@Override
147+
public long ramBytesUsed() {
148+
return SHALLOW_SIZE
149+
+ RamUsageEstimator.sizeOf(name)
150+
+ RamUsageEstimator.sizeOfMapWithKnownShallowSize(
151+
children,
152+
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP,
153+
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY)
154+
+ RamUsageEstimator.sizeOfMapWithKnownShallowSize(
155+
aliasChildren,
156+
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP,
157+
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY);
158+
}
136159
}

0 commit comments

Comments
 (0)