Skip to content

Commit bb8d9b2

Browse files
committed
Extend rewrite to dc / distinct_count via canonical-name translation
PPL eventstats accepts three aliases for the cardinality aggregation — dc, distinct_count, distinct_count_approx — all resolving to BuiltinFunctionName.DISTINCT_COUNT_APPROX. The stats command only accepts distinct_count_approx, so AGGREGATION_FUNC_MAPPING registers only that name; the other two are window-only aliases in WINDOW_FUNC_MAPPING. The previous predicate required intersection of both maps, which rejected dc and distinct_count. They fell through to the legacy RexOver lowering — which is the exact buggy "EnumerableWindow over a row-fetching scan" shape opensearch-project#5483 was filed against. Fix was incomplete. Replace the intersection check with: name is in ofWindowFunction AND its canonical aggregation name (BuiltinFunctionName.name().toLowerCase, e.g. "distinct_count_approx") is in ofAggregation. Translate the same way in stripWindowFunctionForAggregate so aggVisitor sees the registered name. For names already in both maps (count/sum/avg/etc.) the canonical name equals the user-typed name, so the lookup is a no-op — no behavior change for the cases that already worked. ROW_NUMBER still falls through because its canonical name "row_number" isn't in the aggregation map. Same for percentile / take / first / last / median / list / values — all rejected by the canonical-name lookup. Verified locally: - eventstats dc(state) → cardinality agg, size:0 - eventstats distinct_count(state) by gender → composite over gender + nested cardinality on state, size:0 Regenerated explain_eventstats_dc.json and explain_eventstats_distinct_count.json with the new shape (composite size 1000 to match CI). Both tests are pushdown-only (enabledOnlyWhenPushdownIsEnabled() + loadFromFile hardcoded to calcite/), so no calcite_no_pushdown/ variants needed. Signed-off-by: Jialiang Liang <jiallian@amazon.com>
1 parent 515f35e commit bb8d9b2

3 files changed

