Skip to content

Commit 419b45a

Browse files
authored
[BugFix] Fix sort order not preserved through dedup in Calcite engine (#3922) (#5353)
* [BugFix] Fix sort order not preserved through dedup in Calcite engine (#3922) Signed-off-by: Songkan Tang <songkant@amazon.com> * Update integration test expected outputs for sort-then-dedup plan changes Update CalcitePPLDedupIT.testSortThenDedupKeepEmpty expected rows from 7 to 9 to match the corrected dedup behavior. Update all CalciteExplainIT expected YAML/JSON output files to reflect the new logical/physical plan structure where Sort is restored above dedup and ROW_NUMBER includes ORDER BY. Skip testExplain in CalciteNoPushdownIT since pushdown-disabled produces different physical plans. Signed-off-by: Songkan Tang <songkant@amazon.com> * refactor(dedup): propagate sort collation through LogicalDedup and push to top_hits Add inputCollation field to Dedup/LogicalDedup so sort order from upstream Sort nodes is preserved across rule transformations. The collation flows through the full pipeline: - visitDedupe strips the Sort and captures the remapped collation - PPLSimplifyDedupRule extracts ORDER BY from ROW_NUMBER window - PPLDedupConvertRule uses inputCollation for ORDER BY and restore Sort - DedupPushdownRule passes sort info via hint to AggregateAnalyzer - AggregateAnalyzer sets top_hits sort for correct intra-bucket ordering Also fixes an edge case where sort field is projected away before dedup (e.g. sort DEPTNO | fields ENAME, JOB | dedup 1 JOB) by remapping collation field indices through intermediate Projects in stripInputSort. Signed-off-by: Songkan Tang <songkant@amazon.com> * fix(dedup): correct sort field mapping and null ordering for dedup pushdown Fixes CI failures on PR #5353 by addressing three issues introduced by the sort-preserved-through-dedup work: 1. DedupPushdownRule passed the *reordered* targetChildProject's field names to addDedupSortHintToAggregate, but the collation's field indices are relative to the dedup's input row type (the original project). This caused the wrong field to be pushed into top_hits' inner sort (e.g. sort on `age` instead of `new_gender`). Pass `project.getRowType().getFieldNames()` instead. 2. The pushed-down top_hits sort inherited OpenSearch's default null ordering (ASC -> NULLS LAST), which disagreed with PPL's ASC-NULLS-FIRST default. This made dedup select a different row between pushdown-on and pushdown-off paths. Apply `.missing("_first")` / `"_last"` on the top_hits sort in the LITERAL_AGG (dedup) branch only, so other top_hits callers (FIRST/LAST/MIN/MAX/ARG_*) keep their existing sort semantics. 3. testSortThenDedup's expected rows assumed NULLS LAST (expected `[Z, B]` for name=B) but PPL default is NULLS FIRST, under which B's first occurrence is the null-category row (#14). Correct the expected rows to match Calcite semantics. Regenerate the expected explain YAML/JSON outputs for the changed plans: - EnumerableWindow now carries `order by [1 ASC-nulls-first]` for PR's window-ORDER-BY change. - top_hits inner sort includes the newly pushed `"sort":[{field:{order,missing}}]` clause for dedup-pushdown tests. - Extended-mode Janino comparator code now calls `compareNullsFirst` because the window has an ORDER BY. Signed-off-by: Songkan Tang <songkant@amazon.com> * feat(dedup): push all sort fields to top_hits and capture collation field names Extend the dedup sort hint to carry every field collation instead of only the first one, so a multi-field PPL `sort` preserves its full ordering through the pushed-down `top_hits`: - `PPLHintUtils.addDedupSortHintToAggregate` now encodes the full collation as `field:ORDER|field:ORDER|...` in a single hint option; the getter returns a `List<DedupSortKey>`. `AggregateAnalyzer` emits one sort entry per key, preserving ASC/DESC and the Calcite-aligned NULL ordering. - Collation field names are now captured on `LogicalDedup` itself at creation time (against the row type that produced the collation), rather than being resolved later in `DedupPushdownRule` via the dedup's current input row type. This is necessary because planner rules can narrow the dedup's input between `PPLSimplifyDedupRule` and `DedupPushdownRule`, making the index-based `RelCollation` unsafe to resolve against the (then-narrower) input. - New integration test `testMultiColumnSortThenDedup` verifies that `sort state, age, account_number | dedup 1 gender` returns the exact rows that the full three-field ordering dictates — impossible to achieve if only the first sort field were pushed down. - Update the four existing dedup-expr / dedup-with-expr / complex1 expected plans to reflect the second-field sort entry now present in `top_hits`. Signed-off-by: Songkan Tang <songkant@amazon.com> * refactor(dedup): address PR review — simplify collation handling Two PR review comments: 1. core/.../CalciteRelNodeVisitor.java:735 — the private `backtrackForCollation` method was a one-line wrapper around `PlanUtils.findInputCollation`. Remove the wrapper and call `PlanUtils.findInputCollation` directly; make the util public since it is now used from another package. 2. core/.../PlanUtils.java:660 — `remapCollationThroughProjects` hand-rolled a project-to-top index remapping that was fragile (it only understood Logical Projects with simple `RexInputRef`s). Replace it with Calcite's metadata framework: `RelMetadataQuery.collations(input)` returns the subtree's output collation with project remapping handled by `RelMdCollation`, which knows about more operators than our manual walk did. No behavior change expected — every existing CalciteExplainIT / CalcitePPLDedupIT / CalcitePPLSortIT / CalciteSortCommandIT / CalciteReverseCommandIT integration test still passes, as do the core/opensearch/ppl unit tests. Signed-off-by: Songkan Tang <songkant@amazon.com> * refactor(dedup): permute collation to scan schema instead of name workaround Replace the name-based `inputCollationFieldNames` captured on `LogicalDedup` with Calcite's standard index-based collation propagation. The name approach would break under any downstream rename (`RelCollation` itself stores only indices, never names — a rename changes the name but keeps the index). `DedupPushdownRule` now permutes `dedup.inputCollation` into scan-schema indices via `RexInputRef`s of the immediate child `LogicalProject` — mirroring what `Project.getMapping` + `RelCollations.permute` do for Calcite's own trait-propagation paths (cf. `EnumerableMergeJoin.passThroughTraits`). Two cases are handled: - Collation still addresses the child project's output (no rule inserted a narrower project in between): permute each key through the project's projection list. A non-`RexInputRef` projection (computed column, e.g. `lower(gender)`) cannot be expressed as an OS field sort, so the hint is dropped — Calcite's outer sort still restores order. - Collation indices are out of range for the child project's output but within the scan's schema (a narrower project got pushed below dedup after creation): indices are already scan-schema indices; use as-is. Drop `Dedup.inputCollationFieldNames` and the 7-arg `copy` signature — the name capture was a workaround that couldn't survive renames and is no longer needed. Update the two `explain_dedup_expr_complex1*` YAMLs: in that test the sort keys are computed columns (`lower(gender)` / `lower(state)`), which we now correctly refuse to push as `top_hits` sort entries. The outer Calcite sort still runs and the test's final row order is unaffected. Signed-off-by: Songkan Tang <songkant@amazon.com> * fix(dedup): resolve stale inputCollation after planner narrows dedup's input CI revealed that after `PPLSimplifyDedupRule` captures `inputCollation` against the dedup's original input row type, a later Calcite rule (typically scan absorbing a narrowing project) can swap in a different input row type without going through `Dedup.copy()` — so the collation's indices become stale. Symptom: `IndexOutOfBoundsException` in `PPLDedupConvertRule.collationToOrderKeys` (`field ordinal [7] out of range; input fields are: [...]`) in NoPushdown mode, and silently-wrong top_hits sort in pushdown mode. Fix: - Reintroduce `Dedup.inputCollationFieldNames` captured at `LogicalDedup.create` time, strictly as a name-based *fallback anchor* for the "replacement is not a Project" case (scans don't rename, so names are stable there; rename-through- Project scenarios are already handled by Calcite's own index propagation). - `PPLDedupConvertRule.onMatch` now resolves the collation to the current input's schema: if indices are still valid, use as-is; otherwise look each sort key up by original name in the current row type. - `DedupPushdownRule` uses the same two-case resolution (Project permute → name-based fallback) to produce scan-schema indices for the top_hits sort hint, dropping the hint cleanly if any key still can't be resolved. Verified on the integ-test worktree: full `CalciteExplainIT`, `CalcitePPLDedupIT`, and the entire `CalciteNoPushdownIT` suite all pass, as do core/opensearch/ppl unit tests. Signed-off-by: Songkan Tang <songkant@amazon.com> * test(dedup): extend #3922 fixture to actually reproduce the bug The original 5-row fixture happened to pass on pre-fix because the single-partition EnumerableWindow path preserved input order by accident. Adding 5 more rows across a wider category range forces the collation to get shuffled pre-fix (verified locally: output reorders to [X,X,Z,A,B,C,D]), so the expected-datarows match now fails without the fix. Signed-off-by: Songkan Tang <songkant@amazon.com> * refactor(dedup): address PR review comments - PPLDedupConvertRule / DedupPushdownRule: replace fully-qualified org.apache.calcite.rel.* references with imports. - PPLHintUtils: import java.util.Objects; throw IllegalStateException on out-of-range collation index instead of silently skipping — the index is resolved against scan schema upstream, so mismatch is a bug signal. - CalciteExplainIT: drop the testExplain override that forced pushdown-only. Update the corresponding no-pushdown expected plan (explain_output.yaml under calcite_no_pushdown/) to reflect the post-dedup Sort + ROW_NUMBER ORDER BY introduced by this PR. Signed-off-by: Songkan Tang <songkant@amazon.com> --------- Signed-off-by: Songkan Tang <songkant@amazon.com>
1 parent fc2dd11 commit 419b45a

28 files changed

Lines changed: 948 additions & 252 deletions

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 7 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_SUBSEARCH;
2323
import static org.opensearch.sql.calcite.utils.PlanUtils.getRelation;
2424
import static org.opensearch.sql.calcite.utils.PlanUtils.getRexCall;
25+
import static org.opensearch.sql.calcite.utils.PlanUtils.stripInputSort;
2526
import static org.opensearch.sql.calcite.utils.PlanUtils.transformPlanToAttachChild;
2627
import static org.opensearch.sql.utils.SystemIndexUtils.DATASOURCES_TABLE_NAME;
2728

@@ -48,16 +49,12 @@
4849
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
4950
import org.apache.calcite.plan.RelOptTable;
5051
import org.apache.calcite.plan.ViewExpanders;
51-
import org.apache.calcite.rel.BiRel;
5252
import org.apache.calcite.rel.RelCollation;
5353
import org.apache.calcite.rel.RelHomogeneousShuttle;
5454
import org.apache.calcite.rel.RelNode;
5555
import org.apache.calcite.rel.core.Aggregate;
5656
import org.apache.calcite.rel.core.JoinRelType;
57-
import org.apache.calcite.rel.core.SetOp;
5857
import org.apache.calcite.rel.core.Sort;
59-
import org.apache.calcite.rel.core.Uncollect;
60-
import org.apache.calcite.rel.logical.LogicalProject;
6158
import org.apache.calcite.rel.logical.LogicalSort;
6259
import org.apache.calcite.rel.logical.LogicalValues;
6360
import org.apache.calcite.rel.type.RelDataType;
@@ -766,57 +763,6 @@ public RelNode visitHead(Head node, CalcitePlanContext context) {
766763
return context.relBuilder.peek();
767764
}
768765

769-
/**
770-
* Backtrack through the RelNode tree to find the first Sort node with non-empty collation. Stops
771-
* at blocking operators that break ordering:
772-
*
773-
* <ul>
774-
* <li>Aggregate - aggregation destroys input ordering
775-
* <li>BiRel - covers Join, Correlate, and other binary relations
776-
* <li>SetOp - covers Union, Intersect, Except
777-
* <li>Uncollect - unnesting operation that may change ordering
778-
* <li>Project with window functions (RexOver) - ordering determined by window's ORDER BY
779-
* </ul>
780-
*
781-
* @param node the starting RelNode to backtrack from
782-
* @return the collation found, or null if no sort or blocking operator encountered
783-
*/
784-
private RelCollation backtrackForCollation(RelNode node) {
785-
while (node != null) {
786-
// Check for blocking operators that destroy collation
787-
// BiRel covers Join, Correlate, and other binary relations
788-
// SetOp covers Union, Intersect, Except
789-
// Uncollect unnests arrays/multisets which may change ordering
790-
if (node instanceof Aggregate
791-
|| node instanceof BiRel
792-
|| node instanceof SetOp
793-
|| node instanceof Uncollect) {
794-
return null;
795-
}
796-
797-
// Project with window functions has ordering determined by the window's ORDER BY clause
798-
// We should not destroy its output order by inserting a reversed sort
799-
if (node instanceof LogicalProject && ((LogicalProject) node).containsOver()) {
800-
return null;
801-
}
802-
803-
// Check for Sort node with collation
804-
if (node instanceof Sort) {
805-
Sort sort = (Sort) node;
806-
if (sort.getCollation() != null && !sort.getCollation().getFieldCollations().isEmpty()) {
807-
return sort.getCollation();
808-
}
809-
}
810-
811-
// Continue to child node
812-
if (node.getInputs().isEmpty()) {
813-
break;
814-
}
815-
node = node.getInput(0);
816-
}
817-
return null;
818-
}
819-
820766
/**
821767
* Insert a reversed sort node after finding the original sort in the tree. This rebuilds the tree
822768
* with the reversed sort inserted right after the original sort.
@@ -899,7 +845,7 @@ public RelNode visitReverse(
899845
} else {
900846
// Collation not found on current node - try backtracking
901847
RelNode currentNode = context.relBuilder.peek();
902-
RelCollation backtrackCollation = backtrackForCollation(currentNode);
848+
RelCollation backtrackCollation = PlanUtils.findInputCollation(currentNode);
903849

904850
if (backtrackCollation != null && !backtrackCollation.getFieldCollations().isEmpty()) {
905851
// Found collation through backtracking - rebuild tree with reversed sort
@@ -1765,7 +1711,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
17651711
: duplicatedFieldNames.stream()
17661712
.map(a -> (RexNode) context.relBuilder.field(a))
17671713
.toList();
1768-
buildDedupNotNull(context.relBuilder, dedupeFields, allowedDuplication);
1714+
buildDedupNotNull(context.relBuilder, dedupeFields, allowedDuplication, null);
17691715
}
17701716
// add LogicalSystemLimit after dedup
17711717
addSysLimitForJoinSubsearch(context);
@@ -1823,7 +1769,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
18231769
List<RexNode> dedupeFields =
18241770
getRightColumnsInJoinCriteria(context.relBuilder, joinCondition);
18251771

1826-
buildDedupNotNull(context.relBuilder, dedupeFields, allowedDuplication);
1772+
buildDedupNotNull(context.relBuilder, dedupeFields, allowedDuplication, null);
18271773
}
18281774
// add LogicalSystemLimit after dedup
18291775
addSysLimitForJoinSubsearch(context);
@@ -1999,10 +1945,11 @@ public RelNode visitDedupe(Dedupe node, CalcitePlanContext context) {
19991945
// Columns to deduplicate
20001946
List<RexNode> dedupeFields =
20011947
node.getFields().stream().map(f -> rexVisitor.analyze(f, context)).toList();
1948+
RelCollation inputCollation = stripInputSort(context.relBuilder);
20021949
if (keepEmpty) {
2003-
buildDedupOrNull(context.relBuilder, dedupeFields, allowedDuplication);
1950+
buildDedupOrNull(context.relBuilder, dedupeFields, allowedDuplication, inputCollation);
20041951
} else {
2005-
buildDedupNotNull(context.relBuilder, dedupeFields, allowedDuplication);
1952+
buildDedupNotNull(context.relBuilder, dedupeFields, allowedDuplication, inputCollation);
20061953
}
20071954
return context.relBuilder.peek();
20081955
}

core/src/main/java/org/opensearch/sql/calcite/plan/rel/Dedup.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
package org.opensearch.sql.calcite.plan.rel;
77

88
import java.util.List;
9+
import javax.annotation.Nullable;
910
import lombok.Getter;
1011
import org.apache.calcite.plan.RelOptCluster;
1112
import org.apache.calcite.plan.RelOptPlanner;
1213
import org.apache.calcite.plan.RelTraitSet;
14+
import org.apache.calcite.rel.RelCollation;
1315
import org.apache.calcite.rel.RelNode;
1416
import org.apache.calcite.rel.RelWriter;
1517
import org.apache.calcite.rel.SingleRel;
@@ -23,16 +25,33 @@ public abstract class Dedup extends SingleRel {
2325
final Integer allowedDuplication;
2426
final Boolean keepEmpty;
2527
final Boolean consecutive;
28+
final @Nullable RelCollation inputCollation;
29+
30+
/**
31+
* Field names of the row type that {@link #inputCollation} was captured against. Used as a
32+
* name-based anchor so callers can resolve the collation's stale indices after a planner rule has
33+
* narrowed or replaced the dedup's input (typically a scan absorbing a narrowing project).
34+
*
35+
* <p>Renames are handled by Calcite's own {@code Project.getMapping} propagation when a {@code
36+
* Project} sits between dedup's old and new input — see {@code Dedup.copy}. This name list is
37+
* only the fallback for cases where the replacement is not a {@code Project} (e.g. a scan that
38+
* swaps in a narrower row type without a {@code Project} RelNode). Scans don't rename, so name
39+
* equality is a stable identifier for that specific fallback.
40+
*
41+
* <p>{@code null} iff {@link #inputCollation} is {@code null}.
42+
*/
43+
final @Nullable List<String> inputCollationFieldNames;
2644

27-
/** */
2845
protected Dedup(
2946
RelOptCluster cluster,
3047
RelTraitSet traitSet,
3148
RelNode input,
3249
List<RexNode> dedupeFields,
3350
Integer allowedDuplication,
3451
Boolean keepEmpty,
35-
Boolean consecutive) {
52+
Boolean consecutive,
53+
@Nullable RelCollation inputCollation,
54+
@Nullable List<String> inputCollationFieldNames) {
3655
super(cluster, traitSet, input);
3756
if (allowedDuplication <= 0) {
3857
throw new IllegalArgumentException("Number of duplicate events must be greater than 0");
@@ -44,6 +63,8 @@ protected Dedup(
4463
this.allowedDuplication = allowedDuplication;
4564
this.keepEmpty = keepEmpty;
4665
this.consecutive = consecutive;
66+
this.inputCollation = inputCollation;
67+
this.inputCollationFieldNames = inputCollationFieldNames;
4768
}
4869

4970
@Override
@@ -54,7 +75,9 @@ public final RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
5475
this.dedupeFields,
5576
this.allowedDuplication,
5677
this.keepEmpty,
57-
this.consecutive);
78+
this.consecutive,
79+
this.inputCollation,
80+
this.inputCollationFieldNames);
5881
}
5982

6083
public abstract Dedup copy(
@@ -63,7 +86,9 @@ public abstract Dedup copy(
6386
List<RexNode> dedupeFields,
6487
Integer allowedDuplication,
6588
Boolean keepEmpty,
66-
Boolean consecutive);
89+
Boolean consecutive,
90+
@Nullable RelCollation inputCollation,
91+
@Nullable List<String> inputCollationFieldNames);
6792

6893
public Dedup copy(RelNode input, List<RexNode> dedupeFields) {
6994
return this.copy(
@@ -72,7 +97,9 @@ public Dedup copy(RelNode input, List<RexNode> dedupeFields) {
7297
dedupeFields,
7398
this.allowedDuplication,
7499
this.keepEmpty,
75-
this.consecutive);
100+
this.consecutive,
101+
this.inputCollation,
102+
this.inputCollationFieldNames);
76103
}
77104

78105
@Override
@@ -81,7 +108,8 @@ public RelWriter explainTerms(RelWriter pw) {
81108
.item("dedup_fields", dedupeFields)
82109
.item("allowed_dedup", allowedDuplication)
83110
.item("keepEmpty", keepEmpty)
84-
.item("consecutive", consecutive);
111+
.item("consecutive", consecutive)
112+
.itemIf("inputCollation", inputCollation, inputCollation != null);
85113
}
86114

87115
@Override

core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalDedup.java

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
import static org.opensearch.sql.calcite.plan.rule.PPLDedupConvertRule.DEDUP_CONVERT_RULE;
99

1010
import java.util.List;
11+
import javax.annotation.Nullable;
1112
import org.apache.calcite.plan.Convention;
1213
import org.apache.calcite.plan.RelOptCluster;
1314
import org.apache.calcite.plan.RelOptPlanner;
1415
import org.apache.calcite.plan.RelTraitSet;
16+
import org.apache.calcite.rel.RelCollation;
1517
import org.apache.calcite.rel.RelNode;
1618
import org.apache.calcite.rex.RexNode;
1719

@@ -24,8 +26,19 @@ protected LogicalDedup(
2426
List<RexNode> dedupeFields,
2527
Integer allowedDuplication,
2628
Boolean keepEmpty,
27-
Boolean consecutive) {
28-
super(cluster, traitSet, input, dedupeFields, allowedDuplication, keepEmpty, consecutive);
29+
Boolean consecutive,
30+
@Nullable RelCollation inputCollation,
31+
@Nullable List<String> inputCollationFieldNames) {
32+
super(
33+
cluster,
34+
traitSet,
35+
input,
36+
dedupeFields,
37+
allowedDuplication,
38+
keepEmpty,
39+
consecutive,
40+
inputCollation,
41+
inputCollationFieldNames);
2942
}
3043

3144
@Override
@@ -35,10 +48,20 @@ public Dedup copy(
3548
List<RexNode> dedupeFields,
3649
Integer allowedDuplication,
3750
Boolean keepEmpty,
38-
Boolean consecutive) {
51+
Boolean consecutive,
52+
@Nullable RelCollation inputCollation,
53+
@Nullable List<String> inputCollationFieldNames) {
3954
assert traitSet.containsIfApplicable(Convention.NONE);
4055
return new LogicalDedup(
41-
getCluster(), traitSet, input, dedupeFields, allowedDuplication, keepEmpty, consecutive);
56+
getCluster(),
57+
traitSet,
58+
input,
59+
dedupeFields,
60+
allowedDuplication,
61+
keepEmpty,
62+
consecutive,
63+
inputCollation,
64+
inputCollationFieldNames);
4265
}
4366

