Skip to content

Commit 817f7f3

Browse files
authored
Close gaps from top/rare analytics-engine wiring (opensearch-project#5433)
* Forward PPL_SYNTAX_LEGACY_PREFERRED through UnifiedQueryContext `RestUnifiedQueryAction.applyClusterOverrides` previously only forwarded `PPL_REX_MAX_MATCH_LIMIT` into the per-request `UnifiedQueryContext`. As a result, cluster-side updates to `plugins.ppl.syntax.legacy.preferred` were ignored on the analytics-engine route: `PPLQueryParser` -> `AstBuilder` -> `ArgumentFactory` read the legacy-preferred flag from the unified context's settings map, which never received the override. This caused queries like `top age` / `rare state` with `usenull` defaulting off to behave as if `usenull=true` on the analytics route. Refactor the override builder into a small helper and forward both `PPL_REX_MAX_MATCH_LIMIT` and `PPL_SYNTAX_LEGACY_PREFERRED`. Future keys can be added with a one-liner. Fixes `CalciteTopCommandIT.testTopCommandLegacyFalse` and `CalciteRareCommandIT.testRareCommandLegacyFalse` against the analytics route (`-Dtests.analytics.force_routing=true`). Signed-off-by: Kai Huang <huangkaics@gmail.com> * Stabilize rare/top tie-break by appending field columns to ROW_NUMBER order `CalciteRelNodeVisitor.visitRareTopN` lowers `rare`/`top` to a `ROW_NUMBER() OVER (PARTITION BY ... ORDER BY count [DESC])` window. With only the count column in the ORDER BY clause, ties at the same count resolved via the upstream operator's insertion order, which differed between backends (in-process Calcite vs. analytics-engine vs. Lucene pushdown). On the analytics-engine route, `testRareWithGroup` failed because ROW_NUMBER picked NV at count=8 while the test expected AR. Append the rare/top field columns as secondary ASC keys so ties resolve alphabetically and deterministically across backends. This matches the behavior of the existing OpenSearch terms-aggregation pushdown, which tie-breaks on `_key:asc`. Update `RareTopPushdownRule` to accept the new shape: 1 or 2 order keys, where the (optional) second key must be the rare/top target field in ASC direction. The pushdown's wire payload is unchanged. Update the matching unit-test expectations in `CalcitePPLRareTopNTest` (11 RelNode/result/SparkSQL strings) and 5 explain YAML fixtures. Fixes `CalciteRareCommandIT.testRareWithGroup` against the analytics route (and removes the same class of tie-break flakiness across other rare/top tests). Signed-off-by: Kai Huang <huangkaics@gmail.com> * Restrict ROW_NUMBER tie-break to comparable columns + sync no-pushdown fixtures Three follow-ups to the rare/top tie-break commit, surfaced by CI: 1. **Skip non-comparable tie-break columns.** `top address` on a nested-array field reached the in-process `EnumerableWindow` with `ORDER BY count DESC, address` and crashed at runtime — `class java.util.ArrayList cannot be cast to class java.lang.Comparable`. Filter array / multiset / map / struct (row) columns out of the tie-break list before appending. If all field columns are non-comparable, no tie-break is added (falling back to the original count-only ORDER BY). 2. **Sync the parallel `calcite_no_pushdown/` explain fixtures.** `PPLIntegTestCase.loadExpectedPlan` switches to `expectedOutput/calcite_no_pushdown/` when pushdown is disabled, which `CalciteNoPushdownIT` exercises. The previous commit only updated the `expectedOutput/calcite/` copies, so `CalciteNoPushdownIT > CalciteExplainIT` read stale (no-tie-break) YAML. Update the four `explain_{rare,top}_usenull_*` YAMLs under `calcite_no_pushdown/` to include the `, $N` tie-break key in both the logical plan and the `EnumerableWindow` collation. 3. **Collapse the pushdown-disabled branch in `RareCommandIT.testRareWithGroup`.** With the stable tie-break, both pushdown-enabled and pushdown-disabled paths now resolve the count=8 tie deterministically to `AR` (alphabetical). Drop the `isPushdownDisabled() ? rows("F", "NV", 8) : rows("F", "AR", 8)` conditional in favor of the single `AR` expectation. Signed-off-by: Kai Huang <huangkaics@gmail.com> --------- Signed-off-by: Kai Huang <huangkaics@gmail.com>
1 parent 5851bdb commit 817f7f3

14 files changed

Lines changed: 148 additions & 86 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3071,14 +3071,31 @@ public RelNode visitRareTopN(RareTopN node, CalcitePlanContext context) {
30713071
countField = context.relBuilder.field(countFieldName);
30723072
}
30733073

3074+
// Append the rare/top field columns as secondary order keys so ties in the count column
3075+
// resolve deterministically. Without this, ROW_NUMBER's tie-break is insertion-order
3076+
// dependent and varies between backends (e.g. analytics-engine vs in-process Calcite).
3077+
// Skip collection / map / struct columns — Calcite's executor can't compare those at
3078+
// runtime (`ArrayList cannot be cast to Comparable`), and pushdown to the OpenSearch
3079+
// terms aggregation only supports scalar keys. RelDataType#getComparability would be
3080+
// the natural Calcite primitive but every concrete RelDataTypeImpl subclass (including
3081+
// ArraySqlType / MapSqlType / RelRecordType) returns ALL by default in calcite 1.41, so
3082+
// it doesn't actually discriminate the cases we need to reject here.
3083+
List<RexNode> tieBreakKeys =
3084+
rexVisitor.analyze(fieldList, context).stream()
3085+
.filter(CalciteRelNodeVisitor::isComparableOrderKey)
3086+
.toList();
3087+
List<RexNode> orderKeys = new ArrayList<>(tieBreakKeys.size() + 1);
3088+
orderKeys.add(countField);
3089+
orderKeys.addAll(tieBreakKeys);
3090+
30743091
RexNode rowNumberWindowOver =
30753092
PlanUtils.makeOver(
30763093
context,
30773094
BuiltinFunctionName.ROW_NUMBER,
30783095
null,
30793096
List.of(),
30803097
partitionKeys,
3081-
List.of(countField),
3098+
orderKeys,
30823099
WindowFrame.toCurrentRow());
30833100
context.relBuilder.projectPlus(
30843101
context.relBuilder.alias(rowNumberWindowOver, ROW_NUMBER_COLUMN_FOR_RARE_TOP));
@@ -3102,6 +3119,15 @@ public RelNode visitRareTopN(RareTopN node, CalcitePlanContext context) {
31023119
return context.relBuilder.peek();
31033120
}
31043121

3122+
private static boolean isComparableOrderKey(RexNode key) {
3123+
RelDataType type = key.getType();
3124+
// SqlTypeUtil#isCollection covers ARRAY + MULTISET; RelDataType#isStruct covers ROW +
3125+
// STRUCTURED. MAP has no dedicated predicate, so check it explicitly.
3126+
return !SqlTypeUtil.isCollection(type)
3127+
&& !type.isStruct()
3128+
&& type.getSqlTypeName() != SqlTypeName.MAP;
3129+
}
3130+
31053131
@Override
31063132
public RelNode visitTableFunction(TableFunction node, CalcitePlanContext context) {
31073133
throw new CalciteUnsupportedException("Table function is unsupported in Calcite");

integ-test/src/test/java/org/opensearch/sql/ppl/RareCommandIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public void testRareWithGroup() throws IOException {
5959
rows("F", "OK", 7),
6060
rows("F", "KS", 7),
6161
rows("F", "CO", 7),
62-
isPushdownDisabled() ? rows("F", "NV", 8) : rows("F", "AR", 8),
62+
rows("F", "AR", 8),
6363
rows("M", "NE", 5),
6464
rows("M", "RI", 5),
6565
rows("M", "NV", 5),

integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_top_push.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ calcite:
33
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
44
LogicalProject(address.city=[$0], count=[$1])
55
LogicalFilter(condition=[<=($2, 10)])
6-
LogicalProject(address.city=[$0], count=[$1], _row_number_rare_top_=[ROW_NUMBER() OVER (ORDER BY $1 DESC)])
6+
LogicalProject(address.city=[$0], count=[$1], _row_number_rare_top_=[ROW_NUMBER() OVER (ORDER BY $1 DESC, $0)])
77
LogicalAggregate(group=[{0}], count=[COUNT()])
88
LogicalProject(address.city=[$3])
99
LogicalFilter(condition=[IS NOT NULL($3)])

integ-test/src/test/resources/expectedOutput/calcite/explain_rare_usenull_false.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ calcite:
33
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
44
LogicalProject(gender=[$0], state=[$1], count=[$2])
55
LogicalFilter(condition=[<=($3, 2)])
6-
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2)])
6+
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2, $1)])
77
LogicalAggregate(group=[{0, 1}], count=[COUNT()])
88
LogicalProject(gender=[$4], state=[$7])
99
LogicalFilter(condition=[AND(IS NOT NULL($4), IS NOT NULL($7))])

