Skip to content

Commit 75cfe07

Browse files
committed
Support Sort pushdown
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
1 parent e51fcd9 commit 75cfe07

4 files changed

Lines changed: 118 additions & 2 deletions

File tree

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexRules.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@ public class OpenSearchIndexRules {
1616
OpenSearchFilterIndexScanRule.Config.DEFAULT.toRule();
1717
private static final OpenSearchAggregateIndexScanRule AGGREGATE_INDEX_SCAN =
1818
OpenSearchAggregateIndexScanRule.Config.DEFAULT.toRule();
19+
private static final OpenSearchSortIndexScanRule SORT_INDEX_SCAN =
20+
OpenSearchSortIndexScanRule.Config.DEFAULT.toRule();
1921

2022
public static final List<RelOptRule> OPEN_SEARCH_INDEX_SCAN_RULES =
21-
ImmutableList.of(PROJECT_INDEX_SCAN, FILTER_INDEX_SCAN, AGGREGATE_INDEX_SCAN);
23+
ImmutableList.of(
24+
PROJECT_INDEX_SCAN, FILTER_INDEX_SCAN, AGGREGATE_INDEX_SCAN, SORT_INDEX_SCAN);
2225

2326
// prevent instantiation
2427
private OpenSearchIndexRules() {}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.opensearch.planner.physical;
7+
8+
import org.apache.calcite.plan.RelOptRuleCall;
9+
import org.apache.calcite.plan.RelRule;
10+
import org.apache.calcite.rel.logical.LogicalSort;
11+
import org.immutables.value.Value;
12+
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;
13+
14+
@Value.Enclosing
15+
public class OpenSearchSortIndexScanRule extends RelRule<OpenSearchSortIndexScanRule.Config> {
16+
17+
protected OpenSearchSortIndexScanRule(Config config) {
18+
super(config);
19+
}
20+
21+
@Override
22+
public void onMatch(RelOptRuleCall call) {
23+
final LogicalSort sort = call.rel(0);
24+
final CalciteLogicalIndexScan scan = call.rel(1);
25+
26+
if (!sort.getCollation().getFieldCollations().isEmpty() && sort.fetch == null) {
27+
var collations = sort.collation.getFieldCollations();
28+
CalciteLogicalIndexScan newScan = scan.pushDownSort(collations);
29+
if (newScan != null) {
30+
call.transformTo(newScan);
31+
}
32+
}
33+
}
34+
35+
/** Rule configuration. */
36+
@Value.Immutable
37+
public interface Config extends RelRule.Config {
38+
OpenSearchSortIndexScanRule.Config DEFAULT =
39+
ImmutableOpenSearchSortIndexScanRule.Config.builder()
40+
.build()
41+
.withOperandSupplier(
42+
b0 ->
43+
b0.operand(LogicalSort.class)
44+
.oneInput(
45+
b1 ->
46+
b1.operand(CalciteLogicalIndexScan.class)
47+
.predicate(OpenSearchIndexScanRule::test)
48+
.noInputs()));
49+
50+
@Override
51+
default OpenSearchSortIndexScanRule toRule() {
52+
return new OpenSearchSortIndexScanRule(this);
53+
}
54+
}
55+
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteIndexScan.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public RelWriter explainTerms(RelWriter pw) {
6565
// TODO: should we consider equivalent among PushDownContexts with different push down sequence?
6666
public static class PushDownContext extends ArrayDeque<PushDownAction> {
6767
private boolean isAggregatePushed = false;
68+
private boolean isSortPushed = false;
6869

6970
@Override
7071
public PushDownContext clone() {
@@ -75,9 +76,13 @@ public PushDownContext clone() {
7576
public boolean add(PushDownAction pushDownAction) {
7677
// Defense check. It should never do push down to this context after aggregate push-down.
7778
assert !isAggregatePushed : "Aggregate has already been pushed!";
79+
assert !isSortPushed : "Sort has already been pushed!";
7880
if (pushDownAction.type == PushDownType.AGGREGATION) {
7981
isAggregatePushed = true;
8082
}
83+
if (pushDownAction.type == PushDownType.SORT) {
84+
isSortPushed = true;
85+
}
8186
return super.add(pushDownAction);
8287
}
8388

@@ -86,13 +91,19 @@ public boolean isAggregatePushed() {
8691
isAggregatePushed = !isEmpty() && super.peekLast().type == PushDownType.AGGREGATION;
8792
return isAggregatePushed;
8893
}
94+
95+
public boolean isSortPushed() {
96+
if (isSortPushed) return true;
97+
isSortPushed = !isEmpty() && super.peekLast().type == PushDownType.SORT;
98+
return isSortPushed;
99+
}
89100
}
90101

91102
protected enum PushDownType {
92103
FILTER,
93104
PROJECT,
94105
AGGREGATION,
95-
// SORT,
106+
SORT,
96107
// LIMIT,
97108
// HIGHLIGHT,
98109
// NESTED

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.sql.opensearch.storage.scan;
77

88
import com.google.common.collect.ImmutableList;
9+
import java.util.ArrayList;
910
import java.util.List;
1011
import java.util.Map;
1112
import java.util.stream.Collectors;
@@ -16,6 +17,7 @@
1617
import org.apache.calcite.plan.RelOptRule;
1718
import org.apache.calcite.plan.RelOptTable;
1819
import org.apache.calcite.plan.RelTraitSet;
20+
import org.apache.calcite.rel.RelFieldCollation;
1921
import org.apache.calcite.rel.core.Aggregate;
2022
import org.apache.calcite.rel.core.Filter;
2123
import org.apache.calcite.rel.hint.RelHint;
@@ -27,6 +29,10 @@
2729
import org.apache.logging.log4j.Logger;
2830
import org.opensearch.index.query.QueryBuilder;
2931
import org.opensearch.search.aggregations.AggregationBuilder;
32+
import org.opensearch.search.sort.ScoreSortBuilder;
33+
import org.opensearch.search.sort.SortBuilder;
34+
import org.opensearch.search.sort.SortBuilders;
35+
import org.opensearch.search.sort.SortOrder;
3036
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
3137
import org.opensearch.sql.common.setting.Settings;
3238
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
@@ -163,4 +169,45 @@ public CalciteLogicalIndexScan pushDownAggregate(Aggregate aggregate) {
163169
}
164170
return null;
165171
}
172+
173+
public CalciteLogicalIndexScan pushDownSort(List<RelFieldCollation> collations) {
174+
if (getPushDownContext().isSortPushed()) {
175+
return null;
176+
}
177+
try {
178+
CalciteLogicalIndexScan newScan = this.copyWithNewSchema(this.getRowType());
179+
List<SortBuilder<?>> builders = new ArrayList<>();
180+
for (RelFieldCollation collation : collations) {
181+
int index = collation.getFieldIndex();
182+
String fieldName = this.getRowType().getFieldNames().get(index);
183+
RelFieldCollation.Direction direction = collation.getDirection();
184+
// Default sort order is ASCENDING
185+
SortOrder order =
186+
RelFieldCollation.Direction.DESCENDING.equals(direction)
187+
? SortOrder.DESC
188+
: SortOrder.ASC;
189+
// TODO: support script sort and distance sort
190+
SortBuilder<?> sortBuilder;
191+
if (ScoreSortBuilder.NAME.equals(fieldName)) {
192+
sortBuilder = SortBuilders.scoreSort();
193+
} else {
194+
sortBuilder = SortBuilders.fieldSort(fieldName);
195+
}
196+
builders.add(sortBuilder.order(order));
197+
}
198+
newScan.pushDownContext.add(
199+
PushDownAction.of(
200+
PushDownType.SORT,
201+
builders.toString(),
202+
requestBuilder -> requestBuilder.pushDownSort(builders)));
203+
return newScan;
204+
} catch (Exception e) {
205+
if (LOG.isDebugEnabled()) {
206+
LOG.debug("Cannot pushdown the sort {}", collations, e);
207+
} else {
208+
LOG.warn("Cannot pushdown the sort {}, ", collations);
209+
}
210+
}
211+
return null;
212+
}
166213
}

0 commit comments

Comments
 (0)