Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions fe/fe-catalog/src/main/java/org/apache/doris/catalog/Function.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ public enum BinaryType {
protected String runtimeVersion;
@SerializedName("fc")
protected String functionCode;
@SerializedName("vol")
protected FunctionVolatility volatility = FunctionVolatility.IMMUTABLE;

// Only used for serialization
protected Function() {
Expand Down Expand Up @@ -174,6 +176,7 @@ public Function(Function other) {
this.expirationTime = other.expirationTime;
this.runtimeVersion = other.runtimeVersion;
this.functionCode = other.functionCode;
this.volatility = other.getVolatility();
}

public Function clone() {
Expand Down Expand Up @@ -301,6 +304,14 @@ public void setFunctionCode(String functionCode) {
this.functionCode = functionCode;
}

public FunctionVolatility getVolatility() {
return volatility == null ? FunctionVolatility.IMMUTABLE : volatility;
}

public void setVolatility(FunctionVolatility volatility) {
this.volatility = volatility == null ? FunctionVolatility.IMMUTABLE : volatility;
}

// TODO(cmy): Currently we judge whether it is UDF by wheter the 'location' is set.
// Maybe we should use a separate variable to identify,
// but additional variables need to modify the persistence information.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import java.util.Locale;

/** Function volatility controls which optimizer rewrites are safe for a function call. */
public enum FunctionVolatility {
IMMUTABLE,
STABLE,
VOLATILE;

public static FunctionVolatility fromString(String value) {
if (value == null) {
return IMMUTABLE;
}
try {
return FunctionVolatility.valueOf(value.trim().toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid volatility: '" + value
+ "'. Expected one of: immutable, stable, volatile", e);
}
}

public String toSql() {
return name().toLowerCase(Locale.ROOT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.doris.analysis.ToSqlParams;
import org.apache.doris.catalog.Function.NullableMode;

import com.google.common.base.Strings;

import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -54,13 +56,13 @@ public static String toSql(ScalarFunction fn, boolean ifNotExists) {
if (fn.isGlobal()) {
sb.append("GLOBAL ");
}
sb.append("FUNCTION ");
sb.append(fn.isUDTFunction() ? "TABLES FUNCTION " : "FUNCTION ");

if (ifNotExists) {
sb.append("IF NOT EXISTS ");
}
sb.append(fn.signatureString())
.append(" RETURNS " + fn.getReturnType())
.append(" RETURNS " + getScalarFunctionReturnTypeSql(fn))
.append(" PROPERTIES (");
sb.append("\n \"SYMBOL\"=").append("\"" + fn.getSymbolName() + "\"");
if (fn.getPrepareFnSymbol() != null) {
Expand All @@ -75,15 +77,39 @@ public static String toSql(ScalarFunction fn, boolean ifNotExists) {
.append("\"" + (fn.getLocation() == null ? "" : fn.getLocation().toString()) + "\"");
boolean isReturnNull = fn.getNullableMode() == NullableMode.ALWAYS_NULLABLE;
sb.append(",\n \"ALWAYS_NULLABLE\"=").append("\"" + isReturnNull + "\"");
if (!fn.isUDTFunction()) {
Comment thread
linrrzqqq marked this conversation as resolved.
sb.append(",\n \"VOLATILITY\"=").append("\"" + fn.getVolatility().toSql() + "\"");
}
} else if (fn.getBinaryType() == Function.BinaryType.PYTHON_UDF) {
sb.append(",\n \"FILE\"=")
.append("\"" + (fn.getLocation() == null ? "" : fn.getLocation().toString()) + "\"");
boolean isReturnNull = fn.getNullableMode() == NullableMode.ALWAYS_NULLABLE;
sb.append(",\n \"ALWAYS_NULLABLE\"=").append("\"" + isReturnNull + "\"");
sb.append(",\n \"RUNTIME_VERSION\"=").append("\"" + Strings.nullToEmpty(fn.getRuntimeVersion()) + "\"");
Comment thread
linrrzqqq marked this conversation as resolved.
if (!fn.isUDTFunction()) {
sb.append(",\n \"VOLATILITY\"=").append("\"" + fn.getVolatility().toSql() + "\"");
}
} else {
sb.append(",\n \"OBJECT_FILE\"=")
.append("\"" + (fn.getLocation() == null ? "" : fn.getLocation().toString()) + "\"");
}
sb.append(",\n \"TYPE\"=").append("\"" + fn.getBinaryType() + "\"");
sb.append("\n);");
if (fn.getBinaryType() == Function.BinaryType.PYTHON_UDF && !Strings.isNullOrEmpty(fn.getFunctionCode())) {
// Preserve inline Python UDF bodies so SHOW CREATE FUNCTION output can be replayed directly.
sb.append("\n)\nAS $$\n").append(fn.getFunctionCode()).append("\n$$;");
} else {
sb.append("\n);");
}
return sb.toString();
}

private static String getScalarFunctionReturnTypeSql(ScalarFunction fn) {
if (fn.isUDTFunction()) {
return new ArrayType(fn.getReturnType()).toSql();
}
return fn.getReturnType().toSql();
}

/**
* Converts an {@link AggregateFunction} to its SQL representation.
*/
Expand Down Expand Up @@ -125,12 +151,24 @@ public static String toSql(AggregateFunction fn, boolean ifNotExists) {
.append("\"" + (fn.getLocation() == null ? "" : fn.getLocation().toString()) + "\",");
boolean isReturnNull = fn.getNullableMode() == NullableMode.ALWAYS_NULLABLE;
sb.append("\n \"ALWAYS_NULLABLE\"=").append("\"" + isReturnNull + "\",");
} else if (fn.getBinaryType() == Function.BinaryType.PYTHON_UDF) {
sb.append("\n \"FILE\"=")
.append("\"" + (fn.getLocation() == null ? "" : fn.getLocation().toString()) + "\",");
boolean isReturnNull = fn.getNullableMode() == NullableMode.ALWAYS_NULLABLE;
sb.append("\n \"ALWAYS_NULLABLE\"=").append("\"" + isReturnNull + "\",");
sb.append("\n \"RUNTIME_VERSION\"=")
.append("\"" + Strings.nullToEmpty(fn.getRuntimeVersion()) + "\",");
} else {
sb.append("\n \"OBJECT_FILE\"=")
.append("\"" + (fn.getLocation() == null ? "" : fn.getLocation().toString()) + "\",");
}
sb.append("\n \"TYPE\"=").append("\"" + fn.getBinaryType() + "\"");
sb.append("\n);");
if (fn.getBinaryType() == Function.BinaryType.PYTHON_UDF && !Strings.isNullOrEmpty(fn.getFunctionCode())) {
// Preserve inline Python UDAF bodies so SHOW CREATE FUNCTION output can be replayed directly.
sb.append("\n)\nAS $$\n").append(fn.getFunctionCode()).append("\n$$;");
} else {
sb.append("\n);");
}
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public Plan visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, CascadesC
PhysicalProject<? extends Plan> project = (PhysicalProject<? extends Plan>) child;
Map<Slot, Expression> childAlias = project.getAliasToProducer();
if (filter.getInputSlots().stream().map(childAlias::get).filter(Objects::nonNull)
.anyMatch(Expression::containsUniqueFunction)) {
.anyMatch(Expression::containsVolatileExpression)) {
return filter;
}
PhysicalFilter<? extends Plan> newFilter = filter.withConjunctsAndChild(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1419,7 +1419,7 @@ private List<Expression> bindGroupByUniqueId(List<Expression> groupByExpressions
// 2. for 'group by a + random(), a + random() + 1', the two 'random()' will be different.
int containsUniqueGroupByCount = 0;
for (Expression groupByExpr : groupByExpressions) {
if (groupByExpr.containsUniqueFunction()) {
if (groupByExpr.containsVolatileExpression()) {
containsUniqueGroupByCount++;
}
}
Expand All @@ -1433,7 +1433,7 @@ private List<Expression> bindGroupByUniqueId(List<Expression> groupByExpressions
groupByExpressions.size());
for (Expression groupByExpr : groupByExpressions) {
Expression newGroupByExpr = groupByExpr;
if (groupByExpr.containsUniqueFunction()) {
if (groupByExpr.containsVolatileExpression()) {
Expression ignoreUniqueIdExpr = ExpressionUtils.setIgnoreUniqueIdForUniqueFunc(groupByExpr, true);
Expression previousGroupByExpr = ignoreUniqueIdGroupByExprs.get(ignoreUniqueIdExpr);
if (previousGroupByExpr == null) {
Expand Down Expand Up @@ -1476,7 +1476,7 @@ private <T extends Expression> List<T> bindExprsUniqueIdWithGroupBy(List<T> expr
// c) let E3 = rewrite E2 with enable unique ids. then E3 is the bind unique id expression for E.
private <T extends Expression> T bindExprUniqueIdWithGroupBy(T expression,
Map<Expression, Expression> bindUniqueIdReplaceMap) {
if (!expression.containsUniqueFunction() || bindUniqueIdReplaceMap.isEmpty()) {
if (!expression.containsVolatileExpression() || bindUniqueIdReplaceMap.isEmpty()) {
return expression;
}

Expand Down Expand Up @@ -1522,7 +1522,7 @@ private Map<Expression, Expression> getBelowAggregateGroupByUniqueFuncReplaceMap
private Map<Expression, Expression> getGroupByUniqueFuncReplaceMap(List<Expression> groupByByExpressions) {
Map<Expression, Expression> replaceMap = Maps.newHashMap();
for (Expression expression : groupByByExpressions) {
if (expression.containsUniqueFunction()) {
if (expression.containsVolatileExpression()) {
Expression ignoreUniqueIdExpr = ExpressionUtils.setIgnoreUniqueIdForUniqueFunc(expression, true);
// for sql:
// select distinct a + random(), a + random()
Expand Down Expand Up @@ -1554,7 +1554,7 @@ private Plan bindRepeat(MatchingContext<LogicalRepeat<Plan>> ctx) {
= ImmutableList.builderWithExpectedSize(boundGroupingSet.size());
for (Expression groupBy : boundGroupingSet) {
Expression newGroupBy = groupBy;
if (groupBy.containsUniqueFunction()) {
if (groupBy.containsVolatileExpression()) {
Expression ignoreUniqueIdGroupBy = ExpressionUtils.setIgnoreUniqueIdForUniqueFunc(groupBy, true);
Expression previousGroupBy = ignoreUniqueIdGroupByExpressions.get(ignoreUniqueIdGroupBy);
if (previousGroupBy == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public List<ExpressionPatternMatcher<? extends Expression>> buildRules() {
matchesType(InPredicate.class)
.when(inPredicate ->
inPredicate.getOptions().size() <= InPredicateDedup.REWRITE_OPTIONS_MAX_SIZE
&& !inPredicate.getCompareExpr().containsUniqueFunction())
&& !inPredicate.getCompareExpr().containsVolatileExpression())
.then(this::rewrite)
.toRule(ExpressionRuleType.IN_PREDICATE_EXTRACT_NON_CONSTANT)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private Optional<Expression> tryPushIntoNvl(Expression parent, int childIndex, N
// so there will exist twice 'first' in the rewritten IF expression, which may increase the computation cost.
// if the plan is not filter and not join, then push down action may not have positive effect,
// considering this, we give up the rewrite if the plan is not condition plan or first contains unique function.
if (first.containsUniqueFunction() || !isConditionPlan) {
if (first.containsVolatileExpression() || !isConditionPlan) {
return Optional.empty();
}
If ifExpr = new If(new IsNull(first), second, first);
Expand All @@ -182,7 +182,7 @@ private Optional<Expression> tryPushIntoNullIf(Expression parent, int childIndex
// so there will exist twice 'first' in the rewritten IF expression, which may increase the computation cost.
// if the plan is not filter and not join, then push down action may not have positive effect,
// considering this, we give up the rewrite if the plan is not condition plan or first contains unique function.
if (first.containsUniqueFunction() || !isConditionPlan) {
if (first.containsVolatileExpression() || !isConditionPlan) {
return Optional.empty();
}
If ifExpr = new If(new EqualTo(first, second), new NullLiteral(nullIf.getDataType()), first);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public <T extends Expression> Optional<Pair<List<T>, LogicalProject<Plan>>> rewr
*/
@VisibleForTesting
public List<NamedExpression> tryGenUniqueFunctionAlias(Collection<? extends Expression> targets) {
Map<UniqueFunction, Integer> unqiueFunctionCounter = Maps.newLinkedHashMap();
Map<Expression, Integer> unqiueFunctionCounter = Maps.newLinkedHashMap();
for (Expression target : targets) {
target.foreach(e -> {
Expression expr = (Expression) e;
Expand All @@ -286,10 +286,12 @@ public List<NamedExpression> tryGenUniqueFunctionAlias(Collection<? extends Expr

ImmutableList.Builder<NamedExpression> builder
= ImmutableList.builderWithExpectedSize(unqiueFunctionCounter.size());
for (Entry<UniqueFunction, Integer> entry : unqiueFunctionCounter.entrySet()) {
for (Entry<Expression, Integer> entry : unqiueFunctionCounter.entrySet()) {
if (entry.getValue() > 1) {
ExprId exprId = StatementScopeIdGenerator.newExprId();
String name = "$_" + entry.getKey().getName() + "_" + exprId.asInt() + "_$";
String functionName = entry.getKey() instanceof Function
? ((Function) entry.getKey()).getName() : "volatile";
String name = "$_" + functionName + "_" + exprId.asInt() + "_$";
builder.add(new Alias(exprId, entry.getKey(), name));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public Rule build() {
LogicalCTEConsumer cteConsumer = filter.child();
Set<Expression> exprs = filter.getConjuncts();
for (Expression expr : exprs) {
if (expr.containsUniqueFunction()) {
if (expr.containsVolatileExpression()) {
continue;
}
Expression rewrittenExpr = expr.rewriteUp(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private boolean needRewrite(LogicalJoin<Plan, Plan> join) {

// 1. expr contains slots from both sides;
private boolean isConditionNeedRewrite(Expression expr, Set<Slot> leftSlots, Set<Slot> rightSlots) {
if (expr.containsUniqueFunction()) {
if (expr.containsVolatileExpression()) {
return false;
}
Set<Slot> exprSlots = expr.getInputSlots();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Rule build() {
// 2. if the conjunct contains unique function, it should not be pushed down;
// e.g. 'select a, sum(a) from t group by a having a + random() > 10'
// not equals 'select a, sum(a) from t where a + random() > 10 group by a'
if (!conjunct.containsUniqueFunction()
if (!conjunct.containsVolatileExpression()
&& !conjunctSlots.isEmpty() && canPushDownSlots.containsAll(conjunctSlots)) {
pushDownPredicates.add(conjunct);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public Rule build() {
filter.getConjuncts().forEach(conjunct -> {
Set<Slot> conjunctSlots = conjunct.getInputSlots();
if (!conjunctSlots.isEmpty() && childOutputs.containsAll(conjunctSlots)
&& !conjunct.containsUniqueFunction()) {
&& !conjunct.containsVolatileExpression()) {
pushDownPredicates.add(conjunct);
} else {
remainPredicates.add(conjunct);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public Rule build() {
Set<Expression> rightPredicates = Sets.newLinkedHashSet();
Set<Expression> remainingPredicates = Sets.newLinkedHashSet();
for (Expression p : filterPredicates) {
if (p.containsUniqueFunction()) {
if (p.containsVolatileExpression()) {
remainingPredicates.add(p);
continue;
}
Expand Down Expand Up @@ -162,7 +162,7 @@ private boolean convertJoinCondition(Expression predicate, Set<Slot> leftOutputs
if (!(predicate instanceof EqualTo)) {
return false;
}
if (predicate.containsUniqueFunction()) {
if (predicate.containsVolatileExpression()) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private static Pair<Set<Expression>, Set<Expression>> splitConjunctsByChildOutpu
// `project(b + random(1, 10) as a) -> filter(b + random(1, 10) > 1)`, it contains two distinct RANDOM.
if (childOutputs.containsAll(conjunctSlots)
&& conjunctSlots.stream().map(childAlias::get).filter(Objects::nonNull)
.noneMatch(Expression::containsUniqueFunction)) {
.noneMatch(Expression::containsVolatileExpression)) {
pushDownPredicates.add(conjunct);
} else {
remainPredicates.add(conjunct);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Rule build() {
List<Expression> otherConditions = Lists.newArrayListWithExpectedSize(
filter.getConjuncts().size() + join.getOtherJoinConjuncts().size());
for (Expression expr : filter.getConjuncts()) {
if (expr.containsUniqueFunction()) {
if (expr.containsVolatileExpression()) {
remainConditions.add(expr);
} else {
otherConditions.add(expr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private boolean canPushProjectIntoUnion(LogicalProject<LogicalUnion> project) {
Set<Slot> uniqueFunctionSlots = Sets.newHashSet();
for (int i = 0; i < constExprs.size(); i++) {
NamedExpression ne = constExprs.get(i);
if (ne.containsUniqueFunction()) {
if (ne.containsVolatileExpression()) {
uniqueFunctionSlots.add(union.getOutput().get(i));
}
}
Expand Down
Loading
Loading