Skip to content

Commit a3e6a54

Browse files
committed
refactor code
Signed-off-by: xinyual <xinyual@amazon.com>
1 parent 04d081a commit a3e6a54

5 files changed

Lines changed: 136 additions & 117 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/udf/udaf/AggHandler.java

Lines changed: 0 additions & 17 deletions
This file was deleted.

core/src/main/java/org/opensearch/sql/calcite/udf/udaf/AggTransferFunctionMap.java

Lines changed: 0 additions & 86 deletions
This file was deleted.

core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import static org.apache.calcite.rex.RexWindowBounds.UNBOUNDED_PRECEDING;
1111
import static org.apache.calcite.rex.RexWindowBounds.following;
1212
import static org.apache.calcite.rex.RexWindowBounds.preceding;
13-
import static org.opensearch.sql.calcite.udf.udaf.AggTransferFunctionMap.AGG_FUNCTION_MAP;
1413

1514
import com.google.common.collect.ImmutableList;
1615
import java.util.ArrayList;
@@ -28,8 +27,8 @@
2827
import org.opensearch.sql.ast.expression.WindowBound;
2928
import org.opensearch.sql.ast.expression.WindowFrame;
3029
import org.opensearch.sql.calcite.CalcitePlanContext;
31-
import org.opensearch.sql.calcite.udf.udaf.AggHandler;
3230
import org.opensearch.sql.expression.function.BuiltinFunctionName;
31+
import org.opensearch.sql.expression.function.PPLFuncImpTable;
3332

