Skip to content

Commit 275e7fa

Browse files
committed
Support serializing external OpenSearch UDFs at pushdown time (opensearch-project#4618)
* Supports serilizing external OpenSearch UDFs Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Correct subfield access logical when calling ITEM Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Resolve types of generated structs based on their values because their types are UNDEFINED Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Add explain and integration tests for geoip Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> --------- Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> (cherry picked from commit a003e8c)
1 parent d7603d3 commit 275e7fa

9 files changed

Lines changed: 141 additions & 18 deletions

File tree

core/src/main/java/org/opensearch/sql/ast/expression/QualifiedName.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
@Getter
2323
@EqualsAndHashCode(callSuper = false)
2424
public class QualifiedName extends UnresolvedExpression {
25+
public static final String DELIMITER = ".";
2526
private final List<String> parts;
2627

2728
public QualifiedName(String name) {
@@ -94,7 +95,7 @@ public QualifiedName rest() {
9495
}
9596

9697
public String toString() {
97-
return String.join(".", this.parts);
98+
return String.join(DELIMITER, this.parts);
9899
}
99100

100101
@Override

core/src/main/java/org/opensearch/sql/calcite/QualifiedNameResolver.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,8 +270,10 @@ private static RexNode resolveFieldAccess(
270270
if (length == parts.size() - start) {
271271
return field;
272272
} else {
273-
String itemName = joinParts(parts, length + start, parts.size() - 1 - length);
274-
return createItemAccess(field, itemName, context);
273+
String itemName = joinParts(parts, length + start, parts.size() - length);
274+
return context.relBuilder.alias(
275+
createItemAccess(field, itemName, context),
276+
String.join(QualifiedName.DELIMITER, parts.subList(start, parts.size())));
275277
}
276278
}
277279

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public void init() throws Exception {
3838
loadIndex(Index.LOGS);
3939
loadIndex(Index.WORKER);
4040
loadIndex(Index.WORK_INFORMATION);
41+
loadIndex(Index.WEBLOG);
4142
}
4243

4344
@Override
@@ -1453,4 +1454,15 @@ public void testTopKThenSortExplain() throws IOException {
14531454
+ "| sort age "
14541455
+ "| fields age"));
14551456
}
1457+
1458+
@Test
1459+
public void testGeoIpPushedInAgg() throws IOException {
1460+
// This explain IT verifies that externally registered UDF can be properly pushed down
1461+
assertYamlEqualsIgnoreId(
1462+
loadExpectedPlan("udf_geoip_in_agg_pushed.yaml"),
1463+
explainQueryYaml(
1464+
String.format(
1465+
"source=%s | eval info = geoip('my-datasource', host) | stats count() by info.city",
1466+
TEST_INDEX_WEBLOGS)));
1467+
}
14561468
}

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteGeoIpFunctionsIT.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,35 @@ public void testGeoIpEnrichmentWithIpFieldAsInput() throws IOException {
5555
rows("10.0.0.1", Map.of("country", "USA")),
5656
rows("fd12:2345:6789:1:a1b2:c3d4:e5f6:789a", Map.of("country", "India")));
5757
}
58+
59+
@Test
60+
public void testGeoIpInAggregation() throws IOException {
61+
JSONObject result1 =
62+
executeQuery(
63+
String.format(
64+
"source=%s | where method='POST' | eval info = geoip('%s', host) | eval"
65+
+ " date=DATE('2020-12-10') | stats count() by info.city, method, span(date,"
66+
+ " 1month) as month",
67+
TEST_INDEX_WEBLOGS, DATASOURCE_NAME));
68+
verifySchema(
69+
result1,
70+
schema("count()", "bigint"),
71+
schema("month", "date"),
72+
schema("info.city", "string"),
73+
schema("method", "string"));
74+
verifyDataRows(
75+
result1,
76+
rows(1, "2020-12-01", "Seattle", "POST"),
77+
rows(1, "2020-12-01", "Bengaluru", "POST"));
78+
79+
// This case is pushed down into DSL with scripts
80+
JSONObject result2 =
81+
executeQuery(
82+
String.format(
83+
"source=%s | where method='POST' | eval info = geoip('%s', host) | stats count() by"
84+
+ " info.city",
85+
TEST_INDEX_WEBLOGS, DATASOURCE_NAME));
86+
verifySchema(result2, schema("count()", "bigint"), schema("info.city", "string"));
87+
verifyDataRows(result2, rows(1, "Seattle"), rows(1, "Bengaluru"));
88+
}
5889
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalProject(count()=[$1], info.city=[$0])
5+
LogicalAggregate(group=[{0}], count()=[COUNT()])
6+
LogicalProject(info.city=[ITEM(GEOIP('my-datasource':VARCHAR, $0), 'city')])
7+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])
8+
physical: |
9+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), PROJECT->[count(), info.city], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"info.city":{"terms":{"script":{"source":"{\"langType\":\"calcite\",\"script\":\"rO0ABXNyABFqYXZhLnV0aWwuQ29sbFNlcleOq7Y6G6gRAwABSQADdGFneHAAAAADdwQAAAAGdAAHcm93VHlwZXQAknsKICAiZmllbGRzIjogWwogICAgewogICAgICAidWR0IjogIkVYUFJfSVAiLAogICAgICAidHlwZSI6ICJPVEhFUiIsCiAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICJuYW1lIjogImhvc3QiCiAgICB9CiAgXSwKICAibnVsbGFibGUiOiBmYWxzZQp9dAAEZXhwcnQETnsKICAib3AiOiB7CiAgICAibmFtZSI6ICJJVEVNIiwKICAgICJraW5kIjogIklURU0iLAogICAgInN5bnRheCI6ICJTUEVDSUFMIgogIH0sCiAgIm9wZXJhbmRzIjogWwogICAgewogICAgICAib3AiOiB7CiAgICAgICAgIm5hbWUiOiAiR0VPSVAiLAogICAgICAgICJraW5kIjogIk9USEVSX0ZVTkNUSU9OIiwKICAgICAgICAic3ludGF4IjogIkZVTkNUSU9OIgogICAgICB9LAogICAgICAib3BlcmFuZHMiOiBbCiAgICAgICAgewogICAgICAgICAgImxpdGVyYWwiOiAibXktZGF0YXNvdXJjZSIsCiAgICAgICAgICAidHlwZSI6IHsKICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICJudWxsYWJsZSI6IGZhbHNlLAogICAgICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgICAgIH0KICAgICAgICB9LAogICAgICAgIHsKICAgICAgICAgICJpbnB1dCI6IDAsCiAgICAgICAgICAibmFtZSI6ICIkMCIKICAgICAgICB9CiAgICAgIF0sCiAgICAgICJjbGFzcyI6ICJvcmcub3BlbnNlYXJjaC5zcWwuZXhwcmVzc2lvbi5mdW5jdGlvbi5Vc2VyRGVmaW5lZEZ1bmN0aW9uQnVpbGRlciQxIiwKICAgICAgInR5cGUiOiB7CiAgICAgICAgInR5cGUiOiAiTUFQIiwKICAgICAgICAibnVsbGFibGUiOiBmYWxzZSwKICAgICAgICAia2V5IjogewogICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAibnVsbGFibGUiOiBmYWxzZSwKICAgICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICAgIH0sCiAgICAgICAgInZhbHVlIjogewogICAgICAgICAgInR5cGUiOiAiQU5ZIiwKICAgICAgICAgICJudWxsYWJsZSI6IGZhbHNlLAogICAgICAgICAgInByZWNpc2lvbiI6IC0xLAogICAgICAgICAgInNjYWxlIjogLTIxNDc0ODM2NDgKICAgICAgICB9CiAgICAgIH0sCiAgICAgICJkZXRlcm1pbmlzdGljIjogdHJ1ZSwKICAgICAgImR5bmFtaWMiOiBmYWxzZQogICAgfSwKICAgIHsKICAgICAgImxpdGVyYWwiOiAiY2l0eSIsCiAgICAgICJ0eXBlIjogewogICAgICAgICJ0eXBlIjogIkNIQVIiLAogICAgICAgICJudWxsYWJsZSI6IGZhbHNlLAogICAgICAgICJwcmVjaXNpb24iOiA0CiAgICAgIH0KICAgIH0KICBdCn10AApmaWVsZFR5cGVzc3IAEWphdmEudXRpbC5IYXNoTWFwBQfawcMWYNEDAAJGAApsb2FkRmFjdG9ySQAJdGhyZXNob2xkeHA/QAAAAAAADHcIAAAAEAAAAAF0AARob3N0fnIAKW9yZy5vcGVuc2VhcmNoLnNxbC5kYXRhLnR5cGUuRXhwckNvcmVUeXBlAAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAACSVB4eA==\"}","lang":"opensearch_compounded_script","params":{"utcTimestamp": 0}},"missing_bucket":true,"missing_order":"first","order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalProject(count()=[$1], info.city=[$0])
5+
LogicalAggregate(group=[{0}], count()=[COUNT()])
6+
LogicalProject(info.city=[ITEM(GEOIP('my-datasource':VARCHAR, $0), 'city')])
7+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])
8+
physical: |
9+
EnumerableLimit(fetch=[10000])
10+
EnumerableCalc(expr#0..1=[{inputs}], count()=[$t1], info.city=[$t0])
11+
EnumerableAggregate(group=[{0}], count()=[COUNT()])
12+
EnumerableCalc(expr#0..11=[{inputs}], expr#12=['my-datasource':VARCHAR], expr#13=[GEOIP($t12, $t0)], expr#14=['city'], expr#15=[ITEM($t13, $t14)], info.city=[$t15])
13+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])

opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,12 @@ private ExprValue parse(
186186

187187
// Field type may be not defined in mapping if users have disabled dynamic mapping.
188188
// Then try to parse content directly based on the value itself
189-
if (fieldType.isEmpty()) {
189+
// Besides, sub-fields of generated objects are also of type UNDEFINED. We parse the content
190+
// directly on the value itself for this case as well.
191+
// TODO: Remove the second condition once https://github.com/opensearch-project/sql/issues/3751
192+
// is resolved
193+
if (fieldType.isEmpty()
194+
|| fieldType.get().equals(OpenSearchDataType.of(ExprCoreType.UNDEFINED))) {
190195
return parseContent(content);
191196
}
192197

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55

66
package org.opensearch.sql.opensearch.executor;
77

8-
import static org.opensearch.sql.expression.function.BuiltinFunctionName.DISTINCT_COUNT_APPROX;
9-
8+
import com.google.common.base.Suppliers;
109
import java.security.AccessController;
1110
import java.security.PrivilegedAction;
1211
import java.sql.PreparedStatement;
@@ -17,18 +16,23 @@
1716
import java.util.LinkedHashMap;
1817
import java.util.List;
1918
import java.util.Map;
19+
import java.util.concurrent.ConcurrentHashMap;
2020
import java.util.stream.Collectors;
2121

2222
import java.util.concurrent.atomic.AtomicReference;
23+
import java.util.function.Supplier;
2324
import org.apache.calcite.plan.RelOptUtil;
2425
import org.apache.calcite.rel.RelNode;
2526
import org.apache.calcite.rel.RelRoot;
2627
import org.apache.calcite.rel.type.RelDataType;
2728
import org.apache.calcite.rel.type.RelDataTypeField;
2829
import org.apache.calcite.runtime.Hook;
2930
import org.apache.calcite.sql.SqlExplainLevel;
31+
import org.apache.calcite.sql.SqlOperator;
32+
import org.apache.calcite.sql.SqlOperatorTable;
3033
import org.apache.calcite.sql.type.ReturnTypes;
3134
import org.apache.calcite.sql.type.SqlTypeName;
35+
import org.apache.calcite.sql.util.ListSqlOperatorTable;
3236
import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
3337
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
3438
import org.apache.logging.log4j.LogManager;
@@ -283,8 +287,9 @@ private void buildResultSet(
283287
private void registerOpenSearchFunctions() {
284288
if (client instanceof OpenSearchNodeClient) {
285289
SqlUserDefinedFunction geoIpFunction =
286-
new GeoIpFunction(client.getNodeClient()).toUDF("GEOIP");
290+
new GeoIpFunction(client.getNodeClient()).toUDF(BuiltinFunctionName.GEOIP.name());
287291
PPLFuncImpTable.INSTANCE.registerExternalOperator(BuiltinFunctionName.GEOIP, geoIpFunction);
292+
OperatorTable.addOperator(BuiltinFunctionName.GEOIP.name(), geoIpFunction);
288293
} else {
289294
logger.info(
290295
"Function [GEOIP] not registered: incompatible client type {}",
@@ -294,10 +299,37 @@ private void registerOpenSearchFunctions() {
294299
SqlUserDefinedAggFunction approxDistinctCountFunction =
295300
UserDefinedFunctionUtils.createUserDefinedAggFunction(
296301
DistinctCountApproxAggFunction.class,
297-
DISTINCT_COUNT_APPROX.toString(),
302+
BuiltinFunctionName.DISTINCT_COUNT_APPROX.name(),
298303
ReturnTypes.BIGINT_FORCE_NULLABLE,
299304
null);
300305
PPLFuncImpTable.INSTANCE.registerExternalAggOperator(
301-
DISTINCT_COUNT_APPROX, approxDistinctCountFunction);
306+
BuiltinFunctionName.DISTINCT_COUNT_APPROX, approxDistinctCountFunction);
307+
OperatorTable.addOperator(
308+
BuiltinFunctionName.DISTINCT_COUNT_APPROX.name(), approxDistinctCountFunction);
309+
}
310+
311+
/**
312+
* Dynamic SqlOperatorTable that allows adding operators after initialization. Similar to
313+
* PPLBuiltinOperator.instance() or SqlStdOperatorTable.instance().
314+
*/
315+
public static class OperatorTable extends ListSqlOperatorTable {
316+
private static final Supplier<OperatorTable> INSTANCE =
317+
Suppliers.memoize(() -> (OperatorTable) new OperatorTable().init());
318+
// Use map instead of list to avoid duplicated elements if the class is initialized multiple
319+
// times
320+
private static final Map<String, SqlOperator> operators = new ConcurrentHashMap<>();
321+
322+
public static SqlOperatorTable instance() {
323+
return INSTANCE.get();
324+
}
325+
326+
private ListSqlOperatorTable init() {
327+
setOperators(buildIndex(operators.values()));
328+
return this;
329+
}
330+
331+
public static synchronized void addOperator(String name, SqlOperator operator) {
332+
operators.put(name, operator);
333+
}
302334
}
303335
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/serde/RelJsonSerializer.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.io.ByteArrayOutputStream;
1313
import java.io.ObjectInputStream;
1414
import java.io.ObjectOutputStream;
15+
import java.io.Serializable;
1516
import java.util.Base64;
1617
import java.util.HashMap;
1718
import java.util.LinkedHashMap;
@@ -31,6 +32,7 @@
3132
import org.opensearch.sql.calcite.CalcitePlanContext;
3233
import org.opensearch.sql.data.type.ExprType;
3334
import org.opensearch.sql.expression.function.PPLBuiltinOperators;
35+
import org.opensearch.sql.opensearch.executor.OpenSearchExecutionEngine;
3436
import org.opensearch.sql.opensearch.util.OpenSearchRelOptUtil;
3537

3638
/**
@@ -39,7 +41,7 @@
3941
* <p>This serializer:
4042
* <li>Uses Calcite's RelJson class to convert RexNode and RelDataType to/from JSON string
4143
* <li>Manages required OpenSearch field mapping information Note: OpenSearch ExprType subclasses
42-
* implement {@link java.io.Serializable} and are handled through standard Java serialization.
44+
* implement {@link Serializable} and are handled through standard Java serialization.
4345
*/
4446
@Getter
4547
public class RelJsonSerializer {
@@ -52,13 +54,7 @@ public class RelJsonSerializer {
5254
private static final ObjectMapper mapper = new ObjectMapper();
5355
private static final TypeReference<LinkedHashMap<String, Object>> TYPE_REF =
5456
new TypeReference<>() {};
55-
private static final SqlOperatorTable pplSqlOperatorTable =
56-
SqlOperatorTables.chain(
57-
PPLBuiltinOperators.instance(),
58-
SqlStdOperatorTable.instance(),
59-
// Add a list of necessary SqlLibrary if needed
60-
SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable(
61-
SqlLibrary.MYSQL, SqlLibrary.BIG_QUERY, SqlLibrary.SPARK, SqlLibrary.POSTGRESQL));
57+
private static volatile SqlOperatorTable pplSqlOperatorTable;
6258

6359
static {
6460
mapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true);
@@ -68,6 +64,27 @@ public RelJsonSerializer(RelOptCluster cluster) {
6864
this.cluster = cluster;
6965
}
7066

67+
private static SqlOperatorTable getPplSqlOperatorTable() {
68+
if (pplSqlOperatorTable == null) {
69+
synchronized (RelJsonSerializer.class) {
70+
if (pplSqlOperatorTable == null) {
71+
pplSqlOperatorTable =
72+
SqlOperatorTables.chain(
73+
PPLBuiltinOperators.instance(),
74+
SqlStdOperatorTable.instance(),
75+
OpenSearchExecutionEngine.OperatorTable.instance(),
76+
// Add a list of necessary SqlLibrary if needed
77+
SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable(
78+
SqlLibrary.MYSQL,
79+
SqlLibrary.BIG_QUERY,
80+
SqlLibrary.SPARK,
81+
SqlLibrary.POSTGRESQL));
82+
}
83+
}
84+
}
85+
return pplSqlOperatorTable;
86+
}
87+
7188
/**
7289
* Serializes Calcite expressions and field types into a map object string.
7390
*
@@ -136,7 +153,8 @@ public Map<String, Object> deserialize(String struct) {
136153
Map<String, Object> rowTypeMap = mapper.readValue((String) objectMap.get(ROW_TYPE), TYPE_REF);
137154
RelDataType rowType = relJson.toType(cluster.getTypeFactory(), rowTypeMap);
138155
OpenSearchRelInputTranslator inputTranslator = new OpenSearchRelInputTranslator(rowType);
139-
relJson = relJson.withInputTranslator(inputTranslator).withOperatorTable(pplSqlOperatorTable);
156+
relJson =
157+
relJson.withInputTranslator(inputTranslator).withOperatorTable(getPplSqlOperatorTable());
140158
Map<String, Object> exprMap = mapper.readValue((String) objectMap.get(EXPR), TYPE_REF);
141159
RexNode rexNode = relJson.toRex(cluster, exprMap);
142160

0 commit comments

Comments
 (0)