Skip to content

Commit c2011ea

Browse files
committed
Push down IP comparison as range query with Calcite (opensearch-project#3959)
* Add reverse op for compare ip to support pushdown Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Pushdown ip comparison Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Refactor CompareIpFunction to use SqlKind directly Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Simplify the overriding of reverse() for IP comparators Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> --------- Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> (cherry picked from commit e2375fe)
1 parent fe2e83c commit c2011ea

11 files changed

Lines changed: 162 additions & 41 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/utils/PPLReturnTypes.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ private PPLReturnTypes() {}
2424
ReturnTypes.explicit(UserDefinedFunctionUtils.NULLABLE_TIME_UDT);
2525
public static final SqlReturnTypeInference TIMESTAMP_FORCE_NULLABLE =
2626
ReturnTypes.explicit(UserDefinedFunctionUtils.NULLABLE_TIMESTAMP_UDT);
27+
public static final SqlReturnTypeInference IP_FORCE_NULLABLE =
28+
ReturnTypes.explicit(UserDefinedFunctionUtils.NULLABLE_IP_UDT);
2729
public static SqlReturnTypeInference INTEGER_FORCE_NULLABLE =
2830
ReturnTypes.INTEGER.andThen(SqlTypeTransforms.FORCE_NULLABLE);
2931
public static SqlReturnTypeInference STRING_FORCE_NULLABLE =

core/src/main/java/org/opensearch/sql/calcite/utils/UserDefinedFunctionUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public class UserDefinedFunctionUtils {
5757
TYPE_FACTORY.createUDT(ExprUDT.EXPR_TIMESTAMP, true);
5858
public static final RelDataType NULLABLE_STRING =
5959
TYPE_FACTORY.createTypeWithNullability(TYPE_FACTORY.createSqlType(SqlTypeName.VARCHAR), true);
60+
public static final RelDataType NULLABLE_IP_UDT = TYPE_FACTORY.createUDT(EXPR_IP, true);
6061

6162
public static RelDataType nullablePatternAggList =
6263
createArrayType(
@@ -79,6 +80,7 @@ public class UserDefinedFunctionUtils {
7980
ImmutableSet.of("match", "match_phrase", "match_bool_prefix", "match_phrase_prefix");
8081
public static Set<String> MULTI_FIELDS_RELEVANCE_FUNCTION_SET =
8182
ImmutableSet.of("simple_query_string", "query_string", "multi_match");
83+
public static String IP_FUNCTION_NAME = "IP";
8284

8385
/**
8486
* Creates a SqlUserDefinedAggFunction that wraps a Java class implementing an aggregate function.

core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,8 @@ public class PPLBuiltinOperators extends ReflectiveSqlOperatorTable {
299299
.toUDF("TIME");
300300

301301
// IP cast function
302-
public static final SqlOperator IP = new IPFunction().toUDF("IP");
302+
public static final SqlOperator IP =
303+
new IPFunction().toUDF(UserDefinedFunctionUtils.IP_FUNCTION_NAME);
303304
public static final SqlOperator TIME_TO_SEC =
304305
adaptExprMethodToUDF(
305306
DateTimeFunctions.class,

core/src/main/java/org/opensearch/sql/expression/function/udf/ip/CompareIpFunction.java

Lines changed: 79 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,30 @@
55

66
package org.opensearch.sql.expression.function.udf.ip;
77

8+
import java.util.Collections;
89
import java.util.List;
10+
import java.util.Locale;
911
import org.apache.calcite.adapter.enumerable.NotNullImplementor;
1012
import org.apache.calcite.adapter.enumerable.NullPolicy;
1113
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
1214
import org.apache.calcite.linq4j.tree.ConstantExpression;
1315
import org.apache.calcite.linq4j.tree.Expression;
1416
import org.apache.calcite.linq4j.tree.Expressions;
1517
import org.apache.calcite.rex.RexCall;
18+
import org.apache.calcite.sql.SqlIdentifier;
19+
import org.apache.calcite.sql.SqlKind;
20+
import org.apache.calcite.sql.SqlOperator;
21+
import org.apache.calcite.sql.SqlSyntax;
22+
import org.apache.calcite.sql.parser.SqlParserPos;
23+
import org.apache.calcite.sql.type.InferTypes;
1624
import org.apache.calcite.sql.type.ReturnTypes;
1725
import org.apache.calcite.sql.type.SqlReturnTypeInference;
26+
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
27+
import org.checkerframework.checker.nullness.qual.Nullable;
1828
import org.opensearch.sql.data.model.ExprIpValue;
1929
import org.opensearch.sql.data.type.ExprCoreType;
2030
import org.opensearch.sql.expression.function.ImplementorUDF;
31+
import org.opensearch.sql.expression.function.PPLBuiltinOperators;
2132
import org.opensearch.sql.expression.function.UDFOperandMetadata;
2233

2334
/**
@@ -32,33 +43,80 @@
3243
* </ul>
3344
*/
3445
public class CompareIpFunction extends ImplementorUDF {
46+
private final SqlKind kind;
3547

36-
private CompareIpFunction(ComparisonType comparisonType) {
37-
super(new CompareImplementor(comparisonType), NullPolicy.ANY);
48+
private CompareIpFunction(SqlKind kind) {
49+
super(new CompareImplementor(kind), NullPolicy.ANY);
50+
this.kind = kind;
3851
}
3952

4053
public static CompareIpFunction less() {
41-
return new CompareIpFunction(ComparisonType.LESS);
54+
return new CompareIpFunction(SqlKind.LESS_THAN);
4255
}
4356

4457
public static CompareIpFunction greater() {
45-
return new CompareIpFunction(ComparisonType.GREATER);
58+
return new CompareIpFunction(SqlKind.GREATER_THAN);
4659
}
4760

4861
public static CompareIpFunction lessOrEquals() {
49-
return new CompareIpFunction(ComparisonType.LESS_OR_EQUAL);
62+
return new CompareIpFunction(SqlKind.LESS_THAN_OR_EQUAL);
5063
}
5164

5265
public static CompareIpFunction greaterOrEquals() {
53-
return new CompareIpFunction(ComparisonType.GREATER_OR_EQUAL);
66+
return new CompareIpFunction(SqlKind.GREATER_THAN_OR_EQUAL);
5467
}
5568

5669
public static CompareIpFunction equals() {
57-
return new CompareIpFunction(ComparisonType.EQUALS);
70+
return new CompareIpFunction(SqlKind.EQUALS);
5871
}
5972

6073
public static CompareIpFunction notEquals() {
61-
return new CompareIpFunction(ComparisonType.NOT_EQUALS);
74+
return new CompareIpFunction(SqlKind.NOT_EQUALS);
75+
}
76+
77+
@Override
78+
public SqlUserDefinedFunction toUDF(String functionName, boolean isDeterministic) {
79+
SqlIdentifier udfIdentifier =
80+
new SqlIdentifier(Collections.singletonList(functionName), null, SqlParserPos.ZERO, null);
81+
return new SqlUserDefinedFunction(
82+
udfIdentifier,
83+
kind,
84+
getReturnTypeInference(),
85+
InferTypes.ANY_NULLABLE,
86+
getOperandMetadata(),
87+
getFunction()) {
88+
@Override
89+
public boolean isDeterministic() {
90+
return isDeterministic;
91+
}
92+
93+
@Override
94+
public @Nullable SqlOperator reverse() {
95+
switch (kind) {
96+
case LESS_THAN:
97+
return PPLBuiltinOperators.GREATER_IP;
98+
case GREATER_THAN:
99+
return PPLBuiltinOperators.LESS_IP;
100+
case LESS_THAN_OR_EQUAL:
101+
return PPLBuiltinOperators.GTE_IP;
102+
case GREATER_THAN_OR_EQUAL:
103+
return PPLBuiltinOperators.LTE_IP;
104+
case EQUALS:
105+
return PPLBuiltinOperators.EQUALS_IP;
106+
case NOT_EQUALS:
107+
return PPLBuiltinOperators.NOT_EQUALS_IP;
108+
default:
109+
throw new IllegalArgumentException(
110+
String.format(
111+
Locale.ROOT, "CompareIpFunction is not supposed to be of kind: %s", kind));
112+
}
113+
}
114+
115+
@Override
116+
public SqlSyntax getSyntax() {
117+
return SqlSyntax.BINARY;
118+
}
119+
};
62120
}
63121

64122
@Override
@@ -72,10 +130,10 @@ public UDFOperandMetadata getOperandMetadata() {
72130
}
73131

74132
public static class CompareImplementor implements NotNullImplementor {
75-
private final ComparisonType comparisonType;
133+
private final SqlKind compareType;
76134

77-
public CompareImplementor(ComparisonType comparisonType) {
78-
this.comparisonType = comparisonType;
135+
public CompareImplementor(SqlKind compareType) {
136+
this.compareType = compareType;
79137
}
80138

81139
@Override
@@ -88,27 +146,27 @@ public Expression implement(
88146
translatedOperands.get(0),
89147
translatedOperands.get(1));
90148

91-
return generateComparisonExpression(compareResult, comparisonType);
149+
return evalCompareResult(compareResult, compareType);
92150
}
93151

94-
private static Expression generateComparisonExpression(
95-
Expression compareResult, ComparisonType comparisonType) {
96-
final ConstantExpression zero = Expressions.constant(0);
97-
switch (comparisonType) {
152+
private static Expression evalCompareResult(Expression compareResult, SqlKind compareType) {
153+
final ConstantExpression zero = Expressions.constant(0);
154+
switch (compareType) {
98155
case EQUALS:
99156
return Expressions.equal(compareResult, zero);
100157
case NOT_EQUALS:
101158
return Expressions.notEqual(compareResult, zero);
102-
case LESS:
159+
case LESS_THAN:
103160
return Expressions.lessThan(compareResult, zero);
104-
case LESS_OR_EQUAL:
161+
case LESS_THAN_OR_EQUAL:
105162
return Expressions.lessThanOrEqual(compareResult, zero);
106-
case GREATER:
163+
case GREATER_THAN:
107164
return Expressions.greaterThan(compareResult, zero);
108-
case GREATER_OR_EQUAL:
165+
case GREATER_THAN_OR_EQUAL:
109166
return Expressions.greaterThanOrEqual(compareResult, zero);
110167
default:
111-
throw new IllegalArgumentException("Unexpected comparison type: " + comparisonType);
168+
throw new UnsupportedOperationException(
169+
String.format(Locale.ROOT, "Unsupported compare type: %s", compareType));
112170
}
113171
}
114172

@@ -128,13 +186,4 @@ private static ExprIpValue toExprIpValue(Object obj) {
128186
throw new IllegalArgumentException("Invalid IP type: " + obj);
129187
}
130188
}
131-
132-
public enum ComparisonType {
133-
EQUALS,
134-
NOT_EQUALS,
135-
LESS,
136-
LESS_OR_EQUAL,
137-
GREATER,
138-
GREATER_OR_EQUAL
139-
}
140189
}

core/src/main/java/org/opensearch/sql/expression/function/udf/ip/IPFunction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
import org.apache.calcite.linq4j.tree.Expression;
1313
import org.apache.calcite.linq4j.tree.Expressions;
1414
import org.apache.calcite.rex.RexCall;
15-
import org.apache.calcite.sql.type.ReturnTypes;
1615
import org.apache.calcite.sql.type.SqlReturnTypeInference;
1716
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
17+
import org.opensearch.sql.calcite.utils.PPLReturnTypes;
1818
import org.opensearch.sql.data.model.ExprIpValue;
1919
import org.opensearch.sql.data.type.ExprCoreType;
2020
import org.opensearch.sql.data.type.ExprType;
@@ -46,8 +46,7 @@ public UDFOperandMetadata getOperandMetadata() {
4646

4747
@Override
4848
public SqlReturnTypeInference getReturnTypeInference() {
49-
return ReturnTypes.explicit(
50-
OpenSearchTypeFactory.TYPE_FACTORY.createUDT(OpenSearchTypeFactory.ExprUDT.EXPR_IP, true));
49+
return PPLReturnTypes.IP_FORCE_NULLABLE;
5150
}
5251

5352
public static class CastImplementor

integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.regex.Matcher;
1717
import java.util.regex.Pattern;
1818

19+
import org.junit.Assume;
1920
import org.junit.Ignore;
2021
import org.junit.jupiter.api.Test;
2122
import org.opensearch.client.ResponseException;
@@ -96,18 +97,31 @@ public void testFilterByCompareStringTimePushDownExplain() throws IOException {
9697

9798
@Test
9899
public void testFilterByCompareIPCoercion() throws IOException {
99-
// Should automatically cast the string literal to IP.
100-
String expected = loadExpectedPlan("explain_filter_compare_ip.json");
101-
// The index of host is flaky (different from test to test)
100+
// Should automatically cast the string literal to IP and pushdown it as a range query
102101
assertJsonEqualsIgnoreFieldIndex(
103-
expected,
102+
loadExpectedPlan("explain_filter_compare_ip.json"),
104103
explainQueryToString(
105104
String.format(
106105
Locale.ROOT,
107106
"source=%s | where host > '1.1.1.1' | fields host",
108107
TEST_INDEX_WEBLOGS)));
109108
}
110109

110+
@Test
111+
public void testFilterByCompareIpv6Swapped() throws IOException {
112+
// Ignored in v2: the serialized string is unstable because of function properties
113+
Assume.assumeTrue(isCalciteEnabled());
114+
// Test swapping ip and string. In v2, this is pushed down as script;
115+
// with Calcite, it will still be pushed down as a range query
116+
assertJsonEqualsIgnoreFieldIndex(
117+
loadExpectedPlan("explain_filter_compare_ipv6_swapped.json"),
118+
explainQueryToString(
119+
String.format(
120+
Locale.ROOT,
121+
"source=%s | where '::ffff:1234' <= host | fields host",
122+
TEST_INDEX_WEBLOGS)));
123+
}
124+
111125
private static void assertJsonEqualsIgnoreFieldIndex(String expected, String actual) throws IOException {
112126
String reorderedExpected = maskIndexAndReorderProject(expected);
113127
String reorderedActual = maskIndexAndReorderProject(actual);
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"calcite": {
3-
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(host=[$3])\n LogicalFilter(condition=[GREATER_IP($3, IP('1.1.1.1':VARCHAR))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])\n",
4-
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]], PushDownContext=[[PROJECT->[host], SCRIPT->GREATER_IP($0, IP('1.1.1.1':VARCHAR)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"script\":{\"script\":{\"source\":\"{\\\"langType\\\":\\\"calcite\\\",\\\"script\\\":\\\"rO0ABXNyABFqYXZhLnV0aWwuQ29sbFNlcleOq7Y6G6gRAwABSQADdGFneHAAAAADdwQAAAAGdAAHcm93VHlwZXQAensKICAiZmllbGRzIjogWwogICAgewogICAgICAidHlwZSI6ICJPVEhFUiIsCiAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICJuYW1lIjogImhvc3QiCiAgICB9CiAgXSwKICAibnVsbGFibGUiOiBmYWxzZQp9dAAEZXhwcnQDfXsKICAib3AiOiB7CiAgICAibmFtZSI6ICJHUkVBVEVSX0lQIiwKICAgICJraW5kIjogIk9USEVSX0ZVTkNUSU9OIiwKICAgICJzeW50YXgiOiAiRlVOQ1RJT04iCiAgfSwKICAib3BlcmFuZHMiOiBbCiAgICB7CiAgICAgICJpbnB1dCI6IDAsCiAgICAgICJuYW1lIjogIiQwIgogICAgfSwKICAgIHsKICAgICAgIm9wIjogewogICAgICAgICJuYW1lIjogIklQIiwKICAgICAgICAia2luZCI6ICJPVEhFUl9GVU5DVElPTiIsCiAgICAgICAgInN5bnRheCI6ICJGVU5DVElPTiIKICAgICAgfSwKICAgICAgIm9wZXJhbmRzIjogWwogICAgICAgIHsKICAgICAgICAgICJsaXRlcmFsIjogIjEuMS4xLjEiLAogICAgICAgICAgInR5cGUiOiB7CiAgICAgICAgICAgICJ0eXBlIjogIlZBUkNIQVIiLAogICAgICAgICAgICAibnVsbGFibGUiOiBmYWxzZSwKICAgICAgICAgICAgInByZWNpc2lvbiI6IC0xCiAgICAgICAgICB9CiAgICAgICAgfQogICAgICBdLAogICAgICAiY2xhc3MiOiAib3JnLm9wZW5zZWFyY2guc3FsLmV4cHJlc3Npb24uZnVuY3Rpb24uVXNlckRlZmluZWRGdW5jdGlvbkJ1aWxkZXIkMSIsCiAgICAgICJ0eXBlIjogewogICAgICAgICJ0eXBlIjogIk9USEVSIiwKICAgICAgICAibnVsbGFibGUiOiB0cnVlCiAgICAgIH0sCiAgICAgICJkZXRlcm1pbmlzdGljIjogdHJ1ZSwKICAgICAgImR5bmFtaWMiOiBmYWxzZQogICAgfQogIF0sCiAgImNsYXNzIjogIm9yZy5vcGVuc2VhcmNoLnNxbC5leHByZXNzaW9uLmZ1bmN0aW9uLlVzZXJEZWZpbmVkRnVuY3Rpb25CdWlsZGVyJDEiLAogICJ0eXBlIjogewogICAgInR5cGUiOiAiQk9PTEVBTiIsCiAgICAibnVsbGFibGUiOiB0cnVlCiAgfSwKICAiZGV0ZXJtaW5pc3RpYyI6IHRydWUsCiAgImR5bmFtaWMiOiBmYWxzZQp9dAAKZmllbGRUeXBlc3NyABFqYXZhLnV0aWwuSGFzaE1hcAUH2sHDFmDRAwACRgAKbG9hZEZhY3RvckkACXRocmVzaG9sZHhwP0AAAAAAAAx3CAAAABAAAAABdAAEaG9zdH5yAClvcmcub3BlbnNlYXJjaC5zcWwuZGF0YS50eXBlLkV4cHJDb3JlVHlwZQAAAAAAAAAAEgAAeHIADmphdmEubGFuZy5FbnVtAAAAAAAAAAASAAB4cHQAAklQeHg=\\\"}\",\"lang\":\"opensearch_compounded_script\",\"params\":{\"utcTimestamp\":1754322819643187000}},\"boost\":1.0}},\"_source\":{\"includes\":[\"host\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(host=[$0])\n LogicalFilter(condition=[GREATER_IP($0, IP('1.1.1.1':VARCHAR))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])\n",
4+
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]], PushDownContext=[[PROJECT->[host], FILTER->GREATER_IP($0, IP('1.1.1.1':VARCHAR)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"host\":{\"from\":\"1.1.1.1\",\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"host\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
55
}
66
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(host=[$0])\n LogicalFilter(condition=[LTE_IP(IP('::ffff:1234':VARCHAR), $0)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])\n",
4+
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]], PushDownContext=[[PROJECT->[host], FILTER->LTE_IP(IP('::ffff:1234':VARCHAR), $0), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"host\":{\"from\":\"::ffff:1234\",\"to\":null,\"include_lower\":true,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"host\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
5+
}
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(host=[$0])\n LogicalFilter(condition=[LTE_IP(IP('::ffff:1234':VARCHAR), $0)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..11=[{inputs}], expr#12=['::ffff:1234':VARCHAR], expr#13=[IP($t12)], expr#14=[LTE_IP($t13, $t0)], host=[$t0], $condition=[$t14])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])\n"
5+
}
6+
}

0 commit comments

Comments
 (0)