Skip to content

Commit f16570f

Browse files
srielaucloud-fan
authored andcommitted
[SPARK-54807][SQL] Allow qualified names for built-in and session functions
[design_sketch.md](https://github.com/user-attachments/files/25671695/design_sketch.md) ### What changes were proposed in this pull request? - Allow reference of built in functions with qualifiers builtin or system.builtin and temporary functions as session or system.session. Functions registered as extensionz can be qualified with system.extension or extension. - Cleaned up APIs to resolve functions to prep for configurable path - Register builtin extension, and session functions with qualified names, so they can co-exist - Fix a bug that allowed session functions with the same name to co-exist as table and scalar functions [design_sketch.md](https://github.com/user-attachments/files/25671695/design_sketch.md) ### Why are the changes needed? This portion of work allows users to excplicitly pick a builtin or temporary function, the same way they would pick a persisted function by fully qualifying it. This increases security. WIth this we now have a fixed order: extension -> builtin -> session -> current schema for function resolution. In follow on work we plan to allow the priority of function resolution to be configurable, for example to push temporary functions after built-ins or even after persisted functions. Ultimately we aim for proper SQL Standard PATH support where a user can add "libraries" of functions to the path. ### Does this PR introduce _any_ user-facing change? You can now reference builtin functions such as concat with builtin.concat or system.builtin.concat. Teh same for temporary functions which can be qualified as session, or system.session. ### How was this patch tested? A new suite: functionQualificationSuite.scala has been added ### Was this patch authored or co-authored using generative AI tooling? Yes: Claude Sonnet Closes #53570 from srielau/search-path. Lead-authored-by: Serge Rielau <serge@rielau.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Serge Rielau <srielau@users.noreply.github.com> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent 4ebdc4a commit f16570f

39 files changed

Lines changed: 2583 additions & 442 deletions

File tree

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4116,6 +4116,12 @@
41164116
},
41174117
"sqlState" : "42823"
41184118
},
4119+
"INVALID_TEMP_OBJ_QUALIFIER" : {
4120+
"message" : [
4121+
"Temporary <objectType> <objectName> cannot be qualified with <qualifier>. Temporary objects can only be qualified with SESSION or SYSTEM.SESSION."
4122+
],
4123+
"sqlState" : "42602"
4124+
},
41194125
"INVALID_TEMP_OBJ_REFERENCE" : {
41204126
"message" : [
41214127
"Cannot create the persistent object <objName> of the type <obj> because it references to the temporary object <tempObjName> of the type <tempObj>. Please make the temporary object <tempObjName> persistent, or make the persistent object <objName> temporary."
@@ -4186,6 +4192,12 @@
41864192
],
41874193
"sqlState" : "42000"
41884194
},
4195+
"INVALID_USAGE_OF_STAR_WITH_TABLE_IDENTIFIER_IN_COUNT" : {
4196+
"message" : [
4197+
"count(<tableName>.*) is not allowed. Use count(*) or expand the columns manually, e.g. count(col1, col2)."
4198+
],
4199+
"sqlState" : "42000"
4200+
},
41894201
"INVALID_UTF8_STRING" : {
41904202
"message" : [
41914203
"Invalid UTF8 byte sequence found in string: <str>."

mllib/src/main/scala/org/apache/spark/sql/ml/InternalFunctionRegistration.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import org.apache.spark.ml.linalg.{SparseVector, Vector, Vectors}
2020
import org.apache.spark.ml.stat._
2121
import org.apache.spark.mllib.linalg.{SparseVector => OldSparseVector, Vector => OldVector}
2222
import org.apache.spark.sql.{SparkSessionExtensions, SparkSessionExtensionsProvider}
23+
import org.apache.spark.sql.catalyst.FunctionIdentifier
2324
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
2425
import org.apache.spark.sql.catalyst.expressions.{Expression, StringLiteral}
2526
import org.apache.spark.sql.classic.UserDefinedFunctionUtils.toScalaUDF
@@ -41,7 +42,8 @@ object InternalFunctionRegistration {
4142
}
4243

4344
private def registerFunction(name: String)(builder: Seq[Expression] => Expression): Unit = {
44-
FunctionRegistry.internal.createOrReplaceTempFunction(name, builder, "internal")
45+
FunctionRegistry.internal.registerFunction(
46+
FunctionIdentifier(name), builder, "internal")
4547
}
4648

4749
private val vectorToArrayUdf = udf { vec: Any =>

sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,20 @@ private[sql] object QueryParsingErrors extends DataTypeErrorsBase {
674674
new ParseException(errorClass = "INVALID_SQL_SYNTAX.CREATE_TEMP_FUNC_WITH_IF_NOT_EXISTS", ctx)
675675
}
676676

677+
def invalidTempObjQualifierError(
678+
objectType: String,
679+
objectName: String,
680+
qualifier: String,
681+
ctx: ParserRuleContext): Throwable = {
682+
new ParseException(
683+
"INVALID_TEMP_OBJ_QUALIFIER",
684+
Map(
685+
"objectType" -> objectType,
686+
"objectName" -> toSQLId(objectName),
687+
"qualifier" -> toSQLId(qualifier)),
688+
ctx)
689+
}
690+
677691
def unsupportedFunctionNameError(funcName: Seq[String], ctx: ParserRuleContext): Throwable = {
678692
new ParseException(
679693
errorClass = "INVALID_SQL_SYNTAX.MULTI_PART_NAME",

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 95 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1823,23 +1823,46 @@ class Analyzer(
18231823
case _ => false
18241824
}
18251825

1826+
/**
1827+
* Checks if the given function name parts match the expected builtin function name.
1828+
* This is used for special syntax transformations (e.g., COUNT(*) -> COUNT(1)) that
1829+
* should only apply to builtin functions, not to user-defined functions.
1830+
*
1831+
* In legacy mode (sessionOrder="first"), temp functions shadow builtins, so an
1832+
* unqualified name that matches a temp function should NOT be treated as builtin.
1833+
*/
1834+
private def matchesFunctionName(nameParts: Seq[String], expectedName: String): Boolean = {
1835+
if (!FunctionResolution.isUnqualifiedOrBuiltinFunctionName(nameParts, expectedName)) {
1836+
return false
1837+
}
1838+
if (nameParts.size == 1 && conf.sessionFunctionResolutionOrder == "first") {
1839+
val v1Catalog = catalogManager.v1SessionCatalog
1840+
!v1Catalog.isTemporaryFunction(FunctionIdentifier(nameParts.head))
1841+
} else {
1842+
true
1843+
}
1844+
}
1845+
18261846
/**
18271847
* Expands the matching attribute.*'s in `child`'s output.
18281848
*/
18291849
def expandStarExpression(expr: Expression, child: LogicalPlan): Expression = {
18301850
expr.transformUp {
18311851
case f0: UnresolvedFunction if !f0.isDistinct &&
1832-
f0.nameParts.map(_.toLowerCase(Locale.ROOT)) == Seq("count") &&
1852+
matchesFunctionName(f0.nameParts, "count") &&
18331853
isCountStarExpansionAllowed(f0.arguments) =>
18341854
// Transform COUNT(*) into COUNT(1).
1835-
f0.copy(nameParts = Seq("count"), arguments = Seq(Literal(1)))
1855+
// We do not normalize the name to "count"; we keep the original name parts
1856+
// (e.g. builtin.count, system.builtin.count) so that resolution still sees
1857+
// the same qualification.
1858+
f0.copy(arguments = Seq(Literal(1)))
18361859
case f1: UnresolvedFunction if containsStar(f1.arguments) =>
18371860
// SPECIAL CASE: We want to block count(tblName.*) because in spark, count(tblName.*) will
18381861
// be expanded while count(*) will be converted to count(1). They will produce different
18391862
// results and confuse users if there are any null values. For count(t1.*, t2.*), it is
18401863
// still allowed, since it's well-defined in spark.
18411864
if (!conf.allowStarWithSingleTableIdentifierInCount &&
1842-
f1.nameParts == Seq("count") &&
1865+
matchesFunctionName(f1.nameParts, "count") &&
18431866
f1.arguments.length == 1) {
18441867
f1.arguments.foreach {
18451868
case u: UnresolvedStar if u.isQualifiedByTable(child.output, resolver) =>
@@ -1987,8 +2010,13 @@ class Analyzer(
19872010
* only performs simple existence check according to the function identifier to quickly identify
19882011
* undefined functions without triggering relation resolution, which may incur potentially
19892012
* expensive partition/schema discovery process in some cases.
1990-
* In order to avoid duplicate external functions lookup, the external function identifier will
1991-
* store in the local hash set externalFunctionNameSet.
2013+
*
2014+
* To avoid duplicate external catalog lookups, this rule maintains a per-plan cache of
2015+
* persistent function names (externalFunctionNameSet). Builtin and temporary functions are
2016+
* validated on every occurrence since they're fast in-memory lookups, but persistent functions
2017+
* are cached after the first validation to avoid repeated external catalog calls for the same
2018+
* function within a single plan.
2019+
*
19922020
* @see [[ResolveFunctions]]
19932021
* @see https://issues.apache.org/jira/browse/SPARK-19737
19942022
*/
@@ -1998,24 +2026,57 @@ class Analyzer(
19982026

19992027
plan.resolveExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) {
20002028
case f @ UnresolvedFunction(nameParts, _, _, _, _, _, _) =>
2001-
if (functionResolution.lookupBuiltinOrTempFunction(nameParts, Some(f)).isDefined) {
2029+
// For builtin/temp functions, we can do a quick check without catalog lookup
2030+
val quickCheck = if (nameParts.size == 1 ||
2031+
FunctionResolution.sessionNamespaceKind(nameParts).isDefined) {
2032+
functionResolution.lookupBuiltinOrTempFunction(nameParts, Some(f))
2033+
} else {
2034+
None
2035+
}
2036+
2037+
if (quickCheck.isDefined) {
2038+
// It's a builtin or temp function - no need for catalog lookup or caching
20022039
f
20032040
} else {
2041+
// Might be a persistent function - compute full name and check cache first
20042042
val CatalogAndIdentifier(catalog, ident) =
20052043
relationResolution.expandIdentifier(nameParts)
2006-
val fullName =
2007-
normalizeFuncName((catalog.name +: ident.namespace :+ ident.name).toImmutableArraySeq)
2044+
2045+
val fullName = normalizeFuncName(
2046+
(catalog.name +: ident.namespace :+ ident.name).toImmutableArraySeq)
2047+
20082048
if (externalFunctionNameSet.contains(fullName)) {
2009-
f
2010-
} else if (catalog.asFunctionCatalog.functionExists(ident)) {
2011-
externalFunctionNameSet.add(fullName)
2049+
// Already validated this function exists - skip lookup
20122050
f
20132051
} else {
2014-
val catalogPath = (catalog.name() +: catalogManager.currentNamespace).mkString(".")
2015-
throw QueryCompilationErrors.unresolvedRoutineError(
2016-
nameParts,
2017-
Seq("system.builtin", "system.session", catalogPath),
2018-
f.origin)
2052+
// Not in cache - do full lookup to determine type
2053+
val functionType =
2054+
functionResolution.lookupFunctionType(nameParts, Some(f))
2055+
2056+
functionType match {
2057+
case FunctionType.Local =>
2058+
throw SparkException.internalError(
2059+
s"Logic inconsistency: Function ${nameParts.mkString(".")} was " +
2060+
s"classified as $functionType by full lookup but not found by quick check. " +
2061+
s"Check sessionNamespaceKind logic.")
2062+
2063+
case FunctionType.Persistent =>
2064+
externalFunctionNameSet.add(fullName)
2065+
f
2066+
2067+
case FunctionType.TableOnly =>
2068+
throw QueryCompilationErrors.notAScalarFunctionError(nameParts.mkString("."), f)
2069+
2070+
case FunctionType.NotFound =>
2071+
val catalogPath =
2072+
catalogManager.currentCatalog.name +: catalogManager.currentNamespace
2073+
val searchPath = SQLConf.get.resolutionSearchPath(catalogPath.toSeq)
2074+
.map(_.quoted)
2075+
throw QueryCompilationErrors.unresolvedRoutineError(
2076+
nameParts,
2077+
searchPath,
2078+
f.origin)
2079+
}
20192080
}
20202081
}
20212082
}
@@ -2057,38 +2118,26 @@ class Analyzer(
20572118
// Resolve table-valued function references.
20582119
case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) =>
20592120
withPosition(u) {
2060-
try {
2061-
val resolvedFunc = functionResolution.resolveBuiltinOrTempTableFunction(
2062-
u.name, u.functionArgs).getOrElse {
2063-
val CatalogAndIdentifier(catalog, ident) =
2064-
relationResolution.expandIdentifier(u.name)
2065-
if (CatalogV2Util.isSessionCatalog(catalog)) {
2066-
v1SessionCatalog.resolvePersistentTableFunction(
2067-
ident.asFunctionIdentifier, u.functionArgs)
2068-
} else {
2069-
throw QueryCompilationErrors.missingCatalogTableValuedFunctionsAbilityError(
2070-
catalog)
2071-
}
2072-
}
2073-
resolvedFunc.transformAllExpressionsWithPruning(
2074-
_.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION)) {
2075-
case t: FunctionTableSubqueryArgumentExpression =>
2076-
resolvedFunc match {
2077-
case Generate(_: PythonUDTF, _, _, _, _, _) =>
2078-
case Generate(_: UnresolvedPolymorphicPythonUDTF, _, _, _, _, _) =>
2079-
case _ =>
2080-
assert(!t.hasRepartitioning,
2081-
"Cannot evaluate the table-valued function call because it included the " +
2082-
"PARTITION BY clause, but only Python table functions support this " +
2083-
"clause")
2084-
}
2085-
t
2086-
}
2087-
} catch {
2088-
case _: NoSuchFunctionException =>
2121+
val resolvedFunc = functionResolution.resolveTableFunction(u.name, u.functionArgs)
2122+
.getOrElse {
20892123
u.failAnalysis(
20902124
errorClass = "UNRESOLVABLE_TABLE_VALUED_FUNCTION",
20912125
messageParameters = Map("name" -> toSQLId(u.name)))
2126+
}
2127+
2128+
resolvedFunc.transformAllExpressionsWithPruning(
2129+
_.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION)) {
2130+
case t: FunctionTableSubqueryArgumentExpression =>
2131+
resolvedFunc match {
2132+
case Generate(_: PythonUDTF, _, _, _, _, _) =>
2133+
case Generate(_: UnresolvedPolymorphicPythonUDTF, _, _, _, _, _) =>
2134+
case _ =>
2135+
assert(!t.hasRepartitioning,
2136+
"Cannot evaluate the table-valued function call because it included the " +
2137+
"PARTITION BY clause, but only Python table functions support this " +
2138+
"clause")
2139+
}
2140+
t
20922141
}
20932142
}
20942143

@@ -2167,18 +2216,12 @@ class Analyzer(
21672216
ruleId) {
21682217
case u @ UnresolvedFunction(nameParts, arguments, _, _, _, _, _)
21692218
if functionResolution.hasLambdaAndResolvedArguments(arguments) => withPosition(u) {
2170-
functionResolution.resolveBuiltinOrTempFunction(nameParts, arguments, u).map {
2219+
functionResolution.resolveFunction(u) match {
21712220
case func: HigherOrderFunction => func
21722221
case other => other.failAnalysis(
21732222
errorClass = "INVALID_LAMBDA_FUNCTION_CALL.NON_HIGHER_ORDER_FUNCTION",
21742223
messageParameters = Map(
21752224
"class" -> other.getClass.getCanonicalName))
2176-
}.getOrElse {
2177-
throw QueryCompilationErrors.unresolvedRoutineError(
2178-
nameParts,
2179-
// We don't support persistent high-order functions yet.
2180-
Seq("system.builtin", "system.session"),
2181-
u.origin)
21822225
}
21832226
}
21842227

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,10 +303,12 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString
303303
u.tableNotFound(u.multipartIdentifier)
304304

305305
case u: UnresolvedFunctionName =>
306-
val catalogPath = (currentCatalog.name +: catalogManager.currentNamespace).mkString(".")
306+
val catalogPath = currentCatalog.name +: catalogManager.currentNamespace
307+
val searchPath = SQLConf.get.resolutionSearchPath(catalogPath.toSeq)
308+
.map(_.quoted)
307309
throw QueryCompilationErrors.unresolvedRoutineError(
308310
u.multipartIdentifier,
309-
Seq("system.builtin", "system.session", catalogPath),
311+
searchPath,
310312
u.origin)
311313

312314
case u: UnresolvedHint =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.variant._
3535
import org.apache.spark.sql.catalyst.expressions.xml._
3636
import org.apache.spark.sql.catalyst.plans.logical.{FunctionBuilderBase, Generate, LogicalPlan, OneRowRelation, PythonWorkerLogs, Range}
3737
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
38+
import org.apache.spark.sql.connector.catalog.CatalogManager
3839
import org.apache.spark.sql.errors.QueryCompilationErrors
3940
import org.apache.spark.sql.internal.SQLConf
4041
import org.apache.spark.sql.types._
@@ -78,10 +79,13 @@ trait FunctionRegistryBase[T] {
7879
/* Create or replace a temporary function. */
7980
final def createOrReplaceTempFunction(
8081
name: String, builder: FunctionBuilder, source: String): Unit = {
81-
registerFunction(
82-
FunctionIdentifier(name),
83-
builder,
84-
source)
82+
// Regular temporary functions are qualified with CatalogManager.SESSION_NAMESPACE
83+
// to enable coexistence with builtin functions of the same name
84+
val identifier = FunctionIdentifier(
85+
name,
86+
Some(CatalogManager.SESSION_NAMESPACE),
87+
Some(CatalogManager.SYSTEM_CATALOG_NAME))
88+
registerFunction(identifier, builder, source)
8589
}
8690

8791
@throws[AnalysisException]("If function does not exist")

0 commit comments

Comments
 (0)