Skip to content

Commit e07ed44

Browse files
committed
Fix chained appendpipe planner mismatch (#5173)
Chained appendpipe queries can produce literal-only projections during prepare-time field trimming. Calcite may simplify those projections into Values, which can trigger planner mismatch assertions during execution. Preserve the Project shape for this narrow case in OpenSearchRelFieldTrimmer and add YAML REST regression coverage for double and triple appendpipe. Signed-off-by: Songkan Tang <songkant@amazon.com>
1 parent e1af042 commit e07ed44

3 files changed

Lines changed: 223 additions & 7 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -299,15 +299,22 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) {
299299
@Override
300300
public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context) {
301301
visitChildren(node, context);
302-
// Use the main plan from the relBuilder stack directly instead of re-visiting
303-
// the parent AST. Re-visiting causes "belongs to a different planner" assertion
304-
// when multiple appendpipe commands are chained.
305-
RelNode mainNode = context.relBuilder.peek();
306-
context.relBuilder.push(mainNode);
307-
node.getSubQuery().accept(this, context);
302+
UnresolvedPlan subqueryPlan = node.getSubQuery();
303+
UnresolvedPlan childNode = subqueryPlan;
304+
while (childNode.getChild() != null
305+
&& !childNode.getChild().isEmpty()
306+
&& !(childNode.getChild().getFirst() instanceof Values)) {
307+
if (childNode.getChild().size() > 1) {
308+
throw new RuntimeException("AppendPipe doesn't support multiply children subquery.");
309+
}
310+
childNode = (UnresolvedPlan) childNode.getChild().getFirst();
311+
}
312+
childNode.attach(node.getChild().getFirst());
313+
314+
subqueryPlan.accept(this, context);
308315

309316
RelNode subPipelineNode = context.relBuilder.build();
310-
mainNode = context.relBuilder.build();
317+
RelNode mainNode = context.relBuilder.build();
311318
return mergeTableAndResolveColumnConflict(mainNode, subPipelineNode, context);
312319
}
313320

core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchRelFieldTrimmer.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,31 @@
55

66
package org.opensearch.sql.calcite.utils;
77

