Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
package org.opensearch.sql.calcite.plan.rule;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
Expand All @@ -22,6 +25,7 @@
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.rex.RexWindow;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeName;
Expand Down Expand Up @@ -115,8 +119,41 @@ protected void apply(

RelCollation inputCollation = extractCollationFromWindow(windows.get(0));

// Split the bucket-non-null filter into two parts:
// 1) IS_NOT_NULL conjuncts on a partition key — these are the bucket-non-null guards PPL
// emits as part of the dedup pattern; LogicalDedup absorbs their semantics.
// 2) Everything else — for example, a user `where` predicate that FilterMergeRule may
// have folded into the same conjunction, or a user IS_NOT_NULL filter on a non-partition
// column. These must be preserved as a separate filter below the new LogicalDedup so
// user-visible behavior is unchanged regardless of whether FilterMergeRule fired.
Set<Integer> partitionKeyIndices = new HashSet<>();
for (RexNode key : dedupColumns) {
if (key instanceof RexInputRef ref) {
partitionKeyIndices.add(ref.getIndex());
}
}
List<RexNode> bucketNonNullConjuncts = new ArrayList<>();
List<RexNode> remainingConjuncts = new ArrayList<>();
for (RexNode conjunct : RelOptUtil.conjunctions(bucketNonNullFilter.getCondition())) {
if (isNotNullOnPartitionKey(conjunct, partitionKeyIndices)) {
bucketNonNullConjuncts.add(conjunct);
} else {
remainingConjuncts.add(conjunct);
}
}
// Defensive: if no IS_NOT_NULL conjunct on a partition key is present, this filter is not
// actually a bucket-non-null filter — bail out without transforming. The loose operand
// predicate may have matched on an unrelated AND that happens to contain an IS_NOT_NULL on
// some other ref.
if (bucketNonNullConjuncts.isEmpty()) {
return;
}

RelBuilder relBuilder = call.builder();
relBuilder.push(bucketNonNullFilter.getInput());
if (!remainingConjuncts.isEmpty()) {
relBuilder.filter(RexUtil.composeConjunction(relBuilder.getRexBuilder(), remainingConjuncts));
}
List<Pair<RexNode, String>> targetProjections =
projectWithWindow.getNamedProjects().stream()
.filter(p -> !p.getKey().isA(SqlKind.ROW_NUMBER))
Expand All @@ -134,6 +171,14 @@ protected void apply(
call.transformTo(relBuilder.build());
}

private static boolean isNotNullOnPartitionKey(RexNode rex, Set<Integer> partitionKeyIndices) {
if (!PlanUtils.isNotNullOnRef(rex)) {
return false;
}
RexInputRef ref = (RexInputRef) ((RexCall) rex).getOperands().get(0);
return partitionKeyIndices.contains(ref.getIndex());
}

private static @Nullable RelCollation extractCollationFromWindow(RexWindow window) {
if (window.orderKeys.isEmpty()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,15 +785,21 @@ static Mapping mapping(List<RexNode> rexNodes, RelDataType schema) {
return Mappings.target(getSelectColumns(rexNodes), schema.getFieldCount());
}

/**
* Accepts the un-merged {@code IS NOT NULL($ref)} shape and the merged-{@code AND} shape that
* {@link org.apache.calcite.rel.rules.FilterMergeRule} produces when a user {@code where}
* precedes the dedup. The concrete partition-key match — and the split-out of any user predicate
* folded into the AND — happens in {@link PPLSimplifyDedupRule#apply}.
*/
Comment thread
RyanL1997 marked this conversation as resolved.
static boolean mayBeFilterFromBucketNonNull(LogicalFilter filter) {
RexNode condition = filter.getCondition();
return isNotNullOnRef(condition)
|| (condition instanceof RexCall rexCall
&& rexCall.getOperator().equals(SqlStdOperatorTable.AND)
&& rexCall.getOperands().stream().allMatch(PlanUtils::isNotNullOnRef));
&& rexCall.getOperands().stream().anyMatch(PlanUtils::isNotNullOnRef));
}

private static boolean isNotNullOnRef(RexNode rex) {
public static boolean isNotNullOnRef(RexNode rex) {
return rex instanceof RexCall rexCall
&& rexCall.isA(SqlKind.IS_NOT_NULL)
&& rexCall.getOperands().get(0) instanceof RexInputRef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2153,6 +2153,29 @@ public void testTransposeExplain() throws IOException {
+ "| transpose 4 column_name='column_names'"));
}

/**
* With a user {@code where} clause preceding {@code dedup}, the physical plan must push both the
* filter and the dedup-as-aggregation into the OpenSearch scan, not fall back to an in-memory
* {@code ROW_NUMBER} window above a row-fetching scan.
*/
@Test
public void testDedupAfterWherePushDown() throws IOException {
enabledOnlyWhenPushdownIsEnabled();
String result =
explainQueryToString(
"source=opensearch-sql_test_index_account | where age > 25 | dedup gender");
assertTrue(
"Expected user where filter pushed down to the scan:\n" + result,
result.contains("FILTER->>($8, 25)"));
assertTrue(
"Expected dedup pushed down as AGGREGATION (composite + top_hits):\n" + result,
result.contains("AGGREGATION->"));
assertFalse(
"Unexpected EnumerableWindow — dedup fell back to the in-memory ROW_NUMBER form:\n"
+ result,
result.contains("EnumerableWindow"));
}

public void testComplexDedup() throws IOException {
enabledOnlyWhenPushdownIsEnabled();
String expected = loadExpectedPlan("explain_dedup_complex1.yaml");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.rules.SubstitutionRule;
import org.apache.calcite.rex.RexInputRef;
Expand Down Expand Up @@ -47,6 +48,24 @@ protected DedupPushdownRule(Config config) {
@Override
protected void onMatchImpl(RelOptRuleCall call) {
final LogicalDedup logicalDedup = call.rel(0);
if (call.rels[1] instanceof LogicalFilter filter) {
// Push the filter into the scan, then synthesize an identity project so the standard
// apply() can run on the resulting Dedup → Project → Scan shape. If the filter is only
// partially pushable, pushDownFilter returns a Filter wrapping the residual condition over
// the new scan; we can't strip a residual filter without breaking semantics, so bail.
final CalciteLogicalIndexScan scan = call.rel(2);
if (!(scan.pushDownFilter(filter) instanceof CalciteLogicalIndexScan newScan)) {
return;
}
RelBuilder relBuilder = call.builder();
relBuilder.push(newScan);
// force=true so the identity project is materialized (apply() requires a LogicalProject)
relBuilder.project(
relBuilder.fields(), newScan.getRowType().getFieldNames(), /* force= */ true);
LogicalProject identityProject = (LogicalProject) relBuilder.build();
apply(call, logicalDedup, identityProject, newScan);
return;
}
final LogicalProject projectWithExpr = call.rel(1);
final CalciteLogicalIndexScan scan = call.rel(2);
apply(call, logicalDedup, projectWithExpr, scan);
Expand Down Expand Up @@ -226,6 +245,27 @@ public interface Config extends OpenSearchRuleConfig {
.predicate(Config::tableScanChecker)
.noInputs())));

// +- LogicalDedup
// +- LogicalFilter (e.g. a `where` predicate preserved below dedup by
// +- CalciteLogicalIndexScan PPLSimplifyDedupRule when an upstream `where` is folded
// into the bucket-non-null filter)
Config WITH_FILTER =
ImmutableDedupPushdownRule.Config.builder()
.build()
.withDescription("Dedup-to-Aggregate-WithFilter")
.withOperandSupplier(
b0 ->
b0.operand(LogicalDedup.class)
.predicate(dedup -> !dedup.getKeepEmpty())
.oneInput(
b1 ->
b1.operand(LogicalFilter.class)
.oneInput(
b2 ->
b2.operand(CalciteLogicalIndexScan.class)
.predicate(Config::tableScanChecker)
.noInputs())));

/**
* Project must be not pushed since the name of expression would lose after project pushed. E.g.
* in query "eval new_a = a + 1 | dedup b", the "new_a" will lose.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class OpenSearchIndexRules {
SortIndexScanRule.Config.DEFAULT.toRule();
private static final DedupPushdownRule DEDUP_PUSH_DOWN =
DedupPushdownRule.Config.DEFAULT.toRule();
private static final DedupPushdownRule DEDUP_PUSH_DOWN_WITH_FILTER =
DedupPushdownRule.Config.WITH_FILTER.toRule();
private static final SortProjectExprTransposeRule SORT_PROJECT_EXPR_TRANSPOSE =
SortProjectExprTransposeRule.Config.DEFAULT.toRule();
private static final ExpandCollationOnProjectExprRule EXPAND_COLLATION_ON_PROJECT_EXPR =
Expand All @@ -75,6 +77,7 @@ public class OpenSearchIndexRules {
LIMIT_INDEX_SCAN,
SORT_INDEX_SCAN,
DEDUP_PUSH_DOWN,
DEDUP_PUSH_DOWN_WITH_FILTER,
SORT_PROJECT_EXPR_TRANSPOSE,
SORT_AGGREGATION_METRICS_RULE,
RARE_TOP_PUSH_DOWN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,20 @@ public RelNode getRelNode(String ppl) {
return root;
}

/**
* Get the root RelNode of the given PPL query without merging adjacent filters. Useful for
* regression tests that need to exercise rule ordering against the un-merged shape that PPL
* actually emits to the production HEP optimizer.
*/
public RelNode getRelNodeRaw(String ppl) {
CalcitePlanContext context = createBuilderContext();
Query query = (Query) plan(pplParser, ppl);
planTransformer.analyze(query.getPlan(), context);
RelNode root = context.relBuilder.build();
System.out.println(root.explain());
return root;
}

private RelNode mergeAdjacentFilters(RelNode relNode) {
HepProgram program =
new HepProgramBuilder().addRuleInstance(FilterMergeRule.Config.DEFAULT.toRule()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@

package org.opensearch.sql.ppl.calcite;

import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgram;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.rules.FilterMergeRule;
import org.apache.calcite.test.CalciteAssert;
import org.junit.Assert;
import org.junit.Test;
import org.opensearch.sql.calcite.plan.rule.PPLSimplifyDedupRule;

public class CalcitePPLDedupTest extends CalcitePPLAbstractTest {

Expand Down Expand Up @@ -353,4 +359,129 @@ public void testSortFieldProjectedAwayBeforeDedup() {
+ " LogicalTableScan(table=[[scott, EMP]])\n";
verifyLogical(root, expectedLogical);
}

/**
* When a user {@code where} precedes {@code dedup}, the user filter sits adjacent to the
* bucket-non-null filter that PPL emits for the dedup pattern. {@link PPLSimplifyDedupRule} must
* fold the dedup pattern into a {@link org.opensearch.sql.calcite.plan.rel.LogicalDedup} so that
* {@code DedupPushdownRule} can match it; otherwise dedup falls through to the in-memory {@code
* ROW_NUMBER} window form, defeating dedup pushdown to the shard. The user predicate must be
* preserved as a separate filter below the new {@code LogicalDedup}.
*/
@Test
public void testDedupAfterWhereProducesLogicalDedup() {
String ppl = "source=EMP | where SAL > 1000 | dedup DEPTNO";
RelNode raw = getRelNodeRaw(ppl);

// Sanity: the un-merged plan has both the bucket-non-null filter and the user where filter
// adjacent to each other above the scan — exactly the shape that triggered the original bug.
String rawExplain = raw.explain();
Assert.assertTrue(
"Raw plan should contain the bucket-non-null filter:\n" + rawExplain,
rawExplain.contains("IS NOT NULL"));
Assert.assertTrue(
"Raw plan should contain the user where filter:\n" + rawExplain,
rawExplain.contains(">($5, 1000)"));
Assert.assertTrue(
"Raw plan should contain ROW_NUMBER prior to simplification:\n" + rawExplain,
rawExplain.contains("ROW_NUMBER"));

// Apply rules in the order PPLSimplifyDedupRule -> FilterMergeRule.
HepProgram program =
new HepProgramBuilder()
.addRuleInstance(PPLSimplifyDedupRule.DEDUP_SIMPLIFY_RULE)
.addRuleInstance(FilterMergeRule.Config.DEFAULT.toRule())
.build();
RelNode optimized = runHepPlanner(raw, program);

String optimizedExplain = optimized.explain();
Assert.assertTrue(
"Optimized plan should contain LogicalDedup so DedupPushdownRule can match it:\n"
+ optimizedExplain,
optimizedExplain.contains("LogicalDedup"));
Assert.assertFalse(
"Optimized plan should not retain ROW_NUMBER after simplification:\n" + optimizedExplain,
optimizedExplain.contains("ROW_NUMBER"));
Assert.assertTrue(
"User where predicate must be preserved as a filter below LogicalDedup:\n"
+ optimizedExplain,
optimizedExplain.contains(">($5, 1000)"));
}

/**
* Companion to {@link #testDedupAfterWhereProducesLogicalDedup} that pins the user-visible
* contract: with a {@code where} preceding {@code dedup}, a {@code LogicalDedup} must be produced
* regardless of the order in which {@link FilterMergeRule} and {@link PPLSimplifyDedupRule} fire.
*
* <p>The simplify rule's bucket-non-null operand predicate is order-independent — it accepts both
* a pure {@code IS NOT NULL} and an {@code AND} that contains an {@code IS NOT NULL} on a
* partition key — so {@code FilterMergeRule} firing first no longer disables dedup pushdown. This
* guards against re-introducing the original regression by reordering, removing, or adding rules
* to {@code CalciteToolsHelper#HEP_PROGRAM}.
*/
@Test
public void testDedupAfterWhereProducesLogicalDedupRegardlessOfRuleOrder() {
String ppl = "source=EMP | where SAL > 1000 | dedup DEPTNO";
RelNode raw = getRelNodeRaw(ppl);

HepProgram program =
new HepProgramBuilder()
.addRuleInstance(FilterMergeRule.Config.DEFAULT.toRule())
.addRuleInstance(PPLSimplifyDedupRule.DEDUP_SIMPLIFY_RULE)
.build();
RelNode optimized = runHepPlanner(raw, program);

String optimizedExplain = optimized.explain();
Assert.assertTrue(
"Even with FilterMergeRule firing first, PPLSimplifyDedupRule must still produce"
+ " LogicalDedup so DedupPushdownRule can match it:\n"
+ optimizedExplain,
optimizedExplain.contains("LogicalDedup"));
Assert.assertFalse(
"ROW_NUMBER window form should be removed after simplification:\n" + optimizedExplain,
optimizedExplain.contains("ROW_NUMBER"));
Assert.assertTrue(
"User where predicate must be preserved as a filter below LogicalDedup, even when"
+ " FilterMergeRule fired first and folded it into the bucket-non-null filter:\n"
+ optimizedExplain,
optimizedExplain.contains(">($5, 1000)"));
}

/**
* Mirrors the exact shape of {@code CalciteToolsHelper.HEP_PROGRAM} used in production ({@code
* addRuleCollection(List.of(FilterMergeRule, PPLSimplifyDedupRule))}). The two sequenced-{@code
* addRuleInstance} tests above prove the rule fires under either ordering, but production runs
* both rules in a single collection instruction. This test pins that exact shape to catch any
* addRuleCollection-vs-addRuleInstance traversal differences.
*/
@Test
public void testDedupAfterWhereProducesLogicalDedupWithProductionHepProgram() {
String ppl = "source=EMP | where SAL > 1000 | dedup DEPTNO";
RelNode raw = getRelNodeRaw(ppl);

HepProgram program =
new HepProgramBuilder()
.addRuleCollection(
java.util.List.of(
FilterMergeRule.Config.DEFAULT.toRule(),
PPLSimplifyDedupRule.DEDUP_SIMPLIFY_RULE))
.build();
RelNode optimized = runHepPlanner(raw, program);

String optimizedExplain = optimized.explain();
Assert.assertTrue(
"Production HEP_PROGRAM (addRuleCollection) must still produce LogicalDedup:\n"
+ optimizedExplain,
optimizedExplain.contains("LogicalDedup"));
Assert.assertFalse(
"ROW_NUMBER window form should be removed under production HEP_PROGRAM:\n"
+ optimizedExplain,
optimizedExplain.contains("ROW_NUMBER"));
}

private static RelNode runHepPlanner(RelNode root, HepProgram program) {
HepPlanner planner = new HepPlanner(program);
planner.setRoot(root);
return planner.findBestExp();
}
}
Loading