Skip to content

Commit e8678df

Browse files
committed
Make addcoltotals/addtotals summary row backend-deterministic
addcoltotals (and addtotals with col=true) lowers to UNION ALL(data, single SUM row). SQL UNION ALL has no ordering guarantee: the v2 engine preserves input order incidentally, but the DataFusion backend on the analytics-engine route does not, so the summary row landed in a non-deterministic position. The command contract and user docs require the summary row to be last. Separately, the synthesized column for a new `labelfield` was typed as a bounded VARCHAR(n), which renders as a Substrait VarChar(n) literal that the DataFusion substrait consumer rejects ("Unsupported literal_type: VarChar"). Both fixes are in CalciteRelNodeVisitor.buildAddRowTotalAggregate: - Tag each union branch with a constant ordering key (0 = data rows, 1 = summary row), sort on it, then drop the helper column, so the summary row is deterministically last regardless of backend. Reuses the existing streamstats sort-then-projectExcept idiom; adds PlanUtils constant ORDER_COLUMN_FOR_ADDCOLTOTALS. - Type the synthesized label column as an unbounded VARCHAR, matching how OpenSearchTypeFactory maps STRING fields. Regenerated the affected CalcitePPLAddColTotalsTest / CalcitePPLAddTotalsTest plan and Spark-SQL expectations and the explain_add_col_totals / explain_add_totals YAML plans; result rows are unchanged. Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent bcbbc59 commit e8678df

6 files changed

Lines changed: 271 additions & 166 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC;
1717
import static org.opensearch.sql.calcite.plan.rule.PPLDedupConvertRule.buildDedupNotNull;
1818
import static org.opensearch.sql.calcite.plan.rule.PPLDedupConvertRule.buildDedupOrNull;
19+
import static org.opensearch.sql.calcite.utils.PlanUtils.ORDER_COLUMN_FOR_ADDCOLTOTALS;
1920
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_MAIN;
2021
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_RARE_TOP;
2122
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_STREAMSTATS;
@@ -3370,13 +3371,12 @@ public RelNode buildAddRowTotalAggregate(
33703371
RelNode originalData = context.relBuilder.peek();
33713372
List<String> fieldNames = originalData.getRowType().getFieldNames();
33723373
boolean foundLabelField = false;
3373-
int labelLength =
3374-
(labelField != null) && (labelField.length() > label.length())
3375-
? labelField.length()
3376-
: label.length();
33773374

3375+
// Type the synthesized label column as an unbounded VARCHAR, matching how OpenSearchTypeFactory
3376+
// maps STRING fields. A length-bounded VARCHAR(n) would render as a Substrait VarChar(n)
3377+
// literal, which the DataFusion backend (analytics-engine route) does not support.
33783378
RelDataType labelVarcharType =
3379-
context.relBuilder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR, labelLength);
3379+
context.relBuilder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR);
33803380

33813381
// If no specific fields specified, use all numeric fields
33823382
if (fieldsToAggregate.isEmpty()) {
@@ -3483,10 +3483,24 @@ public RelNode buildAddRowTotalAggregate(
34833483
// Project the totals row with proper field order and labels
34843484
context.relBuilder.project(selectList);
34853485
RelNode totalsRow = context.relBuilder.build();
3486-
// 4. Union original data with totals row
3486+
// 4. Union original data with totals row.
3487+
// UNION ALL does not guarantee row order, and some backends (e.g. DataFusion on the
3488+
// analytics-engine route) do not preserve input order, so the summary row can land
3489+
// anywhere. Tag each branch with a constant ordering key (0 = data rows, 1 = summary row),
3490+
// union, sort on it so the summary row is deterministically last, then drop the helper.
34873491
context.relBuilder.push(originalData);
3492+
context.relBuilder.projectPlus(
3493+
context.relBuilder.alias(context.relBuilder.literal(0), ORDER_COLUMN_FOR_ADDCOLTOTALS));
3494+
RelNode dataWithOrder = context.relBuilder.build();
34883495
context.relBuilder.push(totalsRow);
3489-
context.relBuilder.union(true); // Use UNION ALL to preserve order
3496+
context.relBuilder.projectPlus(
3497+
context.relBuilder.alias(context.relBuilder.literal(1), ORDER_COLUMN_FOR_ADDCOLTOTALS));
3498+
RelNode totalsWithOrder = context.relBuilder.build();
3499+
context.relBuilder.push(dataWithOrder);
3500+
context.relBuilder.push(totalsWithOrder);
3501+
context.relBuilder.union(true); // UNION ALL
3502+
context.relBuilder.sort(context.relBuilder.field(ORDER_COLUMN_FOR_ADDCOLTOTALS));
3503+
context.relBuilder.projectExcept(context.relBuilder.field(ORDER_COLUMN_FOR_ADDCOLTOTALS));
34903504
}
34913505
return context.relBuilder.peek();
34923506
}

