diff --git a/fe/fe-catalog/src/main/java/org/apache/doris/catalog/Function.java b/fe/fe-catalog/src/main/java/org/apache/doris/catalog/Function.java index 5c7c80f4535ef1..93783a47725d73 100644 --- a/fe/fe-catalog/src/main/java/org/apache/doris/catalog/Function.java +++ b/fe/fe-catalog/src/main/java/org/apache/doris/catalog/Function.java @@ -114,6 +114,8 @@ public enum BinaryType { protected String runtimeVersion; @SerializedName("fc") protected String functionCode; + @SerializedName("vol") + protected FunctionVolatility volatility = FunctionVolatility.IMMUTABLE; // Only used for serialization protected Function() { @@ -174,6 +176,7 @@ public Function(Function other) { this.expirationTime = other.expirationTime; this.runtimeVersion = other.runtimeVersion; this.functionCode = other.functionCode; + this.volatility = other.getVolatility(); } public Function clone() { @@ -301,6 +304,14 @@ public void setFunctionCode(String functionCode) { this.functionCode = functionCode; } + public FunctionVolatility getVolatility() { + return volatility == null ? FunctionVolatility.IMMUTABLE : volatility; + } + + public void setVolatility(FunctionVolatility volatility) { + this.volatility = volatility == null ? FunctionVolatility.IMMUTABLE : volatility; + } + // TODO(cmy): Currently we judge whether it is UDF by wheter the 'location' is set. // Maybe we should use a separate variable to identify, // but additional variables need to modify the persistence information. diff --git a/fe/fe-catalog/src/main/java/org/apache/doris/catalog/FunctionVolatility.java b/fe/fe-catalog/src/main/java/org/apache/doris/catalog/FunctionVolatility.java new file mode 100644 index 00000000000000..eefbbc4e79bb2d --- /dev/null +++ b/fe/fe-catalog/src/main/java/org/apache/doris/catalog/FunctionVolatility.java @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import java.util.Locale; + +/** Function volatility controls which optimizer rewrites are safe for a function call. */ +public enum FunctionVolatility { + IMMUTABLE, + STABLE, + VOLATILE; + + public static FunctionVolatility fromString(String value) { + if (value == null) { + return IMMUTABLE; + } + try { + return FunctionVolatility.valueOf(value.trim().toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid volatility: '" + value + + "'. Expected one of: immutable, stable, volatile", e); + } + } + + public String toSql() { + return name().toLowerCase(Locale.ROOT); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionToSqlConverter.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionToSqlConverter.java index 8709eb5b6de8ed..4a2c39484f36f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionToSqlConverter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionToSqlConverter.java @@ -23,6 +23,8 @@ import org.apache.doris.analysis.ToSqlParams; import org.apache.doris.catalog.Function.NullableMode; +import com.google.common.base.Strings; + import java.util.List; import java.util.stream.Collectors; @@ -54,13 +56,13 @@ public static String toSql(ScalarFunction fn, boolean ifNotExists) { if (fn.isGlobal()) { sb.append("GLOBAL "); } - sb.append("FUNCTION "); + sb.append(fn.isUDTFunction() ? "TABLES FUNCTION " : "FUNCTION "); if (ifNotExists) { sb.append("IF NOT EXISTS "); } sb.append(fn.signatureString()) - .append(" RETURNS " + fn.getReturnType()) + .append(" RETURNS " + getScalarFunctionReturnTypeSql(fn)) .append(" PROPERTIES ("); sb.append("\n \"SYMBOL\"=").append("\"" + fn.getSymbolName() + "\""); if (fn.getPrepareFnSymbol() != null) { @@ -75,15 +77,39 @@ public static String toSql(ScalarFunction fn, boolean ifNotExists) { .append("\"" + (fn.getLocation() == null ? "" : fn.getLocation().toString()) + "\""); boolean isReturnNull = fn.getNullableMode() == NullableMode.ALWAYS_NULLABLE; sb.append(",\n \"ALWAYS_NULLABLE\"=").append("\"" + isReturnNull + "\""); + if (!fn.isUDTFunction()) { + sb.append(",\n \"VOLATILITY\"=").append("\"" + fn.getVolatility().toSql() + "\""); + } + } else if (fn.getBinaryType() == Function.BinaryType.PYTHON_UDF) { + sb.append(",\n \"FILE\"=") + .append("\"" + (fn.getLocation() == null ? "" : fn.getLocation().toString()) + "\""); + boolean isReturnNull = fn.getNullableMode() == NullableMode.ALWAYS_NULLABLE; + sb.append(",\n \"ALWAYS_NULLABLE\"=").append("\"" + isReturnNull + "\""); + sb.append(",\n \"RUNTIME_VERSION\"=").append("\"" + Strings.nullToEmpty(fn.getRuntimeVersion()) + "\""); + if (!fn.isUDTFunction()) { + sb.append(",\n \"VOLATILITY\"=").append("\"" + fn.getVolatility().toSql() + "\""); + } } else { sb.append(",\n \"OBJECT_FILE\"=") .append("\"" + (fn.getLocation() == null ? "" : fn.getLocation().toString()) + "\""); } sb.append(",\n \"TYPE\"=").append("\"" + fn.getBinaryType() + "\""); - sb.append("\n);"); + if (fn.getBinaryType() == Function.BinaryType.PYTHON_UDF && !Strings.isNullOrEmpty(fn.getFunctionCode())) { + // Preserve inline Python UDF bodies so SHOW CREATE FUNCTION output can be replayed directly. + sb.append("\n)\nAS $$\n").append(fn.getFunctionCode()).append("\n$$;"); + } else { + sb.append("\n);"); + } return sb.toString(); } + private static String getScalarFunctionReturnTypeSql(ScalarFunction fn) { + if (fn.isUDTFunction()) { + return new ArrayType(fn.getReturnType()).toSql(); + } + return fn.getReturnType().toSql(); + } + /** * Converts an {@link AggregateFunction} to its SQL representation. */ @@ -125,12 +151,24 @@ public static String toSql(AggregateFunction fn, boolean ifNotExists) { .append("\"" + (fn.getLocation() == null ? "" : fn.getLocation().toString()) + "\","); boolean isReturnNull = fn.getNullableMode() == NullableMode.ALWAYS_NULLABLE; sb.append("\n \"ALWAYS_NULLABLE\"=").append("\"" + isReturnNull + "\","); + } else if (fn.getBinaryType() == Function.BinaryType.PYTHON_UDF) { + sb.append("\n \"FILE\"=") + .append("\"" + (fn.getLocation() == null ? "" : fn.getLocation().toString()) + "\","); + boolean isReturnNull = fn.getNullableMode() == NullableMode.ALWAYS_NULLABLE; + sb.append("\n \"ALWAYS_NULLABLE\"=").append("\"" + isReturnNull + "\","); + sb.append("\n \"RUNTIME_VERSION\"=") + .append("\"" + Strings.nullToEmpty(fn.getRuntimeVersion()) + "\","); } else { sb.append("\n \"OBJECT_FILE\"=") .append("\"" + (fn.getLocation() == null ? "" : fn.getLocation().toString()) + "\","); } sb.append("\n \"TYPE\"=").append("\"" + fn.getBinaryType() + "\""); - sb.append("\n);"); + if (fn.getBinaryType() == Function.BinaryType.PYTHON_UDF && !Strings.isNullOrEmpty(fn.getFunctionCode())) { + // Preserve inline Python UDAF bodies so SHOW CREATE FUNCTION output can be replayed directly. + sb.append("\n)\nAS $$\n").append(fn.getFunctionCode()).append("\n$$;"); + } else { + sb.append("\n);"); + } return sb.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PushDownFilterThroughProject.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PushDownFilterThroughProject.java index 55cf9d64314efa..9b4e262d6a1853 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PushDownFilterThroughProject.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PushDownFilterThroughProject.java @@ -45,7 +45,7 @@ public Plan visitPhysicalFilter(PhysicalFilter filter, CascadesC PhysicalProject project = (PhysicalProject) child; Map childAlias = project.getAliasToProducer(); if (filter.getInputSlots().stream().map(childAlias::get).filter(Objects::nonNull) - .anyMatch(Expression::containsUniqueFunction)) { + .anyMatch(Expression::containsVolatileExpression)) { return filter; } PhysicalFilter newFilter = filter.withConjunctsAndChild( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java index d5cd32d6b8534b..a38fdb3edbd780 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindExpression.java @@ -1419,7 +1419,7 @@ private List bindGroupByUniqueId(List groupByExpressions // 2. for 'group by a + random(), a + random() + 1', the two 'random()' will be different. int containsUniqueGroupByCount = 0; for (Expression groupByExpr : groupByExpressions) { - if (groupByExpr.containsUniqueFunction()) { + if (groupByExpr.containsVolatileExpression()) { containsUniqueGroupByCount++; } } @@ -1433,7 +1433,7 @@ private List bindGroupByUniqueId(List groupByExpressions groupByExpressions.size()); for (Expression groupByExpr : groupByExpressions) { Expression newGroupByExpr = groupByExpr; - if (groupByExpr.containsUniqueFunction()) { + if (groupByExpr.containsVolatileExpression()) { Expression ignoreUniqueIdExpr = ExpressionUtils.setIgnoreUniqueIdForUniqueFunc(groupByExpr, true); Expression previousGroupByExpr = ignoreUniqueIdGroupByExprs.get(ignoreUniqueIdExpr); if (previousGroupByExpr == null) { @@ -1476,7 +1476,7 @@ private List bindExprsUniqueIdWithGroupBy(List expr // c) let E3 = rewrite E2 with enable unique ids. then E3 is the bind unique id expression for E. private T bindExprUniqueIdWithGroupBy(T expression, Map bindUniqueIdReplaceMap) { - if (!expression.containsUniqueFunction() || bindUniqueIdReplaceMap.isEmpty()) { + if (!expression.containsVolatileExpression() || bindUniqueIdReplaceMap.isEmpty()) { return expression; } @@ -1522,7 +1522,7 @@ private Map getBelowAggregateGroupByUniqueFuncReplaceMap private Map getGroupByUniqueFuncReplaceMap(List groupByByExpressions) { Map replaceMap = Maps.newHashMap(); for (Expression expression : groupByByExpressions) { - if (expression.containsUniqueFunction()) { + if (expression.containsVolatileExpression()) { Expression ignoreUniqueIdExpr = ExpressionUtils.setIgnoreUniqueIdForUniqueFunc(expression, true); // for sql: // select distinct a + random(), a + random() @@ -1554,7 +1554,7 @@ private Plan bindRepeat(MatchingContext> ctx) { = ImmutableList.builderWithExpectedSize(boundGroupingSet.size()); for (Expression groupBy : boundGroupingSet) { Expression newGroupBy = groupBy; - if (groupBy.containsUniqueFunction()) { + if (groupBy.containsVolatileExpression()) { Expression ignoreUniqueIdGroupBy = ExpressionUtils.setIgnoreUniqueIdForUniqueFunc(groupBy, true); Expression previousGroupBy = ignoreUniqueIdGroupByExpressions.get(ignoreUniqueIdGroupBy); if (previousGroupBy == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/InPredicateExtractNonConstant.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/InPredicateExtractNonConstant.java index 6b35d31303bde1..48ca394733deb2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/InPredicateExtractNonConstant.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/InPredicateExtractNonConstant.java @@ -50,7 +50,7 @@ public List> buildRules() { matchesType(InPredicate.class) .when(inPredicate -> inPredicate.getOptions().size() <= InPredicateDedup.REWRITE_OPTIONS_MAX_SIZE - && !inPredicate.getCompareExpr().containsUniqueFunction()) + && !inPredicate.getCompareExpr().containsVolatileExpression()) .then(this::rewrite) .toRule(ExpressionRuleType.IN_PREDICATE_EXTRACT_NON_CONSTANT) ); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PushIntoCaseWhenBranch.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PushIntoCaseWhenBranch.java index 9421aec15ce630..4f400eb54a5698 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PushIntoCaseWhenBranch.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PushIntoCaseWhenBranch.java @@ -166,7 +166,7 @@ private Optional tryPushIntoNvl(Expression parent, int childIndex, N // so there will exist twice 'first' in the rewritten IF expression, which may increase the computation cost. // if the plan is not filter and not join, then push down action may not have positive effect, // considering this, we give up the rewrite if the plan is not condition plan or first contains unique function. - if (first.containsUniqueFunction() || !isConditionPlan) { + if (first.containsVolatileExpression() || !isConditionPlan) { return Optional.empty(); } If ifExpr = new If(new IsNull(first), second, first); @@ -182,7 +182,7 @@ private Optional tryPushIntoNullIf(Expression parent, int childIndex // so there will exist twice 'first' in the rewritten IF expression, which may increase the computation cost. // if the plan is not filter and not join, then push down action may not have positive effect, // considering this, we give up the rewrite if the plan is not condition plan or first contains unique function. - if (first.containsUniqueFunction() || !isConditionPlan) { + if (first.containsVolatileExpression() || !isConditionPlan) { return Optional.empty(); } If ifExpr = new If(new EqualTo(first, second), new NullLiteral(nullIf.getDataType()), first); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AddProjectForUniqueFunction.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AddProjectForUniqueFunction.java index 2c2537ac476d0a..97fc2a671c4198 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AddProjectForUniqueFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AddProjectForUniqueFunction.java @@ -274,7 +274,7 @@ public Optional, LogicalProject>> rewr */ @VisibleForTesting public List tryGenUniqueFunctionAlias(Collection targets) { - Map unqiueFunctionCounter = Maps.newLinkedHashMap(); + Map unqiueFunctionCounter = Maps.newLinkedHashMap(); for (Expression target : targets) { target.foreach(e -> { Expression expr = (Expression) e; @@ -286,10 +286,12 @@ public List tryGenUniqueFunctionAlias(Collection builder = ImmutableList.builderWithExpectedSize(unqiueFunctionCounter.size()); - for (Entry entry : unqiueFunctionCounter.entrySet()) { + for (Entry entry : unqiueFunctionCounter.entrySet()) { if (entry.getValue() > 1) { ExprId exprId = StatementScopeIdGenerator.newExprId(); - String name = "$_" + entry.getKey().getName() + "_" + exprId.asInt() + "_$"; + String functionName = entry.getKey() instanceof Function + ? ((Function) entry.getKey()).getName() : "volatile"; + String name = "$_" + functionName + "_" + exprId.asInt() + "_$"; builder.add(new Alias(exprId, entry.getKey(), name)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumer.java index 62f29f40441626..25fa5d2ccc8a68 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumer.java @@ -38,7 +38,7 @@ public Rule build() { LogicalCTEConsumer cteConsumer = filter.child(); Set exprs = filter.getConjuncts(); for (Expression expr : exprs) { - if (expr.containsUniqueFunction()) { + if (expr.containsVolatileExpression()) { continue; } Expression rewrittenExpr = expr.rewriteUp(e -> { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/JoinExtractOrFromCaseWhen.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/JoinExtractOrFromCaseWhen.java index 87fdf7a48feb2a..81317f20903ea6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/JoinExtractOrFromCaseWhen.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/JoinExtractOrFromCaseWhen.java @@ -108,7 +108,7 @@ private boolean needRewrite(LogicalJoin join) { // 1. expr contains slots from both sides; private boolean isConditionNeedRewrite(Expression expr, Set leftSlots, Set rightSlots) { - if (expr.containsUniqueFunction()) { + if (expr.containsVolatileExpression()) { return false; } Set exprSlots = expr.getInputSlots(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughAggregation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughAggregation.java index 2f993fade5786b..0945162f6d0d1d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughAggregation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughAggregation.java @@ -69,7 +69,7 @@ public Rule build() { // 2. if the conjunct contains unique function, it should not be pushed down; // e.g. 'select a, sum(a) from t group by a having a + random() > 10' // not equals 'select a, sum(a) from t where a + random() > 10 group by a' - if (!conjunct.containsUniqueFunction() + if (!conjunct.containsVolatileExpression() && !conjunctSlots.isEmpty() && canPushDownSlots.containsAll(conjunctSlots)) { pushDownPredicates.add(conjunct); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughGenerate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughGenerate.java index 851aba0e21ae07..85de47ab1274b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughGenerate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughGenerate.java @@ -50,7 +50,7 @@ public Rule build() { filter.getConjuncts().forEach(conjunct -> { Set conjunctSlots = conjunct.getInputSlots(); if (!conjunctSlots.isEmpty() && childOutputs.containsAll(conjunctSlots) - && !conjunct.containsUniqueFunction()) { + && !conjunct.containsVolatileExpression()) { pushDownPredicates.add(conjunct); } else { remainPredicates.add(conjunct); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughJoin.java index b16abf35250ce6..ddcff70759fb7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughJoin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughJoin.java @@ -120,7 +120,7 @@ public Rule build() { Set rightPredicates = Sets.newLinkedHashSet(); Set remainingPredicates = Sets.newLinkedHashSet(); for (Expression p : filterPredicates) { - if (p.containsUniqueFunction()) { + if (p.containsVolatileExpression()) { remainingPredicates.add(p); continue; } @@ -162,7 +162,7 @@ private boolean convertJoinCondition(Expression predicate, Set leftOutputs if (!(predicate instanceof EqualTo)) { return false; } - if (predicate.containsUniqueFunction()) { + if (predicate.containsVolatileExpression()) { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughProject.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughProject.java index d9581d919a7e80..1a46b51a2468b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughProject.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughProject.java @@ -125,7 +125,7 @@ private static Pair, Set> splitConjunctsByChildOutpu // `project(b + random(1, 10) as a) -> filter(b + random(1, 10) > 1)`, it contains two distinct RANDOM. if (childOutputs.containsAll(conjunctSlots) && conjunctSlots.stream().map(childAlias::get).filter(Objects::nonNull) - .noneMatch(Expression::containsUniqueFunction)) { + .noneMatch(Expression::containsVolatileExpression)) { pushDownPredicates.add(conjunct); } else { remainPredicates.add(conjunct); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushFilterInsideJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushFilterInsideJoin.java index d3ba907c5bde7d..7c529ee6669acb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushFilterInsideJoin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushFilterInsideJoin.java @@ -59,7 +59,7 @@ public Rule build() { List otherConditions = Lists.newArrayListWithExpectedSize( filter.getConjuncts().size() + join.getOtherJoinConjuncts().size()); for (Expression expr : filter.getConjuncts()) { - if (expr.containsUniqueFunction()) { + if (expr.containsVolatileExpression()) { remainConditions.add(expr); } else { otherConditions.add(expr); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushProjectIntoUnion.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushProjectIntoUnion.java index afef7be254901e..53deff85eb6beb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushProjectIntoUnion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushProjectIntoUnion.java @@ -104,7 +104,7 @@ private boolean canPushProjectIntoUnion(LogicalProject project) { Set uniqueFunctionSlots = Sets.newHashSet(); for (int i = 0; i < constExprs.size(); i++) { NamedExpression ne = constExprs.get(i); - if (ne.containsUniqueFunction()) { + if (ne.containsVolatileExpression()) { uniqueFunctionSlots.add(union.getOutput().get(i)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ReorderJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ReorderJoin.java index 6b3b540477c8c4..3f5c520b27fa3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ReorderJoin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ReorderJoin.java @@ -100,7 +100,7 @@ public Rule build() { for (Expression conjunct : filter.getConjuncts()) { // after reorder and push down the random() down to lower join, // the rewritten sql may have less rows() than the origin sql - if (conjunct.containsUniqueFunction()) { + if (conjunct.containsVolatileExpression()) { uniqueExprConjuncts.add(conjunct); } else { nonUniqueExprConjuncts.add(conjunct); @@ -153,7 +153,7 @@ public Plan joinToMultiJoin(Plan plan, Map planToHintType) // (t1 join t2) join t3 where t1.a = t3.x + random() // if reorder, then may have ((t1 join t3) on t1.a = t3.x + random()) join t2, // then the reorder result will less rows than origin. - if (conjunct.containsUniqueFunction()) { + if (conjunct.containsVolatileExpression()) { return plan; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/eageraggregation/EagerAggRewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/eageraggregation/EagerAggRewriter.java index 217854d6033ce2..867ba560bc3ad1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/eageraggregation/EagerAggRewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/eageraggregation/EagerAggRewriter.java @@ -506,7 +506,7 @@ public Plan visitLogicalFilter(LogicalFilter filter, PushDownAgg if (filter.child() instanceof LogicalRelation) { return genAggregate(filter, context); } - if (filter.getConjuncts().stream().anyMatch(Expression::containsUniqueFunction)) { + if (filter.getConjuncts().stream().anyMatch(Expression::containsVolatileExpression)) { return genAggregate(filter, context); } List filterInputSlots = filter.getInputSlots().stream() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java index 70619c34007dd0..ce3edfbeb3262f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java @@ -30,7 +30,6 @@ import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; import org.apache.doris.nereids.trees.expressions.functions.generator.TableGeneratingFunction; import org.apache.doris.nereids.trees.expressions.functions.scalar.Lambda; -import org.apache.doris.nereids.trees.expressions.functions.scalar.UniqueFunction; import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; import org.apache.doris.nereids.trees.expressions.shape.LeafExpression; @@ -388,8 +387,9 @@ public boolean isKeyColumnFromTable() { && ((SlotReference) this).getOriginalColumn().get().isKey(); } - public boolean containsUniqueFunction() { - return containsType(UniqueFunction.class); + public boolean containsVolatileExpression() { + return anyMatch(expr -> expr instanceof VolatileExpression + && ((VolatileExpression) expr).isVolatile()); } /** containsNullLiteralChildren */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/VolatileExpression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/VolatileExpression.java new file mode 100644 index 00000000000000..d43d326b6a38e0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/VolatileExpression.java @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions; + +/** Expression that may carry per-call volatile identity for optimizer rewrite safety. */ +public interface VolatileExpression { + VolatileIdentity getVolatileIdentity(); + + Expression withIgnoreUniqueId(boolean ignoreUniqueId); + + default boolean isVolatile() { + return getVolatileIdentity().isVolatile(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/VolatileIdentity.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/VolatileIdentity.java new file mode 100644 index 00000000000000..26bd32caa8b5fd --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/VolatileIdentity.java @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions; + +import com.google.common.base.Preconditions; + +import java.util.Optional; + +/** Value object for volatile per-call identity and temporary group-by binding comparison. */ +public class VolatileIdentity { + public static final VolatileIdentity NON_VOLATILE = new VolatileIdentity(Optional.empty(), false); + + private final Optional uniqueId; + private final boolean ignoreUniqueId; + + public VolatileIdentity(ExprId uniqueId, boolean ignoreUniqueId) { + this(Optional.of(uniqueId), ignoreUniqueId); + } + + private VolatileIdentity(Optional uniqueId, boolean ignoreUniqueId) { + Preconditions.checkArgument(!ignoreUniqueId || uniqueId.isPresent(), + "ignoreUniqueId is meaningful only for volatile expressions"); + this.uniqueId = uniqueId; + this.ignoreUniqueId = ignoreUniqueId; + } + + public static VolatileIdentity newVolatileIdentity() { + return new VolatileIdentity(StatementScopeIdGenerator.newExprId(), false); + } + + public static VolatileIdentity of(ExprId uniqueId, boolean ignoreUniqueId) { + return new VolatileIdentity(uniqueId, ignoreUniqueId); + } + + public boolean isVolatile() { + return uniqueId.isPresent(); + } + + public ExprId getUniqueId() { + return uniqueId.get(); + } + + public Optional getUniqueIdOptional() { + return uniqueId; + } + + public boolean ignoreUniqueId() { + return ignoreUniqueId; + } + + public VolatileIdentity withIgnoreUniqueId(boolean ignoreUniqueId) { + Preconditions.checkState(isVolatile(), "Only volatile expressions can ignore unique id"); + return new VolatileIdentity(uniqueId, ignoreUniqueId); + } + + /** Compare volatile expressions by identity unless either side temporarily ignores it. */ + public boolean equalsByIdentity(VolatileIdentity other, boolean fallbackEquals) { + if ((!isVolatile() && !other.isVolatile()) + || (ignoreUniqueId && other.ignoreUniqueId())) { + return fallbackEquals; + } + if ((!isVolatile() || !other.isVolatile()) + || (ignoreUniqueId || other.ignoreUniqueId())) { + return false; + } + return uniqueId.equals(other.getUniqueIdOptional()); + } + + public int hashCodeByIdentity(int fallbackHash) { + if (!isVolatile() || ignoreUniqueId) { + return fallbackHash; + } + return getUniqueId().asInt(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ExpressionTrait.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ExpressionTrait.java index 7ad16a5d77ce99..fea6220a78341c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ExpressionTrait.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ExpressionTrait.java @@ -76,7 +76,7 @@ default String toSql() throws UnboundException { /** * foldable() mainly use in fold expression. Udf and UniqueFunction are not foldable. * But if want to check an expression contains non-idempotent, such as `rand()`, `uuid()`, etc., - * you should use Expression::containsUniqueFunction instead. + * you should use Expression::containsVolatileExpression instead. */ default boolean foldable() { return true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/UniqueFunction.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/UniqueFunction.java index c29d83e043c3d6..83875fd2dce231 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/UniqueFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/UniqueFunction.java @@ -19,6 +19,8 @@ import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.VolatileExpression; +import org.apache.doris.nereids.trees.expressions.VolatileIdentity; import java.util.List; @@ -95,8 +97,9 @@ * the same. * */ -public abstract class UniqueFunction extends ScalarFunction { +public abstract class UniqueFunction extends ScalarFunction implements VolatileExpression { + protected final VolatileIdentity volatileIdentity; protected final ExprId uniqueId; // when compare and bind unique id with group by expressions, should ignore the unique id @@ -105,22 +108,30 @@ public abstract class UniqueFunction extends ScalarFunction { /** constructor for withChildren and reuse signature */ public UniqueFunction(UniqueFunctionParams functionParams) { super(functionParams); + this.volatileIdentity = VolatileIdentity.of(functionParams.uniqueId, functionParams.ignoreUniqueId); this.uniqueId = functionParams.uniqueId; this.ignoreUniqueId = functionParams.ignoreUniqueId; } public UniqueFunction(String name, ExprId uniqueId, boolean ignoreUniqueId, Expression... arguments) { super(name, arguments); + this.volatileIdentity = VolatileIdentity.of(uniqueId, ignoreUniqueId); this.uniqueId = uniqueId; this.ignoreUniqueId = ignoreUniqueId; } public UniqueFunction(String name, ExprId uniqueId, boolean ignoreUniqueId, List arguments) { super(name, arguments); + this.volatileIdentity = VolatileIdentity.of(uniqueId, ignoreUniqueId); this.uniqueId = uniqueId; this.ignoreUniqueId = ignoreUniqueId; } + @Override + public VolatileIdentity getVolatileIdentity() { + return volatileIdentity; + } + public abstract UniqueFunction withIgnoreUniqueId(boolean ignoreUniqueId); @Override @@ -149,19 +160,13 @@ public boolean equals(Object o) { UniqueFunction other = (UniqueFunction) o; // in BindExpression phase, when compare two expression equals except the unique id, // will set ignoreUniqueId = true temporarily, after bind expression, will recover ignoreUniqueId = false - if (ignoreUniqueId || other.ignoreUniqueId) { - return super.equals(other); - } - return uniqueId.equals(other.uniqueId); + return volatileIdentity.equalsByIdentity(other.volatileIdentity, super.equals(other)); } // The contains method needs to use hashCode, so similar to equals, it only compares exprId @Override public int computeHashCode() { // direct return exprId to speed up - if (ignoreUniqueId) { - return super.computeHashCode(); - } - return uniqueId.asInt(); + return volatileIdentity.hashCodeByIdentity(super.computeHashCode()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdf.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdf.java index 07cd4556324f21..3e53200fa9cdce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdf.java @@ -22,11 +22,14 @@ import org.apache.doris.catalog.Function.NullableMode; import org.apache.doris.catalog.FunctionName; import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.catalog.FunctionVolatility; import org.apache.doris.catalog.Type; import org.apache.doris.common.util.URI; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.VolatileExpression; +import org.apache.doris.nereids.trees.expressions.VolatileIdentity; import org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature; import org.apache.doris.nereids.trees.expressions.functions.Udf; import org.apache.doris.nereids.trees.expressions.functions.scalar.ScalarFunction; @@ -43,12 +46,14 @@ /** * Java UDF for Nereids */ -public class JavaUdf extends ScalarFunction implements ExplicitlyCastableSignature, Udf { +public class JavaUdf extends ScalarFunction implements ExplicitlyCastableSignature, Udf, VolatileExpression { private final String dbName; private final long functionId; private final Function.BinaryType binaryType; private final FunctionSignature signature; private final NullableMode nullableMode; + private final FunctionVolatility volatility; + private final VolatileIdentity volatileIdentity; private final String objectFile; private final String symbol; private final String prepareFn; @@ -62,7 +67,8 @@ public class JavaUdf extends ScalarFunction implements ExplicitlyCastableSignatu */ public JavaUdf(String name, long functionId, String dbName, Function.BinaryType binaryType, FunctionSignature signature, - NullableMode nullableMode, String objectFile, String symbol, String prepareFn, String closeFn, + NullableMode nullableMode, FunctionVolatility volatility, VolatileIdentity volatileIdentity, + String objectFile, String symbol, String prepareFn, String closeFn, String checkSum, boolean isStaticLoad, long expirationTime, Expression... args) { super(name, args); this.dbName = dbName; @@ -70,6 +76,8 @@ public JavaUdf(String name, long functionId, String dbName, Function.BinaryType this.binaryType = binaryType; this.signature = signature; this.nullableMode = nullableMode; + this.volatility = volatility; + this.volatileIdentity = volatileIdentity; this.objectFile = objectFile; this.symbol = symbol; this.prepareFn = prepareFn; @@ -106,10 +114,55 @@ public NullableMode getNullableMode() { public JavaUdf withChildren(List children) { Preconditions.checkArgument(children.size() == this.children.size()); return new JavaUdf(getName(), functionId, dbName, binaryType, signature, nullableMode, + volatility, volatileIdentity, objectFile, symbol, prepareFn, closeFn, checkSum, isStaticLoad, expirationTime, children.toArray(new Expression[0])); } + @Override + public VolatileIdentity getVolatileIdentity() { + return volatileIdentity; + } + + @Override + public JavaUdf withIgnoreUniqueId(boolean ignoreUniqueId) { + Preconditions.checkState(isVolatile(), "Only volatile Java UDF can ignore unique id"); + return new JavaUdf(getName(), functionId, dbName, binaryType, signature, nullableMode, + volatility, volatileIdentity.withIgnoreUniqueId(ignoreUniqueId), + objectFile, symbol, prepareFn, closeFn, checkSum, isStaticLoad, expirationTime, + children.toArray(new Expression[0])); + } + + /** Return a copy with a new per-call identity when this UDF is VOLATILE. */ + public JavaUdf withFreshVolatileIdentity() { + if (volatility != FunctionVolatility.VOLATILE) { + return this; + } + return new JavaUdf(getName(), functionId, dbName, binaryType, signature, nullableMode, + volatility, VolatileIdentity.newVolatileIdentity(), + objectFile, symbol, prepareFn, closeFn, checkSum, isStaticLoad, expirationTime, + children.toArray(new Expression[0])); + } + + @Override + public boolean isDeterministic() { + return volatility == FunctionVolatility.IMMUTABLE; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof JavaUdf)) { + return false; + } + JavaUdf other = (JavaUdf) o; + return volatileIdentity.equalsByIdentity(other.volatileIdentity, super.equals(o)); + } + + @Override + public int computeHashCode() { + return volatileIdentity.hashCodeByIdentity(super.computeHashCode()); + } + /** * translate catalog java udf to nereids java udf */ @@ -130,7 +183,7 @@ public static void translateToNereidsFunction(String dbName, org.apache.doris.ca .toArray(SlotReference[]::new); JavaUdf udf = new JavaUdf(fnName, scalar.getId(), dbName, scalar.getBinaryType(), sig, - scalar.getNullableMode(), + scalar.getNullableMode(), scalar.getVolatility(), createVolatileIdentity(scalar.getVolatility()), scalar.getLocation() == null ? null : scalar.getLocation().getLocation(), scalar.getSymbolName(), scalar.getPrepareFnSymbol(), @@ -166,9 +219,15 @@ public Function getCatalogFunction() { expr.setId(functionId); expr.setStaticLoad(isStaticLoad); expr.setExpirationTime(expirationTime); + expr.setVolatility(volatility); return expr; } catch (Exception e) { throw new AnalysisException(e.getMessage(), e.getCause()); } } + + private static VolatileIdentity createVolatileIdentity(FunctionVolatility volatility) { + return volatility == FunctionVolatility.VOLATILE + ? VolatileIdentity.newVolatileIdentity() : VolatileIdentity.NON_VOLATILE; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdfBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdfBuilder.java index 6ddfdab8a15a38..6ab90cb42cddac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdfBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdfBuilder.java @@ -88,7 +88,7 @@ public Pair build(String name, List arguments) { for (int i = 0; i < exprs.size(); ++i) { processedExprs.add(TypeCoercionUtils.castIfNotSameType(exprs.get(i), argTypes.get(i))); } - return Pair.ofSame(udf.withChildren(processedExprs)); + return Pair.ofSame(udf.withFreshVolatileIdentity().withChildren(processedExprs)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/PythonUdf.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/PythonUdf.java index 98a9e161308417..65c8f2f14d7ddb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/PythonUdf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/PythonUdf.java @@ -22,11 +22,14 @@ import org.apache.doris.catalog.Function.NullableMode; import org.apache.doris.catalog.FunctionName; import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.catalog.FunctionVolatility; import org.apache.doris.catalog.Type; import org.apache.doris.common.util.URI; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.VolatileExpression; +import org.apache.doris.nereids.trees.expressions.VolatileIdentity; import org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature; import org.apache.doris.nereids.trees.expressions.functions.Udf; import org.apache.doris.nereids.trees.expressions.functions.scalar.ScalarFunction; @@ -43,12 +46,14 @@ /** * Python UDF for Nereids */ -public class PythonUdf extends ScalarFunction implements ExplicitlyCastableSignature, Udf { +public class PythonUdf extends ScalarFunction implements ExplicitlyCastableSignature, Udf, VolatileExpression { private final String dbName; private final long functionId; private final Function.BinaryType binaryType; private final FunctionSignature signature; private final NullableMode nullableMode; + private final FunctionVolatility volatility; + private final VolatileIdentity volatileIdentity; private final String objectFile; private final String symbol; private final String prepareFn; @@ -64,7 +69,8 @@ public class PythonUdf extends ScalarFunction implements ExplicitlyCastableSigna */ public PythonUdf(String name, long functionId, String dbName, Function.BinaryType binaryType, FunctionSignature signature, - NullableMode nullableMode, String objectFile, String symbol, String prepareFn, String closeFn, + NullableMode nullableMode, FunctionVolatility volatility, VolatileIdentity volatileIdentity, + String objectFile, String symbol, String prepareFn, String closeFn, String checkSum, boolean isStaticLoad, long expirationTime, String runtimeVersion, String functionCode, Expression... args) { super(name, args); @@ -73,6 +79,8 @@ public PythonUdf(String name, long functionId, String dbName, Function.BinaryTyp this.binaryType = binaryType; this.signature = signature; this.nullableMode = nullableMode; + this.volatility = volatility; + this.volatileIdentity = volatileIdentity; this.objectFile = objectFile; this.symbol = symbol; this.prepareFn = prepareFn; @@ -111,10 +119,55 @@ public NullableMode getNullableMode() { public PythonUdf withChildren(List children) { Preconditions.checkArgument(children.size() == this.children.size()); return new PythonUdf(getName(), functionId, dbName, binaryType, signature, nullableMode, + volatility, volatileIdentity, objectFile, symbol, prepareFn, closeFn, checkSum, isStaticLoad, expirationTime, runtimeVersion, functionCode, children.toArray(new Expression[0])); } + @Override + public VolatileIdentity getVolatileIdentity() { + return volatileIdentity; + } + + @Override + public PythonUdf withIgnoreUniqueId(boolean ignoreUniqueId) { + Preconditions.checkState(isVolatile(), "Only volatile Python UDF can ignore unique id"); + return new PythonUdf(getName(), functionId, dbName, binaryType, signature, nullableMode, + volatility, volatileIdentity.withIgnoreUniqueId(ignoreUniqueId), + objectFile, symbol, prepareFn, closeFn, checkSum, isStaticLoad, expirationTime, + runtimeVersion, functionCode, children.toArray(new Expression[0])); + } + + /** Return a copy with a new per-call identity when this UDF is VOLATILE. */ + public PythonUdf withFreshVolatileIdentity() { + if (volatility != FunctionVolatility.VOLATILE) { + return this; + } + return new PythonUdf(getName(), functionId, dbName, binaryType, signature, nullableMode, + volatility, VolatileIdentity.newVolatileIdentity(), + objectFile, symbol, prepareFn, closeFn, checkSum, isStaticLoad, expirationTime, + runtimeVersion, functionCode, children.toArray(new Expression[0])); + } + + @Override + public boolean isDeterministic() { + return volatility == FunctionVolatility.IMMUTABLE; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof PythonUdf)) { + return false; + } + PythonUdf other = (PythonUdf) o; + return volatileIdentity.equalsByIdentity(other.volatileIdentity, super.equals(o)); + } + + @Override + public int computeHashCode() { + return volatileIdentity.hashCodeByIdentity(super.computeHashCode()); + } + /** * translate catalog java udf to nereids java udf */ @@ -135,7 +188,7 @@ public static void translateToNereidsFunction(String dbName, org.apache.doris.ca .toArray(SlotReference[]::new); PythonUdf udf = new PythonUdf(fnName, scalar.getId(), dbName, scalar.getBinaryType(), sig, - scalar.getNullableMode(), + scalar.getNullableMode(), scalar.getVolatility(), createVolatileIdentity(scalar.getVolatility()), scalar.getLocation() == null ? null : scalar.getLocation().getLocation(), scalar.getSymbolName(), scalar.getPrepareFnSymbol(), @@ -175,9 +228,15 @@ public Function getCatalogFunction() { expr.setExpirationTime(expirationTime); expr.setRuntimeVersion(runtimeVersion); expr.setFunctionCode(functionCode); + expr.setVolatility(volatility); return expr; } catch (Exception e) { throw new AnalysisException(e.getMessage(), e.getCause()); } } + + private static VolatileIdentity createVolatileIdentity(FunctionVolatility volatility) { + return volatility == FunctionVolatility.VOLATILE + ? VolatileIdentity.newVolatileIdentity() : VolatileIdentity.NON_VOLATILE; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/PythonUdfBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/PythonUdfBuilder.java index 7185594099b87c..85ff1035c3d574 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/PythonUdfBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/PythonUdfBuilder.java @@ -60,7 +60,7 @@ public List getSignatures() { @Override public Class functionClass() { - return JavaUdf.class; + return PythonUdf.class; } @Override @@ -88,7 +88,7 @@ public Pair build(String name, List arguments) { for (int i = 0; i < exprs.size(); ++i) { processedExprs.add(TypeCoercionUtils.castIfNotSameType(exprs.get(i), argTypes.get(i))); } - return Pair.ofSame(udf.withChildren(processedExprs)); + return Pair.ofSame(udf.withFreshVolatileIdentity().withChildren(processedExprs)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateFunctionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateFunctionCommand.java index eaacae8aaa62e3..c5c2e2ba99e4d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateFunctionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateFunctionCommand.java @@ -32,6 +32,7 @@ import org.apache.doris.catalog.Function.NullableMode; import org.apache.doris.catalog.FunctionName; import org.apache.doris.catalog.FunctionUtil; +import org.apache.doris.catalog.FunctionVolatility; import org.apache.doris.catalog.MapType; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarFunction; @@ -147,6 +148,7 @@ public class CreateFunctionCommand extends Command implements ForwardWithSync { public static final String IS_STATIC_LOAD = "static_load"; public static final String EXPIRATION_TIME = "expiration_time"; public static final String RUNTIME_VERSION = "runtime_version"; + public static final String VOLATILITY = "volatility"; private static final Pattern PYTHON_VERSION_PATTERN = Pattern.compile("^3\\.\\d{1,2}(?:\\.\\d{1,2})?$"); private static final Logger LOG = LogManager.getLogger(CreateFunctionCommand.class); @@ -177,6 +179,7 @@ public class CreateFunctionCommand extends Command implements ForwardWithSync { // if not, will core dump when input is not null column, but need return null // like https://github.com/apache/doris/pull/14002/files private NullableMode returnNullMode = NullableMode.ALWAYS_NULLABLE; + private FunctionVolatility volatility = FunctionVolatility.IMMUTABLE; private String runtimeVersion; private String functionCode; @@ -315,6 +318,9 @@ private void analyzeCommon(ConnectContext ctx) throws AnalysisException { throw new AnalysisException("do not support 'NATIVE' udf type after doris version 1.2.0," + "please use JAVA_UDF or RPC instead"); } + if (properties.containsKey(VOLATILITY) && (isAggregate || isTableFunction)) { + throw new AnalysisException("volatility property only supports scalar JAVA_UDF and PYTHON_UDF"); + } userFile = properties.getOrDefault(FILE_KEY, properties.get(OBJECT_FILE_KEY)); originalUserFile = userFile; // Keep original jar name for BE @@ -335,6 +341,9 @@ private void analyzeCommon(ConnectContext ctx) throws AnalysisException { } if (binaryType == Function.BinaryType.JAVA_UDF) { FunctionUtil.checkEnableJavaUdf(); + if (!isAggregate && !isTableFunction) { + volatility = analyzeVolatility(); + } // always_nullable the default value is true, equal null means true Boolean isReturnNull = parseBooleanFromProperties(IS_RETURN_NULL); @@ -349,6 +358,9 @@ private void analyzeCommon(ConnectContext ctx) throws AnalysisException { extractExpirationTime(); } else if (binaryType == Function.BinaryType.PYTHON_UDF) { FunctionUtil.checkEnablePythonUdf(); + if (!isAggregate && !isTableFunction) { + volatility = analyzeVolatility(); + } // always_nullable the default value is true, equal null means true Boolean isReturnNull = parseBooleanFromProperties(IS_RETURN_NULL); @@ -365,6 +377,19 @@ private void analyzeCommon(ConnectContext ctx) throws AnalysisException { + "'3.X.X' or '3.XX.XX' (e.g. '3.10.2').", runtimeVersionString)); } runtimeVersion = runtimeVersionString; + } else if (properties.containsKey(VOLATILITY)) { + throw new AnalysisException("volatility property only supports JAVA_UDF and PYTHON_UDF"); + } + } + + private FunctionVolatility analyzeVolatility() throws AnalysisException { + if (!properties.containsKey(VOLATILITY)) { + return FunctionVolatility.VOLATILE; + } + try { + return FunctionVolatility.fromString(properties.get(VOLATILITY)); + } catch (IllegalArgumentException e) { + throw new AnalysisException(e.getMessage()); } } @@ -476,6 +501,7 @@ private void analyzeUdtf() throws AnalysisException { function.setUDTFunction(true); function.setRuntimeVersion(runtimeVersion); function.setFunctionCode(functionCode); + function.setVolatility(volatility); // Todo: maybe in create tables function, need register two function, one is // normal and one is outer as those have different result when result is NULL. } @@ -550,6 +576,7 @@ private void analyzeUdaf() throws AnalysisException { function.setExpirationTime(expirationTime); function.setRuntimeVersion(runtimeVersion); function.setFunctionCode(functionCode); + function.setVolatility(volatility); } private void analyzeUdf() throws AnalysisException { @@ -587,6 +614,7 @@ private void analyzeUdf() throws AnalysisException { function.setExpirationTime(expirationTime); function.setRuntimeVersion(runtimeVersion); function.setFunctionCode(functionCode); + function.setVolatility(volatility); } private void analyzeJavaUdaf(String clazz) throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowFunctionsCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowFunctionsCommand.java index f64f201a7ae61b..081d482308a1c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowFunctionsCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowFunctionsCommand.java @@ -293,9 +293,13 @@ private String buildProperties(Function function) { if (!Strings.isNullOrEmpty(function.getRuntimeVersion())) { properties.put("RUNTIME_VERSION", function.getRuntimeVersion()); } - if (function instanceof ScalarFunction) { ScalarFunction scalarFunction = (ScalarFunction) function; + if (!scalarFunction.isUDTFunction() + && (function.getBinaryType() == Function.BinaryType.JAVA_UDF + || function.getBinaryType() == Function.BinaryType.PYTHON_UDF)) { + properties.put("VOLATILITY", function.getVolatility().toSql()); + } properties.put("SYMBOL", Strings.nullToEmpty(scalarFunction.getSymbolName())); if (scalarFunction.getPrepareFnSymbol() != null) { properties.put("PREPARE_FN", scalarFunction.getPrepareFnSymbol()); @@ -348,4 +352,9 @@ private String buildProperties(Function function) { .collect(Collectors.joining(", ")); } + @VisibleForTesting + String buildPropertiesForTest(Function function) { + return buildProperties(function); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java index 7dba523df42f3b..6eb00636a50f83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java @@ -473,7 +473,7 @@ public void computeUnique(DataTrait.Builder builder) { } DataTrait childFd = child(0).getLogicalProperties().getTrait(); - if (groupByExpressions.stream().anyMatch(Expression::containsUniqueFunction)) { + if (groupByExpressions.stream().anyMatch(Expression::containsVolatileExpression)) { return; } @@ -517,7 +517,7 @@ public void computeUniform(DataTrait.Builder builder) { return; } - if (groupByExpressions.stream().anyMatch(Expression::containsUniqueFunction)) { + if (groupByExpressions.stream().anyMatch(Expression::containsVolatileExpression)) { return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLoadProject.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLoadProject.java index 207d80495f0663..5c2118e60e5bc4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLoadProject.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLoadProject.java @@ -287,7 +287,7 @@ public void computeFd(DataTrait.Builder builder) { continue; } // a+random(1,10) should continue, otherwise the a(determinant), a+random(1,10) (dependency) will be added. - if (expr.containsUniqueFunction()) { + if (expr.containsVolatileExpression()) { continue; } builder.addDeps(expr.getInputSlots(), ImmutableSet.of(expr.toSlot())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalProject.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalProject.java index fe076a7f8fc6f2..28823b0be8a528 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalProject.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalProject.java @@ -319,7 +319,7 @@ public void computeFd(DataTrait.Builder builder) { continue; } // a+random(1,10) should continue, otherwise the a(determinant), a+random(1,10) (dependency) will be added. - if (expr.containsUniqueFunction()) { + if (expr.containsVolatileExpression()) { continue; } builder.addDeps(expr.getInputSlots(), ImmutableSet.of(expr.toSlot())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBucketedHashAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBucketedHashAggregate.java index 11d04dc03483c4..09565bdc820af2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBucketedHashAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBucketedHashAggregate.java @@ -208,7 +208,7 @@ public PhysicalBucketedHashAggregate resetLogicalProperties() { public void computeUnique(DataTrait.Builder builder) { DataTrait childFd = child(0).getLogicalProperties().getTrait(); - if (groupByExpressions.stream().anyMatch(Expression::containsUniqueFunction)) { + if (groupByExpressions.stream().anyMatch(Expression::containsVolatileExpression)) { return; } @@ -240,7 +240,7 @@ public void computeUniform(DataTrait.Builder builder) { DataTrait childFd = child(0).getLogicalProperties().getTrait(); builder.addUniformSlot(childFd); - if (groupByExpressions.stream().anyMatch(Expression::containsUniqueFunction)) { + if (groupByExpressions.stream().anyMatch(Expression::containsVolatileExpression)) { return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java index 2230f6281f50fc..395ff52119880c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java @@ -416,7 +416,7 @@ private boolean isUniformGroupByUnique(NamedExpression namedExpression) { public void computeUnique(DataTrait.Builder builder) { DataTrait childFd = child(0).getLogicalProperties().getTrait(); - if (groupByExpressions.stream().anyMatch(Expression::containsUniqueFunction)) { + if (groupByExpressions.stream().anyMatch(Expression::containsVolatileExpression)) { return; } @@ -455,7 +455,7 @@ public void computeUniform(DataTrait.Builder builder) { DataTrait childFd = child(0).getLogicalProperties().getTrait(); builder.addUniformSlot(childFd); - if (groupByExpressions.stream().anyMatch(Expression::containsUniqueFunction)) { + if (groupByExpressions.stream().anyMatch(Expression::containsVolatileExpression)) { return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java index 5376f31b10fd7e..bd1eb02ab98c28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java @@ -52,6 +52,7 @@ import org.apache.doris.nereids.trees.expressions.Or; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.VolatileExpression; import org.apache.doris.nereids.trees.expressions.WhenClause; import org.apache.doris.nereids.trees.expressions.WindowExpression; import org.apache.doris.nereids.trees.expressions.functions.BoundFunction; @@ -72,7 +73,6 @@ import org.apache.doris.nereids.trees.expressions.functions.scalar.Length; import org.apache.doris.nereids.trees.expressions.functions.scalar.NullIf; import org.apache.doris.nereids.trees.expressions.functions.scalar.Nvl; -import org.apache.doris.nereids.trees.expressions.functions.scalar.UniqueFunction; import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral; import org.apache.doris.nereids.trees.expressions.literal.ComparableLiteral; import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral; @@ -647,7 +647,8 @@ public static List replaceNamedExpressions(List - e instanceof UniqueFunction ? ((UniqueFunction) e).withIgnoreUniqueId(ignoreUniqueId) : e); + e instanceof VolatileExpression && ((VolatileExpression) e).isVolatile() + ? ((VolatileExpression) e).withIgnoreUniqueId(ignoreUniqueId) : e); } public static List rewriteDownShortCircuit( @@ -1346,10 +1347,12 @@ public static boolean hasNonWindowAggregateFunction(Expression expression) { * check if the expressions contain a unique function which exists multiple times */ public static boolean containUniqueFunctionExistMultiple(Collection expressions) { - Set counterSet = Sets.newHashSet(); + Set counterSet = Sets.newHashSet(); for (Expression expression : expressions) { if (expression.anyMatch( - expr -> expr instanceof UniqueFunction && !counterSet.add((UniqueFunction) expr))) { + expr -> expr instanceof VolatileExpression + && ((VolatileExpression) expr).isVolatile() + && !counterSet.add((Expression) expr))) { return true; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java index a635c2b1fd0b8e..0f8bb5e5ed1f7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java @@ -195,7 +195,7 @@ public static boolean canMergeWithProjections(List ch List targetExpressions) { Set uniqueFunctionSlots = Sets.newHashSet(); for (Entry kv : ExpressionUtils.generateReplaceMap(childProjects).entrySet()) { - if (kv.getValue().containsUniqueFunction()) { + if (kv.getValue().containsVolatileExpression()) { uniqueFunctionSlots.add(kv.getKey()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateFunctionTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateFunctionTest.java index fc309de75b0c90..426a45074b85c3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateFunctionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateFunctionTest.java @@ -114,6 +114,20 @@ public void test() throws Exception { Assert.assertTrue(containsIgnoreCase(dorisAssert.query(queryStr).explainQuery(), "concat(left(CAST(CAST(k1 as BIGINT) AS VARCHAR(65533)), 3), '****'," + " right(CAST(CAST(k1 AS BIGINT) AS VARCHAR(65533)), 4))")); + + String pythonUdfSql = "create function db1.py_stable(int) returns int " + + "properties('type'='PYTHON_UDF', 'symbol'='evaluate', " + + "'runtime_version'='3.10.2', 'volatility'='stable');"; + createFunction(pythonUdfSql, ctx); + Assert.assertEquals(2, db.getFunctions().size()); + Function pythonFn = findFunction(db, "py_stable"); + Assert.assertEquals(FunctionVolatility.STABLE, pythonFn.getVolatility()); + Assert.assertTrue(FunctionToSqlConverter.toSql(pythonFn, false).contains("\"VOLATILITY\"=\"stable\"")); + + String defaultVolatileSql = "create function db1.py_default(int) returns int " + + "properties('type'='PYTHON_UDF', 'symbol'='evaluate', 'runtime_version'='3.10.2');"; + createFunction(defaultVolatileSql, ctx); + Assert.assertEquals(FunctionVolatility.VOLATILE, findFunction(db, "py_default").getVolatility()); } @Test @@ -204,4 +218,13 @@ private void createFunction(String sql, ConnectContext connectContext) throws Ex private boolean containsIgnoreCase(String str, String sub) { return str.toLowerCase().contains(sub.toLowerCase()); } + + private Function findFunction(Database db, String functionName) { + for (Function function : db.getFunctions()) { + if (functionName.equals(function.functionName())) { + return function; + } + } + throw new AssertionError("function not found: " + functionName); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/FunctionToSqlConverterTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/FunctionToSqlConverterTest.java index 26b22fa1baed1b..091610f2bf2755 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/FunctionToSqlConverterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/FunctionToSqlConverterTest.java @@ -19,6 +19,8 @@ import org.apache.doris.catalog.Function.BinaryType; import org.apache.doris.catalog.Function.NullableMode; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.URI; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -35,6 +37,7 @@ void testScalarFunction_javaUdf_basicSql() { Type[] argTypes = {Type.INT, Type.INT}; ScalarFunction fn = ScalarFunction.createUdf(BinaryType.JAVA_UDF, name, argTypes, Type.INT, false, null, "com.example.MyAdd", null, null); + fn.setVolatility(FunctionVolatility.VOLATILE); String sql = FunctionToSqlConverter.toSql(fn, false); @@ -44,6 +47,7 @@ void testScalarFunction_javaUdf_basicSql() { Assertions.assertTrue(sql.contains("\"SYMBOL\"=\"com.example.MyAdd\"")); Assertions.assertTrue(sql.contains("\"FILE\"=\"\"")); Assertions.assertTrue(sql.contains("\"TYPE\"=\"JAVA_UDF\"")); + Assertions.assertTrue(sql.contains("\"VOLATILITY\"=\"volatile\"")); Assertions.assertTrue(sql.contains("\"ALWAYS_NULLABLE\"=")); Assertions.assertFalse(sql.contains("OBJECT_FILE")); Assertions.assertFalse(sql.contains("IF NOT EXISTS")); @@ -102,6 +106,83 @@ void testScalarFunction_javaUdf_withoutPrepareFnAndCloseFn() { Assertions.assertFalse(sql.contains("CLOSE_FN")); } + @Test + void testScalarFunction_pythonUdf_inlineReplaySql() { + FunctionName name = new FunctionName("testDb", "py_inline"); + Type[] argTypes = {Type.INT}; + ScalarFunction fn = ScalarFunction.createUdf(BinaryType.PYTHON_UDF, name, argTypes, + Type.INT, false, null, "evaluate", null, null); + fn.setRuntimeVersion("3.10.2"); + fn.setFunctionCode("def evaluate(x):\n return x + 1"); + fn.setVolatility(FunctionVolatility.IMMUTABLE); + + String sql = FunctionToSqlConverter.toSql(fn, false); + + Assertions.assertTrue(sql.contains("\"RUNTIME_VERSION\"=\"3.10.2\"")); + Assertions.assertTrue(sql.contains("\"VOLATILITY\"=\"immutable\"")); + Assertions.assertTrue(sql.contains("\"TYPE\"=\"PYTHON_UDF\"")); + Assertions.assertTrue(sql.contains("AS $$\ndef evaluate(x):\n return x + 1\n$$;")); + Assertions.assertFalse(sql.endsWith(");")); + } + + @Test + void testScalarFunction_pythonUdf_moduleReplaySql() throws AnalysisException { + FunctionName name = new FunctionName("testDb", "py_module"); + Type[] argTypes = {Type.INT}; + ScalarFunction fn = ScalarFunction.createUdf(BinaryType.PYTHON_UDF, name, argTypes, + Type.INT, false, URI.create("file:///tmp/pyudf.zip"), "pkg.mod.evaluate", null, null); + fn.setRuntimeVersion("3.10.2"); + fn.setVolatility(FunctionVolatility.STABLE); + + String sql = FunctionToSqlConverter.toSql(fn, false); + + Assertions.assertTrue(sql.contains("\"FILE\"=\"file:///tmp/pyudf.zip\"")); + Assertions.assertTrue(sql.contains("\"RUNTIME_VERSION\"=\"3.10.2\"")); + Assertions.assertTrue(sql.contains("\"VOLATILITY\"=\"stable\"")); + Assertions.assertTrue(sql.endsWith(");")); + Assertions.assertFalse(sql.contains("AS $$")); + } + + @Test + void testScalarFunction_javaUdtfReplaySql() { + FunctionName name = new FunctionName("testDb", "java_table_fn"); + Type[] argTypes = {Type.INT}; + ScalarFunction fn = ScalarFunction.createUdf(BinaryType.JAVA_UDF, name, argTypes, + Type.INT, false, null, "com.example.TableFn", null, null); + fn.setUDTFunction(true); + fn.setVolatility(FunctionVolatility.IMMUTABLE); + + String sql = FunctionToSqlConverter.toSql(fn, true); + + Assertions.assertTrue(sql.startsWith("CREATE TABLES FUNCTION IF NOT EXISTS ")); + Assertions.assertTrue(sql.contains("java_table_fn(int)")); + Assertions.assertTrue(sql.contains("RETURNS array")); + Assertions.assertTrue(sql.contains("\"TYPE\"=\"JAVA_UDF\"")); + Assertions.assertFalse(sql.contains("VOLATILITY")); + } + + @Test + void testScalarFunction_pythonUdtfReplaySql() { + FunctionName name = new FunctionName("testDb", "py_table_fn"); + Type[] argTypes = {Type.INT}; + ScalarFunction fn = ScalarFunction.createUdf(BinaryType.PYTHON_UDF, name, argTypes, + Type.INT, false, null, "evaluate", null, null); + fn.setUDTFunction(true); + fn.setRuntimeVersion("3.10.2"); + fn.setFunctionCode("def evaluate(x):\n yield x"); + fn.setVolatility(FunctionVolatility.IMMUTABLE); + + String sql = FunctionToSqlConverter.toSql(fn, false); + + Assertions.assertTrue(sql.startsWith("CREATE TABLES FUNCTION ")); + Assertions.assertTrue(sql.contains("py_table_fn(int)")); + Assertions.assertTrue(sql.contains("RETURNS array")); + Assertions.assertTrue(sql.contains("\"RUNTIME_VERSION\"=\"3.10.2\"")); + Assertions.assertTrue(sql.contains("\"TYPE\"=\"PYTHON_UDF\"")); + Assertions.assertTrue(sql.contains("AS $$\ndef evaluate(x):\n yield x\n$$;")); + Assertions.assertFalse(sql.contains("VOLATILITY")); + } + // ======================== ScalarFunction — IF NOT EXISTS ======================== @Test @@ -210,6 +291,30 @@ void testAggregateFunction_javaUdf_ifNotExists() { Assertions.assertTrue(sql.contains("CREATE AGGREGATE FUNCTION IF NOT EXISTS ")); } + @Test + void testAggregateFunction_pythonUdf_inlineReplaySql() { + FunctionName name = new FunctionName("testDb", "py_agg"); + Type[] argTypes = {Type.INT}; + AggregateFunction fn = AggregateFunction.AggregateFunctionBuilder.createUdfBuilder() + .binaryType(BinaryType.PYTHON_UDF) + .name(name) + .argsType(argTypes) + .retType(Type.INT) + .intermediateType(Type.INT) + .hasVarArgs(false) + .symbolName("SumState") + .build(); + fn.setRuntimeVersion("3.10.2"); + fn.setFunctionCode("class SumState:\n pass"); + + String sql = FunctionToSqlConverter.toSql(fn, false); + + Assertions.assertTrue(sql.contains("\"RUNTIME_VERSION\"=\"3.10.2\"")); + Assertions.assertTrue(sql.contains("\"TYPE\"=\"PYTHON_UDF\"")); + Assertions.assertTrue(sql.contains("AS $$\nclass SumState:\n pass\n$$;")); + Assertions.assertFalse(sql.endsWith(");")); + } + // ======================== AggregateFunction — NATIVE ======================== @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumerTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumerTest.java index a12704bd238284..87e9ea0739622b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CollectFilterAboveConsumerTest.java @@ -81,7 +81,7 @@ void testDoNotCollectUniqueFunctionConjunct() { "exactly one conjunct (the deterministic one) should be collected, " + "unique-function conjunct must NOT be collected"); Expression onlyCollected = filters.iterator().next(); - Assertions.assertFalse(onlyCollected.containsUniqueFunction(), + Assertions.assertFalse(onlyCollected.containsVolatileExpression(), "collected conjunct must not contain a unique function"); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/functions/scalar/UniqueFunctionTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/functions/scalar/UniqueFunctionTest.java index fd40a7701649a5..c0dd9dc0d14ad2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/functions/scalar/UniqueFunctionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/functions/scalar/UniqueFunctionTest.java @@ -1044,7 +1044,7 @@ private Map getExprIdToOriginExpressionMap(Plan root) { if (output instanceof Alias) { Alias alias = (Alias) output; exprIdToExpressionMap.put(alias.getExprId(), alias.child()); - if (output.containsUniqueFunction()) { + if (output.containsVolatileExpression()) { boolean notExists = uniqueOutputExpressions.add(alias.child()); Assertions.assertTrue(notExists); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/functions/udf/UdfVolatilityTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/functions/udf/UdfVolatilityTest.java new file mode 100644 index 00000000000000..89fd24821bad40 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/functions/udf/UdfVolatilityTest.java @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.udf; + +import org.apache.doris.catalog.Function; +import org.apache.doris.catalog.Function.NullableMode; +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.catalog.FunctionVolatility; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.VolatileIdentity; +import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral; +import org.apache.doris.nereids.types.IntegerType; +import org.apache.doris.nereids.util.ExpressionUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class UdfVolatilityTest { + + @Test + void testImmutablePythonUdfIsNotVolatileExpression() { + PythonUdf udf = pythonUdf(FunctionVolatility.IMMUTABLE, VolatileIdentity.NON_VOLATILE); + + Assertions.assertTrue(udf.isDeterministic()); + Assertions.assertFalse(udf.containsVolatileExpression()); + Assertions.assertEquals(PythonUdf.class, new PythonUdfBuilder(udf).functionClass()); + } + + @Test + void testVolatilePythonUdfUsesUniqueIdentity() { + PythonUdf first = pythonUdf(FunctionVolatility.VOLATILE, VolatileIdentity.newVolatileIdentity()); + PythonUdf second = pythonUdf(FunctionVolatility.VOLATILE, VolatileIdentity.newVolatileIdentity()); + + Assertions.assertFalse(first.isDeterministic()); + Assertions.assertTrue(first.containsVolatileExpression()); + Assertions.assertNotEquals(first, second); + + Expression ignoredFirst = ExpressionUtils.setIgnoreUniqueIdForUniqueFunc(first, true); + Expression ignoredSecond = ExpressionUtils.setIgnoreUniqueIdForUniqueFunc(second, true); + Assertions.assertEquals(ignoredFirst, ignoredSecond); + } + + @Test + void testVolatileAndImmutableUdfAreNotEqual() { + PythonUdf immutable = pythonUdf(FunctionVolatility.IMMUTABLE, VolatileIdentity.NON_VOLATILE); + PythonUdf volatileUdf = pythonUdf(FunctionVolatility.VOLATILE, VolatileIdentity.newVolatileIdentity()); + + Assertions.assertNotEquals(immutable, volatileUdf); + Assertions.assertNotEquals(volatileUdf, immutable); + } + + @Test + void testJavaUdfVolatility() { + JavaUdf udf = javaUdf(FunctionVolatility.STABLE, VolatileIdentity.NON_VOLATILE); + + Assertions.assertFalse(udf.isDeterministic()); + Assertions.assertFalse(udf.containsVolatileExpression()); + } + + private PythonUdf pythonUdf(FunctionVolatility volatility, VolatileIdentity volatileIdentity) { + return new PythonUdf("py_fn", 1, "db1", Function.BinaryType.PYTHON_UDF, signature(), + NullableMode.ALWAYS_NULLABLE, volatility, volatileIdentity, + null, "evaluate", null, null, "", false, 360, "3.10.2", "", + new IntegerLiteral(1)); + } + + private JavaUdf javaUdf(FunctionVolatility volatility, VolatileIdentity volatileIdentity) { + return new JavaUdf("java_fn", 1, "db1", Function.BinaryType.JAVA_UDF, signature(), + NullableMode.ALWAYS_NULLABLE, volatility, volatileIdentity, + null, "evaluate", null, null, "", false, 360, new IntegerLiteral(1)); + } + + private FunctionSignature signature() { + return FunctionSignature.ret(IntegerType.INSTANCE).args(IntegerType.INSTANCE); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowFunctionsCommandTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowFunctionsCommandTest.java index ba42ef9c6e241c..716b9fa24da4db 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowFunctionsCommandTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ShowFunctionsCommandTest.java @@ -24,6 +24,10 @@ import org.apache.doris.catalog.AccessPrivilegeWithCols; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Function; +import org.apache.doris.catalog.FunctionName; +import org.apache.doris.catalog.FunctionVolatility; +import org.apache.doris.catalog.ScalarFunction; +import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.mysql.privilege.Auth; @@ -111,6 +115,52 @@ void testLike() { Assertions.assertTrue(sf.like("test_for_create_function", "test_for_create_function%")); } + @Test + void testBuildProperties_scalarUdfEmitsVolatility() { + ShowFunctionsCommand sf = new ShowFunctionsCommand("test", true, null); + ScalarFunction fn = ScalarFunction.createUdf(Function.BinaryType.JAVA_UDF, + new FunctionName("test", "java_scalar_fn"), new Type[] {Type.INT}, + Type.INT, false, null, "com.example.ScalarFn", null, null); + fn.setVolatility(FunctionVolatility.IMMUTABLE); + + String properties = sf.buildPropertiesForTest(fn); + + Assertions.assertTrue(properties.contains("SYMBOL=com.example.ScalarFn")); + Assertions.assertTrue(properties.contains("VOLATILITY=immutable")); + } + + @Test + void testBuildProperties_javaUdtfDoesNotEmitVolatility() { + ShowFunctionsCommand sf = new ShowFunctionsCommand("test", true, null); + ScalarFunction fn = ScalarFunction.createUdf(Function.BinaryType.JAVA_UDF, + new FunctionName("test", "java_table_fn"), new Type[] {Type.INT}, + Type.INT, false, null, "com.example.TableFn", null, null); + fn.setUDTFunction(true); + fn.setVolatility(FunctionVolatility.IMMUTABLE); + + String properties = sf.buildPropertiesForTest(fn); + + Assertions.assertTrue(properties.contains("SYMBOL=com.example.TableFn")); + Assertions.assertFalse(properties.contains("VOLATILITY")); + } + + @Test + void testBuildProperties_pythonUdtfDoesNotEmitVolatility() { + ShowFunctionsCommand sf = new ShowFunctionsCommand("test", true, null); + ScalarFunction fn = ScalarFunction.createUdf(Function.BinaryType.PYTHON_UDF, + new FunctionName("test", "py_table_fn"), new Type[] {Type.INT}, + Type.INT, false, null, "evaluate", null, null); + fn.setUDTFunction(true); + fn.setRuntimeVersion("3.10.2"); + fn.setVolatility(FunctionVolatility.IMMUTABLE); + + String properties = sf.buildPropertiesForTest(fn); + + Assertions.assertTrue(properties.contains("RUNTIME_VERSION=3.10.2")); + Assertions.assertTrue(properties.contains("SYMBOL=evaluate")); + Assertions.assertFalse(properties.contains("VOLATILITY")); + } + @Test void testAuth() throws Exception { auth = Env.getCurrentEnv().getAuth(); diff --git a/regression-test/suites/javaudf_p0/test_javaudf_float.groovy b/regression-test/suites/javaudf_p0/test_javaudf_float.groovy index 5372bda71c4f3e..c67488acadc889 100644 --- a/regression-test/suites/javaudf_p0/test_javaudf_float.groovy +++ b/regression-test/suites/javaudf_p0/test_javaudf_float.groovy @@ -56,7 +56,8 @@ suite("test_javaudf_float") { sql """ CREATE FUNCTION java_udf_float_test(FLOAT,FLOAT) RETURNS FLOAT PROPERTIES ( "file"="file://${jarPath}", "symbol"="org.apache.doris.udf.FloatTest", - "type"="JAVA_UDF" + "type"="JAVA_UDF", + "volatility"="immutable" ); """ qt_select """ SELECT java_udf_float_test(cast(2.83645 as float),cast(111.1111111 as float)) as result; """ diff --git a/regression-test/suites/mtmv_p0/test_expand_star_mtmv.groovy b/regression-test/suites/mtmv_p0/test_expand_star_mtmv.groovy index f550dc78c3db33..034765a0a45be5 100644 --- a/regression-test/suites/mtmv_p0/test_expand_star_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_expand_star_mtmv.groovy @@ -62,7 +62,8 @@ suite("test_expand_star_mtmv","mtmv") { sql """ CREATE FUNCTION ${functionName}(date, date) RETURNS boolean PROPERTIES ( "file"="file://${jarPath}", "symbol"="org.apache.doris.udf.DateTest1", - "type"="JAVA_UDF" + "type"="JAVA_UDF", + "volatility"="immutable" ); """ sql """ diff --git a/regression-test/suites/pythonudf_p0/test_pythonudf_aggregate.groovy b/regression-test/suites/pythonudf_p0/test_pythonudf_aggregate.groovy index 6897e42ee24fe2..59940e4ed417f9 100644 --- a/regression-test/suites/pythonudf_p0/test_pythonudf_aggregate.groovy +++ b/regression-test/suites/pythonudf_p0/test_pythonudf_aggregate.groovy @@ -28,7 +28,8 @@ suite("test_pythonudf_aggregate") { PROPERTIES ( "type" = "PYTHON_UDF", "symbol" = "evaluate", - "runtime_version" = "${runtime_version}" + "runtime_version" = "${runtime_version}", + "volatility" = "immutable" ) AS \$\$ def evaluate(score): @@ -120,7 +121,8 @@ def evaluate(score): PROPERTIES ( "type" = "PYTHON_UDF", "symbol" = "evaluate", - "runtime_version" = "${runtime_version}" + "runtime_version" = "${runtime_version}", + "volatility" = "immutable" ) AS \$\$ def evaluate(age): diff --git a/regression-test/suites/pythonudf_p0/test_pythonudf_float.groovy b/regression-test/suites/pythonudf_p0/test_pythonudf_float.groovy index 7a26136ed2d41c..0d621a5abf381d 100644 --- a/regression-test/suites/pythonudf_p0/test_pythonudf_float.groovy +++ b/regression-test/suites/pythonudf_p0/test_pythonudf_float.groovy @@ -53,7 +53,8 @@ suite("test_pythonudf_float") { "symbol"="float_test.evaluate", "type"="PYTHON_UDF", "runtime_version" = "${runtime_version}", - "always_nullable" = "true" + "always_nullable" = "true", + "volatility" = "immutable" ); """ qt_select """ SELECT python_udf_float_test(cast(2.83645 as float),cast(111.1111111 as float)) as result; """ diff --git a/regression-test/suites/pythonudf_p0/test_pythonudf_volatility.groovy b/regression-test/suites/pythonudf_p0/test_pythonudf_volatility.groovy new file mode 100644 index 00000000000000..7cc2f0f89b400c --- /dev/null +++ b/regression-test/suites/pythonudf_p0/test_pythonudf_volatility.groovy @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert + +suite("test_pythonudf_volatility") { + def runtimeVersion = getPythonUdfRuntimeVersion() + def functions = [ + "py_vol_immutable", + "py_vol_stable", + "py_vol_volatile", + "py_vol_default" + ] + def materializedViews = [ + "py_vol_immutable_mv", + "py_vol_stable_mv", + "py_vol_volatile_mv", + "py_vol_default_mv" + ] + + materializedViews.each { mv -> + sql """ DROP MATERIALIZED VIEW IF EXISTS ${mv}; """ + } + sql """ DROP TABLE IF EXISTS py_vol_tbl; """ + functions.each { fn -> + sql """ DROP FUNCTION IF EXISTS ${fn}(INT); """ + } + + sql """ + CREATE TABLE py_vol_tbl ( + k INT + ) + DISTRIBUTED BY HASH(k) BUCKETS 1 + PROPERTIES("replication_num" = "1"); + """ + sql """ INSERT INTO py_vol_tbl VALUES (1), (2); """ + + sql """ + CREATE FUNCTION py_vol_immutable(INT) + RETURNS INT + PROPERTIES ( + "type" = "PYTHON_UDF", + "symbol" = "evaluate", + "runtime_version" = "${runtimeVersion}", + "volatility" = "immutable" + ) +AS \$\$ +def evaluate(x): + if x is None: + return None + return x + 1 +\$\$; + """ + + sql """ + CREATE FUNCTION py_vol_stable(INT) + RETURNS INT + PROPERTIES ( + "type" = "PYTHON_UDF", + "symbol" = "evaluate", + "runtime_version" = "${runtimeVersion}", + "volatility" = "stable" + ) +AS \$\$ +def evaluate(x): + if x is None: + return None + return x + 2 +\$\$; + """ + + sql """ + CREATE FUNCTION py_vol_volatile(INT) + RETURNS INT + PROPERTIES ( + "type" = "PYTHON_UDF", + "symbol" = "evaluate", + "runtime_version" = "${runtimeVersion}", + "volatility" = "volatile" + ) +AS \$\$ +def evaluate(x): + if x is None: + return None + return x + 3 +\$\$; + """ + + sql """ + CREATE FUNCTION py_vol_default(INT) + RETURNS INT + PROPERTIES ( + "type" = "PYTHON_UDF", + "symbol" = "evaluate", + "runtime_version" = "${runtimeVersion}" + ) +AS \$\$ +def evaluate(x): + if x is None: + return None + return x + 4 +\$\$; + """ + + def result = sql """ + SELECT + py_vol_immutable(1), + py_vol_stable(1), + py_vol_volatile(1), + py_vol_default(1); + """ + Assert.assertEquals([[2, 3, 4, 5]], result) + + def showCreateResult = sql """ SHOW CREATE FUNCTION py_vol_immutable(INT); """ + assertTrue(showCreateResult.size() == 1) + def replaySql = showCreateResult[0][1].toString() + assertTrue(replaySql.contains("\"RUNTIME_VERSION\"=\"${runtimeVersion}\"")) + assertTrue(replaySql.contains("\"VOLATILITY\"=\"immutable\"")) + assertTrue(replaySql.contains("AS \$\$")) + assertTrue(replaySql.contains("return x + 1")) + + sql """ DROP FUNCTION py_vol_immutable(INT); """ + sql replaySql + result = sql """ SELECT py_vol_immutable(1); """ + Assert.assertEquals([[2]], result) + + explain { + sql "logical plan SELECT * FROM py_vol_tbl WHERE py_vol_immutable(k) IN (1, k + 1)" + contains "OR[" + notContains " IN " + } + + explain { + sql "logical plan SELECT * FROM py_vol_tbl WHERE py_vol_stable(k) IN (1, k + 2)" + contains "OR[" + notContains " IN " + } + + explain { + sql "logical plan SELECT * FROM py_vol_tbl WHERE py_vol_volatile(k) IN (1, k + 3)" + contains " IN " + notContains "OR[" + } + + explain { + sql "logical plan SELECT * FROM py_vol_tbl WHERE py_vol_default(k) IN (1, k + 4)" + contains " IN " + notContains "OR[" + } + + result = sql """ + SELECT py_vol_volatile(k), COUNT(*) + FROM py_vol_tbl + GROUP BY py_vol_volatile(k) + ORDER BY 1; + """ + Assert.assertEquals("[[4, 1], [5, 1]]", result.toString()) + + createMV(""" + CREATE MATERIALIZED VIEW py_vol_immutable_mv + AS SELECT py_vol_immutable(k) AS v FROM py_vol_tbl; + """) + + test { + sql """ + CREATE MATERIALIZED VIEW py_vol_stable_mv + AS SELECT py_vol_stable(k) AS v_stable FROM py_vol_tbl; + """ + exception "can not contain nonDeterministic expression or unnest" + } + + test { + sql """ + CREATE MATERIALIZED VIEW py_vol_volatile_mv + AS SELECT py_vol_volatile(k) AS v_volatile FROM py_vol_tbl; + """ + exception "can not contain nonDeterministic expression or unnest" + } + + test { + sql """ + CREATE MATERIALIZED VIEW py_vol_default_mv + AS SELECT py_vol_default(k) AS v_default FROM py_vol_tbl; + """ + exception "can not contain nonDeterministic expression or unnest" + } +}