integ-test/src/test/resources/expectedOutput/calcite/explain_rare_usenull_true.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ calcite:
33
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
44
LogicalProject(gender=[$0], state=[$1], count=[$2])
55
LogicalFilter(condition=[<=($3, 2)])
6-
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2)])
6+
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2, $1)])
77
LogicalAggregate(group=[{0, 1}], count=[COUNT()])
88
LogicalProject(gender=[$4], state=[$7])
99
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
1010
physical: |
1111
EnumerableLimit(fetch=[10000])
1212
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[2], expr#5=[<=($t3, $t4)], proj#0..2=[{exprs}], $condition=[$t5])
13-
EnumerableWindow(window#0=[window(partition {0} order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
13+
EnumerableWindow(window#0=[window(partition {0} order by [2, 1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
1414
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},count=COUNT())], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"state":{"terms":{"field":"state.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/explain_top_usenull_false.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ calcite:
33
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
44
LogicalProject(gender=[$0], state=[$1], count=[$2])
55
LogicalFilter(condition=[<=($3, 2)])
6-
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC)])
6+
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC, $1)])
77
LogicalAggregate(group=[{0, 1}], count=[COUNT()])
88
LogicalProject(gender=[$4], state=[$7])
99
LogicalFilter(condition=[AND(IS NOT NULL($4), IS NOT NULL($7))])

