Skip to content

Commit 7a8db58

Browse files
committed
Merge remote-tracking branch 'origin/main' into issues/4201
Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
2 parents e4c5266 + fdb09e8 commit 7a8db58

52 files changed

Lines changed: 1789 additions & 478 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

core/src/main/java/org/opensearch/sql/ast/tree/SPath.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.List;
1010
import lombok.AllArgsConstructor;
1111
import lombok.EqualsAndHashCode;
12+
import lombok.Getter;
1213
import lombok.RequiredArgsConstructor;
1314
import lombok.ToString;
1415
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -19,6 +20,7 @@
1920
@EqualsAndHashCode(callSuper = false)
2021
@RequiredArgsConstructor
2122
@AllArgsConstructor
23+
@Getter
2224
public class SPath extends UnresolvedPlan {
2325
private UnresolvedPlan child;
2426

core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.calcite.tools.RelBuilder;
2424
import org.opensearch.sql.ast.expression.UnresolvedExpression;
2525
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;
26+
import org.opensearch.sql.common.setting.Settings;
2627
import org.opensearch.sql.executor.QueryType;
2728
import org.opensearch.sql.expression.function.FunctionProperties;
2829

@@ -39,6 +40,10 @@ public class CalcitePlanContext {
3940
/** This thread local variable is only used to skip script encoding in script pushdown. */
4041
public static final ThreadLocal<Boolean> skipEncoding = ThreadLocal.withInitial(() -> false);
4142

43+
/** Thread-local switch that tells whether the current query prefers legacy behavior. */
44+
private static final ThreadLocal<Boolean> legacyPreferredFlag =
45+
ThreadLocal.withInitial(() -> true);
46+
4247
@Getter @Setter private boolean isResolvingJoinCondition = false;
4348
@Getter @Setter private boolean isResolvingSubquery = false;
4449
@Getter @Setter private boolean inCoalesceFunction = false;
@@ -105,6 +110,27 @@ public static CalcitePlanContext create(
105110
return new CalcitePlanContext(config, querySizeLimit, queryType);
106111
}
107112

113+
/**
114+
* Executes {@code action} with the thread-local legacy flag set according to the supplied
115+
* settings.
116+
*/
117+
public static void run(Runnable action, Settings settings) {
118+
Boolean preferred = settings.getSettingValue(Settings.Key.PPL_SYNTAX_LEGACY_PREFERRED);
119+
legacyPreferredFlag.set(preferred);
120+
try {
121+
action.run();
122+
} finally {
123+
legacyPreferredFlag.remove();
124+
}
125+
}
126+
127+
/**
128+
* @return {@code true} when the current planning prefer legacy behavior.
129+
*/
130+
public static boolean isLegacyPreferred() {
131+
return legacyPreferredFlag.get();
132+
}
133+
108134
public void putRexLambdaRefMap(Map<String, RexLambdaRef> candidateMap) {
109135
this.rexLambdaRefMap.putAll(candidateMap);
110136
}

core/src/main/java/org/opensearch/sql/calcite/ExtendedRexBuilder.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,9 @@ public RexNode makeCast(
126126
// ImmutableList.of(exp, makeZeroLiteral(sourceType)));
127127
}
128128
} else if (OpenSearchTypeFactory.isUserDefinedType(type)) {
129+
if (RexLiteral.isNullLiteral(exp)) {
130+
return super.makeCast(pos, type, exp, matchNullability, safe, format);
131+
}
129132
var udt = ((AbstractExprRelDataType<?>) type).getUdt();
130133
var argExprType = OpenSearchTypeFactory.convertRelDataTypeToExprType(sourceType);
131134
return switch (udt) {

core/src/main/java/org/opensearch/sql/executor/QueryService.java

Lines changed: 67 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -90,68 +90,80 @@ public void executeWithCalcite(
9090
UnresolvedPlan plan,
9191
QueryType queryType,
9292
ResponseListener<ExecutionEngine.QueryResponse> listener) {
93-
try {
94-
AccessController.doPrivileged(
95-
(PrivilegedAction<Void>)
96-
() -> {
97-
CalcitePlanContext context =
98-
CalcitePlanContext.create(
99-
buildFrameworkConfig(), getQuerySizeLimit(), queryType);
100-
RelNode relNode = analyze(plan, context);
101-
RelNode optimized = optimize(relNode, context);
102-
RelNode calcitePlan = convertToCalcitePlan(optimized);
103-
executionEngine.execute(calcitePlan, context, listener);
104-
return null;
105-
});
106-
} catch (Throwable t) {
107-
if (isCalciteFallbackAllowed(t) && !(t instanceof NonFallbackCalciteException)) {
108-
log.warn("Fallback to V2 query engine since got exception", t);
109-
executeWithLegacy(plan, queryType, listener, Optional.of(t));
110-
} else {
111-
if (t instanceof Exception) {
112-
listener.onFailure((Exception) t);
113-
} else if (t instanceof VirtualMachineError) {
114-
// throw and fast fail the VM errors such as OOM (same with v2).
115-
throw t;
116-
} else {
117-
// Calcite may throw AssertError during query execution.
118-
listener.onFailure(new CalciteUnsupportedException(t.getMessage(), t));
119-
}
120-
}
121-
}
93+
CalcitePlanContext.run(
94+
() -> {
95+
try {
96+
AccessController.doPrivileged(
97+
(PrivilegedAction<Void>)
98+
() -> {
99+
CalcitePlanContext context =
100+
CalcitePlanContext.create(
101+
buildFrameworkConfig(), getQuerySizeLimit(), queryType);
102+
RelNode relNode = analyze(plan, context);
103+
RelNode optimized = optimize(relNode, context);
104+
RelNode calcitePlan = convertToCalcitePlan(optimized);
105+
executionEngine.execute(calcitePlan, context, listener);
106+
return null;
107+
});
108+
} catch (Throwable t) {
109+
if (isCalciteFallbackAllowed(t) && !(t instanceof NonFallbackCalciteException)) {
110+
log.warn("Fallback to V2 query engine since got exception", t);
111+
executeWithLegacy(plan, queryType, listener, Optional.of(t));
112+
} else {
113+
if (t instanceof Exception) {
114+
listener.onFailure((Exception) t);
115+
} else if (t instanceof VirtualMachineError) {
116+
// throw and fast fail the VM errors such as OOM (same with v2).
117+
throw t;
118+
} else {
119+
// Calcite may throw AssertError during query execution.
120+
listener.onFailure(new CalciteUnsupportedException(t.getMessage(), t));
121+
}
122+
}
123+
}
124+
},
125+
settings);
122126
}
123127

124128
public void explainWithCalcite(
125129
UnresolvedPlan plan,
126130
QueryType queryType,
127131
ResponseListener<ExecutionEngine.ExplainResponse> listener,
128132
Explain.ExplainFormat format) {
129-
try {
130-
AccessController.doPrivileged(
131-
(PrivilegedAction<Void>)
132-
() -> {
133-
CalcitePlanContext context =
134-
CalcitePlanContext.create(
135-
buildFrameworkConfig(), getQuerySizeLimit(), queryType);
136-
RelNode relNode = analyze(plan, context);
137-
RelNode optimized = optimize(relNode, context);
138-
RelNode calcitePlan = convertToCalcitePlan(optimized);
139-
executionEngine.explain(calcitePlan, format, context, listener);
140-
return null;
141-
});
142-
} catch (Throwable t) {
143-
if (isCalciteFallbackAllowed(t)) {
144-
log.warn("Fallback to V2 query engine since got exception", t);
145-
explainWithLegacy(plan, queryType, listener, format, Optional.of(t));
146-
} else {
147-
if (t instanceof Error) {
148-
// Calcite may throw AssertError during query execution.
149-
listener.onFailure(new CalciteUnsupportedException(t.getMessage()));
150-
} else {
151-
listener.onFailure((Exception) t);
152-
}
153-
}
154-
}
133+
CalcitePlanContext.run(
134+
() -> {
135+
try {
136+
AccessController.doPrivileged(
137+
(PrivilegedAction<Void>)
138+
() -> {
139+
CalcitePlanContext context =
140+
CalcitePlanContext.create(
141+
buildFrameworkConfig(), getQuerySizeLimit(), queryType);
142+
context.run(
143+
() -> {
144+
RelNode relNode = analyze(plan, context);
145+
RelNode optimized = optimize(relNode, context);
146+
RelNode calcitePlan = convertToCalcitePlan(optimized);
147+
executionEngine.explain(calcitePlan, format, context, listener);
148+
},
149+
settings);
150+
return null;
151+
});
152+
} catch (Throwable t) {
153+
if (isCalciteFallbackAllowed(t)) {
154+
log.warn("Fallback to V2 query engine since got exception", t);
155+
explainWithLegacy(plan, queryType, listener, format, Optional.of(t));
156+
} else {
157+
if (t instanceof Error) {
158+
// Calcite may throw AssertError during query execution.
159+
listener.onFailure(new CalciteUnsupportedException(t.getMessage()));
160+
} else {
161+
listener.onFailure((Exception) t);
162+
}
163+
}
164+
}
165+
},
166+
settings);
155167
}
156168

157169
public void executeWithLegacy(

core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public enum BuiltinFunctionName {
6262
/** Collection functions */
6363
ARRAY(FunctionName.of("array")),
6464
ARRAY_LENGTH(FunctionName.of("array_length")),
65+
MVAPPEND(FunctionName.of("mvappend")),
6566
MVJOIN(FunctionName.of("mvjoin")),
6667
FORALL(FunctionName.of("forall")),
6768
EXISTS(FunctionName.of("exists")),
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.expression.function.CollectionUDF;
7+
8+
import static org.apache.calcite.sql.type.SqlTypeUtil.createArrayType;
9+
10+
import java.util.ArrayList;
11+
import java.util.List;
12+
import org.apache.calcite.adapter.enumerable.NotNullImplementor;
13+
import org.apache.calcite.adapter.enumerable.NullPolicy;
14+
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
15+
import org.apache.calcite.linq4j.tree.Expression;
16+
import org.apache.calcite.linq4j.tree.Expressions;
17+
import org.apache.calcite.linq4j.tree.Types;
18+
import org.apache.calcite.rel.type.RelDataType;
19+
import org.apache.calcite.rel.type.RelDataTypeFactory;
20+
import org.apache.calcite.rex.RexCall;
21+
import org.apache.calcite.sql.SqlOperatorBinding;
22+
import org.apache.calcite.sql.type.SqlReturnTypeInference;
23+
import org.apache.calcite.sql.type.SqlTypeName;
24+
import org.opensearch.sql.expression.function.ImplementorUDF;
25+
import org.opensearch.sql.expression.function.UDFOperandMetadata;
26+
27+
/**
28+
* MVAppend function that appends all elements from arguments to create an array. Always returns an
29+
* array or null for consistent type behavior.
30+
*/
31+
public class MVAppendFunctionImpl extends ImplementorUDF {
32+
33+
public MVAppendFunctionImpl() {
34+
super(new MVAppendImplementor(), NullPolicy.ALL);
35+
}
36+
37+
@Override
38+
public SqlReturnTypeInference getReturnTypeInference() {
39+
return sqlOperatorBinding -> {
40+
RelDataTypeFactory typeFactory = sqlOperatorBinding.getTypeFactory();
41+
42+
if (sqlOperatorBinding.getOperandCount() == 0) {
43+
return typeFactory.createSqlType(SqlTypeName.NULL);
44+
}
45+
46+
RelDataType elementType = determineElementType(sqlOperatorBinding, typeFactory);
47+
return createArrayType(
48+
typeFactory, typeFactory.createTypeWithNullability(elementType, true), true);
49+
};
50+
}
51+
52+
@Override
53+
public UDFOperandMetadata getOperandMetadata() {
54+
return null;
55+
}
56+
57+
private static RelDataType determineElementType(
58+
SqlOperatorBinding sqlOperatorBinding, RelDataTypeFactory typeFactory) {
59+
RelDataType mostGeneralType = null;
60+
61+
for (int i = 0; i < sqlOperatorBinding.getOperandCount(); i++) {
62+
RelDataType operandType = getComponentType(sqlOperatorBinding.getOperandType(i));
63+
64+
mostGeneralType = updateMostGeneralType(mostGeneralType, operandType, typeFactory);
65+
}
66+
67+
return mostGeneralType != null ? mostGeneralType : typeFactory.createSqlType(SqlTypeName.NULL);
68+
}
69+
70+
private static RelDataType getComponentType(RelDataType operandType) {
71+
if (!operandType.isStruct() && operandType.getComponentType() != null) {
72+
return operandType.getComponentType();
73+
}
74+
return operandType;
75+
}
76+
77+
private static RelDataType updateMostGeneralType(
78+
RelDataType current, RelDataType candidate, RelDataTypeFactory typeFactory) {
79+
if (current == null) {
80+
return candidate;
81+
}
82+
83+
if (!current.equals(candidate)) {
84+
return typeFactory.createSqlType(SqlTypeName.ANY);
85+
} else {
86+
return current;
87+
}
88+
}
89+
90+
public static class MVAppendImplementor implements NotNullImplementor {
91+
@Override
92+
public Expression implement(
93+
RexToLixTranslator translator, RexCall call, List<Expression> translatedOperands) {
94+
return Expressions.call(
95+
Types.lookupMethod(MVAppendFunctionImpl.class, "mvappend", Object[].class),
96+
Expressions.newArrayInit(Object.class, translatedOperands));
97+
}
98+
}
99+
100+
public static Object mvappend(Object... args) {
101+
List<Object> elements = collectElements(args);
102+
return elements.isEmpty() ? null : elements;
103+
}
104+
105+
private static List<Object> collectElements(Object... args) {
106+
List<Object> elements = new ArrayList<>();
107+
108+
for (Object arg : args) {
109+
if (arg == null) {
110+
continue;
111+
}
112+
113+
if (arg instanceof List) {
114+
addListElements((List<?>) arg, elements);
115+
} else {
116+
elements.add(arg);
117+
}
118+
}
119+
120+
return elements;
121+
}
122+
123+
private static void addListElements(List<?> list, List<Object> elements) {
124+
for (Object item : list) {
125+
if (item != null) {
126+
elements.add(item);
127+
}
128+
}
129+
}
130+
}

core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.opensearch.sql.expression.function.CollectionUDF.ExistsFunctionImpl;
4747
import org.opensearch.sql.expression.function.CollectionUDF.FilterFunctionImpl;
4848
import org.opensearch.sql.expression.function.CollectionUDF.ForallFunctionImpl;
49+
import org.opensearch.sql.expression.function.CollectionUDF.MVAppendFunctionImpl;
4950
import org.opensearch.sql.expression.function.CollectionUDF.ReduceFunctionImpl;
5051
import org.opensearch.sql.expression.function.CollectionUDF.TransformFunctionImpl;
5152
import org.opensearch.sql.expression.function.jsonUDF.JsonAppendFunctionImpl;
@@ -383,6 +384,7 @@ public class PPLBuiltinOperators extends ReflectiveSqlOperatorTable {
383384
public static final SqlOperator FORALL = new ForallFunctionImpl().toUDF("forall");
384385
public static final SqlOperator EXISTS = new ExistsFunctionImpl().toUDF("exists");
385386
public static final SqlOperator ARRAY = new ArrayFunctionImpl().toUDF("array");
387+
public static final SqlOperator MVAPPEND = new MVAppendFunctionImpl().toUDF("mvappend");
386388
public static final SqlOperator FILTER = new FilterFunctionImpl().toUDF("filter");
387389
public static final SqlOperator TRANSFORM = new TransformFunctionImpl().toUDF("transform");
388390
public static final SqlOperator REDUCE = new ReduceFunctionImpl().toUDF("reduce");

0 commit comments

Comments
 (0)