Skip to content

Commit 8ad01f7

Browse files
committed
Push down sort-then-limit, while only pushdown limit when limit-then-sort
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
1 parent bc07aee commit 8ad01f7

11 files changed

Lines changed: 124 additions & 6 deletions

File tree

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteSortCommandIT.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@
55

66
package org.opensearch.sql.calcite.remote;
77

8+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
9+
import static org.opensearch.sql.util.MatcherUtils.rows;
10+
import static org.opensearch.sql.util.MatcherUtils.verifyOrder;
11+
12+
import java.io.IOException;
13+
import org.json.JSONObject;
14+
import org.junit.Test;
815
import org.opensearch.sql.ppl.SortCommandIT;
916

1017
public class CalciteSortCommandIT extends SortCommandIT {
@@ -14,4 +21,12 @@ public void init() throws Exception {
1421
enableCalcite();
1522
disallowCalciteFallback();
1623
}
24+
25+
// TODO: Move this test to SortCommandIT once head-then-sort is fixed in v2.
26+
@Test
27+
public void testHeadThenSort() throws IOException {
28+
JSONObject result =
29+
executeQuery(String.format("source=%s | head 2 | sort age | fields age", TEST_INDEX_BANK));
30+
verifyOrder(result, rows(32), rows(36));
31+
}
1732
}

integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void testSortWithAggregationExplain() throws IOException {
109109

110110
@Test
111111
public void testMultiSortPushDownExplain() throws IOException {
112-
// TODO: Fix the expected output in expectedOutput/ppl/explain_multi_sort_push.json
112+
// TODO: Fix the expected output in expectedOutput/ppl/explain_multi_sort_push.json (v2)
113113
// balance and gender should take precedence over account_number and firstname
114114
String expected =
115115
isCalciteEnabled()
@@ -128,7 +128,7 @@ public void testMultiSortPushDownExplain() throws IOException {
128128
@Test
129129
public void testSortThenAggregatePushDownExplain() throws IOException {
130130
// TODO: Remove pushed-down sort in DSL in expectedOutput/ppl/explain_sort_then_agg_push.json
131-
// existing collations should be eliminated when pushing down aggregations
131+
// existing collations should be eliminated when pushing down aggregations (v2)
132132
String expected =
133133
isCalciteEnabled()
134134
? loadFromFile("expectedOutput/calcite/explain_sort_then_agg_push.json")
@@ -159,6 +159,46 @@ public void testSortWithRenameExplain() throws IOException {
159159
+ "| fields alias"));
160160
}
161161

162+
/**
163+
* Pushdown SORT and LIMIT Sort should be pushed down since DSL process sort before limit when
164+
* they coexist
165+
*/
166+
@Test
167+
public void testSortThenLimitExplain() throws IOException {
168+
String expected =
169+
isCalciteEnabled()
170+
? loadFromFile("expectedOutput/calcite/explain_sort_then_limit_push.json")
171+
: loadFromFile("expectedOutput/ppl/explain_sort_then_limit_push.json");
172+
assertJsonEqualsIgnoreId(
173+
expected,
174+
explainQueryToString(
175+
"source=opensearch-sql_test_index_account"
176+
+ "| sort age "
177+
+ "| head 5 "
178+
+ "| fields age"));
179+
}
180+
181+
/**
182+
* Push down LIMIT only Sort should NOT be pushed down since DSL process limit before sort when
183+
* they coexist
184+
*/
185+
@Test
186+
public void testLimitThenSortExplain() throws IOException {
187+
// TODO: Fix the expected output in expectedOutput/ppl/explain_limit_then_sort_push.json (v2)
188+
// limit-then-sort should not be pushed down.
189+
String expected =
190+
isCalciteEnabled()
191+
? loadFromFile("expectedOutput/calcite/explain_limit_then_sort_push.json")
192+
: loadFromFile("expectedOutput/ppl/explain_limit_then_sort_push.json");
193+
assertJsonEqualsIgnoreId(
194+
expected,
195+
explainQueryToString(
196+
"source=opensearch-sql_test_index_account"
197+
+ "| head 5 "
198+
+ "| sort age "
199+
+ "| fields age"));
200+
}
201+
162202
@Test
163203
public void testLimitPushDownExplain() throws IOException {
164204
String expected =
@@ -310,6 +350,7 @@ public void testTrendlineWithSortPushDownExplain() throws IOException {
310350
? loadFromFile("expectedOutput/calcite/explain_trendline_sort_push.json")
311351
: loadFromFile("expectedOutput/ppl/explain_trendline_sort_push.json");
312352

353+
// Sort will not be pushed down because there's a head before it.
313354
assertJsonEqualsIgnoreId(
314355
expected,
315356
explainQueryToString(

integ-test/src/test/java/org/opensearch/sql/ppl/SortCommandIT.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,4 +154,11 @@ public void testSortMultipleFields() throws IOException {
154154
String.format("source=%s | sort dog_name, age | fields dog_name, age", TEST_INDEX_DOG));
155155
verifyOrder(result, rows("rex", 2), rows("snoopy", 4));
156156
}
157+
158+
@Test
159+
public void testSortThenHead() throws IOException {
160+
JSONObject result =
161+
executeQuery(String.format("source=%s | sort age | head 2 | fields age", TEST_INDEX_BANK));
162+
verifyOrder(result, rows(28), rows(32));
163+
}
157164
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSort(sort0=[$0], dir0=[ASC])\n LogicalProject(age=[$8])\n LogicalSort(sort0=[$8], dir0=[ASC])\n LogicalSort(fetch=[5])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableSort(sort0=[$0], dir0=[ASC])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->5, PROJECT->[age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])\n"
5+
}
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSort(sort0=[$0], dir0=[ASC])\n LogicalProject(age=[$8])\n LogicalSort(sort0=[$8], dir0=[ASC], fetch=[5])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[SORT->[{\n \"age\" : {\n \"order\" : \"asc\",\n \"missing\" : \"_last\"\n }\n}], LIMIT->5, PROJECT->[age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_last\"}}]}, requestedTotalSize=5, pageSize=null, startFrom=0)])\n"
5+
}
6+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"calcite": {
33
"logical": "LogicalProject(ageTrend=[CASE(>(COUNT() OVER (ROWS 1 PRECEDING), 1), /(SUM($8) OVER (ROWS 1 PRECEDING), CAST(COUNT($8) OVER (ROWS 1 PRECEDING)):DOUBLE NOT NULL), null:NULL)])\n LogicalFilter(condition=[IS NOT NULL($8)])\n LogicalSort(sort0=[$8], dir0=[ASC])\n LogicalSort(fetch=[5])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4-
"physical": "EnumerableCalc(expr#0..3=[{inputs}], expr#4=[1], expr#5=[>($t1, $t4)], expr#6=[CAST($t3):DOUBLE NOT NULL], expr#7=[/($t2, $t6)], expr#8=[null:NULL], expr#9=[CASE($t5, $t7, $t8)], ageTrend=[$t9])\n EnumerableWindow(window#0=[window(rows between $1 PRECEDING and CURRENT ROW aggs [COUNT(), $SUM0($0), COUNT($0)])])\n EnumerableCalc(expr#0=[{inputs}], expr#1=[IS NOT NULL($t0)], age=[$t0], $condition=[$t1])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->5, PROJECT->[age], SORT->[{\n \"age\" : {\n \"order\" : \"asc\",\n \"missing\" : \"_last\"\n }\n}]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_last\"}}]}, requestedTotalSize=5, pageSize=null, startFrom=0)])\n"
4+
"physical": "EnumerableCalc(expr#0..3=[{inputs}], expr#4=[1], expr#5=[>($t1, $t4)], expr#6=[CAST($t3):DOUBLE NOT NULL], expr#7=[/($t2, $t6)], expr#8=[null:NULL], expr#9=[CASE($t5, $t7, $t8)], ageTrend=[$t9])\n EnumerableWindow(window#0=[window(rows between $1 PRECEDING and CURRENT ROW aggs [COUNT(), $SUM0($0), COUNT($0)])])\n EnumerableCalc(expr#0=[{inputs}], expr#1=[IS NOT NULL($t0)], age=[$t0], $condition=[$t1])\n EnumerableSort(sort0=[$0], dir0=[ASC])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->5, PROJECT->[age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])\n"
55
}
66
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"root": {
3+
"name": "ProjectOperator",
4+
"description": {
5+
"fields": "[age]"
6+
},
7+
"children": [
8+
{
9+
"name": "OpenSearchIndexScan",
10+
"description": {
11+
"request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, needClean=true, searchDone=false, pitId=null, cursorKeepAlive=null, searchAfter=null, searchResponse=null)"
12+
},
13+
"children": []
14+
}
15+
]
16+
}
17+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"root": {
3+
"name": "ProjectOperator",
4+
"description": {
5+
"fields": "[age]"
6+
},
7+
"children": [
8+
{
9+
"name": "OpenSearchIndexScan",
10+
"description": {
11+
"request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, needClean=true, searchDone=false, pitId=null, cursorKeepAlive=null, searchAfter=null, searchResponse=null)"
12+
},
13+
"children": []
14+
}
15+
]
16+
}
17+
}

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexScanRule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ static boolean isLimitPushed(CalciteLogicalIndexScan scan) {
3434
* @return True if the LogicalSort is a LIMIT, false otherwise.
3535
*/
3636
static boolean isLogicalSortLimit(LogicalSort sort) {
37-
return sort.fetch != null && sort.getCollation().getFieldCollations().isEmpty();
37+
return sort.fetch != null;
3838
}
3939

4040
static boolean sortByFieldsOnly(LogicalSort sort) {

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchSortIndexScanRule.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.sql.opensearch.planner.physical;
77

8+
import java.util.function.Predicate;
89
import org.apache.calcite.plan.RelOptRuleCall;
910
import org.apache.calcite.plan.RelRule;
1011
import org.apache.calcite.rel.logical.LogicalSort;
@@ -40,7 +41,15 @@ public interface Config extends RelRule.Config {
4041
b0 ->
4142
b0.operand(LogicalSort.class)
4243
.predicate(OpenSearchIndexScanRule::sortByFieldsOnly)
43-
.oneInput(b1 -> b1.operand(CalciteLogicalIndexScan.class).noInputs()));
44+
.oneInput(
45+
b1 ->
46+
b1.operand(CalciteLogicalIndexScan.class)
47+
// Skip the rule if a limit has already been pushed down
48+
// because pushing down a sort after a limit will be treated
49+
// as sort-then-limit by OpenSearch DSL.
50+
.predicate(
51+
Predicate.not(OpenSearchIndexScanRule::isLimitPushed))
52+
.noInputs()));
4453

4554
@Override
4655
default OpenSearchSortIndexScanRule toRule() {

0 commit comments

Comments
 (0)