Skip to content

Commit 35ca89e

Browse files
authored
Replace duplicated aggregation logic with aggregateWithTrimming() (#4926)
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
1 parent 1a3f400 commit 35ca89e

10 files changed

Lines changed: 65 additions & 76 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,7 +1810,6 @@ private RelNode buildStreamWindowJoinPlan(
18101810
context.relBuilder.push(leftWithHelpers);
18111811
context.relBuilder.variable(v::set);
18121812

1813-
context.relBuilder.push(leftWithHelpers);
18141813
RexNode rightSeq = context.relBuilder.field(seqCol);
18151814
RexNode outerSeq = context.relBuilder.field(v.get(), seqCol);
18161815

@@ -1829,8 +1828,7 @@ private RelNode buildStreamWindowJoinPlan(
18291828
context.relBuilder.filter(filter);
18301829

18311830
// aggregate all window functions on right side
1832-
List<AggCall> aggCalls = buildAggCallsForWindowFunctions(node.getWindowFunctionList(), context);
1833-
context.relBuilder.aggregate(context.relBuilder.groupKey(), aggCalls);
1831+
aggregateWithTrimming(List.of(), node.getWindowFunctionList(), context, false);
18341832
RelNode rightAgg = context.relBuilder.build();
18351833

18361834
// correlate LEFT with RIGHT using seq + group fields
@@ -2016,27 +2014,6 @@ private String extractGroupFieldName(UnresolvedExpression groupExpr) {
20162014
}
20172015
}
20182016

2019-
private List<AggCall> buildAggCallsForWindowFunctions(
2020-
List<UnresolvedExpression> windowExprs, CalcitePlanContext context) {
2021-
List<AggCall> aggCalls = new ArrayList<>();
2022-
for (UnresolvedExpression expr : windowExprs) {
2023-
if (expr instanceof Alias a && a.getDelegated() instanceof WindowFunction wf) {
2024-
Function func = (Function) wf.getFunction();
2025-
List<UnresolvedExpression> args = func.getFuncArgs();
2026-
// first argument is the input field, others are function params
2027-
UnresolvedExpression field = args.isEmpty() ? null : args.get(0);
2028-
List<UnresolvedExpression> rest =
2029-
args.size() <= 1 ? List.of() : args.subList(1, args.size());
2030-
AggregateFunction aggFunc = new AggregateFunction(func.getFuncName(), field, rest);
2031-
AggCall call = aggVisitor.analyze(new Alias(a.getName(), aggFunc), context);
2032-
aggCalls.add(call);
2033-
} else {
2034-
throw new IllegalArgumentException("Unsupported window function in streamstats");
2035-
}
2036-
}
2037-
return aggCalls;
2038-
}
2039-
20402017
private List<RexNode> buildRequiredLeft(
20412018
CalcitePlanContext context, String seqCol, List<UnresolvedExpression> groupList) {
20422019
List<RexNode> requiredLeft = new ArrayList<>();

integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global.yaml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ calcite:
66
LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}])
77
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
88
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
9-
LogicalAggregate(group=[{}], avg_age=[AVG($8)])
10-
LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))])
11-
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
12-
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
9+
LogicalAggregate(group=[{}], avg_age=[AVG($0)])
10+
LogicalProject(age=[$8])
11+
LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))])
12+
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
13+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
1314
physical: |
1415
EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t18])
1516
EnumerableLimit(fetch=[10000])

integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yaml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ calcite:
66
LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}])
77
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
88
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
9-
LogicalAggregate(group=[{}], avg_age=[AVG($8)])
10-
LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))])
11-
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
12-
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
9+
LogicalAggregate(group=[{}], avg_age=[AVG($0)])
10+
LogicalProject(age=[$8])
11+
LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))])
12+
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
13+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
1314
physical: |
1415
EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16])
1516
EnumerableLimit(fetch=[10000])

integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset.yaml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ calcite:
77
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))])
88
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)])
99
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
10-
LogicalAggregate(group=[{}], avg_age=[AVG($8)])
11-
LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))])
12-
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))])
13-
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)])
14-
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
10+
LogicalAggregate(group=[{}], avg_age=[AVG($0)])
11+
LogicalProject(age=[$8])
12+
LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))])
13+
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))])
14+
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)])
15+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
1516
physical: |
1617
EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t18])
1718
EnumerableLimit(fetch=[10000])

integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_reset_null_bucket.yaml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ calcite:
77
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))])
88
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)])
99
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
10-
LogicalAggregate(group=[{}], avg_age=[AVG($8)])
11-
LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), =($4, $cor0.gender))])
12-
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))])
13-
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)])
14-
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
10+
LogicalAggregate(group=[{}], avg_age=[AVG($0)])
11+
LogicalProject(age=[$8])
12+
LogicalFilter(condition=[AND(<($17, $cor0.__stream_seq__), =($20, $cor0.__seg_id__), =($4, $cor0.gender))])
13+
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[$17], __reset_before_flag__=[$18], __reset_after_flag__=[$19], __seg_id__=[+(SUM($18) OVER (ROWS UNBOUNDED PRECEDING), COALESCE(SUM($19) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0))])
14+
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()], __reset_before_flag__=[CASE(>($8, 34), 1, 0)], __reset_after_flag__=[CASE(<($8, 25), 1, 0)])
15+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
1516
physical: |
1617
EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16])
1718
EnumerableLimit(fetch=[10000])

integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global.yaml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ calcite:
66
LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}])
77
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
88
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
9-
LogicalAggregate(group=[{}], avg_age=[AVG($8)])
10-
LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))])
11-
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
12-
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
9+
LogicalAggregate(group=[{}], avg_age=[AVG($0)])
10+
LogicalProject(age=[$8])
11+
LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))])
12+
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
13+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
1314
physical: |
1415
EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t18])
1516
EnumerableLimit(fetch=[10000])

integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global_null_bucket.yaml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ calcite:
66
LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}])
77
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
88
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
9-
LogicalAggregate(group=[{}], avg_age=[AVG($8)])
10-
LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))])
11-
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
12-
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
9+
LogicalAggregate(group=[{}], avg_age=[AVG($0)])
10+
LogicalProject(age=[$8])
11+
LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))])
12+
LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()])
13+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
1314
physical: |
1415
EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16])
1516
EnumerableLimit(fetch=[10000])

0 commit comments

Comments
 (0)