Skip to content

Commit 7725e0b

Browse files
RyanL1997claude
andcommitted
Push user where filter into scan when blocking dedup pushdown
PPLSimplifyDedupRule correctly produces Dedup -> Filter(user where) -> Scan when a `where` precedes `dedup`. The Filter between Dedup and Scan blocks DedupPushdownRule's strict Dedup -> Project -> Scan operand chain, so Volcano falls back to PPLDedupConvertRule and the plan ends up with an in-memory ROW_NUMBER window instead of the pushed-down composite + top_hits aggregation. Add a WITH_FILTER operand variant to DedupPushdownRule that matches Dedup -> Filter -> Scan, pushes the filter into the scan, then runs the standard apply() on the resulting Dedup -> Project -> Scan shape. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Jialiang Liang <jiallian@amazon.com>
1 parent ae86ece commit 7725e0b

4 files changed

Lines changed: 86 additions & 9 deletions

File tree

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2154,10 +2154,10 @@ public void testTransposeExplain() throws IOException {
21542154
}
21552155

21562156
/**
2157-
* Regression test for #5482: a user {@code where} clause adjacent to dedup's bucket-non-null
2158-
* filter must still let {@code PPLSimplifyDedupRule} fire even after {@code FilterMergeRule} has
2159-
* merged the two filters into a single AND. Without the fix, the plan keeps the in-memory {@code
2160-
* ROW_NUMBER} window over a row-fetching scan and dedup pushdown is silently disabled.
2157+
* Regression test for #5482: with a user {@code where} clause preceding {@code dedup}, the
2158+
* physical plan must push both the filter and the dedup-as-aggregation into the OpenSearch
2159+
* scan. Before the fix, the simplify rule didn't match the merged-AND bucket-non-null shape and
2160+
* the plan kept an in-memory {@code ROW_NUMBER} window above a row-fetching scan.
21612161
*/
21622162
@Test
21632163
public void testDedupAfterWherePushDown() throws IOException {
@@ -2166,12 +2166,13 @@ public void testDedupAfterWherePushDown() throws IOException {
21662166
explainQueryToString(
21672167
"source=opensearch-sql_test_index_account | where age > 25 | dedup gender");
21682168
assertTrue(
2169-
"Expected LogicalDedup in the plan (simplify rule should fire after FilterMergeRule):\n"
2170-
+ result,
2171-
result.contains("LogicalDedup"));
2169+
"Expected user where filter pushed down to the scan:\n" + result,
2170+
result.contains("FILTER->>($8, 25)"));
2171+
assertTrue(
2172+
"Expected dedup pushed down as AGGREGATION (composite + top_hits):\n" + result,
2173+
result.contains("AGGREGATION->"));
21722174
assertFalse(
2173-
"Unexpected EnumerableWindow in the plan — dedup fell back to the in-memory ROW_NUMBER"
2174-
+ " form, which means PPLSimplifyDedupRule did not fire:\n"
2175+
"Unexpected EnumerableWindow — dedup fell back to the in-memory ROW_NUMBER form:\n"
21752176
+ result,
21762177
result.contains("EnumerableWindow"));
21772178
}

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
import java.util.stream.IntStream;
1414
import javax.annotation.Nullable;
1515
import org.apache.calcite.plan.RelOptRuleCall;
16+
import org.apache.calcite.rel.AbstractRelNode;
1617
import org.apache.calcite.rel.RelCollation;
1718
import org.apache.calcite.rel.RelCollations;
1819
import org.apache.calcite.rel.RelFieldCollation;
1920
import org.apache.calcite.rel.logical.LogicalAggregate;
21+
import org.apache.calcite.rel.logical.LogicalFilter;
2022
import org.apache.calcite.rel.logical.LogicalProject;
2123
import org.apache.calcite.rel.rules.SubstitutionRule;
2224
import org.apache.calcite.rex.RexInputRef;
@@ -47,6 +49,24 @@ protected DedupPushdownRule(Config config) {
4749
@Override
4850
protected void onMatchImpl(RelOptRuleCall call) {
4951
final LogicalDedup logicalDedup = call.rel(0);
52+
if (call.rels[1] instanceof LogicalFilter filter) {
53+
// Push the filter into the scan, then synthesize an identity project so the standard
54+
// apply() can run on the resulting Dedup → Project → Scan shape.
55+
final CalciteLogicalIndexScan scan = call.rel(2);
56+
AbstractRelNode scanWithFilter = scan.pushDownFilter(filter);
57+
if (scanWithFilter == null) {
58+
return;
59+
}
60+
CalciteLogicalIndexScan newScan = (CalciteLogicalIndexScan) scanWithFilter;
61+
RelBuilder relBuilder = call.builder();
62+
relBuilder.push(newScan);
63+
// force=true so the identity project is materialized (apply() requires a LogicalProject)
64+
relBuilder.project(
65+
relBuilder.fields(), newScan.getRowType().getFieldNames(), /* force= */ true);
66+
LogicalProject identityProject = (LogicalProject) relBuilder.build();
67+
apply(call, logicalDedup, identityProject, newScan);
68+
return;
69+
}
5070
final LogicalProject projectWithExpr = call.rel(1);
5171
final CalciteLogicalIndexScan scan = call.rel(2);
5272
apply(call, logicalDedup, projectWithExpr, scan);
@@ -226,6 +246,27 @@ public interface Config extends OpenSearchRuleConfig {
226246
.predicate(Config::tableScanChecker)
227247
.noInputs())));
228248

