diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/rule/PPLSimplifyDedupRule.java b/core/src/main/java/org/opensearch/sql/calcite/plan/rule/PPLSimplifyDedupRule.java index 11eabfd483c..676b1e9a776 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/rule/PPLSimplifyDedupRule.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/rule/PPLSimplifyDedupRule.java @@ -6,11 +6,14 @@ package org.opensearch.sql.calcite.plan.rule; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollations; @@ -22,6 +25,7 @@ import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; import org.apache.calcite.rex.RexWindow; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.SqlTypeName; @@ -115,8 +119,41 @@ protected void apply( RelCollation inputCollation = extractCollationFromWindow(windows.get(0)); + // Split the bucket-non-null filter into two parts: + // 1) IS_NOT_NULL conjuncts on a partition key — these are the bucket-non-null guards PPL + // emits as part of the dedup pattern; LogicalDedup absorbs their semantics. + // 2) Everything else — for example, a user `where` predicate that FilterMergeRule may + // have folded into the same conjunction, or a user IS_NOT_NULL filter on a non-partition + // column. These must be preserved as a separate filter below the new LogicalDedup so + // user-visible behavior is unchanged regardless of whether FilterMergeRule fired. + Set partitionKeyIndices = new HashSet<>(); + for (RexNode key : dedupColumns) { + if (key instanceof RexInputRef ref) { + partitionKeyIndices.add(ref.getIndex()); + } + } + List bucketNonNullConjuncts = new ArrayList<>(); + List remainingConjuncts = new ArrayList<>(); + for (RexNode conjunct : RelOptUtil.conjunctions(bucketNonNullFilter.getCondition())) { + if (isNotNullOnPartitionKey(conjunct, partitionKeyIndices)) { + bucketNonNullConjuncts.add(conjunct); + } else { + remainingConjuncts.add(conjunct); + } + } + // Defensive: if no IS_NOT_NULL conjunct on a partition key is present, this filter is not + // actually a bucket-non-null filter — bail out without transforming. The loose operand + // predicate may have matched on an unrelated AND that happens to contain an IS_NOT_NULL on + // some other ref. + if (bucketNonNullConjuncts.isEmpty()) { + return; + } + RelBuilder relBuilder = call.builder(); relBuilder.push(bucketNonNullFilter.getInput()); + if (!remainingConjuncts.isEmpty()) { + relBuilder.filter(RexUtil.composeConjunction(relBuilder.getRexBuilder(), remainingConjuncts)); + } List> targetProjections = projectWithWindow.getNamedProjects().stream() .filter(p -> !p.getKey().isA(SqlKind.ROW_NUMBER)) @@ -134,6 +171,14 @@ protected void apply( call.transformTo(relBuilder.build()); } + private static boolean isNotNullOnPartitionKey(RexNode rex, Set partitionKeyIndices) { + if (!PlanUtils.isNotNullOnRef(rex)) { + return false; + } + RexInputRef ref = (RexInputRef) ((RexCall) rex).getOperands().get(0); + return partitionKeyIndices.contains(ref.getIndex()); + } + private static @Nullable RelCollation extractCollationFromWindow(RexWindow window) { if (window.orderKeys.isEmpty()) { return null; diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java b/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java index 42ece7dc509..b63c4a0237f 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java @@ -785,15 +785,21 @@ static Mapping mapping(List rexNodes, RelDataType schema) { return Mappings.target(getSelectColumns(rexNodes), schema.getFieldCount()); } + /** + * Accepts the un-merged {@code IS NOT NULL($ref)} shape and the merged-{@code AND} shape that + * {@link org.apache.calcite.rel.rules.FilterMergeRule} produces when a user {@code where} + * precedes the dedup. The concrete partition-key match — and the split-out of any user predicate + * folded into the AND — happens in {@link PPLSimplifyDedupRule#apply}. + */ static boolean mayBeFilterFromBucketNonNull(LogicalFilter filter) { RexNode condition = filter.getCondition(); return isNotNullOnRef(condition) || (condition instanceof RexCall rexCall && rexCall.getOperator().equals(SqlStdOperatorTable.AND) - && rexCall.getOperands().stream().allMatch(PlanUtils::isNotNullOnRef)); + && rexCall.getOperands().stream().anyMatch(PlanUtils::isNotNullOnRef)); } - private static boolean isNotNullOnRef(RexNode rex) { + public static boolean isNotNullOnRef(RexNode rex) { return rex instanceof RexCall rexCall && rexCall.isA(SqlKind.IS_NOT_NULL) && rexCall.getOperands().get(0) instanceof RexInputRef; diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index 8ad0be5cc88..d25631c5ace 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -2153,6 +2153,29 @@ public void testTransposeExplain() throws IOException { + "| transpose 4 column_name='column_names'")); } + /** + * With a user {@code where} clause preceding {@code dedup}, the physical plan must push both the + * filter and the dedup-as-aggregation into the OpenSearch scan, not fall back to an in-memory + * {@code ROW_NUMBER} window above a row-fetching scan. + */ + @Test + public void testDedupAfterWherePushDown() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + String result = + explainQueryToString( + "source=opensearch-sql_test_index_account | where age > 25 | dedup gender"); + assertTrue( + "Expected user where filter pushed down to the scan:\n" + result, + result.contains("FILTER->>($8, 25)")); + assertTrue( + "Expected dedup pushed down as AGGREGATION (composite + top_hits):\n" + result, + result.contains("AGGREGATION->")); + assertFalse( + "Unexpected EnumerableWindow — dedup fell back to the in-memory ROW_NUMBER form:\n" + + result, + result.contains("EnumerableWindow")); + } + public void testComplexDedup() throws IOException { enabledOnlyWhenPushdownIsEnabled(); String expected = loadExpectedPlan("explain_dedup_complex1.yaml"); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java index d37957d4a9f..648888ce9b7 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java @@ -17,6 +17,7 @@ import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.rules.SubstitutionRule; import org.apache.calcite.rex.RexInputRef; @@ -47,6 +48,24 @@ protected DedupPushdownRule(Config config) { @Override protected void onMatchImpl(RelOptRuleCall call) { final LogicalDedup logicalDedup = call.rel(0); + if (call.rels[1] instanceof LogicalFilter filter) { + // Push the filter into the scan, then synthesize an identity project so the standard + // apply() can run on the resulting Dedup → Project → Scan shape. If the filter is only + // partially pushable, pushDownFilter returns a Filter wrapping the residual condition over + // the new scan; we can't strip a residual filter without breaking semantics, so bail. + final CalciteLogicalIndexScan scan = call.rel(2); + if (!(scan.pushDownFilter(filter) instanceof CalciteLogicalIndexScan newScan)) { + return; + } + RelBuilder relBuilder = call.builder(); + relBuilder.push(newScan); + // force=true so the identity project is materialized (apply() requires a LogicalProject) + relBuilder.project( + relBuilder.fields(), newScan.getRowType().getFieldNames(), /* force= */ true); + LogicalProject identityProject = (LogicalProject) relBuilder.build(); + apply(call, logicalDedup, identityProject, newScan); + return; + } final LogicalProject projectWithExpr = call.rel(1); final CalciteLogicalIndexScan scan = call.rel(2); apply(call, logicalDedup, projectWithExpr, scan); @@ -226,6 +245,27 @@ public interface Config extends OpenSearchRuleConfig { .predicate(Config::tableScanChecker) .noInputs()))); + // +- LogicalDedup + // +- LogicalFilter (e.g. a `where` predicate preserved below dedup by + // +- CalciteLogicalIndexScan PPLSimplifyDedupRule when an upstream `where` is folded + // into the bucket-non-null filter) + Config WITH_FILTER = + ImmutableDedupPushdownRule.Config.builder() + .build() + .withDescription("Dedup-to-Aggregate-WithFilter") + .withOperandSupplier( + b0 -> + b0.operand(LogicalDedup.class) + .predicate(dedup -> !dedup.getKeepEmpty()) + .oneInput( + b1 -> + b1.operand(LogicalFilter.class) + .oneInput( + b2 -> + b2.operand(CalciteLogicalIndexScan.class) + .predicate(Config::tableScanChecker) + .noInputs()))); + /** * Project must be not pushed since the name of expression would lose after project pushed. E.g. * in query "eval new_a = a + 1 | dedup b", the "new_a" will lose. diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java index 0068f445ce7..3c8508cc455 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java @@ -50,6 +50,8 @@ public class OpenSearchIndexRules { SortIndexScanRule.Config.DEFAULT.toRule(); private static final DedupPushdownRule DEDUP_PUSH_DOWN = DedupPushdownRule.Config.DEFAULT.toRule(); + private static final DedupPushdownRule DEDUP_PUSH_DOWN_WITH_FILTER = + DedupPushdownRule.Config.WITH_FILTER.toRule(); private static final SortProjectExprTransposeRule SORT_PROJECT_EXPR_TRANSPOSE = SortProjectExprTransposeRule.Config.DEFAULT.toRule(); private static final ExpandCollationOnProjectExprRule EXPAND_COLLATION_ON_PROJECT_EXPR = @@ -75,6 +77,7 @@ public class OpenSearchIndexRules { LIMIT_INDEX_SCAN, SORT_INDEX_SCAN, DEDUP_PUSH_DOWN, + DEDUP_PUSH_DOWN_WITH_FILTER, SORT_PROJECT_EXPR_TRANSPOSE, SORT_AGGREGATION_METRICS_RULE, RARE_TOP_PUSH_DOWN, diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java index ab07cd9b5c1..ec3b57d02cc 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java @@ -110,6 +110,20 @@ public RelNode getRelNode(String ppl) { return root; } + /** + * Get the root RelNode of the given PPL query without merging adjacent filters. Useful for + * regression tests that need to exercise rule ordering against the un-merged shape that PPL + * actually emits to the production HEP optimizer. + */ + public RelNode getRelNodeRaw(String ppl) { + CalcitePlanContext context = createBuilderContext(); + Query query = (Query) plan(pplParser, ppl); + planTransformer.analyze(query.getPlan(), context); + RelNode root = context.relBuilder.build(); + System.out.println(root.explain()); + return root; + } + private RelNode mergeAdjacentFilters(RelNode relNode) { HepProgram program = new HepProgramBuilder().addRuleInstance(FilterMergeRule.Config.DEFAULT.toRule()).build(); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLDedupTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLDedupTest.java index ca1a789b0f4..5f32c1b85bb 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLDedupTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLDedupTest.java @@ -5,9 +5,15 @@ package org.opensearch.sql.ppl.calcite; +import org.apache.calcite.plan.hep.HepPlanner; +import org.apache.calcite.plan.hep.HepProgram; +import org.apache.calcite.plan.hep.HepProgramBuilder; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.rules.FilterMergeRule; import org.apache.calcite.test.CalciteAssert; +import org.junit.Assert; import org.junit.Test; +import org.opensearch.sql.calcite.plan.rule.PPLSimplifyDedupRule; public class CalcitePPLDedupTest extends CalcitePPLAbstractTest { @@ -353,4 +359,129 @@ public void testSortFieldProjectedAwayBeforeDedup() { + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); } + + /** + * When a user {@code where} precedes {@code dedup}, the user filter sits adjacent to the + * bucket-non-null filter that PPL emits for the dedup pattern. {@link PPLSimplifyDedupRule} must + * fold the dedup pattern into a {@link org.opensearch.sql.calcite.plan.rel.LogicalDedup} so that + * {@code DedupPushdownRule} can match it; otherwise dedup falls through to the in-memory {@code + * ROW_NUMBER} window form, defeating dedup pushdown to the shard. The user predicate must be + * preserved as a separate filter below the new {@code LogicalDedup}. + */ + @Test + public void testDedupAfterWhereProducesLogicalDedup() { + String ppl = "source=EMP | where SAL > 1000 | dedup DEPTNO"; + RelNode raw = getRelNodeRaw(ppl); + + // Sanity: the un-merged plan has both the bucket-non-null filter and the user where filter + // adjacent to each other above the scan — exactly the shape that triggered the original bug. + String rawExplain = raw.explain(); + Assert.assertTrue( + "Raw plan should contain the bucket-non-null filter:\n" + rawExplain, + rawExplain.contains("IS NOT NULL")); + Assert.assertTrue( + "Raw plan should contain the user where filter:\n" + rawExplain, + rawExplain.contains(">($5, 1000)")); + Assert.assertTrue( + "Raw plan should contain ROW_NUMBER prior to simplification:\n" + rawExplain, + rawExplain.contains("ROW_NUMBER")); + + // Apply rules in the order PPLSimplifyDedupRule -> FilterMergeRule. + HepProgram program = + new HepProgramBuilder() + .addRuleInstance(PPLSimplifyDedupRule.DEDUP_SIMPLIFY_RULE) + .addRuleInstance(FilterMergeRule.Config.DEFAULT.toRule()) + .build(); + RelNode optimized = runHepPlanner(raw, program); + + String optimizedExplain = optimized.explain(); + Assert.assertTrue( + "Optimized plan should contain LogicalDedup so DedupPushdownRule can match it:\n" + + optimizedExplain, + optimizedExplain.contains("LogicalDedup")); + Assert.assertFalse( + "Optimized plan should not retain ROW_NUMBER after simplification:\n" + optimizedExplain, + optimizedExplain.contains("ROW_NUMBER")); + Assert.assertTrue( + "User where predicate must be preserved as a filter below LogicalDedup:\n" + + optimizedExplain, + optimizedExplain.contains(">($5, 1000)")); + } + + /** + * Companion to {@link #testDedupAfterWhereProducesLogicalDedup} that pins the user-visible + * contract: with a {@code where} preceding {@code dedup}, a {@code LogicalDedup} must be produced + * regardless of the order in which {@link FilterMergeRule} and {@link PPLSimplifyDedupRule} fire. + * + *

The simplify rule's bucket-non-null operand predicate is order-independent — it accepts both + * a pure {@code IS NOT NULL} and an {@code AND} that contains an {@code IS NOT NULL} on a + * partition key — so {@code FilterMergeRule} firing first no longer disables dedup pushdown. This + * guards against re-introducing the original regression by reordering, removing, or adding rules + * to {@code CalciteToolsHelper#HEP_PROGRAM}. + */ + @Test + public void testDedupAfterWhereProducesLogicalDedupRegardlessOfRuleOrder() { + String ppl = "source=EMP | where SAL > 1000 | dedup DEPTNO"; + RelNode raw = getRelNodeRaw(ppl); + + HepProgram program = + new HepProgramBuilder() + .addRuleInstance(FilterMergeRule.Config.DEFAULT.toRule()) + .addRuleInstance(PPLSimplifyDedupRule.DEDUP_SIMPLIFY_RULE) + .build(); + RelNode optimized = runHepPlanner(raw, program); + + String optimizedExplain = optimized.explain(); + Assert.assertTrue( + "Even with FilterMergeRule firing first, PPLSimplifyDedupRule must still produce" + + " LogicalDedup so DedupPushdownRule can match it:\n" + + optimizedExplain, + optimizedExplain.contains("LogicalDedup")); + Assert.assertFalse( + "ROW_NUMBER window form should be removed after simplification:\n" + optimizedExplain, + optimizedExplain.contains("ROW_NUMBER")); + Assert.assertTrue( + "User where predicate must be preserved as a filter below LogicalDedup, even when" + + " FilterMergeRule fired first and folded it into the bucket-non-null filter:\n" + + optimizedExplain, + optimizedExplain.contains(">($5, 1000)")); + } + + /** + * Mirrors the exact shape of {@code CalciteToolsHelper.HEP_PROGRAM} used in production ({@code + * addRuleCollection(List.of(FilterMergeRule, PPLSimplifyDedupRule))}). The two sequenced-{@code + * addRuleInstance} tests above prove the rule fires under either ordering, but production runs + * both rules in a single collection instruction. This test pins that exact shape to catch any + * addRuleCollection-vs-addRuleInstance traversal differences. + */ + @Test + public void testDedupAfterWhereProducesLogicalDedupWithProductionHepProgram() { + String ppl = "source=EMP | where SAL > 1000 | dedup DEPTNO"; + RelNode raw = getRelNodeRaw(ppl); + + HepProgram program = + new HepProgramBuilder() + .addRuleCollection( + java.util.List.of( + FilterMergeRule.Config.DEFAULT.toRule(), + PPLSimplifyDedupRule.DEDUP_SIMPLIFY_RULE)) + .build(); + RelNode optimized = runHepPlanner(raw, program); + + String optimizedExplain = optimized.explain(); + Assert.assertTrue( + "Production HEP_PROGRAM (addRuleCollection) must still produce LogicalDedup:\n" + + optimizedExplain, + optimizedExplain.contains("LogicalDedup")); + Assert.assertFalse( + "ROW_NUMBER window form should be removed under production HEP_PROGRAM:\n" + + optimizedExplain, + optimizedExplain.contains("ROW_NUMBER")); + } + + private static RelNode runHepPlanner(RelNode root, HepProgram program) { + HepPlanner planner = new HepPlanner(program); + planner.setRoot(root); + return planner.findBestExp(); + } }