Skip to content

Commit e634020

Browse files
committed
Support Limit pushdown
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
1 parent e51fcd9 commit e634020

6 files changed

Lines changed: 114 additions & 4 deletions

File tree

integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ public void testSortPushDownExplain() throws Exception {
9696

9797
@Test
9898
public void testLimitPushDownExplain() throws Exception {
99-
// TODO fix after https://github.com/opensearch-project/sql/issues/3381
10099
String expected =
101100
isCalciteEnabled()
102101
? loadFromFile("expectedOutput/calcite/explain_limit_push.json")
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"calcite":{
33
"logical":"LogicalProject(ageMinus=[$17])\n LogicalSort(fetch=[5])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], ageMinus=[-($8, 30)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4-
"physical":"EnumerableCalc(expr#0..16=[{inputs}], expr#17=[30], expr#18=[-($t8, $t17)], ageMinus=[$t18])\n EnumerableLimit(fetch=[5])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n"
4+
"physical":"EnumerableCalc(expr#0..16=[{inputs}], expr#17=[30], expr#18=[-($t8, $t17)], ageMinus=[$t18])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->5], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\"}, requestedTotalSize=5, pageSize=null, startFrom=0)])\n"
55
}
66
}

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexRules.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@ public class OpenSearchIndexRules {
1616
OpenSearchFilterIndexScanRule.Config.DEFAULT.toRule();
1717
private static final OpenSearchAggregateIndexScanRule AGGREGATE_INDEX_SCAN =
1818
OpenSearchAggregateIndexScanRule.Config.DEFAULT.toRule();
19+
private static final OpenSearchLimitIndexScanRule LIMIT_INDEX_SCAN =
20+
OpenSearchLimitIndexScanRule.Config.DEFAULT.toRule();
1921

2022
public static final List<RelOptRule> OPEN_SEARCH_INDEX_SCAN_RULES =
21-
ImmutableList.of(PROJECT_INDEX_SCAN, FILTER_INDEX_SCAN, AGGREGATE_INDEX_SCAN);
23+
ImmutableList.of(
24+
PROJECT_INDEX_SCAN, FILTER_INDEX_SCAN, AGGREGATE_INDEX_SCAN, LIMIT_INDEX_SCAN);
2225

2326
// prevent instantiation
2427
private OpenSearchIndexRules() {}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package org.opensearch.sql.opensearch.planner.physical;
2+
3+
import java.util.Objects;
4+
import org.apache.calcite.plan.RelOptRuleCall;
5+
import org.apache.calcite.plan.RelRule;
6+
import org.apache.calcite.rel.logical.LogicalSort;
7+
import org.apache.calcite.rex.RexLiteral;
8+
import org.apache.calcite.rex.RexNode;
9+
import org.immutables.value.Value;
10+
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;
11+
12+
@Value.Enclosing
13+
public class OpenSearchLimitIndexScanRule extends RelRule<OpenSearchLimitIndexScanRule.Config> {
14+
15+
protected OpenSearchLimitIndexScanRule(Config config) {
16+
super(config);
17+
}
18+
19+
@Override
20+
public void onMatch(RelOptRuleCall call) {
21+
final LogicalSort sort = call.rel(0);
22+
final CalciteLogicalIndexScan scan = call.rel(1);
23+
24+
// The LogicalSort is a LIMIT that should be pushed down when its fetch field is not null and
25+
// its collation is empty.
26+
// For example: `sort name | head 5` should not be pushed down because it has a field collation.
27+
if (sort.fetch != null && sort.getCollation().getFieldCollations().isEmpty()) {
28+
Integer limitValue = extractLimitValue(sort.fetch);
29+
Integer offsetValue = extractOffsetValue(sort.offset);
30+
if (limitValue != null) {
31+
CalciteLogicalIndexScan newScan = scan.pushDownLimit(limitValue, offsetValue);
32+
if (newScan != null) {
33+
call.transformTo(newScan);
34+
}
35+
}
36+
}
37+
}
38+
39+
private static Integer extractLimitValue(RexNode fetch) {
40+
if (fetch instanceof RexLiteral) {
41+
return ((RexLiteral) fetch).getValueAs(Integer.class);
42+
}
43+
return null;
44+
}
45+
46+
/**
47+
* Extracts the offset value from the given `RexNode`. If the offset is `null`, it defaults to 0.
48+
* For example:
49+
*
50+
* <ul>
51+
* <li><code>source=people | head 1</code> will have a <code>null</code> offset, which is
52+
* converted to 0.
53+
* <li><code>source=people | head 1 from 2</code> will have an offset of 2.
54+
* </ul>
55+
*
56+
* @param offset The `RexNode` representing the offset.
57+
* @return The extracted offset value, or `null` if it cannot be determined.
58+
*/
59+
private static Integer extractOffsetValue(RexNode offset) {
60+
if (Objects.isNull(offset)) {
61+
return 0;
62+
}
63+
if (offset instanceof RexLiteral) {
64+
return ((RexLiteral) offset).getValueAs(Integer.class);
65+
}
66+
return null;
67+
}
68+
69+
/** Rule configuration. */
70+
@Value.Immutable
71+
public interface Config extends RelRule.Config {
72+
OpenSearchLimitIndexScanRule.Config DEFAULT =
73+
ImmutableOpenSearchLimitIndexScanRule.Config.builder()
74+
.build()
75+
.withOperandSupplier(
76+
b0 ->
77+
b0.operand(LogicalSort.class)
78+
.oneInput(
79+
b1 ->
80+
b1.operand(CalciteLogicalIndexScan.class)
81+
.predicate(OpenSearchIndexScanRule::test)
82+
.noInputs()));
83+
84+
@Override
85+
default OpenSearchLimitIndexScanRule toRule() {
86+
return new OpenSearchLimitIndexScanRule(this);
87+
}
88+
}
89+
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteIndexScan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ protected enum PushDownType {
9393
PROJECT,
9494
AGGREGATION,
9595
// SORT,
96-
// LIMIT,
96+
LIMIT,
9797
// HIGHLIGHT,
9898
// NESTED
9999
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,4 +163,23 @@ public CalciteLogicalIndexScan pushDownAggregate(Aggregate aggregate) {
163163
}
164164
return null;
165165
}
166+
167+
public CalciteLogicalIndexScan pushDownLimit(Integer limit, Integer offset) {
168+
try {
169+
CalciteLogicalIndexScan newScan = this.copyWithNewSchema(getRowType());
170+
newScan.pushDownContext.add(
171+
PushDownAction.of(
172+
PushDownType.LIMIT,
173+
limit,
174+
requestBuilder -> requestBuilder.pushDownLimit(limit, offset)));
175+
return newScan;
176+
} catch (Exception e) {
177+
if (LOG.isDebugEnabled()) {
178+
LOG.debug("Cannot pushdown the limit {}", limit, e);
179+
} else {
180+
LOG.warn("Cannot pushdown the limit {}, ", limit);
181+
}
182+
}
183+
return null;
184+
}
166185
}

0 commit comments

Comments
 (0)