Skip to content

Commit 5851bdb

Browse files
authored
Resolve SQL unified query gaps for SELECT clauses and window functions (#5450)
* fix(calcite): handle Limit, Alias projection, RelationSubquery, and Values dual-table in visitor Add visitLimit to support LIMIT/OFFSET clauses. Handle Alias nodes in project list by referencing already-computed aggregate fields instead of re-analyzing. Add visitRelationSubquery for derived tables in FROM clause. Fix visitValues to treat a single empty row as a dual-table (SELECT without FROM). Make bucketNullable lookup null-safe with getOrDefault in visitAggregation. Add integration tests covering LIMIT OFFSET, aggregate with alias, GROUP BY without bucket nullable, SELECT with alias, derived table subquery, and SELECT without FROM clause. Signed-off-by: Chen Dai <daichen@amazon.com> * feat(calcite): support window functions, ROW_NUMBER, and register ISNULL Add AggregateFunction handling in visitWindowFunction to support aggregate-based window expressions with DISTINCT and ORDER BY keys. Add translateOrderKeys utility for window ORDER BY translation. Register row_number in WINDOW_FUNC_MAPPING and skip aggregate signature validation for it (it has no field/args). Pass distinct flag through makeOver call chain. RANK and DENSE_RANK are deferred to a follow-up alongside the open PPL eventstats/streamstats issue (#5168) which involves the same function registration and a separate ORDER BY semantics question. Register ISNULL as alias for IS_NULL in PPLFuncImpTable. Add integration tests for window functions with ORDER BY, ROW_NUMBER, COUNT DISTINCT OVER, and ISNULL. Signed-off-by: Chen Dai <daichen@amazon.com> --------- Signed-off-by: Chen Dai <daichen@amazon.com>
1 parent dcdb5ba commit 5851bdb

6 files changed

Lines changed: 267 additions & 15 deletions

File tree

api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerSqlTest.java

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,4 +259,154 @@ SELECT department, count(*)
259259
""")
260260
.assertErrorMessage("Encountered");
261261
}
262+
263+
@Test
264+
public void testSqlWindowFunctionWithOrderBy() {
265+
givenQuery(
266+
"""
267+
SELECT name, SUM(age) OVER (PARTITION BY department ORDER BY id) AS running_sum
268+
FROM catalog.employees\
269+
""")
270+
.assertPlan(
271+
"""
272+
LogicalProject(name=[$1], running_sum=[SUM($2) OVER (PARTITION BY $3 ORDER BY $0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)])
273+
LogicalTableScan(table=[[catalog, employees]])
274+
""");
275+
}
276+
277+
@Test
278+
public void testSqlWindowRowNumber() {
279+
givenQuery(
280+
"""
281+
SELECT name, ROW_NUMBER() OVER (ORDER BY id) AS rn
282+
FROM catalog.employees\
283+
""")
284+
.assertPlan(
285+
"""
286+
LogicalProject(name=[$1], rn=[ROW_NUMBER() OVER (ORDER BY $0)])
287+
LogicalTableScan(table=[[catalog, employees]])
288+
""");
289+
}
290+
291+
@Test
292+
public void testSqlWindowDistinctAggregate() {
293+
givenQuery(
294+
"""
295+
SELECT name, COUNT(DISTINCT department) OVER (PARTITION BY department) AS dist_cnt
296+
FROM catalog.employees\
297+
""")
298+
.assertPlan(
299+
"""
300+
LogicalProject(name=[$1], dist_cnt=[COUNT(DISTINCT $3) OVER (PARTITION BY $3)])
301+
LogicalTableScan(table=[[catalog, employees]])
302+
""");
303+
}
304+
305+
@Test
306+
public void testSqlIsNullFunction() {
307+
// ISNULL(field) — exercises the ISNULL alias registration in PPLFuncImpTable.
308+
// Calcite constant-folds to false since test schema columns are NOT NULL.
309+
givenQuery(
310+
"""
311+
SELECT ISNULL(department) AS is_null
312+
FROM catalog.employees\
313+
""")
314+
.assertPlan(
315+
"""
316+
LogicalProject(is_null=[false])
317+
LogicalTableScan(table=[[catalog, employees]])
318+
""");
319+
}
320+
321+
@Test
322+
public void testSqlLimitOffset() {
323+
givenQuery(
324+
"""
325+
SELECT name
326+
FROM catalog.employees
327+
LIMIT 10 OFFSET 5\
328+
""")
329+
.assertPlan(
330+
"""
331+
LogicalProject(name=[$1])
332+
LogicalSort(offset=[5], fetch=[10])
333+
LogicalTableScan(table=[[catalog, employees]])
334+
""");
335+
}
336+
337+
@Test
338+
public void testSqlAggregateWithAlias() {
339+
givenQuery(
340+
"""
341+
SELECT department, COUNT(*) AS cnt
342+
FROM catalog.employees
343+
GROUP BY department\
344+
""")
345+
.assertPlan(
346+
"""
347+
LogicalAggregate(group=[{0}], COUNT(*)=[COUNT()])
348+
LogicalProject(department=[$3])
349+
LogicalTableScan(table=[[catalog, employees]])
350+
""");
351+
}
352+
353+
@Test
354+
public void testSqlGroupByWithoutBucketNullable() {
355+
givenQuery(
356+
"""
357+
SELECT age, COUNT(*) AS cnt
358+
FROM catalog.employees
359+
GROUP BY age\
360+
""")
361+
.assertPlan(
362+
"""
363+
LogicalAggregate(group=[{0}], COUNT(*)=[COUNT()])
364+
LogicalProject(age=[$2])
365+
LogicalTableScan(table=[[catalog, employees]])
366+
""");
367+
}
368+
369+
@Test
370+
public void testSqlSelectWithAlias() {
371+
givenQuery(
372+
"""
373+
SELECT age AS employee_age, name AS employee_name
374+
FROM catalog.employees\
375+
""")
376+
.assertPlan(
377+
"""
378+
LogicalProject(employee_age=[$2], employee_name=[$1])
379+
LogicalTableScan(table=[[catalog, employees]])
380+
""");
381+
}
382+
383+
@Test
384+
public void testSqlDerivedTableInFromClause() {
385+
// SELECT ... FROM (SELECT ...) AS t — exercises visitRelationSubquery override.
386+
givenQuery(
387+
"""
388+
SELECT t.id
389+
FROM (SELECT id, name FROM catalog.employees WHERE age > 30) AS t\
390+
""")
391+
.assertPlan(
392+
"""
393+
LogicalProject(t.id=[$0])
394+
LogicalFilter(condition=[>($2, 30)])
395+
LogicalTableScan(table=[[catalog, employees]])
396+
""");
397+
}
398+
399+
@Test
400+
public void testSqlSelectWithoutFromClause() {
401+
// SELECT 1 — exercises visitValues dual-table case (single empty row).
402+
givenQuery(
403+
"""
404+
SELECT 1\
405+
""")
406+
.assertPlan(
407+
"""
408+
LogicalSort(sort0=[$0], dir0=[ASC])
409+
LogicalValues(tuples=[[]])
410+
""");
411+
}
262412
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@
132132
import org.opensearch.sql.ast.tree.Head;
133133
import org.opensearch.sql.ast.tree.Join;
134134
import org.opensearch.sql.ast.tree.Kmeans;
135+
import org.opensearch.sql.ast.tree.Limit;
135136
import org.opensearch.sql.ast.tree.Lookup;
136137
import org.opensearch.sql.ast.tree.Lookup.OutputStrategy;
137138
import org.opensearch.sql.ast.tree.ML;
@@ -146,6 +147,7 @@
146147
import org.opensearch.sql.ast.tree.RareTopN;
147148
import org.opensearch.sql.ast.tree.Regex;
148149
import org.opensearch.sql.ast.tree.Relation;
150+
import org.opensearch.sql.ast.tree.RelationSubquery;
149151
import org.opensearch.sql.ast.tree.Rename;
150152
import org.opensearch.sql.ast.tree.Replace;
151153
import org.opensearch.sql.ast.tree.ReplacePair;
@@ -542,7 +544,18 @@ private List<RexNode> expandProjectFields(
542544
.forEach(field -> expandedFields.add(context.relBuilder.field(field)));
543545
}
544546
case Alias alias -> {
545-
expandedFields.add(rexVisitor.analyze(alias, context));
547+
// SQL aggregate aliases (e.g., COUNT(*) AS cnt): reference the already-computed field
548+
// and rebind under the user's alias, since re-analyzing the alias returns null.
549+
if (alias.getDelegated() instanceof AggregateFunction
550+
&& alias.getName() != null
551+
&& currentFields.contains(alias.getName())) {
552+
String displayName =
553+
Strings.isNullOrEmpty(alias.getAlias()) ? alias.getName() : alias.getAlias();
554+
expandedFields.add(
555+
context.relBuilder.alias(context.relBuilder.field(alias.getName()), displayName));
556+
} else {
557+
expandedFields.add(rexVisitor.analyze(alias, context));
558+
}
546559
}
547560
default ->
548561
throw new IllegalStateException(
@@ -766,6 +779,13 @@ public RelNode visitHead(Head node, CalcitePlanContext context) {
766779
return context.relBuilder.peek();
767780
}
768781

782+
@Override
783+
public RelNode visitLimit(Limit node, CalcitePlanContext context) {
784+
visitChildren(node, context);
785+
context.relBuilder.limit(node.getOffset(), node.getLimit());
786+
return context.relBuilder.peek();
787+
}
788+
769789
/**
770790
* Insert a reversed sort node after finding the original sort in the tree. This rebuilds the tree
771791
* with the reversed sort inserted right after the original sort.
@@ -1624,7 +1644,9 @@ private Pair<List<RexNode>, List<AggCall>> resolveAttributesForAggregation(
16241644
@Override
16251645
public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
16261646
Argument.ArgumentMap statsArgs = Argument.ArgumentMap.of(node.getArgExprList());
1627-
Boolean bucketNullable = (Boolean) statsArgs.get(Argument.BUCKET_NULLABLE).getValue();
1647+
// SQL aggregations don't carry the PPL-only BUCKET_NULLABLE argument; default to true.
1648+
boolean bucketNullable =
1649+
(Boolean) statsArgs.getOrDefault(Argument.BUCKET_NULLABLE, Literal.TRUE).getValue();
16281650
int nGroup = node.getGroupExprList().size() + (Objects.nonNull(node.getSpan()) ? 1 : 0);
16291651
BitSet nonNullGroupMask = new BitSet(nGroup);
16301652
if (!bucketNullable) {
@@ -1931,6 +1953,14 @@ public RelNode visitSubqueryAlias(SubqueryAlias node, CalcitePlanContext context
19311953
return context.relBuilder.peek();
19321954
}
19331955

1956+
@Override
1957+
public RelNode visitRelationSubquery(RelationSubquery node, CalcitePlanContext context) {
1958+
// Handle SQL derived tables in FROM clause: SELECT ... FROM (SELECT ...) AS t.
1959+
visitChildren(node, context);
1960+
context.relBuilder.as(node.getAliasAsTableName());
1961+
return context.relBuilder.peek();
1962+
}
1963+
19341964
@Override
19351965
public RelNode visitLookup(Lookup node, CalcitePlanContext context) {
19361966
// 1. resolve source side
@@ -4125,12 +4155,13 @@ public RelNode visitMvExpand(MvExpand mvExpand, CalcitePlanContext context) {
41254155

41264156
@Override
41274157
public RelNode visitValues(Values values, CalcitePlanContext context) {
4128-
if (values.getValues() == null || values.getValues().isEmpty()) {
4158+
// Accept SQL SELECT without FROM (dual table), encoded as Values([[]]) — one row, zero columns.
4159+
List<List<Literal>> rows = values.getValues();
4160+
if (rows == null || rows.isEmpty() || (rows.size() == 1 && rows.get(0).isEmpty())) {
41294161
context.relBuilder.values(context.relBuilder.getTypeFactory().builder().build());
41304162
return context.relBuilder.peek();
4131-
} else {
4132-
throw new CalciteUnsupportedException("Explicit values node is unsupported in Calcite");
41334163
}
4164+
throw new CalciteUnsupportedException("Inline VALUES with literal rows is unsupported");
41344165
}
41354166

41364167
@Override

core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,14 @@
3737
import org.apache.calcite.sql.type.ArraySqlType;
3838
import org.apache.calcite.sql.type.SqlTypeName;
3939
import org.apache.calcite.sql.type.SqlTypeUtil;
40+
import org.apache.calcite.tools.RelBuilder;
4041
import org.apache.calcite.util.DateString;
4142
import org.apache.calcite.util.TimeString;
4243
import org.apache.calcite.util.TimestampString;
44+
import org.apache.commons.lang3.tuple.Pair;
4345
import org.apache.logging.log4j.util.Strings;
4446
import org.opensearch.sql.ast.AbstractNodeVisitor;
47+
import org.opensearch.sql.ast.expression.AggregateFunction;
4548
import org.opensearch.sql.ast.expression.Alias;
4649
import org.opensearch.sql.ast.expression.And;
4750
import org.opensearch.sql.ast.expression.Between;
@@ -72,6 +75,8 @@
7275
import org.opensearch.sql.ast.expression.subquery.InSubquery;
7376
import org.opensearch.sql.ast.expression.subquery.ScalarSubquery;
7477
import org.opensearch.sql.ast.expression.subquery.SubqueryExpression;
78+
import org.opensearch.sql.ast.tree.Sort.SortOption;
79+
import org.opensearch.sql.ast.tree.Sort.SortOrder;
7580
import org.opensearch.sql.ast.tree.UnresolvedPlan;
7681
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit;
7782
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType;
@@ -563,47 +568,96 @@ public RexNode visitFunction(Function node, CalcitePlanContext context) {
563568

564569
@Override
565570
public RexNode visitWindowFunction(WindowFunction node, CalcitePlanContext context) {
566-
Function windowFunction = (Function) node.getFunction();
567-
List<RexNode> arguments =
568-
windowFunction.getFuncArgs().stream().map(arg -> analyze(arg, context)).toList();
571+
// SQL emits AggregateFunction for aggregate-as-window (e.g., SUM(x) OVER); PPL emits Function.
572+
final String funcName;
573+
final List<RexNode> arguments;
574+
final boolean isDistinct;
575+
if (node.getFunction() instanceof AggregateFunction aggFunc) {
576+
funcName = aggFunc.getFuncName();
577+
isDistinct = Boolean.TRUE.equals(aggFunc.getDistinct());
578+
List<UnresolvedExpression> argExprs = new ArrayList<>();
579+
if (aggFunc.getField() != null) {
580+
argExprs.add(aggFunc.getField());
581+
}
582+
argExprs.addAll(aggFunc.getArgList());
583+
arguments = argExprs.stream().map(arg -> analyze(arg, context)).toList();
584+
} else {
585+
Function windowFunction = (Function) node.getFunction();
586+
funcName = windowFunction.getFuncName();
587+
isDistinct = false;
588+
arguments = windowFunction.getFuncArgs().stream().map(arg -> analyze(arg, context)).toList();
589+
}
569590
List<RexNode> partitions =
570591
node.getPartitionByList().stream()
571592
.map(arg -> analyze(arg, context))
572593
.map(this::extractRexNodeFromAlias)
573594
.toList();
574-
return BuiltinFunctionName.ofWindowFunction(windowFunction.getFuncName())
595+
List<RexNode> orderKeys = translateOrderKeys(node.getSortList(), context);
596+
return BuiltinFunctionName.ofWindowFunction(funcName)
575597
.map(
576598
functionName -> {
577599
RexNode field = arguments.isEmpty() ? null : arguments.getFirst();
578600
List<RexNode> args =
579601
(arguments.isEmpty() || arguments.size() == 1)
580602
? Collections.emptyList()
581603
: arguments.subList(1, arguments.size());
604+
// ROW_NUMBER takes no field/args and isn't in aggFunctionRegistry,
605+
// so skip aggregate signature validation.
606+
if (functionName == BuiltinFunctionName.ROW_NUMBER) {
607+
return PlanUtils.makeOver(
608+
context,
609+
functionName,
610+
field,
611+
args,
612+
partitions,
613+
orderKeys,
614+
node.getWindowFrame());
615+
}
582616
List<RexNode> nodes =
583617
PPLFuncImpTable.INSTANCE.validateAggFunctionSignature(
584618
functionName, field, args, context.rexBuilder);
585619
return nodes != null
586620
? PlanUtils.makeOver(
587621
context,
588622
functionName,
623+
isDistinct,
589624
nodes.getFirst(),
590625
nodes.size() <= 1 ? Collections.emptyList() : nodes.subList(1, nodes.size()),
591626
partitions,
592-
List.of(),
627+
orderKeys,
593628
node.getWindowFrame())
594629
: PlanUtils.makeOver(
595630
context,
596631
functionName,
632+
isDistinct,
597633
field,
598634
args,
599635
partitions,
600-
List.of(),
636+
orderKeys,
601637
node.getWindowFrame());
602638
})
603639
.orElseThrow(
604-
() ->
605-
new UnsupportedOperationException(
606-
"Unexpected window function: " + windowFunction.getFuncName()));
640+
() -> new UnsupportedOperationException("Unexpected window function: " + funcName));
641+
}
642+
643+
private List<RexNode> translateOrderKeys(
644+
List<Pair<SortOption, UnresolvedExpression>> sortList, CalcitePlanContext context) {
645+
RelBuilder b = context.relBuilder;
646+
return sortList.stream()
647+
.map(
648+
p -> {
649+
SortOption opt = p.getLeft();
650+
RexNode field = analyze(p.getRight(), context);
651+
if (opt.getSortOrder() == SortOrder.DESC) {
652+
field = b.desc(field);
653+
}
654+
return switch (opt.getNullOrder()) {
655+
case NULL_LAST -> b.nullsLast(field);
656+
case NULL_FIRST -> b.nullsFirst(field);
657+
default -> field;
658+
};
659+
})
660+
.toList();
607661
}
608662

609663
/** extract the expression of Alias from a node */

0 commit comments

Comments
 (0)