3433
public interface PlanUtils {
3534

@@ -220,13 +219,7 @@ static RelBuilder.AggCall makeAggCall(
220219
boolean distinct,
221220
RexNode field,
222221
List<RexNode> argList) {
223-
AggHandler handler = AGG_FUNCTION_MAP.get(functionName);
224-
225-
if (handler == null) {
226-
throw new UnsupportedOperationException("Unexpected aggregation: " + functionName);
227-
}
228-
229-
return handler.apply(distinct, field, argList, context);
222+
return PPLFuncImpTable.INSTANCE.resolveAgg(functionName, distinct, field, argList, context);
230223
}
231224

232225
/** Get all uniq input references from a RexNode. */

core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java

Lines changed: 133 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,14 @@
55

66
package org.opensearch.sql.expression.function;
77

8+
import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.*;
9+
import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.STDDEV_POP_NULLABLE;
810
import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY;
911
import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.getLegacyTypeName;
12+
import static org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils.TransferUserDefinedAggFunction;
1013
import static org.opensearch.sql.expression.function.BuiltinFunctionName.*;
1114

15+
import com.google.common.collect.ImmutableList;
1216
import com.google.common.collect.ImmutableMap;
1317
import java.math.BigDecimal;
1418
import java.util.ArrayList;
@@ -24,11 +28,23 @@
2428
import org.apache.calcite.sql.fun.SqlLibraryOperators;
2529
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
2630
import org.apache.calcite.sql.fun.SqlTrimFunction.Flag;
31+
import org.apache.calcite.sql.type.ReturnTypes;
2732
import org.apache.calcite.sql.type.SqlTypeName;
33+
import org.apache.calcite.tools.RelBuilder;
2834
import org.apache.commons.lang3.tuple.Pair;
35+
import org.opensearch.sql.calcite.CalcitePlanContext;
36+
import org.opensearch.sql.calcite.udf.udaf.PercentileApproxFunction;
37+
import org.opensearch.sql.calcite.udf.udaf.TakeAggFunction;
38+
import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils;
2939
import org.opensearch.sql.executor.QueryType;
3040

3141
public class PPLFuncImpTable {
42+
/** A lambda function interface which could apply parameters to get AggCall. */
43+
@FunctionalInterface
44+
public interface AggHandler {
45+
RelBuilder.AggCall apply(
46+
boolean distinct, RexNode field, List<RexNode> argList, CalcitePlanContext context);
47+
}
3248

3349
public interface FunctionImp {
3450
RelDataType ANY_TYPE = TYPE_FACTORY.createSqlType(SqlTypeName.ANY);
@@ -88,7 +104,9 @@ default List<RelDataType> getParams() {
88104
static {
89105
final Builder builder = new Builder();
90106
builder.populate();
91-
INSTANCE = new PPLFuncImpTable(builder);
107+
final AggBuilder aggBuilder = new AggBuilder();
108+
aggBuilder.populate();
109+
INSTANCE = new PPLFuncImpTable(builder, aggBuilder);
92110
}
93111

94112
/**
@@ -107,12 +125,32 @@ default List<RelDataType> getParams() {
107125
private final Map<BuiltinFunctionName, List<Pair<CalciteFuncSignature, FunctionImp>>>
108126
externalFunctionRegistry;
109127

110-
private PPLFuncImpTable(Builder builder) {
128+
/**
129+
* The registry for built-in agg functions. Agg Functions defined by the PPL specification, whose
130+
* implementations are independent of any specific data storage, should be registered here
131+
* internally.
132+
*/
133+
private final ImmutableMap<BuiltinFunctionName, AggHandler> aggFunctionRegistry;
134+
135+
/**
136+
* The external agg function registry. Agg Functions whose implementations depend on a specific
137+
* data engine should be registered here. This reduces coupling between the core module and
138+
* particular storage backends.
139+
*/
140+
private final Map<BuiltinFunctionName, AggHandler> aggExternalFunctionRegistry;
141+
142+
private PPLFuncImpTable(Builder builder, AggBuilder aggBuilder) {
111143
final ImmutableMap.Builder<BuiltinFunctionName, List<Pair<CalciteFuncSignature, FunctionImp>>>
112144
mapBuilder = ImmutableMap.builder();
113145
builder.map.forEach((k, v) -> mapBuilder.put(k, List.copyOf(v)));
114146
this.functionRegistry = ImmutableMap.copyOf(mapBuilder.build());
115147
this.externalFunctionRegistry = new HashMap<>();
148+
149+
final ImmutableMap.Builder<BuiltinFunctionName, AggHandler> aggMapBuilder =
150+
ImmutableMap.builder();
151+
aggBuilder.map.forEach(aggMapBuilder::put);
152+
this.aggFunctionRegistry = ImmutableMap.copyOf(aggMapBuilder.build());
153+
this.aggExternalFunctionRegistry = new HashMap<>();
116154
}
117155

118156
/**
@@ -132,6 +170,33 @@ public void registerExternalFunction(BuiltinFunctionName functionName, FunctionI
132170
}
133171
}
134172

173+
/**
174+
* Register a function implementation from external services dynamically.
175+
*
176+
* @param functionName the name of the function, has to be defined in BuiltinFunctionName
177+
* @param functionImp the implementation of the agg function
178+
*/
179+
public void registerExternalAggFunction(
180+
BuiltinFunctionName functionName, AggHandler functionImp) {
181+
aggExternalFunctionRegistry.put(functionName, functionImp);
182+
}
183+
184+
public RelBuilder.AggCall resolveAgg(
185+
BuiltinFunctionName functionName,
186+
boolean distinct,
187+
RexNode field,
188+
List<RexNode> argList,
189+
CalcitePlanContext context) {
190+
AggHandler handler = aggExternalFunctionRegistry.get(functionName);
191+
if (handler == null) {
192+
handler = aggFunctionRegistry.get(functionName);
193+
}
194+
if (handler == null) {
195+
throw new IllegalStateException(String.format("Cannot resolve function: %s", functionName));
196+
}
197+
return handler.apply(distinct, field, argList, context);
198+
}
199+
135200
public RexNode resolve(final RexBuilder builder, final String functionName, RexNode... args) {
136201
Optional<BuiltinFunctionName> funcNameOpt = BuiltinFunctionName.of(functionName);
137202
if (funcNameOpt.isEmpty()) {
@@ -462,4 +527,70 @@ public List<RelDataType> getParams() {
462527
return List.of(boolType, boolType);
463528
}
464529
}
530+
531+
private static class AggBuilder {
532+
private final Map<BuiltinFunctionName, AggHandler> map = new HashMap<>();
533+
534+
void register(BuiltinFunctionName functionName, AggHandler aggHandler) {
535+
map.put(functionName, aggHandler);
536+
}
537+
538+
void populate() {
539+
register(MAX, (distinct, field, argList, ctx) -> ctx.relBuilder.max(field));
540+
register(MIN, (distinct, field, argList, ctx) -> ctx.relBuilder.min(field));
541+
542+
register(AVG, (distinct, field, argList, ctx) -> ctx.relBuilder.avg(distinct, null, field));
543+
544+
register(
545+
COUNT,
546+
(distinct, field, argList, ctx) ->
547+
ctx.relBuilder.count(
548+
distinct, null, field == null ? ImmutableList.of() : ImmutableList.of(field)));
549+
register(SUM, (distinct, field, argList, ctx) -> ctx.relBuilder.sum(distinct, null, field));
550+
551+
register(
552+
VARSAMP,
553+
(distinct, field, argList, ctx) ->
554+
ctx.relBuilder.aggregateCall(VAR_SAMP_NULLABLE, field));
555+
556+
register(
557+
VARPOP,
558+
(distinct, field, argList, ctx) -> ctx.relBuilder.aggregateCall(VAR_POP_NULLABLE, field));
559+
560+
register(
561+
STDDEV_SAMP,
562+
(distinct, field, argList, ctx) ->
563+
ctx.relBuilder.aggregateCall(STDDEV_SAMP_NULLABLE, field));
564+
565+
register(
566+
STDDEV_POP,
567+
(distinct, field, argList, ctx) ->
568+
ctx.relBuilder.aggregateCall(STDDEV_POP_NULLABLE, field));
569+
570+
register(
571+
TAKE,
572+
(distinct, field, argList, ctx) ->
573+
TransferUserDefinedAggFunction(
574+
TakeAggFunction.class,
575+
"TAKE",
576+
UserDefinedFunctionUtils.getReturnTypeInferenceForArray(),
577+
List.of(field),
578+
argList,
579+
ctx.relBuilder));
580+
581+
register(
582+
PERCENTILE_APPROX,
583+
(distinct, field, argList, ctx) -> {
584+
List<RexNode> newArgList = new ArrayList<>(argList);
585+
newArgList.add(ctx.rexBuilder.makeFlag(field.getType().getSqlTypeName()));
586+
return TransferUserDefinedAggFunction(
587+
PercentileApproxFunction.class,
588+
"percentile_approx",
589+
ReturnTypes.ARG0_FORCE_NULLABLE,
590+
List.of(field),
591+
newArgList,
592+
ctx.relBuilder);
593+
});
594+
}
595+
}
465596
}

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@
3030
import org.apache.calcite.sql.type.ReturnTypes;
3131
import org.opensearch.sql.ast.statement.Explain.ExplainFormat;
3232
import org.opensearch.sql.calcite.CalcitePlanContext;
33-
import org.opensearch.sql.calcite.udf.udaf.AggTransferFunctionMap;
34-
import org.opensearch.sql.calcite.udf.udaf.AggTransferFunctionMap.*;
3533
import org.opensearch.sql.calcite.utils.CalciteToolsHelper.OpenSearchRelRunners;
3634
import org.opensearch.sql.common.response.ResponseListener;
3735
import org.opensearch.sql.data.model.ExprTupleValue;
@@ -253,7 +251,7 @@ private void registerOpenSearchFunctions() {
253251
builder.makeCall(new GeoIpFunction(client.getNodeClient()).toUDF("GEOIP"), args);
254252
PPLFuncImpTable.INSTANCE.registerExternalFunction(BuiltinFunctionName.GEOIP, geoIpImpl);
255253

256-
AggTransferFunctionMap.AGG_FUNCTION_MAP.put(
254+
PPLFuncImpTable.INSTANCE.registerExternalAggFunction(
257255
DISTINCT_COUNT_APPROX,
258256
(distinct, field, argList, ctx) ->
259257
TransferUserDefinedAggFunction(

0 commit comments

Comments
 (0)