Lines changed: 40 additions & 9 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.HashSet;
4040
import java.util.LinkedHashSet;
4141
import java.util.List;
42+
import java.util.Locale;
4243
import java.util.Map;
4344
import java.util.Objects;
4445
import java.util.Optional;
@@ -2246,9 +2247,19 @@ private static boolean canRewriteWindowAsAggregateJoin(Window node) {
22462247
return false;
22472248
}
22482249
String funcName = extractAggregateFunctionName(wf.getFunction());
2249-
if (funcName == null
2250-
|| BuiltinFunctionName.ofWindowFunction(funcName).isEmpty()
2251-
|| BuiltinFunctionName.ofAggregation(funcName).isEmpty()) {
2250+
if (funcName == null) {
2251+
return false;
2252+
}
2253+
// The user-typed name must resolve to a window function, and the windowable
2254+
// {@link BuiltinFunctionName}'s canonical name (e.g. {@code distinct_count_approx} for the
2255+
// {@code dc} / {@code distinct_count} aliases) must also be registered as a regular
2256+
// aggregation so {@code aggVisitor} can resolve it for the right-side LogicalAggregate.
2257+
Optional<BuiltinFunctionName> windowName = BuiltinFunctionName.ofWindowFunction(funcName);
2258+
if (windowName.isEmpty()) {
2259+
return false;
2260+
}
2261+
String canonical = canonicalAggregationName(windowName.get());
2262+
if (BuiltinFunctionName.ofAggregation(canonical).isEmpty()) {
22522263
return false;
22532264
}
22542265
if (!wf.getSortList().isEmpty()) {
@@ -2279,6 +2290,19 @@ private static String extractAggregateFunctionName(UnresolvedExpression fn) {
22792290
return null;
22802291
}
22812292

2293+
/**
2294+
* Maps a windowable {@link BuiltinFunctionName} to the function name registered in {@code
2295+
* BuiltinFunctionName.AGGREGATION_FUNC_MAPPING}. Most names match directly (e.g. {@code COUNT} →
2296+
* {@code "count"}), but the {@code dc} and {@code distinct_count} aliases are window-only — they
2297+
* map to {@link BuiltinFunctionName#DISTINCT_COUNT_APPROX}, whose registered aggregation name is
2298+
* {@code "distinct_count_approx"}. Translating to the canonical name lets the right-side
2299+
* aggregate go through {@code aggVisitor}'s standard {@link BuiltinFunctionName#ofAggregation}
2300+
* lookup with no special case for the alias.
2301+
*/
2302+
private static String canonicalAggregationName(BuiltinFunctionName windowName) {
2303+
return windowName.name().toLowerCase(Locale.ROOT);
2304+
}
2305+
22822306
/**
22832307
* A bare, simple (non-dotted) field reference. Dotted paths like {@code doc.user.city} are
22842308
* rejected here because the join condition references the partition key by field name via {@code
@@ -2321,12 +2345,19 @@ private UnresolvedExpression stripWindowFunctionForAggregate(UnresolvedExpressio
23212345
return fn;
23222346
}
23232347
if (fn instanceof Function f) {
2348+
// Translate window-only aliases (e.g. `dc`, `distinct_count`) to the canonical
2349+
// aggregation name (`distinct_count_approx`) so aggVisitor's ofAggregation lookup
2350+
// resolves them. For names already in the aggregation map (count, sum, avg, max, min,
2351+
// stddev*, var*, earliest, latest, distinct_count_approx, pattern), this is a no-op.
2352+
String funcName =
2353+
BuiltinFunctionName.ofWindowFunction(f.getFuncName())
2354+
.map(CalciteRelNodeVisitor::canonicalAggregationName)
2355+
.orElse(f.getFuncName());
23242356
List<UnresolvedExpression> args = f.getFuncArgs();
23252357
UnresolvedExpression field = args.isEmpty() ? null : args.get(0);
23262358
List<UnresolvedExpression> argList =
23272359
args.size() <= 1 ? List.of() : args.subList(1, args.size());
2328-
AggregateFunction agg = new AggregateFunction(f.getFuncName(), field, argList);
2329-
return agg;
2360+
return new AggregateFunction(funcName, field, argList);
23302361
}
23312362
return fn;
23322363
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"calcite": {
3-
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[DISTINCT_COUNT_APPROX($7) OVER ()])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4-
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableWindow(window#0=[window(aggs [DISTINCT_COUNT_APPROX($7)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[$17])\n LogicalJoin(condition=[true], joinType=[inner])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n LogicalAggregate(group=[{}], distinct_states=[DISTINCT_COUNT_APPROX($0)])\n LogicalProject(state=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableNestedLoopJoin(condition=[true], joinType=[inner])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#48:LogicalAggregate.NONE.[](input=RelSubset#47,group={},distinct_states=DISTINCT_COUNT_APPROX($0))], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"distinct_states\":{\"cardinality\":{\"field\":\"state.keyword\"}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
55
}
66
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"calcite": {
3-
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[DISTINCT_COUNT_APPROX($7) OVER (PARTITION BY $4)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4-
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableWindow(window#0=[window(partition {4} aggs [DISTINCT_COUNT_APPROX($7)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[$18])\n LogicalJoin(condition=[IS NOT DISTINCT FROM($4, $17)], joinType=[inner])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n LogicalAggregate(group=[{0}], distinct_states=[DISTINCT_COUNT_APPROX($1)])\n LogicalProject(gender=[$4], state=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableCalc(expr#0..12=[{inputs}], proj#0..10=[{exprs}], distinct_states=[$t12])\n EnumerableLimit(fetch=[10000])\n EnumerableNestedLoopJoin(condition=[IS NOT DISTINCT FROM($4, $11)], joinType=[inner])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#163:LogicalAggregate.NONE.[](input=RelSubset#162,group={0},distinct_states=DISTINCT_COUNT_APPROX($1))], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"distinct_states\":{\"cardinality\":{\"field\":\"state.keyword\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
55
}
66
}

0 commit comments

Comments
 (0)