Skip to content

Commit 774a3a2

Browse files
authored
Support filter push down for Sarg value (opensearch-project#3840)
* Support filter push down for Sarg value Signed-off-by: Heng Qian <qianheng@amazon.com> * Fix push down for time range Signed-off-by: Heng Qian <qianheng@amazon.com> * Address comments Signed-off-by: Heng Qian <qianheng@amazon.com> --------- Signed-off-by: Heng Qian <qianheng@amazon.com>
1 parent 74b4de0 commit 774a3a2

5 files changed

Lines changed: 144 additions & 30 deletions

File tree

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55

66
package org.opensearch.sql.calcite.remote;
77

8+
import static org.opensearch.sql.util.MatcherUtils.assertJsonEqualsIgnoreId;
9+
810
import java.io.IOException;
911
import org.junit.Ignore;
12+
import org.junit.Test;
1013
import org.opensearch.sql.ppl.ExplainIT;
1114

1215
public class CalciteExplainIT extends ExplainIT {
@@ -21,8 +24,39 @@ public void init() throws Exception {
2124
@Ignore("test only in v2")
2225
public void testExplainModeUnsupportedInV2() throws IOException {}
2326

24-
@Override
25-
public void testExplain() throws IOException {
26-
super.testExplain();
27+
// Only for Calcite
28+
@Test
29+
public void supportSearchSargPushDown_singleRange() throws IOException {
30+
String query =
31+
"source=opensearch-sql_test_index_account | where age >= 1.0 and age < 10 | fields age";
32+
var result = explainQueryToString(query);
33+
String expected =
34+
loadFromFile("expectedOutput/calcite/explain_sarg_filter_push_single_range.json");
35+
assertJsonEqualsIgnoreId(expected, result);
36+
}
37+
38+
// Only for Calcite
39+
@Test
40+
public void supportSearchSargPushDown_multiRange() throws IOException {
41+
String query =
42+
"source=opensearch-sql_test_index_account | where (age > 20 and age < 28) or (age > 25 and"
43+
+ " age < 30) or (age >= 1 and age <= 10) or age = 0 | fields age";
44+
var result = explainQueryToString(query);
45+
String expected =
46+
loadFromFile("expectedOutput/calcite/explain_sarg_filter_push_multi_range.json");
47+
assertJsonEqualsIgnoreId(expected, result);
48+
}
49+
50+
// Only for Calcite
51+
@Test
52+
public void supportSearchSargPushDown_timeRange() throws IOException {
53+
String expected =
54+
loadFromFile("expectedOutput/calcite/explain_sarg_filter_push_time_range.json");
55+
assertJsonEqualsIgnoreId(
56+
expected,
57+
explainQueryToString(
58+
"source=opensearch-sql_test_index_bank"
59+
+ "| where birthdate >= '2016-12-08 00:00:00.000000000' "
60+
+ "and birthdate < '2018-11-09 00:00:00.000000000' "));
2761
}
2862
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalProject(age=[$8])\n LogicalFilter(condition=[SEARCH($8, Sarg[0, [1..10], (20..30)])])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], FILTER->SEARCH($0, Sarg[0, [1..10], (20..30)])], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"query\":{\"bool\":{\"should\":[{\"term\":{\"age\":{\"value\":0,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":1,\"to\":10,\"include_lower\":true,\"include_upper\":true,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":20,\"to\":30,\"include_lower\":false,\"include_upper\":false,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
5+
}
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalProject(age=[$8])\n LogicalFilter(condition=[SEARCH($8, Sarg[[1.0:DECIMAL(11, 1)..10:DECIMAL(11, 1))]:DECIMAL(11, 1))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], FILTER->SEARCH($0, Sarg[[1.0:DECIMAL(11, 1)..10:DECIMAL(11, 1))]:DECIMAL(11, 1))], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":1.0,\"to\":10,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
5+
}
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12])\n LogicalFilter(condition=[AND(>=($3, TIMESTAMP('2016-12-08 00:00:00.000000000':VARCHAR)), <($3, TIMESTAMP('2018-11-09 00:00:00.000000000':VARCHAR)))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n",
4+
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], FILTER->SEARCH($3, Sarg[['2016-12-08 00:00:00':VARCHAR..'2018-11-09 00:00:00':VARCHAR)]:VARCHAR)], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"query\":{\"range\":{\"birthdate\":{\"from\":\"2016-12-08T00:00:00.000Z\",\"to\":\"2018-11-09T00:00:00.000Z\",\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"birthdate\",\"gender\",\"city\",\"lastname\",\"balance\",\"employer\",\"state\",\"age\",\"email\",\"male\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
5+
}
6+
}

opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java

Lines changed: 89 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import static org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils.SINGLE_FIELD_RELEVANCE_FUNCTION_SET;
4242

