Skip to content

Commit fb380de

Browse files
committed
Support Sort pushdown
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> Copy traits to logical index scan after pushing down sort Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> Allow pushing down multiple times Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
1 parent 270aa0d commit fb380de

8 files changed

Lines changed: 129 additions & 13 deletions

File tree

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"calcite": {
33
"logical": "LogicalSort(sort0=[$0], dir0=[ASC])\n LogicalProject(age=[$8])\n LogicalFilter(condition=[>($8, 30)])\n LogicalSort(sort0=[$8], dir0=[ASC])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4-
"physical": "EnumerableCalc(expr#0=[{inputs}], expr#1=[30], expr#2=[>($t0, $t1)], age=[$t0], $condition=[$t2])\n EnumerableSort(sort0=[$0], dir0=[ASC])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
4+
"physical":"EnumerableCalc(expr#0..16=[{inputs}], age=[$t8])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[SORT->[{\n \"age\" : {\n \"order\" : \"asc\"\n }\n}], FILTER->>($8, 30)], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"age\":{\"order\":\"asc\"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
55
}
6-
}
6+
}

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/EnumerableIndexScanRule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public RelNode convert(RelNode rel) {
4343
final CalciteLogicalIndexScan scan = (CalciteLogicalIndexScan) rel;
4444
return new CalciteEnumerableIndexScan(
4545
scan.getCluster(),
46+
// Retains RelDistribution and RelCollation but replaces Convention
47+
scan.getTraitSet().plus(EnumerableConvention.INSTANCE),
4648
scan.getHints(),
4749
scan.getTable(),
4850
scan.getOsIndex(),

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexRules.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,16 @@ public class OpenSearchIndexRules {
1818
OpenSearchAggregateIndexScanRule.Config.DEFAULT.toRule();
1919
private static final OpenSearchLimitIndexScanRule LIMIT_INDEX_SCAN =
2020
OpenSearchLimitIndexScanRule.Config.DEFAULT.toRule();
21+
private static final OpenSearchSortIndexScanRule SORT_INDEX_SCAN =
22+
OpenSearchSortIndexScanRule.Config.DEFAULT.toRule();
2123

2224
public static final List<RelOptRule> OPEN_SEARCH_INDEX_SCAN_RULES =
2325
ImmutableList.of(
24-
PROJECT_INDEX_SCAN, FILTER_INDEX_SCAN, AGGREGATE_INDEX_SCAN, LIMIT_INDEX_SCAN);
26+
PROJECT_INDEX_SCAN,
27+
FILTER_INDEX_SCAN,
28+
AGGREGATE_INDEX_SCAN,
29+
LIMIT_INDEX_SCAN,
30+
SORT_INDEX_SCAN);
2531

2632
// prevent instantiation
2733
private OpenSearchIndexRules() {}

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexScanRule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,8 @@ static boolean isLimitPushed(CalciteLogicalIndexScan scan) {
3636
static boolean isLogicalSortLimit(LogicalSort sort) {
3737
return sort.fetch != null && sort.getCollation().getFieldCollations().isEmpty();
3838
}
39+
40+
static boolean sortByFieldsOnly(LogicalSort sort) {
41+
return !sort.getCollation().getFieldCollations().isEmpty() && sort.fetch == null;
42+
}
3943
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.opensearch.planner.physical;
7+
8+
import org.apache.calcite.plan.RelOptRuleCall;
9+
import org.apache.calcite.plan.RelRule;
10+
import org.apache.calcite.rel.logical.LogicalSort;
11+
import org.immutables.value.Value;
12+
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;
13+
14+
@Value.Enclosing
15+
public class OpenSearchSortIndexScanRule extends RelRule<OpenSearchSortIndexScanRule.Config> {
16+
17+
protected OpenSearchSortIndexScanRule(Config config) {
18+
super(config);
19+
}
20+
21+
@Override
22+
public void onMatch(RelOptRuleCall call) {
23+
final LogicalSort sort = call.rel(0);
24+
final CalciteLogicalIndexScan scan = call.rel(1);
25+
26+
var collations = sort.collation.getFieldCollations();
27+
CalciteLogicalIndexScan newScan = scan.pushDownSort(collations);
28+
if (newScan != null) {
29+
call.transformTo(newScan);
30+
}
31+
}
32+
33+
/** Rule configuration. */
34+
@Value.Immutable
35+
public interface Config extends RelRule.Config {
36+
OpenSearchSortIndexScanRule.Config DEFAULT =
37+
ImmutableOpenSearchSortIndexScanRule.Config.builder()
38+
.build()
39+
.withOperandSupplier(
40+
b0 ->
41+
b0.operand(LogicalSort.class)
42+
.predicate(OpenSearchIndexScanRule::sortByFieldsOnly)
43+
.oneInput(
44+
b1 ->
45+
b1.operand(CalciteLogicalIndexScan.class)
46+
.predicate(OpenSearchIndexScanRule::test)
47+
.noInputs()));
48+
49+
@Override
50+
default OpenSearchSortIndexScanRule toRule() {
51+
return new OpenSearchSortIndexScanRule(this);
52+
}
53+
}
54+
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ protected enum PushDownType {
133133
FILTER,
134134
PROJECT,
135135
AGGREGATION,
136-
// SORT,
136+
SORT,
137137
LIMIT,
138138
// HIGHLIGHT,
139139
// NESTED

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package org.opensearch.sql.opensearch.storage.scan;
77

88
import java.util.List;
9-
import org.apache.calcite.adapter.enumerable.EnumerableConvention;
109
import org.apache.calcite.adapter.enumerable.EnumerableRel;
1110
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
1211
import org.apache.calcite.adapter.enumerable.PhysType;
@@ -21,6 +20,7 @@
2120
import org.apache.calcite.plan.RelOptPlanner;
2221
import org.apache.calcite.plan.RelOptRule;
2322
import org.apache.calcite.plan.RelOptTable;
23+
import org.apache.calcite.plan.RelTraitSet;
2424
import org.apache.calcite.rel.hint.RelHint;
2525
import org.apache.calcite.rel.rules.CoreRules;
2626
import org.apache.calcite.rel.type.RelDataType;
@@ -44,19 +44,13 @@ public class CalciteEnumerableIndexScan extends AbstractCalciteIndexScan impleme
4444
*/
4545
public CalciteEnumerableIndexScan(
4646
RelOptCluster cluster,
47+
RelTraitSet traitSet,
4748
List<RelHint> hints,
4849
RelOptTable table,
4950
OpenSearchIndex osIndex,
5051
RelDataType schema,
5152
PushDownContext pushDownContext) {
52-
super(
53-
cluster,
54-
cluster.traitSetOf(EnumerableConvention.INSTANCE),
55-
hints,
56-
table,
57-
osIndex,
58-
schema,
59-
pushDownContext);
53+
super(cluster, traitSet, hints, table, osIndex, schema, pushDownContext);
6054
}
6155

6256
@Override

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.sql.opensearch.storage.scan;
77

88
import com.google.common.collect.ImmutableList;
9+
import java.util.ArrayList;
910
import java.util.List;
1011
import java.util.Map;
1112
import java.util.stream.Collectors;
@@ -16,6 +17,8 @@
1617
import org.apache.calcite.plan.RelOptRule;
1718
import org.apache.calcite.plan.RelOptTable;
1819
import org.apache.calcite.plan.RelTraitSet;
20+
import org.apache.calcite.rel.RelCollations;
21+
import org.apache.calcite.rel.RelFieldCollation;
1922
import org.apache.calcite.rel.core.Aggregate;
2023
import org.apache.calcite.rel.core.Filter;
2124
import org.apache.calcite.rel.hint.RelHint;
@@ -27,6 +30,10 @@
2730
import org.apache.logging.log4j.Logger;
2831
import org.opensearch.index.query.QueryBuilder;
2932
import org.opensearch.search.aggregations.AggregationBuilder;
33+
import org.opensearch.search.sort.ScoreSortBuilder;
34+
import org.opensearch.search.sort.SortBuilder;
35+
import org.opensearch.search.sort.SortBuilders;
36+
import org.opensearch.search.sort.SortOrder;
3037
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
3138
import org.opensearch.sql.common.setting.Settings;
3239
import org.opensearch.sql.data.type.ExprType;
@@ -189,4 +196,53 @@ public CalciteLogicalIndexScan pushDownLimit(Integer limit, Integer offset) {
189196
}
190197
return null;
191198
}
199+
200+
public CalciteLogicalIndexScan pushDownSort(List<RelFieldCollation> collations) {
201+
try {
202+
// Propagate the sort to the new scan
203+
RelTraitSet newTraitSet = getTraitSet().plus(RelCollations.of(collations));
204+
CalciteLogicalIndexScan newScan =
205+
new CalciteLogicalIndexScan(
206+
getCluster(),
207+
newTraitSet,
208+
hints,
209+
table,
210+
osIndex,
211+
getRowType(),
212+
pushDownContext.clone());
213+
214+
List<SortBuilder<?>> builders = new ArrayList<>();
215+
for (RelFieldCollation collation : collations) {
216+
int index = collation.getFieldIndex();
217+
String fieldName = this.getRowType().getFieldNames().get(index);
218+
RelFieldCollation.Direction direction = collation.getDirection();
219+
// Default sort order is ASCENDING
220+
SortOrder order =
221+
RelFieldCollation.Direction.DESCENDING.equals(direction)
222+
? SortOrder.DESC
223+
: SortOrder.ASC;
224+
// TODO: support script sort and distance sort
225+
SortBuilder<?> sortBuilder;
226+
if (ScoreSortBuilder.NAME.equals(fieldName)) {
227+
sortBuilder = SortBuilders.scoreSort();
228+
} else {
229+
sortBuilder = SortBuilders.fieldSort(fieldName);
230+
}
231+
builders.add(sortBuilder.order(order));
232+
}
233+
newScan.pushDownContext.add(
234+
PushDownAction.of(
235+
PushDownType.SORT,
236+
builders.toString(),
237+
requestBuilder -> requestBuilder.pushDownSort(builders)));
238+
return newScan;
239+
} catch (Exception e) {
240+
if (LOG.isDebugEnabled()) {
241+
LOG.debug("Cannot pushdown the sort {}", collations, e);
242+
} else {
243+
LOG.warn("Cannot pushdown the sort {}, ", collations);
244+
}
245+
}
246+
return null;
247+
}
192248
}

0 commit comments

Comments
 (0)