Skip to content

Commit 1a043bc

Browse files
committed
Tighten predicate: reject non-windowable aggregates and dotted-path BY keys
CI integration failures revealed two cases the rewrite shouldn't fire on: 1. testUnsupportedWindowFunctions — percentile / percentile_approx are in AGGREGATION_FUNC_MAPPING but not WINDOW_FUNC_MAPPING. The legacy rex visitor throws "Unexpected window function: ..." for them, and the test pins that error. My predicate used only ofAggregation so it accepted percentile and the rewrite ran instead of throwing. Now require presence in both maps — percentile (and take/first/last/median, all aggregation- only) fall through to the legacy throw; dc/distinct_count/row_number (all window-only) also fall through unchanged. 2. testEventstatsOnMapPath — `eventstats count() by doc.user.city`. The join condition uses relBuilder.field(2, side, name), which doesn't resolve nested paths; my predicate accepted any QualifiedName so the rewrite produced a plan that failed at field lookup. Now isSimpleQualifiedName requires parts.size() == 1; dotted paths fall through to the legacy RexOver lowering, which handles nested fields via the existing rexVisitor. Plus a spotless reformat to the earliest/latest test that wasn't picked up before push. Signed-off-by: Jialiang Liang <jiallian@amazon.com>
1 parent a444b56 commit 1a043bc

2 files changed

