Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Instant;
import java.util.List;
import java.util.Properties;
import java.util.function.Consumer;
import org.apache.calcite.adapter.enumerable.EnumerableConvention;
Expand All @@ -59,7 +58,6 @@
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptTable.ViewExpander;
Expand Down Expand Up @@ -551,12 +549,30 @@ public RelNode visit(TableScan scan) {
}
}

/** Try to optimize the plan by using HepPlanner */
private static final List<RelOptRule> hepRuleList =
List.of(FilterMergeRule.Config.DEFAULT.toRule(), PPLSimplifyDedupRule.DEDUP_SIMPLIFY_RULE);

/**
* Try to optimize the plan by using HepPlanner.
*
* <p>The two rules MUST run as separate instructions in this order: {@link
* PPLSimplifyDedupRule#DEDUP_SIMPLIFY_RULE} first, {@link FilterMergeRule} second. {@code
* PPLSimplifyDedupRule} matches a four-operator pattern terminating in a {@link
* org.apache.calcite.rel.logical.LogicalFilter} whose condition is a pure {@code IS_NOT_NULL} (or
* {@code AND}-of-{@code IS_NOT_NULL}s) on the partition columns - the bucket-non-null filter the
* dedup analyzer inserts directly above the scan. When a user {@code where} clause sits between
* the bucket-non-null filter and the scan, those two filters are adjacent. If {@code
* FilterMergeRule} runs first, it folds them into a single filter whose condition is {@code
* AND(IS_NOT_NULL(field), <user predicate>)}; that condition no longer satisfies {@link
* org.opensearch.sql.calcite.utils.PlanUtils#mayBeFilterFromBucketNonNull} and {@code
* PPLSimplifyDedupRule} never matches. The {@code LogicalDedup} is therefore not produced and
* {@code DedupPushdownRule} cannot push the dedup down to the scan as a {@code composite +
* top_hits} aggregation; the dedup falls through to a coordinator-side {@code ROW_NUMBER} window.
* Putting {@code PPLSimplifyDedupRule} in its own instruction guarantees it runs to fixpoint
* against the original adjacent-filter shape before any merge happens.
*/
private static final HepProgram HEP_PROGRAM =
new HepProgramBuilder().addRuleCollection(hepRuleList).build();
new HepProgramBuilder()
.addRuleInstance(PPLSimplifyDedupRule.DEDUP_SIMPLIFY_RULE)
.addRuleInstance(FilterMergeRule.Config.DEFAULT.toRule())
.build();

public static RelNode optimize(RelNode plan, CalcitePlanContext context) {
Util.discard(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ public RelNode getRelNode(String ppl) {
return root;
}

/**
* Get the root RelNode straight from analysis, without the test's auxiliary {@link
* FilterMergeRule} pass. Use this when the test needs to assert behavior that depends on the
* production {@link org.opensearch.sql.calcite.utils.CalciteToolsHelper#optimize HEP optimize}
* flow seeing the un-merged adjacent-filter shape.
*/
public RelNode getAnalyzerRelNode(String ppl) {
CalcitePlanContext context = createBuilderContext();
Query query = (Query) plan(pplParser, ppl);
planTransformer.analyze(query.getPlan(), context);
return context.relBuilder.build();
}

private RelNode mergeAdjacentFilters(RelNode relNode) {
HepProgram program =
new HepProgramBuilder().addRuleInstance(FilterMergeRule.Config.DEFAULT.toRule()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@

package org.opensearch.sql.ppl.calcite;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.test.CalciteAssert;
import org.junit.Test;
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;

public class CalcitePPLDedupTest extends CalcitePPLAbstractTest {

Expand Down Expand Up @@ -353,4 +358,59 @@ public void testSortFieldProjectedAwayBeforeDedup() {
+ " LogicalTableScan(table=[[scott, EMP]])\n";
verifyLogical(root, expectedLogical);
}

/**
* Regression test for opensearch-project/sql#5482. When a user {@code where} clause sits between
* the analyzer-inserted bucket-non-null filter and the table scan, {@link
* org.opensearch.sql.calcite.plan.rule.PPLSimplifyDedupRule} must reliably fold the dedup pattern
* into a {@link org.opensearch.sql.calcite.plan.rel.LogicalDedup}.
*
* <p>The previous HEP program registered {@code FilterMergeRule} and {@code PPLSimplifyDedupRule}
* in a single rule collection, which uses {@code MatchOrder.ARBITRARY}. Whichever rule the
* planner happened to fire first determined the outcome: if {@code FilterMergeRule} fired first,
* it merged the user predicate with the bucket-non-null filter, the merged condition no longer
* satisfied {@code mayBeFilterFromBucketNonNull}, and {@code PPLSimplifyDedupRule} could no
* longer match. After the fix the two rules are scheduled as separate instructions in dedup-first
* order so {@code PPLSimplifyDedupRule} always runs to fixpoint against the original
* adjacent-filter shape.
*/
/**
* Regression test for opensearch-project/sql#5482. When a user {@code where} clause sits between
* the analyzer-inserted bucket-non-null filter and the table scan, {@link
* org.opensearch.sql.calcite.plan.rule.PPLSimplifyDedupRule} must reliably fold the dedup pattern
* into a {@link org.opensearch.sql.calcite.plan.rel.LogicalDedup}.
*
* <p>The previous HEP program registered {@code FilterMergeRule} and {@code PPLSimplifyDedupRule}
* in a single rule collection, which uses {@code MatchOrder.ARBITRARY}. Whichever rule the
* planner happened to fire first determined the outcome: if {@code FilterMergeRule} fired first,
* it merged the user predicate with the bucket-non-null filter, the merged condition no longer
* satisfied {@code mayBeFilterFromBucketNonNull}, and {@code PPLSimplifyDedupRule} could no
* longer match. After the fix the two rules are scheduled as separate instructions in dedup-first
* order so {@code PPLSimplifyDedupRule} always runs to fixpoint against the original
* adjacent-filter shape.
*/
@Test
public void testDedupAfterWhereSimplifiesToLogicalDedup() {
String ppl = "source=EMP | where DEPTNO > 10 | dedup 1 DEPTNO | fields DEPTNO";
// Use the analyzer plan WITHOUT the test's auxiliary FilterMerge step; the production HEP
// (CalciteToolsHelper.optimize) is responsible for filter merging.
RelNode analyzerPlan = getAnalyzerRelNode(ppl);
String analyzerExplain = analyzerPlan.explain();
// Sanity: the analyzer plan still has the un-merged adjacent filters (bucket-non-null over
// user where).
assertThat(analyzerExplain, containsString("LogicalFilter(condition=[IS NOT NULL($7)])"));
assertThat(analyzerExplain, containsString("LogicalFilter(condition=[>($7, 10)])"));
assertThat(analyzerExplain, not(containsString("LogicalDedup")));

RelNode optimized = CalciteToolsHelper.optimize(analyzerPlan, null);
String optimizedExplain = optimized.explain();
// PPLSimplifyDedupRule must have produced a LogicalDedup (otherwise the dedup falls through
// to a coordinator-side ROW_NUMBER window and dedup pushdown is disabled).
assertThat(optimizedExplain, containsString("LogicalDedup"));
// The user predicate must survive (it is now a sibling filter under the dedup, not merged
// away or dropped by the simplify rule).
assertThat(optimizedExplain, containsString(">($7, 10)"));
// The ROW_NUMBER window is gone - the dedup has been folded.
assertThat(optimizedExplain, not(containsString("ROW_NUMBER")));
}
}