Skip to content

Commit 47a5d7a

Browse files
committed
Move geoip function to opensearch package & register it in OpenSearchExecutionEngine
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
1 parent e1dcaa0 commit 47a5d7a

10 files changed

Lines changed: 52 additions & 47 deletions

File tree

core/build.gradle

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,6 @@ dependencies {
6363
api 'org.apache.calcite:calcite-linq4j:1.38.0'
6464
api project(':common')
6565
implementation "com.github.seancfoley:ipaddress:5.4.2"
66-
implementation group: 'org.opensearch', name:'geospatial-client', version: "${opensearch_build}"
67-
implementation group: 'org.opensearch.client', name: 'opensearch-rest-high-level-client', version: "${opensearch_version}"
6866

6967
annotationProcessor('org.immutables:value:2.8.8')
7068
compileOnly('org.immutables:value-annotations:2.8.8')

core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;
2222
import org.opensearch.sql.executor.QueryType;
2323
import org.opensearch.sql.expression.function.FunctionProperties;
24-
import org.opensearch.transport.client.node.NodeClient;
2524

2625
public class CalcitePlanContext {
2726

@@ -45,17 +44,13 @@ public class CalcitePlanContext {
4544

4645
private final Stack<RexCorrelVariable> correlVar = new Stack<>();
4746

48-
/** Certain functions (e.g., GEOIP) depend on NodeClient for performing RPC calls. * */
49-
@Getter private final NodeClient client;
50-
51-
private CalcitePlanContext(FrameworkConfig config, QueryType queryType, NodeClient client) {
47+
private CalcitePlanContext(FrameworkConfig config, QueryType queryType) {
5248
this.config = config;
5349
this.queryType = queryType;
5450
this.connection = CalciteToolsHelper.connect(config, TYPE_FACTORY);
5551
this.relBuilder = CalciteToolsHelper.create(config, TYPE_FACTORY, connection);
5652
this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder());
5753
this.functionProperties = new FunctionProperties(QueryType.PPL);
58-
this.client = client;
5954
}
6055

6156
public RexNode resolveJoinCondition(
@@ -87,12 +82,7 @@ public Optional<RexCorrelVariable> peekCorrelVar() {
8782
}
8883
}
8984

90-
public static CalcitePlanContext create(
91-
FrameworkConfig config, QueryType queryType, NodeClient client) {
92-
return new CalcitePlanContext(config, queryType, client);
93-
}
94-
9585
public static CalcitePlanContext create(FrameworkConfig config, QueryType queryType) {
96-
return new CalcitePlanContext(config, queryType, null);
86+
return new CalcitePlanContext(config, queryType);
9787
}
9888
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@
6565
import org.opensearch.sql.exception.SemanticCheckException;
6666
import org.opensearch.sql.expression.function.BuiltinFunctionName;
6767
import org.opensearch.sql.expression.function.PPLFuncImpTable;
68-
import org.opensearch.sql.expression.function.udf.ip.GeoIpFunction;
6968

7069
@RequiredArgsConstructor
7170
public class CalciteRexNodeVisitor extends AbstractNodeVisitor<RexNode, CalcitePlanContext> {
@@ -349,13 +348,6 @@ public RexNode visitFunction(Function node, CalcitePlanContext context) {
349348
List<RexNode> arguments =
350349
node.getFuncArgs().stream().map(arg -> analyze(arg, context)).collect(Collectors.toList());
351350

352-
// GEOIP needs NodeClient to perform RPC calls. Therefore, we handle it separately.
353-
if (BuiltinFunctionName.GEOIP.equals(BuiltinFunctionName.of(node.getFuncName()).orElse(null))) {
354-
PPLFuncImpTable.FunctionImp geoIpImpl =
355-
(builder, args) ->
356-
builder.makeCall(new GeoIpFunction(context.getClient()).toUDF("GEOIP"), args);
357-
return geoIpImpl.resolve(context.rexBuilder, arguments.toArray(new RexNode[0]));
358-
}
359351
RexNode resolvedNode =
360352
PPLFuncImpTable.INSTANCE.resolveSafe(
361353
context.rexBuilder, node.getFuncName(), arguments.toArray(new RexNode[0]));

core/src/main/java/org/opensearch/sql/executor/QueryService.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.opensearch.sql.planner.Planner;
4545
import org.opensearch.sql.planner.logical.LogicalPlan;
4646
import org.opensearch.sql.planner.physical.PhysicalPlan;
47-
import org.opensearch.transport.client.node.NodeClient;
4847

4948
/** The low level interface of core engine. */
5049
@RequiredArgsConstructor
@@ -60,7 +59,6 @@ public class QueryService {
6059

6160
private DataSourceService dataSourceService;
6261
private Settings settings;
63-
private NodeClient nodeClient;
6462

6563
/** Execute the {@link UnresolvedPlan}, using {@link ResponseListener} to get response.<br> */
6664
public void execute(
@@ -96,7 +94,7 @@ public void executeWithCalcite(
9694
(PrivilegedAction<Void>)
9795
() -> {
9896
CalcitePlanContext context =
99-
CalcitePlanContext.create(buildFrameworkConfig(), queryType, nodeClient);
97+
CalcitePlanContext.create(buildFrameworkConfig(), queryType);
10098
RelNode relNode = analyze(plan, context);
10199
RelNode optimized = optimize(relNode);
102100
RelNode calcitePlan = convertToCalcitePlan(optimized);
@@ -128,7 +126,7 @@ public void explainWithCalcite(
128126
(PrivilegedAction<Void>)
129127
() -> {
130128
CalcitePlanContext context =
131-
CalcitePlanContext.create(buildFrameworkConfig(), queryType, nodeClient);
129+
CalcitePlanContext.create(buildFrameworkConfig(), queryType);
132130
RelNode relNode = analyze(plan, context);
133131
RelNode optimized = optimize(relNode);
134132
RelNode calcitePlan = convertToCalcitePlan(optimized);

core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,13 +152,29 @@ default List<RelDataType> getParams() {
152152
INSTANCE = new PPLFuncImpTable(builder);
153153
}
154154

155-
private final ImmutableMap<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>> map;
155+
private final ImmutableMap<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>>
156+
functionRegistry;
157+
private final Map<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>>
158+
externalFunctionRegistry;
156159

157160
private PPLFuncImpTable(Builder builder) {
158161
final ImmutableMap.Builder<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>>
159162
mapBuilder = ImmutableMap.builder();
160163
builder.map.forEach((k, v) -> mapBuilder.put(k, v.immutable()));
161-
this.map = ImmutableMap.copyOf(mapBuilder.build());
164+
this.functionRegistry = ImmutableMap.copyOf(mapBuilder.build());
165+
this.externalFunctionRegistry = new HashMap<>();
166+
}
167+
168+
/**
169+
* Register a function implementation from external services dynamically.
170+
*
171+
* @param functionName the name of the function, has to be defined in BuiltinFunctionName
172+
* @param functionImp the implementation of the function
173+
*/
174+
public void registerExternalFunction(BuiltinFunctionName functionName, FunctionImp functionImp) {
175+
CalciteFuncSignature signature =
176+
new CalciteFuncSignature(functionName.getName(), functionImp.getParams());
177+
externalFunctionRegistry.put(functionName, PairList.of(signature, functionImp));
162178
}
163179

164180
public @Nullable RexNode resolveSafe(
@@ -180,7 +196,11 @@ public RexNode resolve(final RexBuilder builder, final String functionName, RexN
180196

181197
public RexNode resolve(
182198
final RexBuilder builder, final BuiltinFunctionName functionName, RexNode... args) {
183-
final PairList<CalciteFuncSignature, FunctionImp> implementList = map.get(functionName);
199+
PairList<CalciteFuncSignature, FunctionImp> implementList = functionRegistry.get(functionName);
200+
// If the function is not part of the built-in registry, check the external registry
201+
if (implementList == null) {
202+
implementList = externalFunctionRegistry.get(functionName);
203+
}
184204
if (implementList == null || implementList.isEmpty()) {
185205
throw new IllegalStateException(String.format("Cannot resolve function: %s", functionName));
186206
}

datasources/src/main/java/org/opensearch/sql/datasources/service/DataSourceServiceImpl.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import java.util.*;
1212
import java.util.stream.Collectors;
1313

14-
import lombok.Getter;
1514
import org.opensearch.sql.datasource.DataSourceService;
1615
import org.opensearch.sql.datasource.RequestContext;
1716
import org.opensearch.sql.datasource.model.DataSource;
@@ -20,9 +19,7 @@
2019
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelper;
2120
import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException;
2221
import org.opensearch.sql.datasources.exceptions.DatasourceDisabledException;
23-
import org.opensearch.sql.legacy.esdomain.OpenSearchClient;
2422
import org.opensearch.sql.storage.DataSourceFactory;
25-
import org.opensearch.transport.client.node.NodeClient;
2623

2724
/**
2825
* Default implementation of {@link DataSourceService}. It is per-jvm single instance.

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ public QueryPlanFactory queryPlanFactory(ExecutionEngine executionEngine) {
372372
Planner planner = new Planner(LogicalPlanOptimizer.create());
373373
// NodeClient is not used in integration test, so we pass null
374374
QueryService queryService =
375-
new QueryService(analyzer, executionEngine, planner, dataSourceService, settings, null);
375+
new QueryService(analyzer, executionEngine, planner, dataSourceService, settings);
376376
return new QueryPlanFactory(queryService);
377377
}
378378
}

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import java.util.List;
1919
import java.util.Map;
2020
import java.util.concurrent.atomic.AtomicReference;
21-
import lombok.RequiredArgsConstructor;
2221
import org.apache.calcite.plan.RelOptUtil;
2322
import org.apache.calcite.rel.RelNode;
2423
import org.apache.calcite.rel.RelRoot;
@@ -38,21 +37,33 @@
3837
import org.opensearch.sql.executor.ExecutionEngine.Schema.Column;
3938
import org.opensearch.sql.executor.Explain;
4039
import org.opensearch.sql.executor.pagination.PlanSerializer;
40+
import org.opensearch.sql.expression.function.BuiltinFunctionName;
41+
import org.opensearch.sql.expression.function.PPLFuncImpTable;
4142
import org.opensearch.sql.opensearch.client.OpenSearchClient;
4243
import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector;
44+
import org.opensearch.sql.opensearch.functions.GeoIpFunction;
4345
import org.opensearch.sql.opensearch.util.JdbcOpenSearchDataTypeConvertor;
4446
import org.opensearch.sql.planner.physical.PhysicalPlan;
4547
import org.opensearch.sql.storage.TableScanOperator;
4648

4749
/** OpenSearch execution engine implementation. */
48-
@RequiredArgsConstructor
4950
public class OpenSearchExecutionEngine implements ExecutionEngine {
5051

5152
private final OpenSearchClient client;
5253

5354
private final ExecutionProtector executionProtector;
5455
private final PlanSerializer planSerializer;
5556

57+
public OpenSearchExecutionEngine(
58+
OpenSearchClient client,
59+
ExecutionProtector executionProtector,
60+
PlanSerializer planSerializer) {
61+
this.client = client;
62+
this.executionProtector = executionProtector;
63+
this.planSerializer = planSerializer;
64+
registerOpenSearchFunctions();
65+
}
66+
5667
@Override
5768
public void execute(PhysicalPlan physicalPlan, ResponseListener<QueryResponse> listener) {
5869
execute(physicalPlan, ExecutionContext.emptyExecutionContext(), listener);
@@ -224,4 +235,12 @@ private void buildResultSet(
224235
QueryResponse response = new QueryResponse(schema, values, null);
225236
listener.onResponse(response);
226237
}
238+
239+
/** Registers opensearch-dependent functions */
240+
private void registerOpenSearchFunctions() {
241+
PPLFuncImpTable.FunctionImp geoIpImpl =
242+
(builder, args) ->
243+
builder.makeCall(new GeoIpFunction(client.getNodeClient()).toUDF("GEOIP"), args);
244+
PPLFuncImpTable.INSTANCE.registerExternalFunction(BuiltinFunctionName.GEOIP, geoIpImpl);
245+
}
227246
}

core/src/main/java/org/opensearch/sql/expression/function/udf/ip/GeoIpFunction.java renamed to opensearch/src/main/java/org/opensearch/sql/opensearch/functions/GeoIpFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.sql.expression.function.udf.ip;
6+
package org.opensearch.sql.opensearch.functions;
77

88
import java.util.*;
99
import java.util.stream.Collectors;

plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -98,22 +98,13 @@ public SQLService sqlService(QueryManager queryManager, QueryPlanFactory queryPl
9898
/** {@link QueryPlanFactory}. */
9999
@Provides
100100
public QueryPlanFactory queryPlanFactory(
101-
DataSourceService dataSourceService,
102-
ExecutionEngine executionEngine,
103-
Settings settings,
104-
OpenSearchClient client) {
101+
DataSourceService dataSourceService, ExecutionEngine executionEngine, Settings settings) {
105102
Analyzer analyzer =
106103
new Analyzer(
107104
new ExpressionAnalyzer(functionRepository), dataSourceService, functionRepository);
108105
Planner planner = new Planner(LogicalPlanOptimizer.create());
109106
QueryService queryService =
110-
new QueryService(
111-
analyzer,
112-
executionEngine,
113-
planner,
114-
dataSourceService,
115-
settings,
116-
client.getNodeClient());
107+
new QueryService(analyzer, executionEngine, planner, dataSourceService, settings);
117108
return new QueryPlanFactory(queryService);
118109
}
119110
}

0 commit comments

Comments
 (0)