4343
import com.google.common.base.Throwables;
44+
import com.google.common.collect.BoundType;
4445
import com.google.common.collect.Range;
4546
import java.math.BigDecimal;
4647
import java.util.ArrayList;
@@ -65,6 +66,7 @@
6566
import org.apache.calcite.sql.type.SqlTypeFamily;
6667
import org.apache.calcite.sql.type.SqlTypeName;
6768
import org.apache.calcite.util.NlsString;
69+
import org.apache.calcite.util.RangeSets;
6870
import org.apache.calcite.util.Sarg;
6971
import org.opensearch.index.mapper.DateFieldMapper;
7072
import org.opensearch.index.query.BoolQueryBuilder;
@@ -74,7 +76,9 @@
7476
import org.opensearch.sql.calcite.type.ExprSqlType;
7577
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
7678
import org.opensearch.sql.data.model.ExprTimestampValue;
79+
import org.opensearch.sql.data.type.ExprCoreType;
7780
import org.opensearch.sql.data.type.ExprType;
81+
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
7882
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType.MappingType;
7983
import org.opensearch.sql.opensearch.data.type.OpenSearchTextType;
8084
import org.opensearch.sql.opensearch.storage.script.filter.lucene.relevance.MatchBoolPrefixQuery;
@@ -230,7 +234,7 @@ private static boolean supportedRexCall(RexCall call) {
230234
case INTERNAL:
231235
switch (call.getKind()) {
232236
case SEARCH:
233-
return canBeTranslatedToTermsQuery(call);
237+
return true;
234238
default:
235239
return false;
236240
}
@@ -241,20 +245,6 @@ private static boolean supportedRexCall(RexCall call) {
241245
}
242246
}
243247