core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,12 @@ public interface PlanUtils {
9090
String ROW_NUMBER_COLUMN_FOR_TRANSPOSE = "_row_number_transpose_";
9191
String VALUE_COLUMN_FOR_TRANSPOSE = "_value_transpose_";
9292

93+
/**
94+
* Ordering key for the addcoltotals UNION ALL: 0 on the original rows, 1 on the summary row, so a
95+
* sort on it keeps the summary row last regardless of backend union ordering.
96+
*/
97+
String ORDER_COLUMN_FOR_ADDCOLTOTALS = "_addcoltotals_order_";
98+
9399
static SpanUnit intervalUnitToSpanUnit(IntervalUnit unit) {
94100
return switch (unit) {
95101
case MICROSECOND -> SpanUnit.MICROSECOND;

integ-test/src/test/resources/expectedOutput/calcite/explain_add_col_totals.yaml

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,21 @@ calcite:
22
logical: |
33
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
44
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10])
5-
LogicalUnion(all=[true])
6-
LogicalSort(fetch=[5])
7-
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
8-
LogicalProject(account_number=[null:BIGINT], firstname=[null:VARCHAR], address=[null:VARCHAR], balance=[$0], gender=[null:VARCHAR], city=[null:VARCHAR], employer=[null:VARCHAR], state=[null:VARCHAR], age=[$1], email=[null:VARCHAR], lastname=[null:VARCHAR], _id=[null:VARCHAR], _index=[null:VARCHAR], _score=[null:REAL], _maxscore=[null:REAL], _sort=[null:BIGINT], _routing=[null:VARCHAR])
9-
LogicalAggregate(group=[{}], balance=[SUM($3)], age=[SUM($8)])
5+
LogicalSort(sort0=[$17], dir0=[ASC])
6+
LogicalUnion(all=[true])
7+
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], _addcoltotals_order_=[0])
108
LogicalSort(fetch=[5])
119
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
10+
LogicalProject(account_number=[null:BIGINT], firstname=[null:VARCHAR], address=[null:VARCHAR], balance=[$0], gender=[null:VARCHAR], city=[null:VARCHAR], employer=[null:VARCHAR], state=[null:VARCHAR], age=[$1], email=[null:VARCHAR], lastname=[null:VARCHAR], _id=[null:VARCHAR], _index=[null:VARCHAR], _score=[null:REAL], _maxscore=[null:REAL], _sort=[null:BIGINT], _routing=[null:VARCHAR], _addcoltotals_order_=[1])
11+
LogicalAggregate(group=[{}], balance=[SUM($3)], age=[SUM($8)])
12+
LogicalSort(fetch=[5])
13+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
1214
physical: |
13-
EnumerableLimit(fetch=[10000])
14-
EnumerableUnion(all=[true])
15-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname], LIMIT->5], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])
16-
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[null:BIGINT], expr#3=[null:VARCHAR], account_number=[$t2], firstname=[$t3], address=[$t3], balance=[$t0], gender=[$t3], city=[$t3], employer=[$t3], state=[$t3], age=[$t1], email=[$t3], lastname=[$t3])
17-
EnumerableAggregate(group=[{}], balance=[SUM($0)], age=[SUM($1)])
18-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[balance, age], LIMIT->5], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["balance","age"]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])
15+
EnumerableCalc(expr#0..11=[{inputs}], proj#0..10=[{exprs}])
16+
CalciteEnumerableTopK(sort0=[$11], dir0=[ASC], fetch=[10000])
17+
EnumerableUnion(all=[true])
18+
EnumerableCalc(expr#0..10=[{inputs}], expr#11=[0], proj#0..11=[{exprs}])
19+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname], LIMIT->5], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])
20+
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[null:BIGINT], expr#3=[null:VARCHAR], expr#4=[1], account_number=[$t2], firstname=[$t3], address=[$t3], balance=[$t0], gender=[$t3], city=[$t3], employer=[$t3], state=[$t3], age=[$t1], email=[$t3], lastname=[$t3], _addcoltotals_order_=[$t4])
21+
EnumerableAggregate(group=[{}], balance=[SUM($0)], age=[SUM($1)])
22+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[balance, age], LIMIT->5], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["balance","age"]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/explain_add_totals.yaml

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,22 @@ calcite:
22
logical: |
33
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
44
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], CustomSum=[$17], all_emp_total=[$18])
5-
LogicalUnion(all=[true])
6-
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], CustomSum=[+($3, $8)], all_emp_total=[null:VARCHAR(13)])
7-
LogicalSort(fetch=[5])
8-
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
9-
LogicalProject(account_number=[null:BIGINT], firstname=[null:VARCHAR], address=[null:VARCHAR], balance=[$0], gender=[null:VARCHAR], city=[null:VARCHAR], employer=[null:VARCHAR], state=[null:VARCHAR], age=[$1], email=[null:VARCHAR], lastname=[null:VARCHAR], _id=[null:VARCHAR], _index=[null:VARCHAR], _score=[null:REAL], _maxscore=[null:REAL], _sort=[null:BIGINT], _routing=[null:VARCHAR], CustomSum=[null:BIGINT], all_emp_total=['ColTotal':VARCHAR(13)])
10-
LogicalAggregate(group=[{}], balance=[SUM($0)], age=[SUM($1)])
11-
LogicalProject(balance=[$3], age=[$8])
12-
LogicalSort(fetch=[5])
13-
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
5+
LogicalSort(sort0=[$19], dir0=[ASC])
6+
LogicalUnion(all=[true])
7+
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], CustomSum=[+($3, $8)], all_emp_total=[null:VARCHAR], _addcoltotals_order_=[0])
8+
LogicalSort(fetch=[5])
9+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
10+
LogicalProject(account_number=[null:BIGINT], firstname=[null:VARCHAR], address=[null:VARCHAR], balance=[$0], gender=[null:VARCHAR], city=[null:VARCHAR], employer=[null:VARCHAR], state=[null:VARCHAR], age=[$1], email=[null:VARCHAR], lastname=[null:VARCHAR], _id=[null:VARCHAR], _index=[null:VARCHAR], _score=[null:REAL], _maxscore=[null:REAL], _sort=[null:BIGINT], _routing=[null:VARCHAR], CustomSum=[null:BIGINT], all_emp_total=['ColTotal':VARCHAR], _addcoltotals_order_=[1])
11+
LogicalAggregate(group=[{}], balance=[SUM($0)], age=[SUM($1)])
12+
LogicalProject(balance=[$3], age=[$8])
13+
LogicalSort(fetch=[5])
14+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
1415
physical: |
15-
EnumerableLimit(fetch=[10000])
16-
EnumerableUnion(all=[true])
17-
EnumerableCalc(expr#0..10=[{inputs}], expr#11=[+($t3, $t8)], expr#12=[null:VARCHAR(13)], proj#0..12=[{exprs}])
18-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname], LIMIT->5, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])
19-
EnumerableLimit(fetch=[10000])
20-
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[null:BIGINT], expr#3=[null:VARCHAR], expr#4=['ColTotal':VARCHAR(13)], account_number=[$t2], firstname=[$t3], address=[$t3], balance=[$t0], gender=[$t3], city=[$t3], employer=[$t3], state=[$t3], age=[$t1], email=[$t3], lastname=[$t3], CustomSum=[$t2], all_emp_total=[$t4])
16+
EnumerableCalc(expr#0..13=[{inputs}], proj#0..12=[{exprs}])
17+
CalciteEnumerableTopK(sort0=[$13], dir0=[ASC], fetch=[10000])
18+
EnumerableUnion(all=[true])
19+
EnumerableCalc(expr#0..10=[{inputs}], expr#11=[+($t3, $t8)], expr#12=[null:VARCHAR], expr#13=[0], proj#0..13=[{exprs}])
20+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname], LIMIT->5], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])
21+
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[null:BIGINT], expr#3=[null:VARCHAR], expr#4=['ColTotal':VARCHAR], expr#5=[1], account_number=[$t2], firstname=[$t3], address=[$t3], balance=[$t0], gender=[$t3], city=[$t3], employer=[$t3], state=[$t3], age=[$t1], email=[$t3], lastname=[$t3], CustomSum=[$t2], all_emp_total=[$t4], _addcoltotals_order_=[$t5])
2122
EnumerableAggregate(group=[{}], balance=[SUM($0)], age=[SUM($1)])
2223
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[balance, age], LIMIT->5], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["balance","age"]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])

0 commit comments

Comments
 (0)