integ-test/src/test/resources/expectedOutput/calcite/explain_top_usenull_true.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ calcite:
33
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
44
LogicalProject(gender=[$0], state=[$1], count=[$2])
55
LogicalFilter(condition=[<=($3, 2)])
6-
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC)])
6+
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC, $1)])
77
LogicalAggregate(group=[{0, 1}], count=[COUNT()])
88
LogicalProject(gender=[$4], state=[$7])
99
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
1010
physical: |
1111
EnumerableLimit(fetch=[10000])
1212
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[2], expr#5=[<=($t3, $t4)], proj#0..2=[{exprs}], $condition=[$t5])
13-
EnumerableWindow(window#0=[window(partition {0} order by [2 DESC] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
13+
EnumerableWindow(window#0=[window(partition {0} order by [2 DESC, 1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
1414
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},count=COUNT())], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"state":{"terms":{"field":"state.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_rare_usenull_false.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@ calcite:
33
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
44
LogicalProject(gender=[$0], state=[$1], count=[$2])
55
LogicalFilter(condition=[<=($3, 2)])
6-
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2)])
6+
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2, $1)])
77
LogicalAggregate(group=[{0, 1}], count=[COUNT()])
88
LogicalProject(gender=[$4], state=[$7])
99
LogicalFilter(condition=[AND(IS NOT NULL($4), IS NOT NULL($7))])
1010
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
1111
physical: |
1212
EnumerableLimit(fetch=[10000])
1313
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[2], expr#5=[<=($t3, $t4)], proj#0..2=[{exprs}], $condition=[$t5])
14-
EnumerableWindow(window#0=[window(partition {0} order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
14+
EnumerableWindow(window#0=[window(partition {0} order by [2, 1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
1515
EnumerableAggregate(group=[{4, 7}], count=[COUNT()])
1616
EnumerableCalc(expr#0..16=[{inputs}], expr#17=[IS NOT NULL($t4)], expr#18=[IS NOT NULL($t7)], expr#19=[AND($t17, $t18)], proj#0..16=[{exprs}], $condition=[$t19])
1717
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])

integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_rare_usenull_true.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ calcite:
33
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
44
LogicalProject(gender=[$0], state=[$1], count=[$2])
55
LogicalFilter(condition=[<=($3, 2)])
6-
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2)])
6+
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2, $1)])
77
LogicalAggregate(group=[{0, 1}], count=[COUNT()])
88
LogicalProject(gender=[$4], state=[$7])
99
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
1010
physical: |
1111
EnumerableLimit(fetch=[10000])
1212
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[2], expr#5=[<=($t3, $t4)], proj#0..2=[{exprs}], $condition=[$t5])
13-
EnumerableWindow(window#0=[window(partition {0} order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
13+
EnumerableWindow(window#0=[window(partition {0} order by [2, 1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
1414
EnumerableAggregate(group=[{4, 7}], count=[COUNT()])
1515
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])

integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_top_usenull_false.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@ calcite:
33
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
44
LogicalProject(gender=[$0], state=[$1], count=[$2])
55
LogicalFilter(condition=[<=($3, 2)])
6-
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC)])
6+
LogicalProject(gender=[$0], state=[$1], count=[$2], _row_number_rare_top_=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC, $1)])
77
LogicalAggregate(group=[{0, 1}], count=[COUNT()])
88
LogicalProject(gender=[$4], state=[$7])
99
LogicalFilter(condition=[AND(IS NOT NULL($4), IS NOT NULL($7))])
1010
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
1111
physical: |
1212
EnumerableLimit(fetch=[10000])
1313
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[2], expr#5=[<=($t3, $t4)], proj#0..2=[{exprs}], $condition=[$t5])
14-
EnumerableWindow(window#0=[window(partition {0} order by [2 DESC] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
14+
EnumerableWindow(window#0=[window(partition {0} order by [2 DESC, 1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])
1515
EnumerableAggregate(group=[{4, 7}], count=[COUNT()])
1616
EnumerableCalc(expr#0..16=[{inputs}], expr#17=[IS NOT NULL($t4)], expr#18=[IS NOT NULL($t7)], expr#19=[AND($t17, $t18)], proj#0..16=[{exprs}], $condition=[$t19])
1717
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])

0 commit comments

Comments
 (0)