Skip to content

Commit 4afb9f2

Browse files
committed
Fix forced Exchange branch isolation in distributed align-by-device planning.
This preserves forced InnerTimeJoin-related exchanges from being merged into shared sinks and adds an integration test for wildcard align-by-device aggregation across time partitions.
1 parent 8657782 commit 4afb9f2

4 files changed

Lines changed: 195 additions & 24 deletions

File tree

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.it.aggregation;
21+
22+
import org.apache.iotdb.it.env.EnvFactory;
23+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
24+
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
25+
26+
import org.junit.AfterClass;
27+
import org.junit.BeforeClass;
28+
import org.junit.Test;
29+
import org.junit.experimental.categories.Category;
30+
import org.junit.runner.RunWith;
31+
32+
import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
33+
import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
34+
35+
@RunWith(IoTDBTestRunner.class)
36+
@Category({LocalStandaloneIT.class})
37+
public class IoTDBAlignByDeviceWildcardIT {
38+
39+
private static final String[] SQL_LIST =
40+
new String[] {
41+
"CREATE DATABASE root.min",
42+
"CREATE TIMESERIES root.min.d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
43+
"CREATE TIMESERIES root.min.d1.s2 WITH DATATYPE=INT32, ENCODING=PLAIN",
44+
"CREATE TIMESERIES root.min.d2.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
45+
"CREATE TIMESERIES root.min.d2.s2 WITH DATATYPE=INT32, ENCODING=PLAIN",
46+
"INSERT INTO root.min.d1(time, s1, s2) VALUES(1, 1, 1)",
47+
"INSERT INTO root.min.d1(time, s1, s2) VALUES(2, 1, 1)",
48+
"FLUSH",
49+
"INSERT INTO root.min.d2(time, s1, s2) VALUES(5, 1, 1)",
50+
"FLUSH"
51+
};
52+
53+
@BeforeClass
54+
public static void setUp() throws Exception {
55+
EnvFactory.getEnv()
56+
.getConfig()
57+
.getCommonConfig()
58+
.setDefaultDataRegionGroupNumPerDatabase(1)
59+
.setTimePartitionInterval(1)
60+
.setEnableSeqSpaceCompaction(false)
61+
.setEnableUnseqSpaceCompaction(false)
62+
.setEnableCrossSpaceCompaction(false);
63+
EnvFactory.getEnv().initClusterEnvironment();
64+
prepareData(SQL_LIST);
65+
}
66+
67+
@AfterClass
68+
public static void tearDown() throws Exception {
69+
// EnvFactory.getEnv().cleanClusterEnvironment();
70+
}
71+
72+
@Test
73+
public void testWildcardAlignByDeviceWithTimePartitionSplit() {
74+
String sql =
75+
"SELECT count(s1) FROM root.min.** "
76+
+ "WHERE s2 is not null and s1 is not null "
77+
+ "GROUP BY([1, 6), 1ms) ALIGN BY DEVICE";
78+
String[] expectedHeader = new String[] {"Time", "Device", "count(s1)"};
79+
String[] expectedRows =
80+
new String[] {
81+
"1,root.min.d1,1,",
82+
"2,root.min.d1,1,",
83+
"3,root.min.d1,0,",
84+
"4,root.min.d1,0,",
85+
"5,root.min.d1,0,",
86+
"1,root.min.d2,0,",
87+
"2,root.min.d2,0,",
88+
"3,root.min.d2,0,",
89+
"4,root.min.d2,0,",
90+
"5,root.min.d2,1,",
91+
};
92+
resultSetEqualTest(sql, expectedHeader, expectedRows);
93+
}
94+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -145,13 +145,23 @@ private void adjustUpStreamHelper(
145145
ExchangeNode exchangeNode = (ExchangeNode) child;
146146
TRegionReplicaSet regionOfChild =
147147
context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).getRegion();
148-
MultiChildrenSinkNode newChild =
149-
memo.computeIfAbsent(
150-
regionOfChild,
151-
tRegionReplicaSet ->
152-
needShuffleSinkNode
153-
? new ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
154-
: new IdentitySinkNode(context.queryContext.getQueryId().genPlanNodeId()));
148+
MultiChildrenSinkNode newChild;
149+
if (exchangeNode.isForcedExchange()) {
150+
// Keep forced exchange branch isolated: do not merge into shared sink memo.
151+
newChild =
152+
needShuffleSinkNode
153+
? new ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
154+
: new IdentitySinkNode(context.queryContext.getQueryId().genPlanNodeId());
155+
} else {
156+
newChild =
157+
memo.computeIfAbsent(
158+
regionOfChild,
159+
tRegionReplicaSet ->
160+
needShuffleSinkNode
161+
? new ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
162+
: new IdentitySinkNode(
163+
context.queryContext.getQueryId().genPlanNodeId()));
164+
}
155165
newChild.addChild(exchangeNode.getChild());
156166
newChild.addDownStreamChannelLocation(
157167
new DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java

Lines changed: 68 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -69,17 +69,17 @@
6969
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
7070
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
7171

72+
import java.util.ArrayList;
7273
import java.util.Arrays;
7374
import java.util.HashMap;
7475
import java.util.List;
7576
import java.util.Map;
7677
import java.util.stream.Collectors;
7778

78-
import static com.google.common.collect.ImmutableList.toImmutableList;
79-
8079
public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
8180

8281
private final Analysis analysis;
82+
private boolean containsInnerTimeJoinInCurrentSubtree = false;
8383

8484
public ExchangeNodeAdder(Analysis analysis) {
8585
this.analysis = analysis;
@@ -92,10 +92,7 @@ public PlanNode visitPlan(PlanNode node, NodeGroupContext context) {
9292
return node;
9393
}
9494
// Visit all the children of current node
95-
List<PlanNode> children =
96-
node.getChildren().stream()
97-
.map(child -> child.accept(this, context))
98-
.collect(toImmutableList());
95+
List<PlanNode> children = visitChildrenAndRecordInnerTimeJoin(node.getChildren(), context);
9996

10097
// Calculate the node distribution info according to its children
10198

@@ -215,7 +212,13 @@ private PlanNode processNoChildSourceNode(SourceNode node, NodeGroupContext cont
215212

216213
@Override
217214
public PlanNode visitDeviceView(DeviceViewNode node, NodeGroupContext context) {
218-
return processMultiChildNode(node, context);
215+
List<PlanNode> visitedChildren =
216+
visitChildrenAndRecordInnerTimeJoin(node.getChildren(), context);
217+
// Force Exchange for multi-child DeviceView if any child subtree contains InnerTimeJoin.
218+
if (node.getChildren().size() > 1 && containsInnerTimeJoinInCurrentSubtree) {
219+
return processMultiChildNodeWithForcedExchange(node, context, visitedChildren);
220+
}
221+
return processMultiChildNode(node, context, visitedChildren);
219222
}
220223

221224
@Override
@@ -236,7 +239,13 @@ public PlanNode visitSingleDeviceView(SingleDeviceViewNode node, NodeGroupContex
236239

237240
@Override
238241
public PlanNode visitMergeSort(MergeSortNode node, NodeGroupContext context) {
239-
return processMultiChildNode(node, context);
242+
List<PlanNode> visitedChildren =
243+
visitChildrenAndRecordInnerTimeJoin(node.getChildren(), context);
244+
// Force Exchange if any child subtree contains InnerTimeJoin.
245+
if (containsInnerTimeJoinInCurrentSubtree) {
246+
return processMultiChildNodeWithForcedExchange(node, context, visitedChildren);
247+
}
248+
return processMultiChildNode(node, context, visitedChildren);
240249
}
241250

242251
@Override
@@ -428,11 +437,14 @@ private PlanNode processMultiChildNode(MultiChildProcessNode node, NodeGroupCont
428437
return processMultiChildNodeByLocation(node, context);
429438
}
430439

431-
MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
432440
List<PlanNode> visitedChildren =
433-
node.getChildren().stream()
434-
.map(child -> visit(child, context))
435-
.collect(Collectors.toList());
441+
visitChildrenAndRecordInnerTimeJoin(node.getChildren(), context);
442+
return processMultiChildNode(node, context, visitedChildren);
443+
}
444+
445+
private PlanNode processMultiChildNode(
446+
MultiChildProcessNode node, NodeGroupContext context, List<PlanNode> visitedChildren) {
447+
MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
436448

437449
// DataRegion in which node locates
438450
TRegionReplicaSet dataRegion;
@@ -546,13 +558,47 @@ private PlanNode processTopKNode(
546558
}
547559

548560
private ExchangeNode genExchangeNode(NodeGroupContext context, PlanNode child) {
561+
return genExchangeNode(context, child, false);
562+
}
563+
564+
private ExchangeNode genExchangeNode(
565+
NodeGroupContext context, PlanNode child, boolean forcedExchange) {
549566
ExchangeNode exchangeNode = new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
550567
exchangeNode.setChild(child);
551568
exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
569+
exchangeNode.setForcedExchange(forcedExchange);
552570
context.hasExchangeNode = true;
553571
return exchangeNode;
554572
}
555573

574+
private PlanNode processMultiChildNodeWithForcedExchange(
575+
MultiChildProcessNode node, NodeGroupContext context, List<PlanNode> visitedChildren) {
576+
MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
577+
for (PlanNode child : visitedChildren) {
578+
newNode.addChild(genExchangeNode(context, child, true));
579+
}
580+
context.putNodeDistribution(
581+
newNode.getPlanNodeId(),
582+
new NodeDistribution(
583+
NodeDistributionType.SAME_WITH_SOME_CHILD, context.getMostlyUsedDataRegion()));
584+
return newNode;
585+
}
586+
587+
private List<PlanNode> visitChildrenAndRecordInnerTimeJoin(
588+
List<PlanNode> children, NodeGroupContext context) {
589+
List<PlanNode> result = new ArrayList<>(children.size());
590+
boolean originalTimeJoin = containsInnerTimeJoinInCurrentSubtree;
591+
boolean hasInnerTimeJoinInChildren = false;
592+
for (PlanNode child : children) {
593+
containsInnerTimeJoinInCurrentSubtree = false;
594+
PlanNode visitedChild = visit(child, context);
595+
hasInnerTimeJoinInChildren |= containsInnerTimeJoinInCurrentSubtree;
596+
result.add(visitedChild);
597+
}
598+
containsInnerTimeJoinInCurrentSubtree = originalTimeJoin || hasInnerTimeJoinInChildren;
599+
return result;
600+
}
601+
556602
@Override
557603
public PlanNode visitSlidingWindowAggregation(
558604
SlidingWindowAggregationNode node, NodeGroupContext context) {
@@ -584,9 +630,9 @@ private TRegionReplicaSet calculateDataRegionByChildren(
584630
if (region == null
585631
&& context.getNodeDistribution(child.getPlanNodeId()).getType()
586632
== NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
587-
return calculateSchemaRegionByChildren(child.getChildren(), context);
633+
region = calculateSchemaRegionByChildren(child.getChildren(), context);
588634
}
589-
return region;
635+
return region == null ? DataPartition.NOT_ASSIGNED : region;
590636
},
591637
Collectors.counting()));
592638

@@ -644,6 +690,13 @@ private boolean nodeDistributionIsSame(List<PlanNode> children, NodeGroupContext
644690
}
645691

646692
public PlanNode visit(PlanNode node, NodeGroupContext context) {
647-
return node.accept(this, context);
693+
boolean originalTimeJoin = containsInnerTimeJoinInCurrentSubtree;
694+
containsInnerTimeJoinInCurrentSubtree = false;
695+
PlanNode visitedNode = node.accept(this, context);
696+
containsInnerTimeJoinInCurrentSubtree =
697+
originalTimeJoin
698+
|| containsInnerTimeJoinInCurrentSubtree
699+
|| node instanceof InnerTimeJoinNode;
700+
return visitedNode;
648701
}
649702
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/ExchangeNode.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ public class ExchangeNode extends SingleChildProcessNode {
4848
/** Exchange needs to know which child of IdentitySinkNode/ShuffleSinkNode it matches */
4949
private int indexOfUpstreamSinkHandle = 0;
5050

51+
/** Planner-only flag: this exchange is forced and should keep independent upstream sink. */
52+
private transient boolean forcedExchange = false;
53+
5154
public ExchangeNode(PlanNodeId id) {
5255
super(id);
5356
}
@@ -72,6 +75,7 @@ public PlanNode clone() {
7275
ExchangeNode node = new ExchangeNode(getPlanNodeId());
7376
node.setOutputColumnNames(outputColumnNames);
7477
node.setIndexOfUpstreamSinkHandle(indexOfUpstreamSinkHandle);
78+
node.setForcedExchange(forcedExchange);
7579
return node;
7680
}
7781

@@ -162,6 +166,14 @@ public void setIndexOfUpstreamSinkHandle(int indexOfUpstreamSinkHandle) {
162166
this.indexOfUpstreamSinkHandle = indexOfUpstreamSinkHandle;
163167
}
164168

169+
public boolean isForcedExchange() {
170+
return forcedExchange;
171+
}
172+
173+
public void setForcedExchange(boolean forcedExchange) {
174+
this.forcedExchange = forcedExchange;
175+
}
176+
165177
public TEndPoint getUpstreamEndpoint() {
166178
return upstreamEndpoint;
167179
}
@@ -188,11 +200,13 @@ public boolean equals(Object o) {
188200
ExchangeNode that = (ExchangeNode) o;
189201
return Objects.equals(upstreamEndpoint, that.upstreamEndpoint)
190202
&& Objects.equals(upstreamInstanceId, that.upstreamInstanceId)
191-
&& Objects.equals(upstreamPlanNodeId, that.upstreamPlanNodeId);
203+
&& Objects.equals(upstreamPlanNodeId, that.upstreamPlanNodeId)
204+
&& forcedExchange == that.forcedExchange;
192205
}
193206

194207
@Override
195208
public int hashCode() {
196-
return Objects.hash(super.hashCode(), upstreamEndpoint, upstreamInstanceId, upstreamPlanNodeId);
209+
return Objects.hash(
210+
super.hashCode(), upstreamEndpoint, upstreamInstanceId, upstreamPlanNodeId, forcedExchange);
197211
}
198212
}

0 commit comments

Comments
 (0)