From 13198f0605ad41678834f73f3ced48c6444d6304 Mon Sep 17 00:00:00 2001 From: Songkan Tang Date: Wed, 1 Apr 2026 16:41:19 +0800 Subject: [PATCH 1/6] Fix double appendpipe planner assertion error (#5173) visitAppendPipe re-visits parent AST in new planner context, causing assertion failure. Use relBuilder stack directly instead of AST re-visitation to stay within the same planner context. Signed-off-by: Songkan Tang --- .../sql/calcite/CalciteRelNodeVisitor.java | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index e4e036da3a6..b284a37f205 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -299,22 +299,15 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) { @Override public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context) { visitChildren(node, context); - UnresolvedPlan subqueryPlan = node.getSubQuery(); - UnresolvedPlan childNode = subqueryPlan; - while (childNode.getChild() != null - && !childNode.getChild().isEmpty() - && !(childNode.getChild().getFirst() instanceof Values)) { - if (childNode.getChild().size() > 1) { - throw new RuntimeException("AppendPipe doesn't support multiply children subquery."); - } - childNode = (UnresolvedPlan) childNode.getChild().getFirst(); - } - childNode.attach(node.getChild().getFirst()); - - subqueryPlan.accept(this, context); + // Use the main plan from the relBuilder stack directly instead of re-visiting + // the parent AST. Re-visiting causes "belongs to a different planner" assertion + // when multiple appendpipe commands are chained. + RelNode mainNode = context.relBuilder.peek(); + context.relBuilder.push(mainNode); + node.getSubQuery().accept(this, context); RelNode subPipelineNode = context.relBuilder.build(); - RelNode mainNode = context.relBuilder.build(); + mainNode = context.relBuilder.build(); return mergeTableAndResolveColumnConflict(mainNode, subPipelineNode, context); } From 9b371671f3531d231439e32aba47cae587f52b7b Mon Sep 17 00:00:00 2001 From: Songkan Tang Date: Tue, 7 Apr 2026 16:17:21 +0800 Subject: [PATCH 2/6] Add regression tests for chained appendpipe (double and triple) Signed-off-by: Songkan Tang --- .../remote/CalcitePPLAppendPipeCommandIT.java | 61 ++++++++ .../ppl/calcite/CalcitePPLAppendPipeTest.java | 148 ++++++++++++++++++ 2 files changed, 209 insertions(+) diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java index d25d3ca80db..f0f2f3ddbc8 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java @@ -10,6 +10,7 @@ import static org.opensearch.sql.util.MatcherUtils.rows; import static org.opensearch.sql.util.MatcherUtils.schema; import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows; import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder; import java.io.IOException; @@ -87,4 +88,64 @@ public void testAppendpipeWithConflictTypeColumn() throws IOException { TEST_INDEX_ACCOUNT))); assertTrue(exception.getMessage().contains("due to incompatible types")); } + + /** Regression test: double appendpipe with different aggregations (issue #5173). */ + @Test + public void testDoubleAppendPipe() throws IOException { + // stats by gender gives 2 rows (F, M), then two appendpipe aggregations add 1 row each + JSONObject actual = + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum_age by gender" + + " | appendpipe [ stats avg(sum_age) as avg_sum_age ]" + + " | appendpipe [ stats max(sum_age) as max_sum_age ]", + TEST_INDEX_ACCOUNT)); + // 2 original + 1 avg + 1 max = 4 rows + verifyNumOfRows(actual, 4); + verifySchemaInOrder( + actual, + schema("sum_age", "bigint"), + schema("gender", "string"), + schema("avg_sum_age", "double"), + schema("max_sum_age", "bigint")); + } + + /** Regression test: triple appendpipe with different aggregations (issue #5173). */ + @Test + public void testTripleAppendPipe() throws IOException { + JSONObject actual = + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum_age by gender" + + " | appendpipe [ stats avg(sum_age) as avg_sum_age ]" + + " | appendpipe [ stats max(sum_age) as max_sum_age ]" + + " | appendpipe [ stats min(sum_age) as min_sum_age ]", + TEST_INDEX_ACCOUNT)); + // 2 original + 1 avg + 1 max + 1 min = 5 rows + verifyNumOfRows(actual, 5); + verifySchemaInOrder( + actual, + schema("sum_age", "bigint"), + schema("gender", "string"), + schema("avg_sum_age", "double"), + schema("max_sum_age", "bigint"), + schema("min_sum_age", "bigint")); + } + + /** Regression test: double appendpipe with non-aggregation (filter) subpipeline. */ + @Test + public void testDoubleAppendPipeWithFilter() throws IOException { + JSONObject actual = + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum_age by gender" + + " | appendpipe [ where gender = 'F' ]" + + " | appendpipe [ where gender = 'M' ]", + TEST_INDEX_ACCOUNT)); + // 2 original + 1 (F filter) + 1 (M filter from cumulative 3 rows) = 4 rows + verifyNumOfRows(actual, 4); + } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java index faf944da4a0..56ed409b4d7 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java @@ -59,4 +59,152 @@ public void testAppendPipeWithMergedColumns() { + "FROM `scott`.`EMP`"; verifyPPLToSparkSQL(root, expectedSparkSql); } + + /** + * Regression test: double appendpipe with different aggregations. Result count (16 = 14 + 1 avg + + * 1 max) is verified in integration tests only because RelRunners.run() creates a new planner + * that conflicts with shared RelNode subtrees — a test framework limitation that does not affect + * the production path. + */ + @Test + public void testDoubleAppendPipe() { + String ppl = + "source=EMP | appendpipe [stats avg(SAL) as avg_sal] | appendpipe [stats max(SAL) as" + + " max_sal]"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalUnion(all=[true])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], avg_sal=[$8], max_sal=[null:DECIMAL(7, 2)])\n" + + " LogicalUnion(all=[true])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], avg_sal=[null:DECIMAL(11, 6)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)]," + + " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE]," + + " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT]," + + " avg_sal=[$0])\n" + + " LogicalAggregate(group=[{}], avg_sal=[AVG($0)])\n" + + " LogicalProject(SAL=[$5])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)]," + + " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE]," + + " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT]," + + " avg_sal=[null:DECIMAL(11, 6)], max_sal=[$0])\n" + + " LogicalAggregate(group=[{}], max_sal=[MAX($0)])\n" + + " LogicalProject(SAL=[$5])\n" + + " LogicalUnion(all=[true])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3]," + + " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7]," + + " avg_sal=[null:DECIMAL(11, 6)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)]," + + " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE]," + + " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT]," + + " avg_sal=[$0])\n" + + " LogicalAggregate(group=[{}], avg_sal=[AVG($0)])\n" + + " LogicalProject(SAL=[$5])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + } + + /** + * Regression test: triple appendpipe with different aggregations. Result count (17 = 14 + 1 avg + + * 1 max + 1 min) is verified in integration tests only — see testDoubleAppendPipe for rationale. + */ + @Test + public void testTripleAppendPipe() { + String ppl = + "source=EMP | appendpipe [stats avg(SAL) as avg_sal] | appendpipe [stats max(SAL) as" + + " max_sal] | appendpipe [stats min(SAL) as min_sal]"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalUnion(all=[true])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], avg_sal=[$8], max_sal=[$9]," + + " min_sal=[null:DECIMAL(7, 2)])\n" + + " LogicalUnion(all=[true])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], avg_sal=[$8]," + + " max_sal=[null:DECIMAL(7, 2)])\n" + + " LogicalUnion(all=[true])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3]," + + " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7]," + + " avg_sal=[null:DECIMAL(11, 6)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)]," + + " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE]," + + " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT]," + + " avg_sal=[$0])\n" + + " LogicalAggregate(group=[{}], avg_sal=[AVG($0)])\n" + + " LogicalProject(SAL=[$5])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)]," + + " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE]," + + " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT]," + + " avg_sal=[null:DECIMAL(11, 6)], max_sal=[$0])\n" + + " LogicalAggregate(group=[{}], max_sal=[MAX($0)])\n" + + " LogicalProject(SAL=[$5])\n" + + " LogicalUnion(all=[true])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3]," + + " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7]," + + " avg_sal=[null:DECIMAL(11, 6)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)]," + + " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE]," + + " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT]," + + " avg_sal=[$0])\n" + + " LogicalAggregate(group=[{}], avg_sal=[AVG($0)])\n" + + " LogicalProject(SAL=[$5])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)]," + + " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE]," + + " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT]," + + " avg_sal=[null:DECIMAL(11, 6)], max_sal=[null:DECIMAL(7, 2)], min_sal=[$0])\n" + + " LogicalAggregate(group=[{}], min_sal=[MIN($0)])\n" + + " LogicalProject(SAL=[$5])\n" + + " LogicalUnion(all=[true])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3]," + + " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], avg_sal=[$8]," + + " max_sal=[null:DECIMAL(7, 2)])\n" + + " LogicalUnion(all=[true])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3]," + + " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7]," + + " avg_sal=[null:DECIMAL(11, 6)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)]," + + " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE]," + + " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT]," + + " avg_sal=[$0])\n" + + " LogicalAggregate(group=[{}], avg_sal=[AVG($0)])\n" + + " LogicalProject(SAL=[$5])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)]," + + " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE]," + + " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT]," + + " avg_sal=[null:DECIMAL(11, 6)], max_sal=[$0])\n" + + " LogicalAggregate(group=[{}], max_sal=[MAX($0)])\n" + + " LogicalProject(SAL=[$5])\n" + + " LogicalUnion(all=[true])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3]," + + " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7]," + + " avg_sal=[null:DECIMAL(11, 6)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)]," + + " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE]," + + " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT]," + + " avg_sal=[$0])\n" + + " LogicalAggregate(group=[{}], avg_sal=[AVG($0)])\n" + + " LogicalProject(SAL=[$5])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + } + + /** Regression test: double appendpipe with non-aggregation (filter) subpipeline. */ + @Test + public void testDoubleAppendPipeWithFilter() { + String ppl = "source=EMP | appendpipe [where DEPTNO = 20] | appendpipe [where DEPTNO = 30]"; + RelNode root = getRelNode(ppl); + verifyResultCount(root, 25); // 14 original + 5 (dept 20) + 6 (dept 30) + } } From 7cf6f5d769e5dad661be9d002275e7329be5a531 Mon Sep 17 00:00:00 2001 From: Songkan Tang Date: Wed, 8 Apr 2026 14:07:33 +0800 Subject: [PATCH 3/6] Minor change of assertion on rows Signed-off-by: Songkan Tang --- .../remote/CalcitePPLAppendPipeCommandIT.java | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java index f0f2f3ddbc8..6ae37a027ba 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java @@ -10,7 +10,6 @@ import static org.opensearch.sql.util.MatcherUtils.rows; import static org.opensearch.sql.util.MatcherUtils.schema; import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; -import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows; import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder; import java.io.IOException; @@ -92,7 +91,6 @@ public void testAppendpipeWithConflictTypeColumn() throws IOException { /** Regression test: double appendpipe with different aggregations (issue #5173). */ @Test public void testDoubleAppendPipe() throws IOException { - // stats by gender gives 2 rows (F, M), then two appendpipe aggregations add 1 row each JSONObject actual = executeQuery( String.format( @@ -101,14 +99,19 @@ public void testDoubleAppendPipe() throws IOException { + " | appendpipe [ stats avg(sum_age) as avg_sum_age ]" + " | appendpipe [ stats max(sum_age) as max_sum_age ]", TEST_INDEX_ACCOUNT)); - // 2 original + 1 avg + 1 max = 4 rows - verifyNumOfRows(actual, 4); verifySchemaInOrder( actual, schema("sum_age", "bigint"), schema("gender", "string"), schema("avg_sum_age", "double"), schema("max_sum_age", "bigint")); + // 2 original rows + 1 avg row + 1 max row + verifyDataRows( + actual, + rows(14947, "F", null, null), + rows(15224, "M", null, null), + rows(null, null, 15085.5, null), + rows(null, null, null, 15224)); } /** Regression test: triple appendpipe with different aggregations (issue #5173). */ @@ -123,8 +126,6 @@ public void testTripleAppendPipe() throws IOException { + " | appendpipe [ stats max(sum_age) as max_sum_age ]" + " | appendpipe [ stats min(sum_age) as min_sum_age ]", TEST_INDEX_ACCOUNT)); - // 2 original + 1 avg + 1 max + 1 min = 5 rows - verifyNumOfRows(actual, 5); verifySchemaInOrder( actual, schema("sum_age", "bigint"), @@ -132,6 +133,14 @@ public void testTripleAppendPipe() throws IOException { schema("avg_sum_age", "double"), schema("max_sum_age", "bigint"), schema("min_sum_age", "bigint")); + // 2 original rows + 1 avg + 1 max + 1 min + verifyDataRows( + actual, + rows(14947, "F", null, null, null), + rows(15224, "M", null, null, null), + rows(null, null, 15085.5, null, null), + rows(null, null, null, 15224, null), + rows(null, null, null, null, 14947)); } /** Regression test: double appendpipe with non-aggregation (filter) subpipeline. */ @@ -145,7 +154,7 @@ public void testDoubleAppendPipeWithFilter() throws IOException { + " | appendpipe [ where gender = 'F' ]" + " | appendpipe [ where gender = 'M' ]", TEST_INDEX_ACCOUNT)); - // 2 original + 1 (F filter) + 1 (M filter from cumulative 3 rows) = 4 rows - verifyNumOfRows(actual, 4); + // 2 original + 1 (F filter from original) + 1 (M filter from cumulative 3 rows) + verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"), rows(14947, "F"), rows(15224, "M")); } } From e07ed449c6ee2ca8087f639f3df0e11028ff0e99 Mon Sep 17 00:00:00 2001 From: Songkan Tang Date: Wed, 8 Apr 2026 16:50:43 +0800 Subject: [PATCH 4/6] Fix chained appendpipe planner mismatch (#5173) Chained appendpipe queries can produce literal-only projections during prepare-time field trimming. Calcite may simplify those projections into Values, which can trigger planner mismatch assertions during execution. Preserve the Project shape for this narrow case in OpenSearchRelFieldTrimmer and add YAML REST regression coverage for double and triple appendpipe. Signed-off-by: Songkan Tang --- .../sql/calcite/CalciteRelNodeVisitor.java | 21 ++-- .../utils/OpenSearchRelFieldTrimmer.java | 110 ++++++++++++++++++ .../rest-api-spec/test/issues/5173.yml | 99 ++++++++++++++++ 3 files changed, 223 insertions(+), 7 deletions(-) create mode 100644 integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/5173.yml diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index b284a37f205..e4e036da3a6 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -299,15 +299,22 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) { @Override public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context) { visitChildren(node, context); - // Use the main plan from the relBuilder stack directly instead of re-visiting - // the parent AST. Re-visiting causes "belongs to a different planner" assertion - // when multiple appendpipe commands are chained. - RelNode mainNode = context.relBuilder.peek(); - context.relBuilder.push(mainNode); - node.getSubQuery().accept(this, context); + UnresolvedPlan subqueryPlan = node.getSubQuery(); + UnresolvedPlan childNode = subqueryPlan; + while (childNode.getChild() != null + && !childNode.getChild().isEmpty() + && !(childNode.getChild().getFirst() instanceof Values)) { + if (childNode.getChild().size() > 1) { + throw new RuntimeException("AppendPipe doesn't support multiply children subquery."); + } + childNode = (UnresolvedPlan) childNode.getChild().getFirst(); + } + childNode.attach(node.getChild().getFirst()); + + subqueryPlan.accept(this, context); RelNode subPipelineNode = context.relBuilder.build(); - mainNode = context.relBuilder.build(); + RelNode mainNode = context.relBuilder.build(); return mergeTableAndResolveColumnConflict(mainNode, subPipelineNode, context); } diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchRelFieldTrimmer.java b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchRelFieldTrimmer.java index 398b82530c9..29bd2007bf4 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchRelFieldTrimmer.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchRelFieldTrimmer.java @@ -5,21 +5,31 @@ package org.opensearch.sql.calcite.utils; +import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import org.apache.calcite.linq4j.Ord; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.Values; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexPermuteInputsShuttle; +import org.apache.calcite.rex.RexSubQuery; +import org.apache.calcite.rex.RexUtil; import org.apache.calcite.rex.RexVisitor; import org.apache.calcite.sql.validate.SqlValidator; import org.apache.calcite.sql2rel.RelFieldTrimmer; import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.mapping.Mapping; +import org.apache.calcite.util.mapping.MappingType; import org.apache.calcite.util.mapping.Mappings; import org.checkerframework.checker.nullness.qual.Nullable; import org.opensearch.sql.calcite.plan.rel.Dedup; @@ -30,9 +40,94 @@ *

This class extends Calcite's RelFieldTrimmer to support trimming customized operators. */ public class OpenSearchRelFieldTrimmer extends RelFieldTrimmer { + private final RelBuilder openSearchRelBuilder; public OpenSearchRelFieldTrimmer(@Nullable SqlValidator validator, RelBuilder relBuilder) { super(validator, relBuilder); + this.openSearchRelBuilder = relBuilder; + } + + @Override + public TrimResult trimFields( + Project project, ImmutableBitSet fieldsUsed, Set extraFields) { + final RelDataType rowType = project.getRowType(); + final int fieldCount = rowType.getFieldCount(); + final RelNode input = project.getInput(); + + final Set inputExtraFields = new LinkedHashSet<>(extraFields); + RelOptUtil.InputFinder inputFinder = new RelOptUtil.InputFinder(inputExtraFields); + for (Ord ord : Ord.zip(project.getProjects())) { + if (fieldsUsed.get(ord.i)) { + ord.e.accept(inputFinder); + } + } + + List subQueries = RexUtil.SubQueryCollector.collect(project); + Set correlationIds = RelOptUtil.getVariablesUsed(subQueries); + ImmutableBitSet requiredColumns = ImmutableBitSet.of(); + if (!correlationIds.isEmpty()) { + assert correlationIds.size() == 1; + requiredColumns = RelOptUtil.correlationColumns(correlationIds.iterator().next(), project); + } + + ImmutableBitSet finderFields = inputFinder.build(); + ImmutableBitSet inputFieldsUsed = + ImmutableBitSet.builder().addAll(requiredColumns).addAll(finderFields).build(); + + TrimResult trimResult = trimChild(project, input, inputFieldsUsed, inputExtraFields); + RelNode newInput = trimResult.left; + final Mapping inputMapping = trimResult.right; + + if (newInput == input && fieldsUsed.cardinality() == fieldCount) { + return result(project, Mappings.createIdentity(fieldCount)); + } + + if (fieldsUsed.cardinality() == 0) { + return dummyProject(fieldCount, newInput, project); + } + + final List newProjects = new ArrayList<>(); + final RexVisitor shuttle; + if (!correlationIds.isEmpty()) { + assert correlationIds.size() == 1; + shuttle = + new RexPermuteInputsShuttle(inputMapping, newInput) { + @Override + public RexNode visitSubQuery(RexSubQuery subQuery) { + subQuery = (RexSubQuery) super.visitSubQuery(subQuery); + return RelOptUtil.remapCorrelatesInSuqQuery( + openSearchRelBuilder.getRexBuilder(), + subQuery, + correlationIds.iterator().next(), + newInput.getRowType(), + inputMapping); + } + }; + } else { + shuttle = new RexPermuteInputsShuttle(inputMapping, newInput); + } + + final Mapping mapping = + Mappings.create(MappingType.INVERSE_SURJECTION, fieldCount, fieldsUsed.cardinality()); + for (Ord ord : Ord.zip(project.getProjects())) { + if (fieldsUsed.get(ord.i)) { + mapping.set(ord.i, newProjects.size()); + RexNode newProjectExpr = ord.e.accept(shuttle); + newProjects.add(newProjectExpr); + } + } + + final RelDataType newRowType = + RelOptUtil.permute(project.getCluster().getTypeFactory(), rowType, mapping); + + if (shouldAvoidSimplifyValues(newProjects, newInput)) { + return result( + project.copy(project.getTraitSet(), newInput, newProjects, newRowType), mapping, project); + } + + openSearchRelBuilder.push(newInput); + openSearchRelBuilder.project(newProjects, newRowType.getFieldNames(), false, correlationIds); + return result(openSearchRelBuilder.build(), mapping, project); } public TrimResult trimFields( @@ -67,4 +162,19 @@ public TrimResult trimFields( // needs them for its condition. return result(dedup.copy(newInput, newDedupFields), inputMapping); } + + private boolean shouldAvoidSimplifyValues(List projects, RelNode input) { + return projects.stream().allMatch(RexLiteral.class::isInstance) && isFixedRowCount(input); + } + + private boolean isFixedRowCount(RelNode input) { + if (input instanceof Values) { + return true; + } + if (input instanceof Aggregate aggregate) { + return aggregate.getGroupSet().isEmpty() + && aggregate.getGroupType() == Aggregate.Group.SIMPLE; + } + return false; + } } diff --git a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/5173.yml b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/5173.yml new file mode 100644 index 00000000000..3db25e24f56 --- /dev/null +++ b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/5173.yml @@ -0,0 +1,99 @@ +setup: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled: true + + - do: + indices.create: + index: issue5173 + body: + settings: + number_of_shards: 1 + number_of_replicas: 0 + mappings: + properties: + gender: + type: keyword + age: + type: integer + + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "issue5173", "_id": "1"}}' + - '{"gender": "F", "age": 10}' + - '{"index": {"_index": "issue5173", "_id": "2"}}' + - '{"gender": "F", "age": 20}' + - '{"index": {"_index": "issue5173", "_id": "3"}}' + - '{"gender": "M", "age": 30}' + - '{"index": {"_index": "issue5173", "_id": "4"}}' + - '{"gender": "M", "age": 40}' + +--- +teardown: + - do: + indices.delete: + index: issue5173 + ignore_unavailable: true + - do: + query.settings: + body: + transient: + plugins.calcite.enabled: false + +--- +"Issue 5173: double appendpipe with different aggregations should succeed": + - skip: + features: + - headers + - do: + headers: + Content-Type: 'application/json' + ppl: + body: + query: "source=issue5173 | stats sum(age) as sum_age by gender | appendpipe [ stats avg(sum_age) as avg_sum_age ] | appendpipe [ stats max(sum_age) as max_sum_age ]" + + - match: { total: 4 } + - match: + schema: + - { name: sum_age, type: bigint } + - { name: gender, type: string } + - { name: avg_sum_age, type: double } + - { name: max_sum_age, type: bigint } + - match: + datarows: + - [ 30, "F", null, null ] + - [ 70, "M", null, null ] + - [ null, null, 50.0, null ] + - [ null, null, null, 70 ] + +--- +"Issue 5173: triple appendpipe with different aggregations should succeed": + - skip: + features: + - headers + - do: + headers: + Content-Type: 'application/json' + ppl: + body: + query: "source=issue5173 | stats sum(age) as sum_age by gender | appendpipe [ stats avg(sum_age) as avg_sum_age ] | appendpipe [ stats max(sum_age) as max_sum_age ] | appendpipe [ stats min(sum_age) as min_sum_age ]" + + - match: { total: 5 } + - match: + schema: + - { name: sum_age, type: bigint } + - { name: gender, type: string } + - { name: avg_sum_age, type: double } + - { name: max_sum_age, type: bigint } + - { name: min_sum_age, type: bigint } + - match: + datarows: + - [ 30, "F", null, null, null ] + - [ 70, "M", null, null, null ] + - [ null, null, 50.0, null, null ] + - [ null, null, null, 70, null ] + - [ null, null, null, null, 30 ] From 0c3a6953b8d59993b7ffe1fedf4da5e5d57ec2d3 Mon Sep 17 00:00:00 2001 From: Songkan Tang Date: Thu, 9 Apr 2026 17:43:55 +0800 Subject: [PATCH 5/6] Use consistent prepareStatement to resolve the issue Signed-off-by: Songkan Tang --- .../sql/calcite/utils/CalciteToolsHelper.java | 31 +++++ .../utils/OpenSearchRelFieldTrimmer.java | 110 ------------------ 2 files changed, 31 insertions(+), 110 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java index a6d57ea01f6..a9dc054bc99 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java @@ -63,6 +63,7 @@ import org.apache.calcite.plan.RelOptSchema; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelOptTable.ViewExpander; +import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.hep.HepPlanner; import org.apache.calcite.plan.hep.HepProgram; import org.apache.calcite.plan.hep.HepProgramBuilder; @@ -367,6 +368,36 @@ protected SqlToRelConverter getSqlToRelConverter( return new OpenSearchSqlToRelConverter( this, validator, catalogReader, this.cluster, convertletTable, config); } + + @Override + protected RelRoot trimUnusedFields(RelRoot root) { + final SqlToRelConverter.Config config = + SqlToRelConverter.config() + .withTrimUnusedFields(shouldTrim(root.rel)) + .withExpand(THREAD_EXPAND.get()) + .withInSubQueryThreshold(requireNonNull(THREAD_INSUBQUERY_THRESHOLD.get())); + // PPL analyzes into a pre-built RelNode before prepareStatement(rel). Reuse the incoming + // RelNode's cluster here so prepare-time trimming does not create replacement nodes under a + // different planner than the rest of the tree. + final SqlToRelConverter converter = + new OpenSearchSqlToRelConverter( + this, + getSqlValidator(), + catalogReader, + root.rel.getCluster(), + convertletTable, + config); + final boolean ordered = !root.collation.getFieldCollations().isEmpty(); + final boolean dml = SqlKind.DML.contains(root.kind); + return root.withRel(converter.trimUnusedFields(dml || ordered, root.rel)); + } + + private static boolean shouldTrim(RelNode rootRel) { + // For now, don't trim if there are more than 3 joins. The projects + // near the leaves created by trim migrate past joins and seem to + // prevent join-reordering. + return THREAD_TRIM.get() || RelOptUtil.countJoins(rootRel) < 2; + } } public static class OpenSearchSqlToRelConverter extends SqlToRelConverter { diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchRelFieldTrimmer.java b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchRelFieldTrimmer.java index 29bd2007bf4..398b82530c9 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchRelFieldTrimmer.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchRelFieldTrimmer.java @@ -5,31 +5,21 @@ package org.opensearch.sql.calcite.utils; -import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; -import org.apache.calcite.linq4j.Ord; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Aggregate; -import org.apache.calcite.rel.core.CorrelationId; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.core.Values; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexPermuteInputsShuttle; -import org.apache.calcite.rex.RexSubQuery; -import org.apache.calcite.rex.RexUtil; import org.apache.calcite.rex.RexVisitor; import org.apache.calcite.sql.validate.SqlValidator; import org.apache.calcite.sql2rel.RelFieldTrimmer; import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.mapping.Mapping; -import org.apache.calcite.util.mapping.MappingType; import org.apache.calcite.util.mapping.Mappings; import org.checkerframework.checker.nullness.qual.Nullable; import org.opensearch.sql.calcite.plan.rel.Dedup; @@ -40,94 +30,9 @@ *

This class extends Calcite's RelFieldTrimmer to support trimming customized operators. */ public class OpenSearchRelFieldTrimmer extends RelFieldTrimmer { - private final RelBuilder openSearchRelBuilder; public OpenSearchRelFieldTrimmer(@Nullable SqlValidator validator, RelBuilder relBuilder) { super(validator, relBuilder); - this.openSearchRelBuilder = relBuilder; - } - - @Override - public TrimResult trimFields( - Project project, ImmutableBitSet fieldsUsed, Set extraFields) { - final RelDataType rowType = project.getRowType(); - final int fieldCount = rowType.getFieldCount(); - final RelNode input = project.getInput(); - - final Set inputExtraFields = new LinkedHashSet<>(extraFields); - RelOptUtil.InputFinder inputFinder = new RelOptUtil.InputFinder(inputExtraFields); - for (Ord ord : Ord.zip(project.getProjects())) { - if (fieldsUsed.get(ord.i)) { - ord.e.accept(inputFinder); - } - } - - List subQueries = RexUtil.SubQueryCollector.collect(project); - Set correlationIds = RelOptUtil.getVariablesUsed(subQueries); - ImmutableBitSet requiredColumns = ImmutableBitSet.of(); - if (!correlationIds.isEmpty()) { - assert correlationIds.size() == 1; - requiredColumns = RelOptUtil.correlationColumns(correlationIds.iterator().next(), project); - } - - ImmutableBitSet finderFields = inputFinder.build(); - ImmutableBitSet inputFieldsUsed = - ImmutableBitSet.builder().addAll(requiredColumns).addAll(finderFields).build(); - - TrimResult trimResult = trimChild(project, input, inputFieldsUsed, inputExtraFields); - RelNode newInput = trimResult.left; - final Mapping inputMapping = trimResult.right; - - if (newInput == input && fieldsUsed.cardinality() == fieldCount) { - return result(project, Mappings.createIdentity(fieldCount)); - } - - if (fieldsUsed.cardinality() == 0) { - return dummyProject(fieldCount, newInput, project); - } - - final List newProjects = new ArrayList<>(); - final RexVisitor shuttle; - if (!correlationIds.isEmpty()) { - assert correlationIds.size() == 1; - shuttle = - new RexPermuteInputsShuttle(inputMapping, newInput) { - @Override - public RexNode visitSubQuery(RexSubQuery subQuery) { - subQuery = (RexSubQuery) super.visitSubQuery(subQuery); - return RelOptUtil.remapCorrelatesInSuqQuery( - openSearchRelBuilder.getRexBuilder(), - subQuery, - correlationIds.iterator().next(), - newInput.getRowType(), - inputMapping); - } - }; - } else { - shuttle = new RexPermuteInputsShuttle(inputMapping, newInput); - } - - final Mapping mapping = - Mappings.create(MappingType.INVERSE_SURJECTION, fieldCount, fieldsUsed.cardinality()); - for (Ord ord : Ord.zip(project.getProjects())) { - if (fieldsUsed.get(ord.i)) { - mapping.set(ord.i, newProjects.size()); - RexNode newProjectExpr = ord.e.accept(shuttle); - newProjects.add(newProjectExpr); - } - } - - final RelDataType newRowType = - RelOptUtil.permute(project.getCluster().getTypeFactory(), rowType, mapping); - - if (shouldAvoidSimplifyValues(newProjects, newInput)) { - return result( - project.copy(project.getTraitSet(), newInput, newProjects, newRowType), mapping, project); - } - - openSearchRelBuilder.push(newInput); - openSearchRelBuilder.project(newProjects, newRowType.getFieldNames(), false, correlationIds); - return result(openSearchRelBuilder.build(), mapping, project); } public TrimResult trimFields( @@ -162,19 +67,4 @@ public TrimResult trimFields( // needs them for its condition. return result(dedup.copy(newInput, newDedupFields), inputMapping); } - - private boolean shouldAvoidSimplifyValues(List projects, RelNode input) { - return projects.stream().allMatch(RexLiteral.class::isInstance) && isFixedRowCount(input); - } - - private boolean isFixedRowCount(RelNode input) { - if (input instanceof Values) { - return true; - } - if (input instanceof Aggregate aggregate) { - return aggregate.getGroupSet().isEmpty() - && aggregate.getGroupType() == Aggregate.Group.SIMPLE; - } - return false; - } } From 91247029aae2cd99c954e7b76c35a78a457ff178 Mon Sep 17 00:00:00 2001 From: Songkan Tang Date: Thu, 9 Apr 2026 18:05:11 +0800 Subject: [PATCH 6/6] Preserve agg hint after reusing cluster during prepare and optimize Signed-off-by: Songkan Tang --- .../sql/calcite/utils/CalciteToolsHelper.java | 36 +++++++++++++++++-- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java index a9dc054bc99..ffd1dc3c229 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java @@ -75,6 +75,7 @@ import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.RelShuttle; import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.hint.HintStrategyTable; import org.apache.calcite.rel.logical.LogicalTableScan; import org.apache.calcite.rel.rules.FilterMergeRule; import org.apache.calcite.rel.type.RelDataType; @@ -410,22 +411,51 @@ public OpenSearchSqlToRelConverter( RelOptCluster cluster, SqlRexConvertletTable convertletTable, Config config) { - super(viewExpander, validator, catalogReader, cluster, convertletTable, config); + this( + viewExpander, + validator, + catalogReader, + cluster, + convertletTable, + preserveHintStrategies(cluster, config), + true); + } + + private OpenSearchSqlToRelConverter( + ViewExpander viewExpander, + @Nullable SqlValidator validator, + CatalogReader catalogReader, + RelOptCluster cluster, + SqlRexConvertletTable convertletTable, + Config effectiveConfig, + boolean ignored) { + super(viewExpander, validator, catalogReader, cluster, convertletTable, effectiveConfig); this.relBuilder = - config + effectiveConfig .getRelBuilderFactory() .create( cluster, validator != null ? validator.getCatalogReader().unwrap(RelOptSchema.class) : null) - .transform(config.getRelBuilderConfigTransform()); + .transform(effectiveConfig.getRelBuilderConfigTransform()); } @Override protected RelFieldTrimmer newFieldTrimmer() { return new OpenSearchRelFieldTrimmer(validator, this.relBuilder); } + + // SqlToRelConverter always installs the hint strategy table from its config onto the cluster. + // When prepare-time trimming reuses an incoming RelNode cluster, preserve any PPL-specific + // aggregate hint strategies that were already registered during analysis. + private static Config preserveHintStrategies(RelOptCluster cluster, Config config) { + if (config.getHintStrategyTable() == HintStrategyTable.EMPTY + && cluster.getHintStrategies() != HintStrategyTable.EMPTY) { + return config.withHintStrategyTable(cluster.getHintStrategies()); + } + return config; + } } public static class OpenSearchRelRunners {