Skip to content

Commit 0bf8f7c

Browse files
committed
[BugFix] Make dedup simplify operand order-independent (#5488)
Address review feedback on #5488: extend mayBeFilterFromBucketNonNull to accept the merged conjunction shape FilterMergeRule produces, so PPLSimplifyDedupRule fires regardless of whether FilterMergeRule has already merged the user where clause into the bucket-non-null filter. PPLSimplifyDedupRule.apply now splits the bottom filter into IS_NOT_NULL conjuncts on partition keys (absorbed into LogicalDedup semantics) and any remaining conjuncts (preserved as a separate filter below the new LogicalDedup), so a user predicate that was folded in is no longer dropped. With the operand predicate order-independent, the HEP rule order is no longer a load-bearing invariant. Revert the addRuleCollection -> addRuleInstance change in CalciteToolsHelper.HEP_PROGRAM that the previous patch introduced. Replace the regression test that pinned the buggy rule order with one that asserts the user-visible contract: with where preceding dedup, a LogicalDedup is produced and the user predicate is preserved regardless of which order FilterMergeRule and PPLSimplifyDedupRule fire. Signed-off-by: ryan-gh-bot <ryan-gh-bot@users.noreply.github.com>
1 parent 19f506d commit 0bf8f7c

4 files changed

Lines changed: 126 additions & 52 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/plan/rule/PPLSimplifyDedupRule.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@
66
package org.opensearch.sql.calcite.plan.rule;
77

88
import java.util.ArrayList;
9+
import java.util.HashSet;
910
import java.util.List;
11+
import java.util.Set;
1012
import java.util.function.Predicate;
1113
import java.util.stream.Collectors;
1214
import javax.annotation.Nullable;
1315
import org.apache.calcite.plan.RelOptRuleCall;
16+
import org.apache.calcite.plan.RelOptUtil;
1417
import org.apache.calcite.plan.RelRule;
1518
import org.apache.calcite.rel.RelCollation;
1619
import org.apache.calcite.rel.RelCollations;
@@ -22,6 +25,7 @@
2225
import org.apache.calcite.rex.RexInputRef;
2326
import org.apache.calcite.rex.RexLiteral;
2427
import org.apache.calcite.rex.RexNode;
28+
import org.apache.calcite.rex.RexUtil;
2529
import org.apache.calcite.rex.RexWindow;
2630
import org.apache.calcite.sql.SqlKind;
2731
import org.apache.calcite.sql.type.SqlTypeName;
@@ -115,8 +119,41 @@ protected void apply(
115119

116120
RelCollation inputCollation = extractCollationFromWindow(windows.get(0));
117121

122+
// Split the bucket-non-null filter into two parts:
123+
// 1) IS_NOT_NULL conjuncts on a partition key — these are the bucket-non-null guards PPL
124+
// emits as part of the dedup pattern; LogicalDedup absorbs their semantics.
125+
// 2) Everything else — for example, a user `where` predicate that FilterMergeRule may
126+
// have folded into the same conjunction, or a user IS_NOT_NULL filter on a non-partition
127+
// column. These must be preserved as a separate filter below the new LogicalDedup so
128+
// user-visible behavior is unchanged regardless of whether FilterMergeRule fired.
129+
Set<Integer> partitionKeyIndices = new HashSet<>();
130+
for (RexNode key : dedupColumns) {
131+
if (key instanceof RexInputRef ref) {
132+
partitionKeyIndices.add(ref.getIndex());
133+
}
134+
}
135+
List<RexNode> bucketNonNullConjuncts = new ArrayList<>();
136+
List<RexNode> remainingConjuncts = new ArrayList<>();
137+
for (RexNode conjunct : RelOptUtil.conjunctions(bucketNonNullFilter.getCondition())) {
138+
if (isNotNullOnPartitionKey(conjunct, partitionKeyIndices)) {
139+
bucketNonNullConjuncts.add(conjunct);
140+
} else {
141+
remainingConjuncts.add(conjunct);
142+
}
143+
}
144+
// Defensive: if no IS_NOT_NULL conjunct on a partition key is present, this filter is not
145+
// actually a bucket-non-null filter — bail out without transforming. The loose operand
146+
// predicate may have matched on an unrelated AND that happens to contain an IS_NOT_NULL on
147+
// some other ref.
148+
if (bucketNonNullConjuncts.isEmpty()) {
149+
return;
150+
}
151+
118152
RelBuilder relBuilder = call.builder();
119153
relBuilder.push(bucketNonNullFilter.getInput());
154+
if (!remainingConjuncts.isEmpty()) {
155+
relBuilder.filter(RexUtil.composeConjunction(relBuilder.getRexBuilder(), remainingConjuncts));
156+
}
120157
List<Pair<RexNode, String>> targetProjections =
121158
projectWithWindow.getNamedProjects().stream()
122159
.filter(p -> !p.getKey().isA(SqlKind.ROW_NUMBER))
@@ -134,6 +171,14 @@ protected void apply(
134171
call.transformTo(relBuilder.build());
135172
}
136173

174+
private static boolean isNotNullOnPartitionKey(RexNode rex, Set<Integer> partitionKeyIndices) {
175+
return rex instanceof RexCall rexCall
176+
&& rexCall.isA(SqlKind.IS_NOT_NULL)
177+
&& !rexCall.getOperands().isEmpty()
178+
&& rexCall.getOperands().get(0) instanceof RexInputRef ref
179+
&& partitionKeyIndices.contains(ref.getIndex());
180+
}
181+
137182
private static @Nullable RelCollation extractCollationFromWindow(RexWindow window) {
138183
if (window.orderKeys.isEmpty()) {
139184
return null;

core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.sql.PreparedStatement;
3737
import java.sql.SQLException;
3838
import java.time.Instant;
39+
import java.util.List;
3940
import java.util.Properties;
4041
import java.util.function.Consumer;
4142
import org.apache.calcite.adapter.enumerable.EnumerableConvention;
@@ -58,6 +59,7 @@
5859
import org.apache.calcite.plan.Convention;
5960
import org.apache.calcite.plan.RelOptCluster;
6061
import org.apache.calcite.plan.RelOptPlanner;
62+
import org.apache.calcite.plan.RelOptRule;
6163
import org.apache.calcite.plan.RelOptSchema;
6264
import org.apache.calcite.plan.RelOptTable;
6365
import org.apache.calcite.plan.RelOptTable.ViewExpander;
@@ -549,25 +551,12 @@ public RelNode visit(TableScan scan) {
549551
}
550552
}
551553

552-
/**
553-
* Try to optimize the plan by using HepPlanner.
554-
*
555-
* <p>Rules are added as separate {@code addRuleInstance} phases so each rule is run to a fixed
556-
* point before the next phase begins. {@code PPLSimplifyDedupRule} must run before {@code
557-
* FilterMergeRule}: PPL emits the dedup pattern with an {@code IS NOT NULL} bucket-non-null
558-
* filter directly above the scan, and the simplify rule's operand requires that filter to remain
559-
* a pure {@code IS NOT NULL} (or {@code AND} of {@code IS NOT NULL}s). When a user {@code where}
560-
* clause precedes the dedup, the user's filter sits adjacent to the bucket-non-null filter; if
561-
* {@code FilterMergeRule} fires first it folds the two into a single conjunction, the simplify
562-
* rule's operand match fails, no {@link org.opensearch.sql.calcite.plan.rel.LogicalDedup} is
563-
* produced, and dedup pushdown is silently disabled. See
564-
* https://github.com/opensearch-project/sql/issues/5482.
565-
*/
554+
/** Try to optimize the plan by using HepPlanner */
555+
private static final List<RelOptRule> hepRuleList =
556+
List.of(FilterMergeRule.Config.DEFAULT.toRule(), PPLSimplifyDedupRule.DEDUP_SIMPLIFY_RULE);
557+
566558
private static final HepProgram HEP_PROGRAM =
567-
new HepProgramBuilder()
568-
.addRuleInstance(PPLSimplifyDedupRule.DEDUP_SIMPLIFY_RULE)
569-
.addRuleInstance(FilterMergeRule.Config.DEFAULT.toRule())
570-
.build();
559+
new HepProgramBuilder().addRuleCollection(hepRuleList).build();
571560

572561
public static RelNode optimize(RelNode plan, CalcitePlanContext context) {
573562
Util.discard(context);

core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -785,17 +785,45 @@ static Mapping mapping(List<RexNode> rexNodes, RelDataType schema) {
785785
return Mappings.target(getSelectColumns(rexNodes), schema.getFieldCount());
786786
}
787787

788+
/**
789+
* Loose structural check for the bucket-non-null filter PPL emits as part of the {@code dedup}
790+
* pattern. Returns true for either:
791+
*
792+
* <ul>
793+
* <li>a pure {@code IS NOT NULL($ref)}, or
794+
* <li>any {@code AND} whose conjuncts contain at least one {@code IS NOT NULL($ref)}.
795+
* </ul>
796+
*
797+
* <p>This intentionally accepts the un-merged shape ({@code IS NOT NULL} alone) and the merged
798+
* shape that {@link org.apache.calcite.rel.rules.FilterMergeRule} produces when a user {@code
799+
* where} clause precedes the dedup ({@code AND(IS NOT NULL($pk), <user predicate>)}). The
800+
* concrete partition-key match — and the split-out of any user predicate that was folded in —
801+
* happens inside {@code PPLSimplifyDedupRule#apply}, which has access to the partition keys
802+
* declared by the {@code ROW_NUMBER} window above this filter. Keeping the operand predicate
803+
* order-independent means the simplify rule fires regardless of whether {@code FilterMergeRule}
804+
* has already run, so dedup pushdown is no longer load-bearing on HEP rule ordering.
805+
*
806+
* <p>Misclassification is bounded by the surrounding four-level operand chain: a project that
807+
* does not contain the {@code _row_number_dedup_} column, above a filter whose condition is
808+
* {@code _row_number_ <= N} and which does contain {@code _row_number_dedup_}, above a project
809+
* that does contain {@code _row_number_dedup_} (with a {@code ROW_NUMBER} window function), above
810+
* this filter. That signature is unique to the PPL-emitted dedup pattern, so a user-written
811+
* filter such as {@code IS NOT NULL(x) AND ...} cannot accidentally match.
812+
*/
788813
static boolean mayBeFilterFromBucketNonNull(LogicalFilter filter) {
789814
RexNode condition = filter.getCondition();
790-
return isNotNullOnRef(condition)
791-
|| (condition instanceof RexCall rexCall
792-
&& rexCall.getOperator().equals(SqlStdOperatorTable.AND)
793-
&& rexCall.getOperands().stream().allMatch(PlanUtils::isNotNullOnRef));
815+
if (isNotNullOnRef(condition)) {
816+
return true;
817+
}
818+
return condition instanceof RexCall rexCall
819+
&& rexCall.getOperator().equals(SqlStdOperatorTable.AND)
820+
&& rexCall.getOperands().stream().anyMatch(PlanUtils::isNotNullOnRef);
794821
}
795822

796-
private static boolean isNotNullOnRef(RexNode rex) {
823+
static boolean isNotNullOnRef(RexNode rex) {
797824
return rex instanceof RexCall rexCall
798825
&& rexCall.isA(SqlKind.IS_NOT_NULL)
826+
&& !rexCall.getOperands().isEmpty()
799827
&& rexCall.getOperands().get(0) instanceof RexInputRef;
800828
}
801829

ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLDedupTest.java

Lines changed: 41 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -362,41 +362,37 @@ public void testSortFieldProjectedAwayBeforeDedup() {
362362

363363
/**
364364
* When a user {@code where} precedes {@code dedup}, the user filter sits adjacent to the
365-
* bucket-non-null filter that PPL emits for the dedup pattern. The HEP optimizer must run {@link
366-
* PPLSimplifyDedupRule} before {@link FilterMergeRule}; otherwise the merged condition breaks the
367-
* simplify-rule's operand match and dedup falls through to the in-memory {@code ROW_NUMBER}
368-
* window form, defeating dedup pushdown to the shard.
369-
*
370-
* <p>This test mirrors the production HEP rule sequence in {@code CalciteToolsHelper#HEP_PROGRAM}
371-
* and asserts that a {@code LogicalDedup} is produced in the presence of a user {@code where}.
365+
* bucket-non-null filter that PPL emits for the dedup pattern. {@link PPLSimplifyDedupRule} must
366+
* fold the dedup pattern into a {@link org.opensearch.sql.calcite.plan.rel.LogicalDedup} so that
367+
* {@code DedupPushdownRule} can match it; otherwise dedup falls through to the in-memory {@code
368+
* ROW_NUMBER} window form, defeating dedup pushdown to the shard. The user predicate must be
369+
* preserved as a separate filter below the new {@code LogicalDedup}.
372370
*/
373371
@Test
374372
public void testDedupAfterWhereProducesLogicalDedup() {
375373
String ppl = "source=EMP | where SAL > 1000 | dedup DEPTNO";
376374
RelNode raw = getRelNodeRaw(ppl);
377375

378376
// Sanity: the un-merged plan has both the bucket-non-null filter and the user where filter
379-
// adjacent to each other above the scan — exactly the shape that triggers the bug.
377+
// adjacent to each other above the scan — exactly the shape that triggered the original bug.
380378
String rawExplain = raw.explain();
381379
Assert.assertTrue(
382380
"Raw plan should contain the bucket-non-null filter:\n" + rawExplain,
383381
rawExplain.contains("IS NOT NULL"));
384382
Assert.assertTrue(
385383
"Raw plan should contain the user where filter:\n" + rawExplain,
386-
rawExplain.contains("1000"));
384+
rawExplain.contains(">($5, 1000)"));
387385
Assert.assertTrue(
388386
"Raw plan should contain ROW_NUMBER prior to simplification:\n" + rawExplain,
389387
rawExplain.contains("ROW_NUMBER"));
390388

391-
// Apply the production HEP rule sequence: PPLSimplifyDedupRule first, then FilterMergeRule.
389+
// Apply rules in the order PPLSimplifyDedupRule -> FilterMergeRule.
392390
HepProgram program =
393391
new HepProgramBuilder()
394392
.addRuleInstance(PPLSimplifyDedupRule.DEDUP_SIMPLIFY_RULE)
395393
.addRuleInstance(FilterMergeRule.Config.DEFAULT.toRule())
396394
.build();
397-
HepPlanner planner = new HepPlanner(program);
398-
planner.setRoot(raw);
399-
RelNode optimized = planner.findBestExp();
395+
RelNode optimized = runHepPlanner(raw, program);
400396

401397
String optimizedExplain = optimized.explain();
402398
Assert.assertTrue(
@@ -406,40 +402,56 @@ public void testDedupAfterWhereProducesLogicalDedup() {
406402
Assert.assertFalse(
407403
"Optimized plan should not retain ROW_NUMBER after simplification:\n" + optimizedExplain,
408404
optimizedExplain.contains("ROW_NUMBER"));
405+
Assert.assertTrue(
406+
"User where predicate must be preserved as a filter below LogicalDedup:\n"
407+
+ optimizedExplain,
408+
optimizedExplain.contains(">($5, 1000)"));
409409
}
410410

411411
/**
412-
* Companion to {@link #testDedupAfterWhereProducesLogicalDedup} that pins the buggy behavior:
413-
* when {@link FilterMergeRule} runs before {@link PPLSimplifyDedupRule}, the merged conjunction
414-
* defeats the simplify-rule's operand match and no {@code LogicalDedup} is produced. This guards
415-
* against accidentally swapping the rule order in {@code CalciteToolsHelper#HEP_PROGRAM} back to
416-
* the buggy ordering.
412+
* Companion to {@link #testDedupAfterWhereProducesLogicalDedup} that pins the user-visible
413+
* contract: with a {@code where} preceding {@code dedup}, a {@code LogicalDedup} must be produced
414+
* regardless of the order in which {@link FilterMergeRule} and {@link PPLSimplifyDedupRule} fire.
415+
*
416+
* <p>The simplify rule's bucket-non-null operand predicate is order-independent — it accepts both
417+
* a pure {@code IS NOT NULL} and an {@code AND} that contains an {@code IS NOT NULL} on a
418+
* partition key — so {@code FilterMergeRule} firing first no longer disables dedup pushdown. This
419+
* guards against re-introducing the original regression by reordering, removing, or adding rules
420+
* to {@code CalciteToolsHelper#HEP_PROGRAM}.
417421
*/
418422
@Test
419-
public void testDedupAfterWhereWithFilterMergeFirstFailsToSimplify() {
423+
public void testDedupAfterWhereProducesLogicalDedupRegardlessOfRuleOrder() {
420424
String ppl = "source=EMP | where SAL > 1000 | dedup DEPTNO";
421425
RelNode raw = getRelNodeRaw(ppl);
422426

423-
// Buggy order: FilterMergeRule first, then PPLSimplifyDedupRule.
427+
// Swapped order: FilterMergeRule first, then PPLSimplifyDedupRule. This is the order that
428+
// produced the original bug (#5482) before the fix.
424429
HepProgram program =
425430
new HepProgramBuilder()
426431
.addRuleInstance(FilterMergeRule.Config.DEFAULT.toRule())
427432
.addRuleInstance(PPLSimplifyDedupRule.DEDUP_SIMPLIFY_RULE)
428433
.build();
429-
HepPlanner planner = new HepPlanner(program);
430-
planner.setRoot(raw);
431-
RelNode optimized = planner.findBestExp();
434+
RelNode optimized = runHepPlanner(raw, program);
432435

433436
String optimizedExplain = optimized.explain();
434-
Assert.assertFalse(
435-
"Buggy order: PPLSimplifyDedupRule should not match after FilterMergeRule consolidates"
436-
+ " the bucket-non-null filter with the user where filter, so no LogicalDedup is"
437-
+ " produced:\n"
437+
Assert.assertTrue(
438+
"Even with FilterMergeRule firing first, PPLSimplifyDedupRule must still produce"
439+
+ " LogicalDedup so DedupPushdownRule can match it:\n"
438440
+ optimizedExplain,
439441
optimizedExplain.contains("LogicalDedup"));
442+
Assert.assertFalse(
443+
"ROW_NUMBER window form should be removed after simplification:\n" + optimizedExplain,
444+
optimizedExplain.contains("ROW_NUMBER"));
440445
Assert.assertTrue(
441-
"Buggy order: ROW_NUMBER window form should remain since simplification was skipped:\n"
446+
"User where predicate must be preserved as a filter below LogicalDedup, even when"
447+
+ " FilterMergeRule fired first and folded it into the bucket-non-null filter:\n"
442448
+ optimizedExplain,
443-
optimizedExplain.contains("ROW_NUMBER"));
449+
optimizedExplain.contains(">($5, 1000)"));
450+
}
451+
452+
private static RelNode runHepPlanner(RelNode root, HepProgram program) {
453+
HepPlanner planner = new HepPlanner(program);
454+
planner.setRoot(root);
455+
return planner.findBestExp();
444456
}
445457
}

0 commit comments

Comments
 (0)