8+
import java.util.ArrayList;
89
import java.util.LinkedHashSet;
910
import java.util.List;
1011
import java.util.Set;
12+
import org.apache.calcite.linq4j.Ord;
1113
import org.apache.calcite.plan.RelOptUtil;
1214
import org.apache.calcite.rel.RelNode;
15+
import org.apache.calcite.rel.core.Aggregate;
16+
import org.apache.calcite.rel.core.CorrelationId;
17+
import org.apache.calcite.rel.core.Project;
18+
import org.apache.calcite.rel.core.Values;
1319
import org.apache.calcite.rel.type.RelDataType;
1420
import org.apache.calcite.rel.type.RelDataTypeField;
21+
import org.apache.calcite.rex.RexLiteral;
1522
import org.apache.calcite.rex.RexNode;
1623
import org.apache.calcite.rex.RexPermuteInputsShuttle;
24+
import org.apache.calcite.rex.RexSubQuery;
25+
import org.apache.calcite.rex.RexUtil;
1726
import org.apache.calcite.rex.RexVisitor;
1827
import org.apache.calcite.sql.validate.SqlValidator;
1928
import org.apache.calcite.sql2rel.RelFieldTrimmer;
2029
import org.apache.calcite.tools.RelBuilder;
2130
import org.apache.calcite.util.ImmutableBitSet;
2231
import org.apache.calcite.util.mapping.Mapping;
32+
import org.apache.calcite.util.mapping.MappingType;
2333
import org.apache.calcite.util.mapping.Mappings;
2434
import org.checkerframework.checker.nullness.qual.Nullable;
2535
import org.opensearch.sql.calcite.plan.rel.Dedup;
@@ -30,9 +40,94 @@
3040
* <p>This class extends Calcite's RelFieldTrimmer to support trimming customized operators.
3141
*/
3242
public class OpenSearchRelFieldTrimmer extends RelFieldTrimmer {
43+
private final RelBuilder openSearchRelBuilder;
3344

3445
public OpenSearchRelFieldTrimmer(@Nullable SqlValidator validator, RelBuilder relBuilder) {
3546
super(validator, relBuilder);
47+
this.openSearchRelBuilder = relBuilder;
48+
}
49+
50+
@Override
51+
public TrimResult trimFields(
52+
Project project, ImmutableBitSet fieldsUsed, Set<RelDataTypeField> extraFields) {
53+
final RelDataType rowType = project.getRowType();
54+
final int fieldCount = rowType.getFieldCount();
55+
final RelNode input = project.getInput();
56+
57+
final Set<RelDataTypeField> inputExtraFields = new LinkedHashSet<>(extraFields);
58+
RelOptUtil.InputFinder inputFinder = new RelOptUtil.InputFinder(inputExtraFields);
59+
for (Ord<RexNode> ord : Ord.zip(project.getProjects())) {
60+
if (fieldsUsed.get(ord.i)) {
61+
ord.e.accept(inputFinder);
62+
}
63+
}
64+
65+
List<RexSubQuery> subQueries = RexUtil.SubQueryCollector.collect(project);
66+
Set<CorrelationId> correlationIds = RelOptUtil.getVariablesUsed(subQueries);
67+
ImmutableBitSet requiredColumns = ImmutableBitSet.of();
68+
if (!correlationIds.isEmpty()) {
69+
assert correlationIds.size() == 1;
70+
requiredColumns = RelOptUtil.correlationColumns(correlationIds.iterator().next(), project);
71+
}
72+
73+
ImmutableBitSet finderFields = inputFinder.build();
74+
ImmutableBitSet inputFieldsUsed =
75+
ImmutableBitSet.builder().addAll(requiredColumns).addAll(finderFields).build();
76+
77+
TrimResult trimResult = trimChild(project, input, inputFieldsUsed, inputExtraFields);
78+
RelNode newInput = trimResult.left;
79+
final Mapping inputMapping = trimResult.right;
80+
81+
if (newInput == input && fieldsUsed.cardinality() == fieldCount) {
82+
return result(project, Mappings.createIdentity(fieldCount));
83+
}
84+
85+
if (fieldsUsed.cardinality() == 0) {
86+
return dummyProject(fieldCount, newInput, project);
87+
}
88+
89+
final List<RexNode> newProjects = new ArrayList<>();
90+
final RexVisitor<RexNode> shuttle;
91+
if (!correlationIds.isEmpty()) {
92+
assert correlationIds.size() == 1;
93+
shuttle =
94+
new RexPermuteInputsShuttle(inputMapping, newInput) {
95+
@Override
96+
public RexNode visitSubQuery(RexSubQuery subQuery) {
97+
subQuery = (RexSubQuery) super.visitSubQuery(subQuery);
98+
return RelOptUtil.remapCorrelatesInSuqQuery(
99+
openSearchRelBuilder.getRexBuilder(),
100+
subQuery,
101+
correlationIds.iterator().next(),
102+
newInput.getRowType(),
103+
inputMapping);
104+
}
105+
};
106+
} else {
107+
shuttle = new RexPermuteInputsShuttle(inputMapping, newInput);
108+
}
109+
110+
final Mapping mapping =
111+
Mappings.create(MappingType.INVERSE_SURJECTION, fieldCount, fieldsUsed.cardinality());
112+
for (Ord<RexNode> ord : Ord.zip(project.getProjects())) {
113+
if (fieldsUsed.get(ord.i)) {
114+
mapping.set(ord.i, newProjects.size());
115+
RexNode newProjectExpr = ord.e.accept(shuttle);
116+
newProjects.add(newProjectExpr);
117+
}
118+
}
119+
120+
final RelDataType newRowType =
121+
RelOptUtil.permute(project.getCluster().getTypeFactory(), rowType, mapping);
122+
123+
if (shouldAvoidSimplifyValues(newProjects, newInput)) {
124+
return result(
125+
project.copy(project.getTraitSet(), newInput, newProjects, newRowType), mapping, project);
126+
}
127+
128+
openSearchRelBuilder.push(newInput);
129+
openSearchRelBuilder.project(newProjects, newRowType.getFieldNames(), false, correlationIds);
130+
return result(openSearchRelBuilder.build(), mapping, project);
36131
}
37132

38133
public TrimResult trimFields(
@@ -67,4 +162,19 @@ public TrimResult trimFields(
67162
// needs them for its condition.
68163
return result(dedup.copy(newInput, newDedupFields), inputMapping);
69164
}
165+
166+
private boolean shouldAvoidSimplifyValues(List<RexNode> projects, RelNode input) {
167+
return projects.stream().allMatch(RexLiteral.class::isInstance) && isFixedRowCount(input);
168+
}
169+
170+
private boolean isFixedRowCount(RelNode input) {
171+
if (input instanceof Values) {
172+
return true;
173+
}
174+
if (input instanceof Aggregate aggregate) {
175+
return aggregate.getGroupSet().isEmpty()
176+
&& aggregate.getGroupType() == Aggregate.Group.SIMPLE;
177+
}
178+
return false;
179+
}
70180
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
setup:
2+
- do:
3+
query.settings:
4+
body:
5+
transient:
6+
plugins.calcite.enabled: true
7+
8+
- do:
9+
indices.create:
10+
index: issue5173
11+
body:
12+
settings:
13+
number_of_shards: 1
14+
number_of_replicas: 0
15+
mappings:
16+
properties:
17+
gender:
18+
type: keyword
19+
age:
20+
type: integer
21+
22+
- do:
23+
bulk:
24+
refresh: true
25+
body:
26+
- '{"index": {"_index": "issue5173", "_id": "1"}}'
27+
- '{"gender": "F", "age": 10}'
28+
- '{"index": {"_index": "issue5173", "_id": "2"}}'
29+
- '{"gender": "F", "age": 20}'
30+
- '{"index": {"_index": "issue5173", "_id": "3"}}'
31+
- '{"gender": "M", "age": 30}'
32+
- '{"index": {"_index": "issue5173", "_id": "4"}}'
33+
- '{"gender": "M", "age": 40}'
34+
35+
---
36+
teardown:
37+
- do:
38+
indices.delete:
39+
index: issue5173
40+
ignore_unavailable: true
41+
- do:
42+
query.settings:
43+
body:
44+
transient:
45+
plugins.calcite.enabled: false
46+
47+
---
48+
"Issue 5173: double appendpipe with different aggregations should succeed":
49+
- skip:
50+
features:
51+
- headers
52+
- do:
53+
headers:
54+
Content-Type: 'application/json'
55+
ppl:
56+
body:
57+
query: "source=issue5173 | stats sum(age) as sum_age by gender | appendpipe [ stats avg(sum_age) as avg_sum_age ] | appendpipe [ stats max(sum_age) as max_sum_age ]"
58+
59+
- match: { total: 4 }
60+
- match:
61+
schema:
62+
- { name: sum_age, type: bigint }
63+
- { name: gender, type: string }
64+
- { name: avg_sum_age, type: double }
65+
- { name: max_sum_age, type: bigint }
66+
- match:
67+
datarows:
68+
- [ 30, "F", null, null ]
69+
- [ 70, "M", null, null ]
70+
- [ null, null, 50.0, null ]
71+
- [ null, null, null, 70 ]
72+
73+
---
74+
"Issue 5173: triple appendpipe with different aggregations should succeed":
75+
- skip:
76+
features:
77+
- headers
78+
- do:
79+
headers:
80+
Content-Type: 'application/json'
81+
ppl:
82+
body:
83+
query: "source=issue5173 | stats sum(age) as sum_age by gender | appendpipe [ stats avg(sum_age) as avg_sum_age ] | appendpipe [ stats max(sum_age) as max_sum_age ] | appendpipe [ stats min(sum_age) as min_sum_age ]"
84+
85+
- match: { total: 5 }
86+
- match:
87+
schema:
88+
- { name: sum_age, type: bigint }
89+
- { name: gender, type: string }
90+
- { name: avg_sum_age, type: double }
91+
- { name: max_sum_age, type: bigint }
92+
- { name: min_sum_age, type: bigint }
93+
- match:
94+
datarows:
95+
- [ 30, "F", null, null, null ]
96+
- [ 70, "M", null, null, null ]
97+
- [ null, null, 50.0, null, null ]
98+
- [ null, null, null, 70, null ]
99+
- [ null, null, null, null, 30 ]

0 commit comments

Comments
 (0)