diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 6cd985fd01fa0..b472003f9ab78 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -182,6 +182,36 @@ case class AnalysisContext( def getSinglePassResolverBridgeState: Option[AnalyzerBridgeState] = singlePassResolverBridgeState + + /** + * Per-pass memo of the SQL resolution search path (SPARK-57758). Function resolution computes + * the ordered path once per analysis pass and reuses it for every [[UnresolvedFunction]], + * instead of rebuilding and re-iterating it per node -- the cost that, under Spark Connect's + * repeated re-analysis of the growing plan, scaled with plan size x analyze calls. + * + * The memo lives on the context so it shares the context's per-pass lifetime. `SET PATH` / + * `USE` / conf changes all produce a fresh context ([[reset]], [[withNewAnalysisContext]], or + * the `copy` / construction for a view or SQL-function body), and a body-level field is not + * carried over by `copy`, so a new pass automatically starts with an empty memo and the memo is + * collected with the context. It is therefore safe without an identity key or weak reference, + * but only for values derived from this context's immutable fields (the path derives from + * `resolutionPathEntries` / `catalogAndNamespace`); never memoize anything derived from the + * mutable fields above (`relationCache`, `referredTempFunctionNames`, ...). + * + * INVARIANT: keep this a body `var`, never a constructor parameter. `.copy()` (used by + * `withAnalysisContext(function)` and `withOuterPlan`) deliberately does not carry a body + * field, which is what gives a SQL-function-body / outer-plan context a fresh memo. Promoting + * it to a parameter would copy a stale path across that boundary and silently mis-resolve + * (SECTION 17f of `FunctionQualificationSuite` is the regression guard). + */ + private var resolutionPathMemo: Seq[Seq[String]] = _ + + def memoizedResolutionPath(compute: => Seq[Seq[String]]): Seq[Seq[String]] = { + if (resolutionPathMemo == null) { + resolutionPathMemo = compute + } + resolutionPathMemo + } } object AnalysisContext { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala index 4f6aee03967cb..543e9da670a6e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala @@ -110,10 +110,37 @@ class FunctionResolution( * directly, matching [[RelationResolution.relationResolutionEntries]] so routine order stays * aligned with relation order. */ - private[analysis] def sqlResolutionPathEntriesForAnalysis: Seq[Seq[String]] = - catalogManager.resolutionPathEntriesForAnalysis( - AnalysisContext.get.resolutionPathEntries, - AnalysisContext.get.catalogAndNamespace) + private[analysis] def sqlResolutionPathEntriesForAnalysis: Seq[Seq[String]] = { + // Per-analysis-pass memo (SPARK-57758): computing the path (reading the live [[CatalogManager]] + // and several confs, then allocating `Seq`s) used to run once per [[UnresolvedFunction]], and + // under Spark Connect once per node on every re-analysis of the growing plan. The path is + // stable within a pass (`SET PATH` / `USE` / conf changes happen between passes, each under a + // fresh [[AnalysisContext]]), so it is memoized on the current context, which shares the pass's + // lifetime. See [[AnalysisContext.memoizedResolutionPath]] for why this needs no identity key. + val context = AnalysisContext.get + context.memoizedResolutionPath { + catalogManager.resolutionPathEntriesForAnalysis( + context.resolutionPathEntries, context.catalogAndNamespace) + } + } + + /** + * True when `system.builtin` is the first entry of the effective resolution path. In that case a + * single-part name that resolves to a built-in cannot be shadowed by any earlier entry -- neither + * a `system.session` entry (a temporary/session function) nor a catalog/schema entry placed + * before `system.builtin` by a custom `SET PATH` -- so the built-in fast-path in + * [[resolveFunction]] / [[resolveTableFunction]] cannot change resolution precedence. A miss + * still falls through to the full candidate loop, so non-built-in names are unaffected. + * + * The default `spark.sql.functionResolution.sessionOrder` modes `second` and `last` put + * `system.builtin` first; only `first` puts `system.session` before it, where the fast-path is + * correctly disabled. Only a custom `SET PATH` can place another entry before `system.builtin`. + * + * Reads the per-pass memoized path ([[sqlResolutionPathEntriesForAnalysis]]), so the check is + * O(1) per [[UnresolvedFunction]]. + */ + private def builtinFastPathSafe: Boolean = + CatalogManager.isBuiltinFirstOnPath(sqlResolutionPathEntriesForAnalysis) private def resolutionCandidates(nameParts: Seq[String]): Seq[Seq[String]] = { if (nameParts.size == 1) { @@ -134,6 +161,12 @@ class FunctionResolution( private def resolveFunctionCandidate( nameParts: Seq[String], unresolvedFunc: UnresolvedFunction): Option[Expression] = { + // NOTE: the `system.builtin.` case here is the same registry lookup the built-in + // fast-path in `resolveFunction` performs directly (both go through + // `identifierFromSystemNameParts` / `builtinFunctionIdentifier` -> + // `resolveScalarFunctionByIdentifier`). The two must stay equivalent; a change to built-in + // scalar resolution has to touch both. `resolveTableFunctionCandidate` / `resolveTableFunction` + // mirror this for table functions. if (isSystemCatalogQualified(nameParts)) { v1SessionCatalog.identifierFromSystemNameParts(nameParts).flatMap { ident => val expr = v1SessionCatalog.resolveScalarFunctionByIdentifier( @@ -189,6 +222,23 @@ class FunctionResolution( } } + // Fast-path (SPARK-57758): an unqualified, non-internal name that resolves to a built-in + // is by far the common case. When `system.builtin` is the first entry of the effective path, + // a built-in hit cannot be shadowed by any earlier entry (a session/temporary function, or a + // catalog/schema placed before `system.builtin` via `SET PATH`), so it can be resolved with a + // single registry lookup instead of building and iterating the candidate search path. A miss + // falls through to the full candidate resolution below. This lookup is equivalent to the + // `system.builtin.` branch of `resolveFunctionCandidate`; keep the two in sync. + if (unresolvedFunc.nameParts.size == 1 && !unresolvedFunc.isInternal && + builtinFastPathSafe) { + val builtin = v1SessionCatalog.resolveScalarFunctionByIdentifier( + FunctionRegistry.builtinFunctionIdentifier(unresolvedFunc.nameParts.head), + unresolvedFunc.arguments) + if (builtin.isDefined) { + return validateFunction(builtin.get, unresolvedFunc.arguments.length, unresolvedFunc) + } + } + val candidates = resolutionCandidates(unresolvedFunc.nameParts) for (nameParts <- candidates) { resolveFunctionCandidate(nameParts, unresolvedFunc) match { @@ -263,6 +313,16 @@ class FunctionResolution( def resolveTableFunction( nameParts: Seq[String], arguments: Seq[Expression]): Option[LogicalPlan] = { + // Fast-path (SPARK-57758): see `resolveFunction`. Short-circuit a single-part name to a + // built-in table function when `system.builtin` is the first entry of the path; a miss + // (including a built-in scalar of the same name) falls through to the candidate loop, which + // preserves the NOT_A_TABLE_FUNCTION semantics. + if (nameParts.size == 1 && builtinFastPathSafe) { + val builtin = v1SessionCatalog.resolveTableFunctionByIdentifier( + FunctionRegistry.builtinFunctionIdentifier(nameParts.head), arguments) + if (builtin.isDefined) return builtin + } + val candidates = resolutionCandidates(nameParts) for (nameParts <- candidates) { resolveTableFunctionCandidate(nameParts, arguments) match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 4dd8af5eb37ef..19b01ffc3b83c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -588,6 +588,15 @@ private[sql] object CatalogManager extends Logging { parts.head.equalsIgnoreCase(SYSTEM_CATALOG_NAME) && parts(1).equalsIgnoreCase(BUILTIN_NAMESPACE) + /** + * True when `system.builtin` is the first entry of `pathEntries`. This is the path-shape + * condition under which a built-in function found by an unqualified single-part name cannot be + * shadowed by any earlier path entry -- the precise property the function-resolution built-in + * fast-path relies on. Pure predicate over the path shape; callers decide how to use it. + */ + def isBuiltinFirstOnPath(pathEntries: Seq[Seq[String]]): Boolean = + pathEntries.headOption.exists(isSystemBuiltinPathEntry) + /** * Extract `system.builtin` / `system.session` entries from a resolved PATH, mapped to * [[SessionCatalog.SessionFunctionKind]] in path order. Pure data conversion -- callers diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala index 199e43d39bbe1..b5f388b8ce870 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala @@ -188,6 +188,30 @@ class CatalogManagerSuite extends SparkFunSuite with SQLHelper { assert(e.getMessage.contains("default.v_broken")) } + test("isBuiltinFirstOnPath: SPARK-57758 gate for the built-in function fast-path") { + // The function-resolution built-in fast-path is safe (cannot be shadowed) only when + // `system.builtin` is the FIRST path entry. The fast-path has no behavioral signature -- the + // slow candidate loop yields identical results -- so this pure-predicate test guards the gate + // directly: a regression that silently stopped the fast-path from firing (re-introducing the + // SPARK-57758 perf regression) or fired it when a catalog precedes `system.builtin` + // (re-introducing the precedence bug) would leave the behavioral SQL tests green. + val builtin = Seq("system", "builtin") + val session = Seq("system", "session") + val catalog = Seq("spark_catalog", "some_schema") + // Default `sessionOrder` modes `second` / `last` keep `system.builtin` first -> safe. + assert(CatalogManager.isBuiltinFirstOnPath(Seq(builtin, session))) + assert(CatalogManager.isBuiltinFirstOnPath(Seq(builtin))) + assert(CatalogManager.isBuiltinFirstOnPath(Seq(builtin, catalog, session))) + // Case-insensitive match on the well-known entry. + assert(CatalogManager.isBuiltinFirstOnPath(Seq(Seq("System", "Builtin"), session))) + // `first` mode (session before builtin) and custom `SET PATH` shapes that place any entry + // before `system.builtin` -> not safe. + assert(!CatalogManager.isBuiltinFirstOnPath(Seq(session, builtin))) + assert(!CatalogManager.isBuiltinFirstOnPath(Seq(catalog, builtin))) + assert(!CatalogManager.isBuiltinFirstOnPath(Seq(catalog))) + assert(!CatalogManager.isBuiltinFirstOnPath(Seq.empty)) + } + // --------------------------------------------------------------------------- // Direct unit tests for [[PathElement.validateNoStaticDuplicates]]. The end-to-end // `SetPathSuite` exercises this via SQL, but the duplicate-detection rules diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FunctionQualificationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FunctionQualificationSuite.scala index 3aade2ddf09a8..185956229699b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FunctionQualificationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FunctionQualificationSuite.scala @@ -1290,6 +1290,154 @@ class FunctionQualificationSuite extends SharedSparkSession { checkAnswer(sql("SELECT system.builtin.abs(-5)"), Row(5)) } } + + test("SECTION 17a: SPARK-57758 built-in fast-path respects dynamic session order") { + withTempView("t") { + sql("CREATE TEMPORARY VIEW t AS SELECT 1 AS id") + // Register a temp function shadowing builtin `abs` (overrideIfExists allows any order). + spark.udf.register("abs", (x: Int) => x + 100) + try { + // Default order (second): builtin precedes session, so the fast-path returns the builtin + // and the temp is reachable only via `session.` qualification. + withSQLConf("spark.sql.functionResolution.sessionOrder" -> "second") { + checkAnswer(sql("SELECT abs(-5) FROM t"), Row(5)) + checkAnswer(sql("SELECT session.abs(-5) FROM t"), Row(95)) + } + // first: session precedes builtin, so the fast-path is bypassed and the temp shadows the + // builtin for unqualified names. Same session, so this also exercises per-pass recompute. + withSQLConf("spark.sql.functionResolution.sessionOrder" -> "first") { + checkAnswer(sql("SELECT abs(-5) FROM t"), Row(95)) + checkAnswer(sql("SELECT builtin.abs(-5) FROM t"), Row(5)) + } + } finally { + spark.sessionState.catalog.dropTempFunction("abs", ignoreIfNotExists = true) + } + } + } + + test("SECTION 17b: SPARK-57758 built-in table-function fast-path") { + // Smoke test that built-in / extension table functions resolve unqualified. Under the default + // path `system.builtin` leads, so the slow loop's first candidate is already the built-in -- + // this yields the same rows whether or not the fast-path fires, and so does not by itself + // distinguish the gate's on/off behavior. That signal is asserted by + // CatalogManagerSuite.isBuiltinFirstOnPath and by SECTION 17c/17d/17e. + checkAnswer(sql("SELECT * FROM range(3)"), Seq(Row(0), Row(1), Row(2))) + // Extension table functions (stored as builtins) also resolve unqualified. + checkAnswer(sql("SELECT * FROM test_ext_table_func()"), Seq(Row(0L), Row(1L), Row(2L))) + } + + test("SECTION 17c: SPARK-57758 fast-path is bypassed when a catalog precedes system.builtin") { + // SPARK-57758 regression guard: the built-in fast-path is safe only when `system.builtin` is + // the FIRST path entry. A custom `SET PATH` can place a catalog/schema before `system.builtin`, + // and an unqualified name found there must win over the built-in -- the fast-path must not + // short-circuit to the built-in. (The default `sessionOrder` modes always keep catalogs after + // `system.builtin`, so only a custom SET PATH exercises this.) + withSQLConf(SQLConf.PATH_ENABLED.key -> "true") { + withDatabase("path_abs_shadow") { + sql("CREATE DATABASE path_abs_shadow") + // Persistent function shadowing the built-in `abs`. + sql("CREATE FUNCTION path_abs_shadow.abs(x INT) RETURNS INT RETURN x * 10") + try { + // Catalog before system.builtin: the persistent `abs` must win (50), not the + // built-in (5). Pre-fix, the fast-path returned the built-in here. + sql("SET PATH = spark_catalog.path_abs_shadow, system.builtin") + checkAnswer(sql("SELECT abs(5)"), Row(50)) + // system.builtin first: the fast-path is enabled and resolves the built-in (abs(-5) = 5), + // confirming the optimization still applies when builtin leads the path. + sql("SET PATH = system.builtin, spark_catalog.path_abs_shadow") + checkAnswer(sql("SELECT abs(-5)"), Row(5)) + } finally { + sql("SET PATH = DEFAULT_PATH") + sql("DROP FUNCTION IF EXISTS path_abs_shadow.abs") + } + } + } + } + + test("SECTION 17d: SPARK-57758 table-function fast-path is bypassed when a catalog precedes " + + "system.builtin") { + // Table-function counterpart of SECTION 17c: the table-function fast-path shares the same + // `builtinFastPathSafe` gate, so a persistent table function in a schema placed before + // `system.builtin` via `SET PATH` must win over the built-in TVF of the same name. + withSQLConf(SQLConf.PATH_ENABLED.key -> "true") { + withDatabase("path_range_shadow") { + sql("CREATE DATABASE path_range_shadow") + // Persistent table function shadowing the built-in `range` (ignores its argument). + sql("CREATE FUNCTION path_range_shadow.range(n INT) RETURNS TABLE(id INT) RETURN SELECT 99") + try { + // Catalog before system.builtin: the persistent `range` must win (one row [99]), not the + // built-in `range(1)` (one row [0]). Pre-fix, the fast-path returned the built-in here. + sql("SET PATH = spark_catalog.path_range_shadow, system.builtin") + checkAnswer(sql("SELECT * FROM range(1)"), Row(99)) + // system.builtin first: the fast-path resolves the built-in `range(1)` (one row [0]). + sql("SET PATH = system.builtin, spark_catalog.path_range_shadow") + checkAnswer(sql("SELECT * FROM range(1)"), Row(0)) + } finally { + sql("SET PATH = DEFAULT_PATH") + sql("DROP FUNCTION IF EXISTS path_range_shadow.range") + } + } + } + } + + test("SECTION 17e: SPARK-57758 built-in fast-path raises the built-in's argument error rather " + + "than falling through to a same-named session function") { + // Invariant: the fast-path and the slow candidate loop must fail on the SAME candidate. + // `resolveScalarFunctionByIdentifier` returns None only when the built-in is absent; an + // existing built-in invoked with bad arity throws. So with `system.builtin` first, an + // unqualified `abs(1, 2)` must hit the built-in `abs` (1-arg) and raise its argument error -- + // it must NOT silently fall through to a compatible 2-arg session `abs`. The slow loop would + // also hit `system.builtin.abs` first and throw, so the two paths stay equivalent. + withTempView("t") { + sql("CREATE TEMPORARY VIEW t AS SELECT 1 AS id") + spark.udf.register("abs", (x: Int, y: Int) => x + y) + try { + withSQLConf("spark.sql.functionResolution.sessionOrder" -> "second") { + // The 2-arg session function is reachable via explicit `session.` qualification... + checkAnswer(sql("SELECT session.abs(1, 2) FROM t"), Row(3)) + // ...but the unqualified name resolves to the built-in `abs` first and raises its + // argument error instead of resolving the 2-arg session function. + intercept[AnalysisException](sql("SELECT abs(1, 2) FROM t").collect()) + } + } finally { + spark.sessionState.catalog.dropTempFunction("abs", ignoreIfNotExists = true) + } + } + } + + test("SECTION 17f: SPARK-57758 per-pass memo recomputes for a SQL-function body whose pinned " + + "path differs from the caller") { + // The subtlest recompute trigger: a SQL-function body runs under a FRESH AnalysisContext + // carrying the function's own stored resolution path (not the caller's). The memo lives on the + // context, so the body computes its own path within the same analysis pass as the outer query. + // Here the body's `abs` is pinned to the built-in (created under a builtin-first path) while + // the caller resolves the unqualified `abs` to a shadowing persistent function (catalog-first) + // -- so a single statement must yield both resolutions, proving neither context reuses the + // other's memo. + withSQLConf(SQLConf.PATH_ENABLED.key -> "true") { + withDatabase("path_body") { + sql("CREATE DATABASE path_body") + // Persistent `abs` shadowing the built-in for the caller's catalog-first path. + sql("CREATE FUNCTION path_body.abs(x INT) RETURNS INT RETURN x * 10") + try { + // Create the function body while `system.builtin` leads -> the body's unqualified `abs` + // is pinned to the built-in. + sql("SET PATH = system.builtin, spark_catalog.path_body") + sql("CREATE FUNCTION path_body.use_abs(x INT) RETURNS INT RETURN abs(x)") + // Caller path puts the catalog first -> a top-level unqualified `abs` resolves to the + // persistent `abs` (x * 10), while the body keeps resolving its `abs` to the built-in. + sql("SET PATH = spark_catalog.path_body, system.builtin") + checkAnswer( + sql("SELECT abs(-5) AS top, path_body.use_abs(-5) AS body"), + Row(-50, 5)) + } finally { + sql("SET PATH = DEFAULT_PATH") + sql("DROP FUNCTION IF EXISTS path_body.use_abs") + sql("DROP FUNCTION IF EXISTS path_body.abs") + } + } + } + } } /**