Lines changed: 36 additions & 16 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2115,9 +2115,9 @@ public RelNode visitWindow(Window node, CalcitePlanContext context) {
21152115
* Rewrites {@code eventstats} from a per-row {@link org.apache.calcite.rex.RexOver} window into a
21162116
* cross-join (or partition-key join) against a precomputed aggregate over the same input. The
21172117
* aggregate sits below the join, so {@code AggregateIndexScanRule.AGGREGATE_SCAN} (no-{@code BY})
2118-
* or {@code AggregateIndexScanRule.DEFAULT} / {@code BUCKET_NON_NULL_AGG} ({@code BY}) can push it
2119-
* to OpenSearch as {@code size:0+track_total_hits} or a {@code terms} aggregation. Without this
2120-
* rewrite the {@code RexOver} blocks every pushdown rule and the coordinator streams every
2118+
* or {@code AggregateIndexScanRule.DEFAULT} / {@code BUCKET_NON_NULL_AGG} ({@code BY}) can push
2119+
* it to OpenSearch as {@code size:0+track_total_hits} or a {@code terms} aggregation. Without
2120+
* this rewrite the {@code RexOver} blocks every pushdown rule and the coordinator streams every
21212121
* matching document just to count it.
21222122
*
21232123
* <p>The rewrite preserves the row type {@code [original cols, agg cols]} that the legacy
@@ -2194,8 +2194,7 @@ private RelNode rewriteWindowAsAggregateJoin(Window node, CalcitePlanContext con
21942194
joinCondition = context.relBuilder.and(perKeyConditions);
21952195
}
21962196

2197-
JoinRelType joinType =
2198-
(hasGroup && !bucketNullable) ? JoinRelType.LEFT : JoinRelType.INNER;
2197+
JoinRelType joinType = (hasGroup && !bucketNullable) ? JoinRelType.LEFT : JoinRelType.INNER;
21992198
context.relBuilder.join(joinType, joinCondition);
22002199

22012200
// Final projection: keep all original left columns, then append the aggregate output columns
@@ -2221,17 +2220,21 @@ private RelNode rewriteWindowAsAggregateJoin(Window node, CalcitePlanContext con
22212220

22222221
/**
22232222
* Returns true if {@code node} matches the shape PPL {@code eventstats} actually emits — all
2224-
* window functions resolve to a registered aggregation (no {@code ROW_NUMBER} / {@code LAG} /
2225-
* etc.), no {@code ORDER BY}, default frame, and all partition keys are bare field references.
2226-
* Anything outside that shape falls through to the legacy {@code RexOver} lowering, preserving
2227-
* existing behavior for any future {@link Window} producer.
2223+
* window functions resolve to a windowable aggregation (present in both {@link
2224+
* BuiltinFunctionName#ofWindowFunction} and {@link BuiltinFunctionName#ofAggregation}), no
2225+
* {@code ORDER BY}, default frame, and all partition keys are simple (non-dotted) field
2226+
* references. Anything outside that shape falls through to the legacy {@code RexOver} lowering,
2227+
* preserving existing behavior — including the {@code Unexpected window function} error for
2228+
* non-windowable aggregates like {@code percentile} (window-map miss), the legacy {@code
2229+
* ROW_NUMBER} window form (aggregation-map miss), {@code dc} / {@code distinct_count} which are
2230+
* only registered under their window names, and dotted-path BY keys (e.g. {@code by
2231+
* doc.user.city}) which would need nested-field resolution in the join condition.
22282232
*
22292233
* <p>PPL's {@code AstExpressionBuilder.visitWindowFunction} wraps the parsed function in a {@link
22302234
* WindowFunction} whose inner expression is a {@link Function} (not {@link AggregateFunction}) —
22312235
* SQL emits {@link AggregateFunction} for aggregate-as-window — so the predicate accepts either
2232-
* and classifies via {@link BuiltinFunctionName#ofAggregation(String)}, which is what {@code
2233-
* CalciteRexNodeVisitor.visitWindowFunction} also relies on to distinguish aggregate windows from
2234-
* pure window functions like {@code ROW_NUMBER}.
2236+
* inner type and classifies by function name. Requiring presence in both maps is the cleanest
2237+
* intersection: windowable AND resolvable as a regular aggregate.
22352238
*/
22362239
private static boolean canRewriteWindowAsAggregateJoin(Window node) {
22372240
if (node.getWindowFunctionList().isEmpty()) {
@@ -2243,7 +2246,9 @@ private static boolean canRewriteWindowAsAggregateJoin(Window node) {
22432246
return false;
22442247
}
22452248
String funcName = extractAggregateFunctionName(wf.getFunction());
2246-
if (funcName == null || BuiltinFunctionName.ofAggregation(funcName).isEmpty()) {
2249+
if (funcName == null
2250+
|| BuiltinFunctionName.ofWindowFunction(funcName).isEmpty()
2251+
|| BuiltinFunctionName.ofAggregation(funcName).isEmpty()) {
22472252
return false;
22482253
}
22492254
if (!wf.getSortList().isEmpty()) {
@@ -2274,16 +2279,30 @@ private static String extractAggregateFunctionName(UnresolvedExpression fn) {
22742279
return null;
22752280
}
22762281

2282+
/**
2283+
* A bare, simple (non-dotted) field reference. Dotted paths like {@code doc.user.city} are
2284+
* rejected here because the join condition references the partition key by field name via
2285+
* {@code relBuilder.field(2, side, name)}, which does not perform nested-field resolution.
2286+
*/
22772287
private static boolean isBareFieldReference(UnresolvedExpression expr) {
2278-
if (expr instanceof Field || expr instanceof QualifiedName) {
2279-
return true;
2288+
if (expr instanceof Field f) {
2289+
return isSimpleQualifiedName(f.getField());
2290+
}
2291+
if (expr instanceof QualifiedName qn) {
2292+
return isSimpleQualifiedName(qn);
22802293
}
22812294
if (expr instanceof Alias a) {
22822295
return isBareFieldReference(a.getDelegated());
22832296
}
22842297
return false;
22852298
}
22862299

2300+
private static boolean isSimpleQualifiedName(UnresolvedExpression nameExpr) {
2301+
return nameExpr instanceof QualifiedName qn
2302+
&& qn.getParts() != null
2303+
&& qn.getParts().size() == 1;
2304+
}
2305+
22872306
/**
22882307
* Strips the {@link WindowFunction} wrapper from an eventstats aggregate so {@code aggVisitor}
22892308
* resolves it as a regular aggregate. Preserves the outer {@link Alias} so the aggregate output

ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEventstatsEarliestLatestTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ public void testEventstatsEarliestWithOtherAggregatesWithoutSecondArgument() {
111111
+ " earliest_message=[$6], cnt=[$7])\n"
112112
+ " LogicalJoin(condition=[=($0, $5)], joinType=[inner])\n"
113113
+ " LogicalTableScan(table=[[POST, LOGS]])\n"
114-
+ " LogicalAggregate(group=[{0}], earliest_message=[ARG_MIN($1, $2)], cnt=[COUNT()])\n"
114+
+ " LogicalAggregate(group=[{0}], earliest_message=[ARG_MIN($1, $2)],"
115+
+ " cnt=[COUNT()])\n"
115116
+ " LogicalProject(server=[$0], message=[$2], @timestamp=[$3])\n"
116117
+ " LogicalTableScan(table=[[POST, LOGS]])\n";
117118
verifyLogical(root, expectedLogical);

0 commit comments

Comments
 (0)