4467
public static LogicalDedup create(
@@ -47,10 +70,33 @@ public static LogicalDedup create(
4770
Integer allowedDuplication,
4871
Boolean keepEmpty,
4972
Boolean consecutive) {
73+
return create(input, dedupeFields, allowedDuplication, keepEmpty, consecutive, null);
74+
}
75+
76+
public static LogicalDedup create(
77+
RelNode input,
78+
List<RexNode> dedupeFields,
79+
Integer allowedDuplication,
80+
Boolean keepEmpty,
81+
Boolean consecutive,
82+
@Nullable RelCollation inputCollation) {
83+
// Record the field names from the current input's row type so callers that encounter a stale
84+
// collation (after a planner rule has swapped in a different, non-Project-derived input) can
85+
// still resolve the sort keys to positions in the new input by name. See
86+
// Dedup.inputCollationFieldNames.
87+
List<String> fieldNames = inputCollation == null ? null : input.getRowType().getFieldNames();
5088
final RelOptCluster cluster = input.getCluster();
5189
RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE);
5290
return new LogicalDedup(
53-
cluster, traitSet, input, dedupeFields, allowedDuplication, keepEmpty, consecutive);
91+
cluster,
92+
traitSet,
93+
input,
94+
dedupeFields,
95+
allowedDuplication,
96+
keepEmpty,
97+
consecutive,
98+
inputCollation,
99+
fieldNames);
54100
}
55101

56102
@Override

0 commit comments

Comments
 (0)