Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,36 @@ case class AnalysisContext(

def getSinglePassResolverBridgeState: Option[AnalyzerBridgeState] =
singlePassResolverBridgeState

/**
* Per-pass memo of the SQL resolution search path (SPARK-57758). Function resolution computes
* the ordered path once per analysis pass and reuses it for every [[UnresolvedFunction]],
* instead of rebuilding and re-iterating it per node -- the cost that, under Spark Connect's
* repeated re-analysis of the growing plan, scaled with plan size x analyze calls.
*
* The memo lives on the context so it shares the context's per-pass lifetime. `SET PATH` /
* `USE` / conf changes all produce a fresh context ([[reset]], [[withNewAnalysisContext]], or
* the `copy` / construction for a view or SQL-function body), and a body-level field is not
* carried over by `copy`, so a new pass automatically starts with an empty memo and the memo is
* collected with the context. It is therefore safe without an identity key or weak reference,
* but only for values derived from this context's immutable fields (the path derives from
* `resolutionPathEntries` / `catalogAndNamespace`); never memoize anything derived from the
* mutable fields above (`relationCache`, `referredTempFunctionNames`, ...).
*
* INVARIANT: keep this a body `var`, never a constructor parameter. `.copy()` (used by
* `withAnalysisContext(function)` and `withOuterPlan`) deliberately does not carry a body
* field, which is what gives a SQL-function-body / outer-plan context a fresh memo. Promoting
* it to a parameter would copy a stale path across that boundary and silently mis-resolve
* (SECTION 17f of `FunctionQualificationSuite` is the regression guard).
*/
private var resolutionPathMemo: Seq[Seq[String]] = _

def memoizedResolutionPath(compute: => Seq[Seq[String]]): Seq[Seq[String]] = {
if (resolutionPathMemo == null) {
resolutionPathMemo = compute
}
resolutionPathMemo
}
}

object AnalysisContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,37 @@ class FunctionResolution(
* directly, matching [[RelationResolution.relationResolutionEntries]] so routine order stays
* aligned with relation order.
*/
private[analysis] def sqlResolutionPathEntriesForAnalysis: Seq[Seq[String]] =
catalogManager.resolutionPathEntriesForAnalysis(
AnalysisContext.get.resolutionPathEntries,
AnalysisContext.get.catalogAndNamespace)
private[analysis] def sqlResolutionPathEntriesForAnalysis: Seq[Seq[String]] = {
// Per-analysis-pass memo (SPARK-57758): computing the path (reading the live [[CatalogManager]]
// and several confs, then allocating `Seq`s) used to run once per [[UnresolvedFunction]], and
// under Spark Connect once per node on every re-analysis of the growing plan. The path is
// stable within a pass (`SET PATH` / `USE` / conf changes happen between passes, each under a
// fresh [[AnalysisContext]]), so it is memoized on the current context, which shares the pass's
// lifetime. See [[AnalysisContext.memoizedResolutionPath]] for why this needs no identity key.
val context = AnalysisContext.get
context.memoizedResolutionPath {
catalogManager.resolutionPathEntriesForAnalysis(
context.resolutionPathEntries, context.catalogAndNamespace)
}
}

/**
* True when `system.builtin` is the first entry of the effective resolution path. In that case a
* single-part name that resolves to a built-in cannot be shadowed by any earlier entry -- neither
* a `system.session` entry (a temporary/session function) nor a catalog/schema entry placed
* before `system.builtin` by a custom `SET PATH` -- so the built-in fast-path in
* [[resolveFunction]] / [[resolveTableFunction]] cannot change resolution precedence. A miss
* still falls through to the full candidate loop, so non-built-in names are unaffected.
*
* The default `spark.sql.functionResolution.sessionOrder` modes `second` and `last` put
* `system.builtin` first; only `first` puts `system.session` before it, where the fast-path is
* correctly disabled. Only a custom `SET PATH` can place another entry before `system.builtin`.
*
* Reads the per-pass memoized path ([[sqlResolutionPathEntriesForAnalysis]]), so the check is
* O(1) per [[UnresolvedFunction]].
*/
private def builtinFastPathSafe: Boolean =
CatalogManager.isBuiltinFirstOnPath(sqlResolutionPathEntriesForAnalysis)

private def resolutionCandidates(nameParts: Seq[String]): Seq[Seq[String]] = {
if (nameParts.size == 1) {
Expand All @@ -134,6 +161,12 @@ class FunctionResolution(
private def resolveFunctionCandidate(
nameParts: Seq[String],
unresolvedFunc: UnresolvedFunction): Option[Expression] = {
// NOTE: the `system.builtin.<name>` case here is the same registry lookup the built-in
// fast-path in `resolveFunction` performs directly (both go through
// `identifierFromSystemNameParts` / `builtinFunctionIdentifier` ->
// `resolveScalarFunctionByIdentifier`). The two must stay equivalent; a change to built-in
// scalar resolution has to touch both. `resolveTableFunctionCandidate` / `resolveTableFunction`
// mirror this for table functions.
if (isSystemCatalogQualified(nameParts)) {
v1SessionCatalog.identifierFromSystemNameParts(nameParts).flatMap { ident =>
val expr = v1SessionCatalog.resolveScalarFunctionByIdentifier(
Expand Down Expand Up @@ -189,6 +222,23 @@ class FunctionResolution(
}
}

// Fast-path (SPARK-57758): an unqualified, non-internal name that resolves to a built-in
// is by far the common case. When `system.builtin` is the first entry of the effective path,
// a built-in hit cannot be shadowed by any earlier entry (a session/temporary function, or a
// catalog/schema placed before `system.builtin` via `SET PATH`), so it can be resolved with a
// single registry lookup instead of building and iterating the candidate search path. A miss
// falls through to the full candidate resolution below. This lookup is equivalent to the
// `system.builtin.<name>` branch of `resolveFunctionCandidate`; keep the two in sync.
if (unresolvedFunc.nameParts.size == 1 && !unresolvedFunc.isInternal &&
builtinFastPathSafe) {
val builtin = v1SessionCatalog.resolveScalarFunctionByIdentifier(
FunctionRegistry.builtinFunctionIdentifier(unresolvedFunc.nameParts.head),
unresolvedFunc.arguments)
if (builtin.isDefined) {
return validateFunction(builtin.get, unresolvedFunc.arguments.length, unresolvedFunc)
}
}

val candidates = resolutionCandidates(unresolvedFunc.nameParts)
for (nameParts <- candidates) {
resolveFunctionCandidate(nameParts, unresolvedFunc) match {
Expand Down Expand Up @@ -263,6 +313,16 @@ class FunctionResolution(
def resolveTableFunction(
nameParts: Seq[String],
arguments: Seq[Expression]): Option[LogicalPlan] = {
// Fast-path (SPARK-57758): see `resolveFunction`. Short-circuit a single-part name to a
// built-in table function when `system.builtin` is the first entry of the path; a miss
// (including a built-in scalar of the same name) falls through to the candidate loop, which
// preserves the NOT_A_TABLE_FUNCTION semantics.
if (nameParts.size == 1 && builtinFastPathSafe) {
val builtin = v1SessionCatalog.resolveTableFunctionByIdentifier(
FunctionRegistry.builtinFunctionIdentifier(nameParts.head), arguments)
if (builtin.isDefined) return builtin
}

val candidates = resolutionCandidates(nameParts)
for (nameParts <- candidates) {
resolveTableFunctionCandidate(nameParts, arguments) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,15 @@ private[sql] object CatalogManager extends Logging {
parts.head.equalsIgnoreCase(SYSTEM_CATALOG_NAME) &&
parts(1).equalsIgnoreCase(BUILTIN_NAMESPACE)

/**
* True when `system.builtin` is the first entry of `pathEntries`. This is the path-shape
* condition under which a built-in function found by an unqualified single-part name cannot be
* shadowed by any earlier path entry -- the precise property the function-resolution built-in
* fast-path relies on. Pure predicate over the path shape; callers decide how to use it.
*/
def isBuiltinFirstOnPath(pathEntries: Seq[Seq[String]]): Boolean =
pathEntries.headOption.exists(isSystemBuiltinPathEntry)

/**
* Extract `system.builtin` / `system.session` entries from a resolved PATH, mapped to
* [[SessionCatalog.SessionFunctionKind]] in path order. Pure data conversion -- callers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,30 @@ class CatalogManagerSuite extends SparkFunSuite with SQLHelper {
assert(e.getMessage.contains("default.v_broken"))
}

test("isBuiltinFirstOnPath: SPARK-57758 gate for the built-in function fast-path") {
// The function-resolution built-in fast-path is safe (cannot be shadowed) only when
// `system.builtin` is the FIRST path entry. The fast-path has no behavioral signature -- the
// slow candidate loop yields identical results -- so this pure-predicate test guards the gate
// directly: a regression that silently stopped the fast-path from firing (re-introducing the
// SPARK-57758 perf regression) or fired it when a catalog precedes `system.builtin`
// (re-introducing the precedence bug) would leave the behavioral SQL tests green.
val builtin = Seq("system", "builtin")
val session = Seq("system", "session")
val catalog = Seq("spark_catalog", "some_schema")
// Default `sessionOrder` modes `second` / `last` keep `system.builtin` first -> safe.
assert(CatalogManager.isBuiltinFirstOnPath(Seq(builtin, session)))
assert(CatalogManager.isBuiltinFirstOnPath(Seq(builtin)))
assert(CatalogManager.isBuiltinFirstOnPath(Seq(builtin, catalog, session)))
// Case-insensitive match on the well-known entry.
assert(CatalogManager.isBuiltinFirstOnPath(Seq(Seq("System", "Builtin"), session)))
// `first` mode (session before builtin) and custom `SET PATH` shapes that place any entry
// before `system.builtin` -> not safe.
assert(!CatalogManager.isBuiltinFirstOnPath(Seq(session, builtin)))
assert(!CatalogManager.isBuiltinFirstOnPath(Seq(catalog, builtin)))
assert(!CatalogManager.isBuiltinFirstOnPath(Seq(catalog)))
assert(!CatalogManager.isBuiltinFirstOnPath(Seq.empty))
}

// ---------------------------------------------------------------------------
// Direct unit tests for [[PathElement.validateNoStaticDuplicates]]. The end-to-end
// `SetPathSuite` exercises this via SQL, but the duplicate-detection rules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1290,6 +1290,154 @@ class FunctionQualificationSuite extends SharedSparkSession {
checkAnswer(sql("SELECT system.builtin.abs(-5)"), Row(5))
}
}

test("SECTION 17a: SPARK-57758 built-in fast-path respects dynamic session order") {
withTempView("t") {
sql("CREATE TEMPORARY VIEW t AS SELECT 1 AS id")
// Register a temp function shadowing builtin `abs` (overrideIfExists allows any order).
spark.udf.register("abs", (x: Int) => x + 100)
try {
// Default order (second): builtin precedes session, so the fast-path returns the builtin
// and the temp is reachable only via `session.` qualification.
withSQLConf("spark.sql.functionResolution.sessionOrder" -> "second") {
checkAnswer(sql("SELECT abs(-5) FROM t"), Row(5))
checkAnswer(sql("SELECT session.abs(-5) FROM t"), Row(95))
}
// first: session precedes builtin, so the fast-path is bypassed and the temp shadows the
// builtin for unqualified names. Same session, so this also exercises per-pass recompute.
withSQLConf("spark.sql.functionResolution.sessionOrder" -> "first") {
checkAnswer(sql("SELECT abs(-5) FROM t"), Row(95))
checkAnswer(sql("SELECT builtin.abs(-5) FROM t"), Row(5))
}
} finally {
spark.sessionState.catalog.dropTempFunction("abs", ignoreIfNotExists = true)
}
}
}

test("SECTION 17b: SPARK-57758 built-in table-function fast-path") {
// Smoke test that built-in / extension table functions resolve unqualified. Under the default
// path `system.builtin` leads, so the slow loop's first candidate is already the built-in --
// this yields the same rows whether or not the fast-path fires, and so does not by itself
// distinguish the gate's on/off behavior. That signal is asserted by
// CatalogManagerSuite.isBuiltinFirstOnPath and by SECTION 17c/17d/17e.
checkAnswer(sql("SELECT * FROM range(3)"), Seq(Row(0), Row(1), Row(2)))
// Extension table functions (stored as builtins) also resolve unqualified.
checkAnswer(sql("SELECT * FROM test_ext_table_func()"), Seq(Row(0L), Row(1L), Row(2L)))
}

test("SECTION 17c: SPARK-57758 fast-path is bypassed when a catalog precedes system.builtin") {
// SPARK-57758 regression guard: the built-in fast-path is safe only when `system.builtin` is
// the FIRST path entry. A custom `SET PATH` can place a catalog/schema before `system.builtin`,
// and an unqualified name found there must win over the built-in -- the fast-path must not
// short-circuit to the built-in. (The default `sessionOrder` modes always keep catalogs after
// `system.builtin`, so only a custom SET PATH exercises this.)
withSQLConf(SQLConf.PATH_ENABLED.key -> "true") {
withDatabase("path_abs_shadow") {
sql("CREATE DATABASE path_abs_shadow")
// Persistent function shadowing the built-in `abs`.
sql("CREATE FUNCTION path_abs_shadow.abs(x INT) RETURNS INT RETURN x * 10")
try {
// Catalog before system.builtin: the persistent `abs` must win (50), not the
// built-in (5). Pre-fix, the fast-path returned the built-in here.
sql("SET PATH = spark_catalog.path_abs_shadow, system.builtin")
checkAnswer(sql("SELECT abs(5)"), Row(50))
// system.builtin first: the fast-path is enabled and resolves the built-in (abs(-5) = 5),
// confirming the optimization still applies when builtin leads the path.
sql("SET PATH = system.builtin, spark_catalog.path_abs_shadow")
checkAnswer(sql("SELECT abs(-5)"), Row(5))
} finally {
sql("SET PATH = DEFAULT_PATH")
sql("DROP FUNCTION IF EXISTS path_abs_shadow.abs")
}
}
}
}

test("SECTION 17d: SPARK-57758 table-function fast-path is bypassed when a catalog precedes " +
"system.builtin") {
// Table-function counterpart of SECTION 17c: the table-function fast-path shares the same
// `builtinFastPathSafe` gate, so a persistent table function in a schema placed before
// `system.builtin` via `SET PATH` must win over the built-in TVF of the same name.
withSQLConf(SQLConf.PATH_ENABLED.key -> "true") {
withDatabase("path_range_shadow") {
sql("CREATE DATABASE path_range_shadow")
// Persistent table function shadowing the built-in `range` (ignores its argument).
sql("CREATE FUNCTION path_range_shadow.range(n INT) RETURNS TABLE(id INT) RETURN SELECT 99")
try {
// Catalog before system.builtin: the persistent `range` must win (one row [99]), not the
// built-in `range(1)` (one row [0]). Pre-fix, the fast-path returned the built-in here.
sql("SET PATH = spark_catalog.path_range_shadow, system.builtin")
checkAnswer(sql("SELECT * FROM range(1)"), Row(99))
// system.builtin first: the fast-path resolves the built-in `range(1)` (one row [0]).
sql("SET PATH = system.builtin, spark_catalog.path_range_shadow")
checkAnswer(sql("SELECT * FROM range(1)"), Row(0))
} finally {
sql("SET PATH = DEFAULT_PATH")
sql("DROP FUNCTION IF EXISTS path_range_shadow.range")
}
}
}
}

test("SECTION 17e: SPARK-57758 built-in fast-path raises the built-in's argument error rather " +
"than falling through to a same-named session function") {
// Invariant: the fast-path and the slow candidate loop must fail on the SAME candidate.
// `resolveScalarFunctionByIdentifier` returns None only when the built-in is absent; an
// existing built-in invoked with bad arity throws. So with `system.builtin` first, an
// unqualified `abs(1, 2)` must hit the built-in `abs` (1-arg) and raise its argument error --
// it must NOT silently fall through to a compatible 2-arg session `abs`. The slow loop would
// also hit `system.builtin.abs` first and throw, so the two paths stay equivalent.
withTempView("t") {
sql("CREATE TEMPORARY VIEW t AS SELECT 1 AS id")
spark.udf.register("abs", (x: Int, y: Int) => x + y)
try {
withSQLConf("spark.sql.functionResolution.sessionOrder" -> "second") {
// The 2-arg session function is reachable via explicit `session.` qualification...
checkAnswer(sql("SELECT session.abs(1, 2) FROM t"), Row(3))
// ...but the unqualified name resolves to the built-in `abs` first and raises its
// argument error instead of resolving the 2-arg session function.
intercept[AnalysisException](sql("SELECT abs(1, 2) FROM t").collect())
}
} finally {
spark.sessionState.catalog.dropTempFunction("abs", ignoreIfNotExists = true)
}
}
}

test("SECTION 17f: SPARK-57758 per-pass memo recomputes for a SQL-function body whose pinned " +
"path differs from the caller") {
// The subtlest recompute trigger: a SQL-function body runs under a FRESH AnalysisContext
// carrying the function's own stored resolution path (not the caller's). The memo lives on the
// context, so the body computes its own path within the same analysis pass as the outer query.
// Here the body's `abs` is pinned to the built-in (created under a builtin-first path) while
// the caller resolves the unqualified `abs` to a shadowing persistent function (catalog-first)
// -- so a single statement must yield both resolutions, proving neither context reuses the
// other's memo.
withSQLConf(SQLConf.PATH_ENABLED.key -> "true") {
withDatabase("path_body") {
sql("CREATE DATABASE path_body")
// Persistent `abs` shadowing the built-in for the caller's catalog-first path.
sql("CREATE FUNCTION path_body.abs(x INT) RETURNS INT RETURN x * 10")
try {
// Create the function body while `system.builtin` leads -> the body's unqualified `abs`
// is pinned to the built-in.
sql("SET PATH = system.builtin, spark_catalog.path_body")
sql("CREATE FUNCTION path_body.use_abs(x INT) RETURNS INT RETURN abs(x)")
// Caller path puts the catalog first -> a top-level unqualified `abs` resolves to the
// persistent `abs` (x * 10), while the body keeps resolving its `abs` to the built-in.
sql("SET PATH = spark_catalog.path_body, system.builtin")
checkAnswer(
sql("SELECT abs(-5) AS top, path_body.use_abs(-5) AS body"),
Row(-50, 5))
} finally {
sql("SET PATH = DEFAULT_PATH")
sql("DROP FUNCTION IF EXISTS path_body.use_abs")
sql("DROP FUNCTION IF EXISTS path_body.abs")
}
}
}
}
}

/**
Expand Down