Skip to content

Commit 43b5222

Browse files
committed
Support Limit pushdown
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
1 parent e51fcd9 commit 43b5222

6 files changed

Lines changed: 118 additions & 4 deletions

File tree

integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ public void testSortPushDownExplain() throws Exception {
9696

9797
@Test
9898
public void testLimitPushDownExplain() throws Exception {
99-
// TODO fix after https://github.com/opensearch-project/sql/issues/3381
10099
String expected =
101100
isCalciteEnabled()
102101
? loadFromFile("expectedOutput/calcite/explain_limit_push.json")
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"calcite":{
33
"logical":"LogicalProject(ageMinus=[$17])\n LogicalSort(fetch=[5])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], ageMinus=[-($8, 30)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4-
"physical":"EnumerableCalc(expr#0..16=[{inputs}], expr#17=[30], expr#18=[-($t8, $t17)], ageMinus=[$t18])\n EnumerableLimit(fetch=[5])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n"
4+
"physical":"EnumerableCalc(expr#0..16=[{inputs}], expr#17=[30], expr#18=[-($t8, $t17)], ageMinus=[$t18])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->5], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\"}, requestedTotalSize=5, pageSize=null, startFrom=0)])\n"
55
}
66
}

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchIndexRules.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@ public class OpenSearchIndexRules {
1616
OpenSearchFilterIndexScanRule.Config.DEFAULT.toRule();
1717
private static final OpenSearchAggregateIndexScanRule AGGREGATE_INDEX_SCAN =
1818
OpenSearchAggregateIndexScanRule.Config.DEFAULT.toRule();
19+
private static final OpenSearchLimitIndexScanRule LIMIT_INDEX_SCAN =
20+
OpenSearchLimitIndexScanRule.Config.DEFAULT.toRule();
1921

2022
public static final List<RelOptRule> OPEN_SEARCH_INDEX_SCAN_RULES =
21-
ImmutableList.of(PROJECT_INDEX_SCAN, FILTER_INDEX_SCAN, AGGREGATE_INDEX_SCAN);
23+
ImmutableList.of(
24+
PROJECT_INDEX_SCAN, FILTER_INDEX_SCAN, AGGREGATE_INDEX_SCAN, LIMIT_INDEX_SCAN);
2225

2326
// prevent instantiation
2427
private OpenSearchIndexRules() {}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package org.opensearch.sql.opensearch.planner.physical;
2+
3+
import java.util.Objects;
4+
import org.apache.calcite.plan.RelOptRuleCall;
5+
import org.apache.calcite.plan.RelRule;
6+
import org.apache.calcite.rel.logical.LogicalSort;
7+
import org.apache.calcite.rex.RexLiteral;
8+
import org.apache.calcite.rex.RexNode;
9+
import org.immutables.value.Value;
10+
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;
11+
12+
/**
13+
* Planner rule that push a {@link LogicalSort} with a semantic meaning of LIMIT ... [OFFSET ...]
14+
* down to {@link CalciteLogicalIndexScan}
15+
*/
16+
@Value.Enclosing
17+
public class OpenSearchLimitIndexScanRule extends RelRule<OpenSearchLimitIndexScanRule.Config> {
18+
19+
protected OpenSearchLimitIndexScanRule(Config config) {
20+
super(config);
21+
}
22+
23+
@Override
24+
public void onMatch(RelOptRuleCall call) {
25+
final LogicalSort sort = call.rel(0);
26+
final CalciteLogicalIndexScan scan = call.rel(1);
27+
28+
// The LogicalSort is a LIMIT that should be pushed down when its fetch field is not null and
29+
// its collation is empty.
30+
// For example: `sort name | head 5` should not be pushed down because it has a field collation.
31+
if (sort.fetch != null && sort.getCollation().getFieldCollations().isEmpty()) {
32+
Integer limitValue = extractLimitValue(sort.fetch);
33+
Integer offsetValue = extractOffsetValue(sort.offset);
34+
if (limitValue != null && offsetValue != null) {
35+
CalciteLogicalIndexScan newScan = scan.pushDownLimit(limitValue, offsetValue);
36+
if (newScan != null) {
37+
call.transformTo(newScan);
38+
}
39+
}
40+
}
41+
}
42+
43+
private static Integer extractLimitValue(RexNode fetch) {
44+
if (fetch instanceof RexLiteral) {
45+
return ((RexLiteral) fetch).getValueAs(Integer.class);
46+
}
47+
return null;
48+
}
49+
50+
/**
51+
* Extracts the offset value from the given `RexNode`. If the offset is `null`, it defaults to 0.
52+
* For example:
53+
*
54+
* <ul>
55+
* <li><code>source=people | head 1</code> will have a <code>null</code> offset, which is
56+
* converted to 0.
57+
* <li><code>source=people | head 1 from 2</code> will have an offset of 2.
58+
* </ul>
59+
*
60+
* @param offset The `RexNode` representing the offset.
61+
* @return The extracted offset value, or `null` if it cannot be determined.
62+
*/
63+
private static Integer extractOffsetValue(RexNode offset) {
64+
if (Objects.isNull(offset)) {
65+
return 0;
66+
}
67+
if (offset instanceof RexLiteral) {
68+
return ((RexLiteral) offset).getValueAs(Integer.class);
69+
}
70+
return null;
71+
}
72+
73+
/** Rule configuration. */
74+
@Value.Immutable
75+
public interface Config extends RelRule.Config {
76+
OpenSearchLimitIndexScanRule.Config DEFAULT =
77+
ImmutableOpenSearchLimitIndexScanRule.Config.builder()
78+
.build()
79+
.withOperandSupplier(
80+
b0 ->
81+
b0.operand(LogicalSort.class)
82+
.oneInput(
83+
b1 ->
84+
b1.operand(CalciteLogicalIndexScan.class)
85+
.predicate(OpenSearchIndexScanRule::test)
86+
.noInputs()));
87+
88+
@Override
89+
default OpenSearchLimitIndexScanRule toRule() {
90+
return new OpenSearchLimitIndexScanRule(this);
91+
}
92+
}
93+
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteIndexScan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ protected enum PushDownType {
9393
PROJECT,
9494
AGGREGATION,
9595
// SORT,
96-
// LIMIT,
96+
LIMIT,
9797
// HIGHLIGHT,
9898
// NESTED
9999
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,4 +163,23 @@ public CalciteLogicalIndexScan pushDownAggregate(Aggregate aggregate) {
163163
}
164164
return null;
165165
}
166+
167+
public CalciteLogicalIndexScan pushDownLimit(Integer limit, Integer offset) {
168+
try {
169+
CalciteLogicalIndexScan newScan = this.copyWithNewSchema(getRowType());
170+
newScan.pushDownContext.add(
171+
PushDownAction.of(
172+
PushDownType.LIMIT,
173+
limit,
174+
requestBuilder -> requestBuilder.pushDownLimit(limit, offset)));
175+
return newScan;
176+
} catch (Exception e) {
177+
if (LOG.isDebugEnabled()) {
178+
LOG.debug("Cannot pushdown the limit {}", limit, e);
179+
} else {
180+
LOG.warn("Cannot pushdown the limit {}, ", limit);
181+
}
182+
}
183+
return null;
184+
}
166185
}

0 commit comments

Comments
 (0)