Skip to content

Commit c6a06d1

Browse files
committed
Supports serilizing external OpenSearch UDFs
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
1 parent a4b8ac1 commit c6a06d1

2 files changed

Lines changed: 63 additions & 11 deletions

File tree

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.sql.opensearch.executor;
77

8+
import com.google.common.base.Suppliers;
89
import java.security.AccessController;
910
import java.security.PrivilegedAction;
1011
import java.sql.PreparedStatement;
@@ -15,16 +16,21 @@
1516
import java.util.LinkedHashMap;
1617
import java.util.List;
1718
import java.util.Map;
19+
import java.util.concurrent.ConcurrentHashMap;
1820
import java.util.concurrent.atomic.AtomicReference;
21+
import java.util.function.Supplier;
1922
import org.apache.calcite.plan.RelOptUtil;
2023
import org.apache.calcite.rel.RelNode;
2124
import org.apache.calcite.rel.RelRoot;
2225
import org.apache.calcite.rel.type.RelDataType;
2326
import org.apache.calcite.rel.type.RelDataTypeField;
2427
import org.apache.calcite.runtime.Hook;
2528
import org.apache.calcite.sql.SqlExplainLevel;
29+
import org.apache.calcite.sql.SqlOperator;
30+
import org.apache.calcite.sql.SqlOperatorTable;
2631
import org.apache.calcite.sql.type.ReturnTypes;
2732
import org.apache.calcite.sql.type.SqlTypeName;
33+
import org.apache.calcite.sql.util.ListSqlOperatorTable;
2834
import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
2935
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
3036
import org.apache.logging.log4j.LogManager;
@@ -271,8 +277,9 @@ private void buildResultSet(
271277
private void registerOpenSearchFunctions() {
272278
if (client instanceof OpenSearchNodeClient) {
273279
SqlUserDefinedFunction geoIpFunction =
274-
new GeoIpFunction(client.getNodeClient()).toUDF("GEOIP");
280+
new GeoIpFunction(client.getNodeClient()).toUDF(BuiltinFunctionName.GEOIP.name());
275281
PPLFuncImpTable.INSTANCE.registerExternalOperator(BuiltinFunctionName.GEOIP, geoIpFunction);
282+
OperatorTable.addOperator(BuiltinFunctionName.GEOIP.name(), geoIpFunction);
276283
} else {
277284
logger.info(
278285
"Function [GEOIP] not registered: incompatible client type {}",
@@ -282,10 +289,37 @@ private void registerOpenSearchFunctions() {
282289
SqlUserDefinedAggFunction approxDistinctCountFunction =
283290
UserDefinedFunctionUtils.createUserDefinedAggFunction(
284291
DistinctCountApproxAggFunction.class,
285-
"APPROX_DISTINCT_COUNT",
292+
BuiltinFunctionName.DISTINCT_COUNT_APPROX.name(),
286293
ReturnTypes.BIGINT_FORCE_NULLABLE,
287294
null);
288295
PPLFuncImpTable.INSTANCE.registerExternalAggOperator(
289296
BuiltinFunctionName.DISTINCT_COUNT_APPROX, approxDistinctCountFunction);
297+
OperatorTable.addOperator(
298+
BuiltinFunctionName.DISTINCT_COUNT_APPROX.name(), approxDistinctCountFunction);
299+
}
300+
301+
/**
302+
* Dynamic SqlOperatorTable that allows adding operators after initialization. Similar to
303+
* PPLBuiltinOperator.instance() or SqlStdOperatorTable.instance().
304+
*/
305+
public static class OperatorTable extends ListSqlOperatorTable {
306+
private static final Supplier<OperatorTable> INSTANCE =
307+
Suppliers.memoize(() -> (OperatorTable) new OperatorTable().init());
308+
// Use map instead of list to avoid duplicated elements if the class is initialized multiple
309+
// times
310+
private static final Map<String, SqlOperator> operators = new ConcurrentHashMap<>();
311+
312+
public static SqlOperatorTable instance() {
313+
return INSTANCE.get();
314+
}
315+
316+
private ListSqlOperatorTable init() {
317+
setOperators(buildIndex(operators.values()));
318+
return this;
319+
}
320+
321+
public static synchronized void addOperator(String name, SqlOperator operator) {
322+
operators.put(name, operator);
323+
}
290324
}
291325
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/serde/RelJsonSerializer.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.io.ByteArrayOutputStream;
1313
import java.io.ObjectInputStream;
1414
import java.io.ObjectOutputStream;
15+
import java.io.Serializable;
1516
import java.util.Base64;
1617
import java.util.LinkedHashMap;
1718
import java.util.Map;
@@ -29,14 +30,15 @@
2930
import org.opensearch.sql.calcite.CalcitePlanContext;
3031
import org.opensearch.sql.data.type.ExprType;
3132
import org.opensearch.sql.expression.function.PPLBuiltinOperators;
33+
import org.opensearch.sql.opensearch.executor.OpenSearchExecutionEngine;
3234

3335
/**
3436
* A serializer that (de-)serializes Calcite RexNode, RelDataType and OpenSearch field mapping.
3537
*
3638
* <p>This serializer:
3739
* <li>Uses Calcite's RelJson class to convert RexNode and RelDataType to/from JSON string
3840
* <li>Manages required OpenSearch field mapping information Note: OpenSearch ExprType subclasses
39-
* implement {@link java.io.Serializable} and are handled through standard Java serialization.
41+
* implement {@link Serializable} and are handled through standard Java serialization.
4042
*/
4143
@Getter
4244
public class RelJsonSerializer {
@@ -49,13 +51,7 @@ public class RelJsonSerializer {
4951
private static final ObjectMapper mapper = new ObjectMapper();
5052
private static final TypeReference<LinkedHashMap<String, Object>> TYPE_REF =
5153
new TypeReference<>() {};
52-
private static final SqlOperatorTable pplSqlOperatorTable =
53-
SqlOperatorTables.chain(
54-
PPLBuiltinOperators.instance(),
55-
SqlStdOperatorTable.instance(),
56-
// Add a list of necessary SqlLibrary if needed
57-
SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable(
58-
SqlLibrary.MYSQL, SqlLibrary.BIG_QUERY, SqlLibrary.SPARK, SqlLibrary.POSTGRESQL));
54+
private static volatile SqlOperatorTable pplSqlOperatorTable;
5955

6056
static {
6157
mapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true);
@@ -65,6 +61,27 @@ public RelJsonSerializer(RelOptCluster cluster) {
6561
this.cluster = cluster;
6662
}
6763

64+
private static SqlOperatorTable getPplSqlOperatorTable() {
65+
if (pplSqlOperatorTable == null) {
66+
synchronized (RelJsonSerializer.class) {
67+
if (pplSqlOperatorTable == null) {
68+
pplSqlOperatorTable =
69+
SqlOperatorTables.chain(
70+
PPLBuiltinOperators.instance(),
71+
SqlStdOperatorTable.instance(),
72+
OpenSearchExecutionEngine.OperatorTable.instance(),
73+
// Add a list of necessary SqlLibrary if needed
74+
SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable(
75+
SqlLibrary.MYSQL,
76+
SqlLibrary.BIG_QUERY,
77+
SqlLibrary.SPARK,
78+
SqlLibrary.POSTGRESQL));
79+
}
80+
}
81+
}
82+
return pplSqlOperatorTable;
83+
}
84+
6885
/**
6986
* Serializes Calcite expressions and field types into a map object string.
7087
*
@@ -125,7 +142,8 @@ public Map<String, Object> deserialize(String struct) {
125142
Map<String, Object> rowTypeMap = mapper.readValue((String) objectMap.get(ROW_TYPE), TYPE_REF);
126143
RelDataType rowType = relJson.toType(cluster.getTypeFactory(), rowTypeMap);
127144
OpenSearchRelInputTranslator inputTranslator = new OpenSearchRelInputTranslator(rowType);
128-
relJson = relJson.withInputTranslator(inputTranslator).withOperatorTable(pplSqlOperatorTable);
145+
relJson =
146+
relJson.withInputTranslator(inputTranslator).withOperatorTable(getPplSqlOperatorTable());
129147
Map<String, Object> exprMap = mapper.readValue((String) objectMap.get(EXPR), TYPE_REF);
130148
RexNode rexNode = relJson.toRex(cluster, exprMap);
131149

0 commit comments

Comments
 (0)