244-
/**
245-
* There are three types of the Sarg included in SEARCH RexCall: 1) Sarg is points (In ('a',
246-
* 'b', 'c' ...)). In this case the search call can be translated to terms Query 2) Sarg is
247-
* complementedPoints (Not in ('a', 'b')). In this case the search call can be translated to
248-
* MustNot terms Query 3) Sarg is real Range( > 1 and <= 10). In this case the search call
249-
* should be translated to rang Query Currently only the 1) and 2) cases are supported.
250-
*
251-
* @param search SEARCH RexCall
252-
* @return true if it isSearchWithPoints or isSearchWithComplementedPoints, other false
253-
*/
254-
static boolean canBeTranslatedToTermsQuery(RexCall search) {
255-
return isSearchWithPoints(search) || isSearchWithComplementedPoints(search);
256-
}
257-
258248
static boolean isSearchWithPoints(RexCall search) {
259249
RexLiteral literal = (RexLiteral) search.getOperands().get(1);
260250
final Sarg<?> sarg = requireNonNull(literal.getValueAs(Sarg.class), "Sarg");
@@ -520,8 +510,28 @@ private QueryExpression binary(RexCall call) {
520510
case SEARCH:
521511
if (isSearchWithComplementedPoints(call)) {
522512
return QueryExpression.create(pair.getKey()).notIn(pair.getValue());
523-
} else {
513+
} else if (isSearchWithPoints(call)) {
524514
return QueryExpression.create(pair.getKey()).in(pair.getValue());
515+
} else {
516+
Sarg<?> sarg = pair.getValue().literal.getValueAs(Sarg.class);
517+
Set<? extends Range<?>> rangeSet = requireNonNull(sarg).rangeSet.asRanges();
518+
boolean isTimeStamp =
519+
(pair.getKey() instanceof NamedFieldExpression namedField)
520+
&& namedField.isTimeStampType();
521+
List<QueryExpression> queryExpressions =
522+
rangeSet.stream()
523+
.map(
524+
range ->
525+
RangeSets.isPoint(range)
526+
? QueryExpression.create(pair.getKey())
527+
.equals(range.lowerEndpoint(), isTimeStamp)
528+
: QueryExpression.create(pair.getKey()).between(range, isTimeStamp))
529+
.toList();
530+
if (queryExpressions.size() == 1) {
531+
return queryExpressions.getFirst();
532+
} else {
533+
return CompoundQueryExpression.or(queryExpressions.toArray(new QueryExpression[0]));
534+
}
525535
}
526536
default:
527537
break;
@@ -706,6 +716,16 @@ public boolean isPartial() {
706716

707717
public abstract QueryExpression notIn(LiteralExpression literal);
708718

719+
public QueryExpression between(Range<?> literal, boolean isTimeStamp) {
720+
throw new PredicateAnalyzer.PredicateAnalyzerException(
721+
"between cannot be applied to " + this.getClass());
722+
}
723+
724+
public QueryExpression equals(Object point, boolean isTimeStamp) {
725+
throw new PredicateAnalyzer.PredicateAnalyzerException(
726+
"equals cannot be applied to " + this.getClass());
727+
}
728+
709729
public abstract QueryExpression notEquals(LiteralExpression literal);
710730

711731
public abstract QueryExpression gt(LiteralExpression literal);
@@ -951,6 +971,11 @@ private SimpleQueryExpression(NamedFieldExpression rel) {
951971
this.rel = rel;
952972
}
953973

974+
public SimpleQueryExpression(QueryBuilder builder) {
975+
this.builder = builder;
976+
this.rel = null;
977+
}
978+
954979
@Override
955980
public QueryBuilder builder() {
956981
if (builder == null) {
@@ -1125,6 +1150,44 @@ public QueryExpression notIn(LiteralExpression literal) {
11251150
builder = boolQuery().mustNot(termsQuery(getFieldReferenceForTermQuery(), collection));
11261151
return this;
11271152
}
1153+
1154+
@Override
1155+
public QueryExpression equals(Object point, boolean isTimeStamp) {
1156+
builder =
1157+
termQuery(getFieldReferenceForTermQuery(), convertEndpointValue(point, isTimeStamp));
1158+
return this;
1159+
}
1160+
1161+
@Override
1162+
public QueryExpression between(Range<?> range, boolean isTimeStamp) {
1163+
Object lowerBound =
1164+
range.hasLowerBound() ? convertEndpointValue(range.lowerEndpoint(), isTimeStamp) : null;
1165+
Object upperBound =
1166+
range.hasUpperBound() ? convertEndpointValue(range.upperEndpoint(), isTimeStamp) : null;
1167+
RangeQueryBuilder rangeQueryBuilder = rangeQuery(getFieldReference());
1168+
rangeQueryBuilder =
1169+
range.lowerBoundType() == BoundType.CLOSED
1170+
? rangeQueryBuilder.gte(lowerBound)
1171+
: rangeQueryBuilder.gt(lowerBound);
1172+
rangeQueryBuilder =
1173+
range.upperBoundType() == BoundType.CLOSED
1174+
? rangeQueryBuilder.lte(upperBound)
1175+
: rangeQueryBuilder.lt(upperBound);
1176+
builder = rangeQueryBuilder;
1177+
return this;
1178+
}
1179+
1180+
private Object convertEndpointValue(Object value, boolean isTimeStamp) {
1181+
value = (value instanceof NlsString nls) ? nls.getValue() : value;
1182+
return isTimeStamp ? timestampValueForPushDown(value.toString()) : value;
1183+
}
1184+
}
1185+
1186+
private static String timestampValueForPushDown(String value) {
1187+
ExprTimestampValue exprTimestampValue = new ExprTimestampValue(value);
1188+
return DateFieldMapper.getDefaultDateTimeFormatter()
1189+
.format(exprTimestampValue.timestampValue());
1190+
// https://github.com/opensearch-project/sql/pull/3442
11281191
}
11291192

11301193
/**
@@ -1210,6 +1273,14 @@ ExprType getExprType() {
12101273
return type;
12111274
}
12121275

1276+
boolean isTimeStampType() {
1277+
return type != null
1278+
&& ExprCoreType.TIMESTAMP.equals(
1279+
type.getOriginalExprType() instanceof OpenSearchDataType osType
1280+
? osType.getExprCoreType()
1281+
: type.getOriginalExprType());
1282+
}
1283+
12131284
boolean isTextType() {
12141285
return type != null && type.getOriginalExprType() instanceof OpenSearchTextType;
12151286
}
@@ -1267,7 +1338,7 @@ Object value() {
12671338
} else if (isBoolean()) {
12681339
return booleanValue();
12691340
} else if (isTimestamp()) {
1270-
return timestampValueForPushDown();
1341+
return timestampValueForPushDown(RexLiteral.stringValue(literal));
12711342
} else if (isString()) {
12721343
return RexLiteral.stringValue(literal);
12731344
} else {
@@ -1318,15 +1389,6 @@ String stringValue() {
13181389
return RexLiteral.stringValue(literal);
13191390
}
13201391

1321-
String timestampValueForPushDown() {
1322-
ExprTimestampValue exprTimestampValue =
1323-
new ExprTimestampValue(RexLiteral.stringValue(literal));
1324-
return DateFieldMapper.getDefaultDateTimeFormatter()
1325-
.format(
1326-
exprTimestampValue.timestampValue()); // format using opensearch default formatter as
1327-
// https://github.com/opensearch-project/sql/pull/3442
1328-
}
1329-
13301392
List<Object> sargValue() {
13311393
final Sarg sarg = requireNonNull(literal.getValueAs(Sarg.class), "Sarg");
13321394
final RelDataType type = literal.getType();
@@ -1376,7 +1438,7 @@ private static void checkForIncompatibleDateTimeOperands(RexCall call) {
13761438
|| (SqlTypeFamily.TIMESTAMP.contains(op2) && !SqlTypeFamily.TIMESTAMP.contains(op1))
13771439
|| (SqlTypeFamily.TIME.contains(op1) && !SqlTypeFamily.TIME.contains(op2))
13781440
|| (SqlTypeFamily.TIME.contains(op2) && !SqlTypeFamily.TIME.contains(op1))) {
1379-
throw new PredicateAnalyzerException(
1441+
throw new PredicateAnalyzer.PredicateAnalyzerException(
13801442
"Cannot handle " + call.getKind() + " expression for _id field, " + call);
13811443
}
13821444
}

0 commit comments

Comments
 (0)