249+
// +- LogicalDedup
250+
// +- LogicalFilter (e.g. a `where` predicate preserved below dedup by
251+
// +- CalciteLogicalIndexScan PPLSimplifyDedupRule when an upstream `where` is folded
252+
// into the bucket-non-null filter)
253+
Config WITH_FILTER =
254+
ImmutableDedupPushdownRule.Config.builder()
255+
.build()
256+
.withDescription("Dedup-to-Aggregate-WithFilter")
257+
.withOperandSupplier(
258+
b0 ->
259+
b0.operand(LogicalDedup.class)
260+
.predicate(dedup -> !dedup.getKeepEmpty())
261+
.oneInput(
262+
b1 ->
263+
b1.operand(LogicalFilter.class)
264+
.oneInput(
265+
b2 ->
266+
b2.operand(CalciteLogicalIndexScan.class)
267+
.predicate(Config::tableScanChecker)
268+
.noInputs())));
269+
229270
/**
230271
* Project must be not pushed since the name of expression would lose after project pushed. E.g.
231272
* in query "eval new_a = a + 1 | dedup b", the "new_a" will lose.

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ public class OpenSearchIndexRules {
5050
SortIndexScanRule.Config.DEFAULT.toRule();
5151
private static final DedupPushdownRule DEDUP_PUSH_DOWN =
5252
DedupPushdownRule.Config.DEFAULT.toRule();
53+
private static final DedupPushdownRule DEDUP_PUSH_DOWN_WITH_FILTER =
54+
DedupPushdownRule.Config.WITH_FILTER.toRule();
5355
private static final SortProjectExprTransposeRule SORT_PROJECT_EXPR_TRANSPOSE =
5456
SortProjectExprTransposeRule.Config.DEFAULT.toRule();
5557
private static final ExpandCollationOnProjectExprRule EXPAND_COLLATION_ON_PROJECT_EXPR =
@@ -75,6 +77,7 @@ public class OpenSearchIndexRules {
7577
LIMIT_INDEX_SCAN,
7678
SORT_INDEX_SCAN,
7779
DEDUP_PUSH_DOWN,
80+
DEDUP_PUSH_DOWN_WITH_FILTER,
7881
SORT_PROJECT_EXPR_TRANSPOSE,
7982
SORT_AGGREGATION_METRICS_RULE,
8083
RARE_TOP_PUSH_DOWN,

ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLDedupTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,38 @@ public void testDedupAfterWhereProducesLogicalDedupRegardlessOfRuleOrder() {
449449
optimizedExplain.contains(">($5, 1000)"));
450450
}
451451

452+
/**
453+
* Mirrors the exact shape of {@code CalciteToolsHelper.HEP_PROGRAM} used in production
454+
* ({@code addRuleCollection(List.of(FilterMergeRule, PPLSimplifyDedupRule))}). The two
455+
* sequenced-{@code addRuleInstance} tests above prove the rule fires under either ordering,
456+
* but production runs both rules in a single collection instruction. This test pins that exact
457+
* shape to catch any addRuleCollection-vs-addRuleInstance traversal differences.
458+
*/
459+
@Test
460+
public void testDedupAfterWhereProducesLogicalDedupWithProductionHepProgram() {
461+
String ppl = "source=EMP | where SAL > 1000 | dedup DEPTNO";
462+
RelNode raw = getRelNodeRaw(ppl);
463+
464+
HepProgram program =
465+
new HepProgramBuilder()
466+
.addRuleCollection(
467+
java.util.List.of(
468+
FilterMergeRule.Config.DEFAULT.toRule(),
469+
PPLSimplifyDedupRule.DEDUP_SIMPLIFY_RULE))
470+
.build();
471+
RelNode optimized = runHepPlanner(raw, program);
472+
473+
String optimizedExplain = optimized.explain();
474+
Assert.assertTrue(
475+
"Production HEP_PROGRAM (addRuleCollection) must still produce LogicalDedup:\n"
476+
+ optimizedExplain,
477+
optimizedExplain.contains("LogicalDedup"));
478+
Assert.assertFalse(
479+
"ROW_NUMBER window form should be removed under production HEP_PROGRAM:\n"
480+
+ optimizedExplain,
481+
optimizedExplain.contains("ROW_NUMBER"));
482+
}
483+
452484
private static RelNode runHepPlanner(RelNode root, HepProgram program) {
453485
HepPlanner planner = new HepPlanner(program);
454486
planner.setRoot(root);

0 commit comments

Comments
 (0)