Skip to content

Commit b1066f8

Browse files
committed
Disable limit-then-filter pushdown with Calcite
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
1 parent bf19c6e commit b1066f8

9 files changed

Lines changed: 102 additions & 2 deletions

File tree

integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,36 @@ public void testLimitPushDownExplain() throws Exception {
109109
+ "| fields ageMinus"));
110110
}
111111

112+
@Test
113+
public void testLimitWithFilterPushdownExplain() throws Exception {
114+
// TODO: Fix limit-then-filter pushdown without Calcite
115+
// Currently both limit-then-filter and filter-then-limit have the
116+
// limit-then-filter effect
117+
String expectedFilterThenLimit =
118+
isCalciteEnabled()
119+
? loadFromFile("expectedOutput/calcite/explain_filter_then_limit_push.json")
120+
: loadFromFile("expectedOutput/ppl/explain_filter_then_limit_push.json");
121+
assertJsonEqualsIgnoreRelId(
122+
expectedFilterThenLimit,
123+
explainQueryToString(
124+
"source=opensearch-sql_test_index_account"
125+
+ "| where age > 30 "
126+
+ "| head 5 "
127+
+ "| fields age"));
128+
129+
String expectedLimitThenFilter =
130+
isCalciteEnabled()
131+
? loadFromFile("expectedOutput/calcite/explain_limit_then_filter_push.json")
132+
: loadFromFile("expectedOutput/ppl/explain_limit_then_filter_push.json");
133+
assertJsonEqualsIgnoreRelId(
134+
expectedLimitThenFilter,
135+
explainQueryToString(
136+
"source=opensearch-sql_test_index_account"
137+
+ "| head 5 "
138+
+ "| where age > 30 "
139+
+ "| fields age"));
140+
}
141+
112142
@Test
113143
public void testFillNullPushDownExplain() throws Exception {
114144
String expected = loadFromFile("expectedOutput/ppl/explain_fillnull_push.json");
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalProject(age=[$8])\n LogicalSort(fetch=[5])\n LogicalFilter(condition=[>($8, 30)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[FILTER->>($8, 30), LIMIT->5, PROJECT->[age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, requestedTotalSize=5, pageSize=null, startFrom=0)])\n"
5+
}
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalProject(age=[$8])\n LogicalFilter(condition=[>($8, 30)])\n LogicalSort(fetch=[5])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableCalc(expr#0=[{inputs}], expr#1=[30], expr#2=[>($t0, $t1)], age=[$t0], $condition=[$t2])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->5, PROJECT->[age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])\n"
5+
}
6+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"root": {
3+
"name": "ProjectOperator",
4+
"description": {
5+
"fields": "[age]"
6+
},
7+
"children": [
8+
{
9+
"name": "OpenSearchIndexScan",
10+
"description": {
11+
"request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, needClean=true, searchDone=false, pitId=null, cursorKeepAlive=null, searchAfter=null, searchResponse=null)"
12+
},
13+
"children": []
14+
}
15+
]
16+
}
17+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"root": {
3+
"name": "ProjectOperator",
4+
"description": {
5+
"fields": "[age]"
6+
},
7+
"children": [
8+
{
9+
"name": "OpenSearchIndexScan",
10+
"description": {
11+
"request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, needClean=true, searchDone=false, pitId=null, cursorKeepAlive=null, searchAfter=null, searchResponse=null)"
12+
},
13+
"children": []
14+
}
15+
]
16+
}
17+
}

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchFilterIndexScanRule.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
*/
55
package org.opensearch.sql.opensearch.planner.physical;
66

7+
import java.util.function.Predicate;
78
import org.apache.calcite.plan.RelOptRuleCall;
89
import org.apache.calcite.plan.RelRule;
910
import org.apache.calcite.rel.core.Filter;
@@ -55,7 +56,14 @@ public interface Config extends RelRule.Config {
5556
.oneInput(
5657
b1 ->
5758
b1.operand(CalciteLogicalIndexScan.class)
58-
.predicate(OpenSearchIndexScanRule::test)
59+
.predicate(
60+
// Filter pushdown is skipped if a limit has already been
61+
// pushed down because the current DSL cannot correctly
62+
// handle filter pushdown after limit. Both "limit after
63+
// filter" and "filter after limit" result in the same
64+
// limit-after-filter DSL.
65+
Predicate.not(OpenSearchIndexScanRule::isLimitPushed)
66+
.and(OpenSearchIndexScanRule::test))
5967
.noInputs()));
6068

6169
@Override

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexScanRule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,8 @@ static boolean test(CalciteLogicalIndexScan scan) {
1818
final RelOptTable table = scan.getTable();
1919
return table.unwrap(OpenSearchIndex.class) != null;
2020
}
21+
22+
static boolean isLimitPushed(CalciteLogicalIndexScan scan) {
23+
return scan.getPushDownContext().isLimitPushed();
24+
}
2125
}

opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,11 @@ public void pushDownSort(List<SortBuilder<?>> sortBuilders) {
197197

198198
/** Pushdown size (limit) and from (offset) to DSL request. */
199199
public void pushDownLimit(Integer limit, Integer offset) {
200-
requestedTotalSize = limit;
200+
// If there are multiple limit, we take the minimum among them
201+
// E.g. for `source=t | head 10 | head 5`, we take 5
202+
// This also ensures that the limit won't exceed the initial default value. (set to
203+
// Settings.Key.QUERY_SIZE_LIMIT in OpenSearchIndex)
204+
requestedTotalSize = Math.min(limit, requestedTotalSize);
201205
startFrom = offset;
202206
sourceBuilder.from(offset).size(limit);
203207
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteIndexScan.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public double estimateRowCount(RelMetadataQuery mq) {
9898
public static class PushDownContext extends ArrayDeque<PushDownAction> {
9999

100100
private boolean isAggregatePushed = false;
101+
private boolean isLimitPushed = false;
101102

102103
@Override
103104
public PushDownContext clone() {
@@ -111,6 +112,9 @@ public boolean add(PushDownAction pushDownAction) {
111112
if (pushDownAction.type == PushDownType.AGGREGATION) {
112113
isAggregatePushed = true;
113114
}
115+
if (pushDownAction.type == PushDownType.LIMIT) {
116+
isLimitPushed = true;
117+
}
114118
return super.add(pushDownAction);
115119
}
116120

@@ -119,6 +123,10 @@ public boolean isAggregatePushed() {
119123
isAggregatePushed = !isEmpty() && super.peekLast().type == PushDownType.AGGREGATION;
120124
return isAggregatePushed;
121125
}
126+
127+
public boolean isLimitPushed() {
128+
return isLimitPushed;
129+
}
122130
}
123131

124132
protected enum PushDownType {

0 commit comments

Comments
 (0)