Skip to content

Commit bbb74b1

Browse files
authored
[to dev/1.3] Transfer schema tree in batches & add memory control for schema tree
1 parent b9c2124 commit bbb74b1

17 files changed

Lines changed: 438 additions & 90 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderBy2IT.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ public class IoTDBOrderBy2IT extends IoTDBOrderByIT {
3737
public static void setUp() throws Exception {
3838
EnvFactory.getEnv().getConfig().getDataNodeCommonConfig().setSortBufferSize(2048);
3939
EnvFactory.getEnv().getConfig().getDataNodeCommonConfig().setMaxTsBlockSizeInByte(200);
40+
EnvFactory.getEnv()
41+
.getConfig()
42+
.getDataNodeCommonConfig()
43+
.setQueryMemoryProportion("1:100:200:50:400:200:200:50");
4044
EnvFactory.getEnv().initClusterEnvironment();
4145
insertData();
4246
}

integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderByForDebugIT.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ public class IoTDBOrderByForDebugIT {
104104
@BeforeClass
105105
public static void setUp() throws Exception {
106106
EnvFactory.getEnv().getConfig().getDataNodeCommonConfig().setSortBufferSize(1024 * 1024L);
107+
EnvFactory.getEnv()
108+
.getConfig()
109+
.getDataNodeCommonConfig()
110+
.setQueryMemoryProportion("1:100:200:50:400:200:200:50");
107111
EnvFactory.getEnv().initClusterEnvironment();
108112
insertData();
109113
}

integration-test/src/test/java/org/apache/iotdb/db/it/orderBy/IoTDBOrderByIT.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ public class IoTDBOrderByIT {
107107
@BeforeClass
108108
public static void setUp() throws Exception {
109109
EnvFactory.getEnv().getConfig().getDataNodeCommonConfig().setSortBufferSize(1024 * 1024L);
110+
EnvFactory.getEnv()
111+
.getConfig()
112+
.getDataNodeCommonConfig()
113+
.setQueryMemoryProportion("1:100:200:50:400:200:200:50");
110114
EnvFactory.getEnv().initClusterEnvironment();
111115
insertData();
112116
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.LinkedList;
3838
import java.util.List;
3939
import java.util.Set;
40+
import java.util.function.LongConsumer;
4041

4142
/**
4243
* This class is used to record the context of a query including QueryId, query statement, session
@@ -88,6 +89,10 @@ public class MPPQueryContext {
8889
// the updateScanNum process in distributed planning can be skipped.
8990
private boolean needUpdateScanNumForLastQuery = false;
9091

92+
private long reservedMemoryCostForSchemaTree = 0;
93+
private boolean releaseSchemaTreeAfterAnalyzing = true;
94+
private LongConsumer reserveMemoryForSchemaTreeFunc = null;
95+
9196
private boolean userQuery = false;
9297

9398
public MPPQueryContext(QueryId queryId) {
@@ -128,6 +133,34 @@ public MPPQueryContext(
128133
this.initResultNodeContext();
129134
}
130135

136+
public void setReserveMemoryForSchemaTreeFunc(LongConsumer reserveMemoryForSchemaTreeFunc) {
137+
this.reserveMemoryForSchemaTreeFunc = reserveMemoryForSchemaTreeFunc;
138+
}
139+
140+
public void reserveMemoryForSchemaTree(long memoryCost) {
141+
if (reserveMemoryForSchemaTreeFunc == null) {
142+
return;
143+
}
144+
reserveMemoryForSchemaTreeFunc.accept(memoryCost);
145+
this.reservedMemoryCostForSchemaTree += memoryCost;
146+
}
147+
148+
public void setReleaseSchemaTreeAfterAnalyzing(boolean releaseSchemaTreeAfterAnalyzing) {
149+
this.releaseSchemaTreeAfterAnalyzing = releaseSchemaTreeAfterAnalyzing;
150+
}
151+
152+
public boolean releaseSchemaTreeAfterAnalyzing() {
153+
return releaseSchemaTreeAfterAnalyzing;
154+
}
155+
156+
public void releaseMemoryForSchemaTree() {
157+
if (reservedMemoryCostForSchemaTree <= 0) {
158+
return;
159+
}
160+
this.memoryReservationManager.releaseMemoryCumulatively(reservedMemoryCostForSchemaTree);
161+
reservedMemoryCostForSchemaTree = 0;
162+
}
163+
131164
public void prepareForRetry() {
132165
this.initResultNodeContext();
133166
this.releaseAllMemoryReservedForFrontEnd();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java

Lines changed: 149 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.iotdb.db.schemaengine.template.Template;
4040

4141
import org.apache.tsfile.utils.Pair;
42+
import org.apache.tsfile.utils.RamUsageEstimator;
4243
import org.apache.tsfile.utils.ReadWriteIOUtils;
4344
import org.apache.tsfile.write.schema.IMeasurementSchema;
4445

@@ -49,8 +50,10 @@
4950
import java.util.ArrayList;
5051
import java.util.Deque;
5152
import java.util.HashMap;
53+
import java.util.Iterator;
5254
import java.util.List;
5355
import java.util.Map;
56+
import java.util.NoSuchElementException;
5457
import java.util.Set;
5558

5659
import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
@@ -60,6 +63,8 @@
6063
import static org.apache.iotdb.db.queryengine.common.schematree.node.SchemaNode.SCHEMA_MEASUREMENT_NODE;
6164

6265
public class ClusterSchemaTree implements ISchemaTree {
66+
private static final long SHALLOW_SIZE =
67+
RamUsageEstimator.shallowSizeOfInstance(ClusterSchemaTree.class);
6368
private static final ClusterTemplateManager templateManager =
6469
ClusterTemplateManager.getInstance();
6570

@@ -75,6 +80,8 @@ public class ClusterSchemaTree implements ISchemaTree {
7580

7681
private Map<Integer, Template> templateMap = new HashMap<>();
7782

83+
private long ramBytesUsed;
84+
7885
public ClusterSchemaTree() {
7986
root = new SchemaInternalNode(PATH_ROOT);
8087
}
@@ -484,59 +491,158 @@ public void serialize(OutputStream outputStream) throws IOException {
484491
root.serialize(outputStream);
485492
}
486493

487-
public static ClusterSchemaTree deserialize(InputStream inputStream) throws IOException {
494+
public Iterator<SchemaNode> getIteratorForSerialize() {
495+
return new SchemaNodePostOrderIterator(root);
496+
}
488497

489-
byte nodeType;
490-
int childNum;
491-
Deque<SchemaNode> stack = new ArrayDeque<>();
492-
SchemaNode child;
493-
boolean hasLogicalView = false;
494-
boolean hasNormalTimeSeries = false;
495-
Map<Integer, Template> templateMap = new HashMap<>();
496-
497-
while (inputStream.available() > 0) {
498-
nodeType = ReadWriteIOUtils.readByte(inputStream);
499-
if (nodeType == SCHEMA_MEASUREMENT_NODE) {
500-
SchemaMeasurementNode measurementNode = SchemaMeasurementNode.deserialize(inputStream);
501-
stack.push(measurementNode);
502-
if (measurementNode.isLogicalView()) {
503-
hasLogicalView = true;
498+
@Override
499+
public long ramBytesUsed() {
500+
if (ramBytesUsed > 0) {
501+
return ramBytesUsed;
502+
}
503+
ramBytesUsed =
504+
root.ramBytesUsed()
505+
+ SHALLOW_SIZE
506+
+ RamUsageEstimator.sizeOfMapWithKnownShallowSize(
507+
templateMap,
508+
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP,
509+
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY);
510+
return ramBytesUsed;
511+
}
512+
513+
public void setRamBytesUsed(long ramBytesUsed) {
514+
this.ramBytesUsed = ramBytesUsed;
515+
}
516+
517+
private static class SchemaNodePostOrderIterator implements Iterator<SchemaNode> {
518+
// This class is likely to be faster than Stack when used as a stack
519+
private final Deque<Pair<SchemaNode, Iterator<SchemaNode>>> stack = new ArrayDeque<>();
520+
private SchemaNode nextNode;
521+
522+
public SchemaNodePostOrderIterator(SchemaNode root) {
523+
stack.push(new Pair<>(root, root.getChildrenIterator()));
524+
prepareNext();
525+
}
526+
527+
@Override
528+
public boolean hasNext() {
529+
return nextNode != null;
530+
}
531+
532+
@Override
533+
public SchemaNode next() {
534+
if (!hasNext()) {
535+
throw new NoSuchElementException();
536+
}
537+
SchemaNode result = nextNode;
538+
prepareNext();
539+
return result;
540+
}
541+
542+
private void prepareNext() {
543+
nextNode = null;
544+
while (!stack.isEmpty()) {
545+
Pair<SchemaNode, Iterator<SchemaNode>> pair = stack.peek();
546+
SchemaNode currentNode = pair.getLeft();
547+
Iterator<SchemaNode> childrenIterator = pair.getRight();
548+
if (childrenIterator.hasNext()) {
549+
SchemaNode child = childrenIterator.next();
550+
stack.push(new Pair<>(child, child.getChildrenIterator()));
551+
} else {
552+
stack.pop();
553+
nextNode = currentNode;
554+
return;
504555
}
505-
hasNormalTimeSeries = true;
506-
} else {
507-
SchemaInternalNode internalNode;
508-
if (nodeType == SCHEMA_ENTITY_NODE) {
509-
internalNode = SchemaEntityNode.deserialize(inputStream);
510-
int templateId = internalNode.getAsEntityNode().getTemplateId();
511-
if (templateId != NON_TEMPLATE) {
512-
templateMap.putIfAbsent(templateId, templateManager.getTemplate(templateId));
556+
}
557+
}
558+
}
559+
560+
public static class SchemaNodeBatchDeserializer {
561+
private byte nodeType;
562+
private int childNum;
563+
// This class is likely to be faster than Stack when used as a stack
564+
private final Deque<SchemaNode> stack = new ArrayDeque<>();
565+
private SchemaNode child;
566+
private boolean hasLogicalView = false;
567+
private boolean hasNormalTimeSeries = false;
568+
private Map<Integer, Template> templateMap = new HashMap<>();
569+
private boolean isFirstBatch = true;
570+
571+
public boolean isFirstBatch() {
572+
return isFirstBatch;
573+
}
574+
575+
public void deserializeFromBatch(InputStream inputStream) throws IOException {
576+
isFirstBatch = false;
577+
while (inputStream.available() > 0) {
578+
nodeType = ReadWriteIOUtils.readByte(inputStream);
579+
if (nodeType == SCHEMA_MEASUREMENT_NODE) {
580+
SchemaMeasurementNode measurementNode = SchemaMeasurementNode.deserialize(inputStream);
581+
stack.push(measurementNode);
582+
if (measurementNode.isLogicalView()) {
583+
hasLogicalView = true;
513584
}
585+
hasNormalTimeSeries = true;
514586
} else {
515-
internalNode = SchemaInternalNode.deserialize(inputStream);
516-
}
587+
SchemaInternalNode internalNode;
588+
if (nodeType == SCHEMA_ENTITY_NODE) {
589+
internalNode = SchemaEntityNode.deserialize(inputStream);
590+
int templateId = internalNode.getAsEntityNode().getTemplateId();
591+
if (templateId != NON_TEMPLATE) {
592+
templateMap.putIfAbsent(templateId, templateManager.getTemplate(templateId));
593+
}
594+
} else {
595+
internalNode = SchemaInternalNode.deserialize(inputStream);
596+
}
517597

518-
childNum = ReadWriteIOUtils.readInt(inputStream);
519-
while (childNum > 0) {
520-
child = stack.pop();
521-
internalNode.addChild(child.getName(), child);
522-
if (child.isMeasurement()) {
523-
SchemaMeasurementNode measurementNode = child.getAsMeasurementNode();
524-
if (measurementNode.getAlias() != null) {
525-
internalNode
526-
.getAsEntityNode()
527-
.addAliasChild(measurementNode.getAlias(), measurementNode);
598+
childNum = ReadWriteIOUtils.readInt(inputStream);
599+
while (childNum > 0) {
600+
child = stack.pop();
601+
internalNode.addChild(child.getName(), child);
602+
if (child.isMeasurement()) {
603+
SchemaMeasurementNode measurementNode = child.getAsMeasurementNode();
604+
if (measurementNode.getAlias() != null) {
605+
internalNode
606+
.getAsEntityNode()
607+
.addAliasChild(measurementNode.getAlias(), measurementNode);
608+
}
528609
}
610+
childNum--;
529611
}
530-
childNum--;
612+
stack.push(internalNode);
531613
}
532-
stack.push(internalNode);
533614
}
534615
}
535-
ClusterSchemaTree result = new ClusterSchemaTree(stack.poll());
536-
result.templateMap = templateMap;
537-
result.hasLogicalMeasurementPath = hasLogicalView;
538-
result.hasNormalTimeSeries = hasNormalTimeSeries;
539-
return result;
616+
617+
public ClusterSchemaTree finish() {
618+
try {
619+
ClusterSchemaTree result = new ClusterSchemaTree(stack.poll());
620+
result.templateMap = templateMap;
621+
result.hasLogicalMeasurementPath = hasLogicalView;
622+
result.hasNormalTimeSeries = hasNormalTimeSeries;
623+
return result;
624+
} finally {
625+
reset();
626+
}
627+
}
628+
629+
private void reset() {
630+
nodeType = 0;
631+
childNum = 0;
632+
stack.clear();
633+
child = null;
634+
hasLogicalView = false;
635+
hasNormalTimeSeries = false;
636+
// templateMap is set to the returned schema tree, so we should create a new one
637+
templateMap = new HashMap<>();
638+
isFirstBatch = true;
639+
}
640+
}
641+
642+
public static ClusterSchemaTree deserialize(InputStream inputStream) throws IOException {
643+
SchemaNodeBatchDeserializer schemaNodeBatchDeserializer = new SchemaNodeBatchDeserializer();
644+
schemaNodeBatchDeserializer.deserializeFromBatch(inputStream);
645+
return schemaNodeBatchDeserializer.finish();
540646
}
541647

542648
/**

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ISchemaTree.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,13 @@
2424
import org.apache.iotdb.commons.utils.TestOnly;
2525
import org.apache.iotdb.db.schemaengine.template.Template;
2626

27+
import org.apache.tsfile.utils.Accountable;
2728
import org.apache.tsfile.utils.Pair;
2829

2930
import java.util.List;
3031
import java.util.Set;
3132

32-
public interface ISchemaTree {
33+
public interface ISchemaTree extends Accountable {
3334
/**
3435
* Return all measurement paths for given path pattern and filter the result by slimit and offset.
3536
*

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/node/SchemaEntityNode.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.db.queryengine.common.schematree.node;
2121

22+
import org.apache.tsfile.utils.RamUsageEstimator;
2223
import org.apache.tsfile.utils.ReadWriteIOUtils;
2324

2425
import java.io.IOException;
@@ -31,6 +32,9 @@
3132

3233
public class SchemaEntityNode extends SchemaInternalNode {
3334

35+
private static final long SHALLOW_SIZE =
36+
RamUsageEstimator.shallowSizeOfInstance(SchemaEntityNode.class);
37+
3438
private boolean isAligned;
3539

3640
private Map<String, SchemaMeasurementNode> aliasChildren;
@@ -117,6 +121,11 @@ public byte getType() {
117121
@Override
118122
public void serialize(OutputStream outputStream) throws IOException {
119123
serializeChildren(outputStream);
124+
this.serializeNodeOwnContent(outputStream);
125+
}
126+
127+
@Override
128+
public void serializeNodeOwnContent(OutputStream outputStream) throws IOException {
120129
ReadWriteIOUtils.write(getType(), outputStream);
121130
ReadWriteIOUtils.write(name, outputStream);
122131
ReadWriteIOUtils.write(isAligned, outputStream);
@@ -133,4 +142,18 @@ public static SchemaEntityNode deserialize(InputStream inputStream) throws IOExc
133142
entityNode.setTemplateId(templateId);
134143
return entityNode;
135144
}
145+
146+
@Override
147+
public long ramBytesUsed() {
148+
return SHALLOW_SIZE
149+
+ RamUsageEstimator.sizeOf(name)
150+
+ RamUsageEstimator.sizeOfMapWithKnownShallowSize(
151+
children,
152+
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP,
153+
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY)
154+
+ RamUsageEstimator.sizeOfMapWithKnownShallowSize(
155+
aliasChildren,
156+
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP,
157+
RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY);
158+
}
136159
}

0 commit comments

Comments
 (0)