Skip to content

Commit c8415bc

Browse files
committed
[SPARK-57600][4.1][SQL] Declarative Pipelines should isolate per-flow SQL confs during parallel flow resolution
### What changes were proposed in this pull request? Declarative Pipelines resolves flows in parallel on a shared `SparkSession` (`DataflowGraphTransformer`, parallelism 10). `FlowAnalysis.createFlowFunctionFromLogicalPlan` applied each flow's per-flow SQL confs by mutating that shared session's conf and restoring it afterwards. Because the session is shared, concurrent flows interleave those set/restore operations, so a flow can be analyzed under another flow's confs or have its own conf restored out from under it. This gives each flow a private `SQLConf` instead: clone the session's conf, apply the flow's overrides to the clone, and install it for the analyzing thread with `SQLConf.withExistingConf` while that flow is analyzed (the analyzer reads conf through `SQLConf.get`). Analysis still runs on the shared session, so the catalog, current catalog/database, temp views, and the resolved DataFrames are all left on that session; only the confs the analyzer reads are isolated per flow. ### Why are the changes needed? When more than one flow sets per-flow confs, parallel resolution can analyze a flow under another flow's confs, producing non-deterministic and occasionally incorrect analysis and schema inference. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added three tests in `ConnectValidPipelineSuite`: - one checks that a flow's per-flow conf is what the analyzer reads during analysis, but does not leak onto the session the pipeline is run from; - one resolves several flows in parallel, each setting a different value for the same conf, and asserts every flow's analysis observes its own value; - one resolves a graph through `resolveToDataflowGraph()` with a per-flow `spark.sql.caseSensitive` override and checks that analysis actually honors it: a column reference resolves under the default but fails for the flow that turns on case sensitivity. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Claude Opus 4.8) Closes #56861 from LuciferYang/SPARK-57600-4.1. Authored-by: YangJie <yangjie01@baidu.com> Signed-off-by: yangjie01 <yangjie01@baidu.com>
1 parent cf05617 commit c8415bc

3 files changed

Lines changed: 175 additions & 23 deletions

File tree

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
2424
import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, UnresolvedRelation}
2525
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
2626
import org.apache.spark.sql.classic.{DataFrame, Dataset, DataStreamReader, SparkSession}
27+
import org.apache.spark.sql.internal.SQLConf
2728
import org.apache.spark.sql.pipelines.AnalysisWarning
2829
import org.apache.spark.sql.pipelines.graph.GraphIdentifierManager.{ExternalDatasetIdentifier, InternalDatasetIdentifier}
2930
import org.apache.spark.sql.pipelines.util.{BatchReadOptions, InputReadOptions, StreamingReadOptions}
@@ -46,17 +47,23 @@ object FlowAnalysis {
4647
confs: Map[String, String],
4748
queryContext: QueryContext,
4849
queryOrigin: QueryOrigin) => {
50+
// Flows are resolved in parallel on a shared session, so applying per-flow confs by mutating
51+
// that session's conf would race across flows. Instead, give each flow a private SQLConf
52+
// (a clone of the session's conf plus this flow's overrides) and install it for the analyzing
53+
// thread via SQLConf.withExistingConf. Analysis still runs on the shared session, so its
54+
// catalog and the resolved DataFrames are unaffected; only the confs the analyzer reads are
55+
// isolated per flow.
56+
val spark = SparkSession.active
4957
val ctx = FlowAnalysisContext(
5058
allInputs = allInputs,
5159
availableInputs = availableInputs,
5260
queryContext = queryContext,
53-
spark = SparkSession.active
61+
spark = spark,
62+
flowConf = spark.sessionState.conf.clone()
5463
)
55-
val df = try {
64+
val df = SQLConf.withExistingConf(ctx.flowConf) {
5665
confs.foreach { case (k, v) => ctx.setConf(k, v) }
5766
Try(FlowAnalysis.analyze(ctx, plan))
58-
} finally {
59-
ctx.restoreOriginalConf()
6067
}
6168
FlowFunctionResult(
6269
requestedInputs = ctx.requestedInputs.toSet,
@@ -74,9 +81,12 @@ object FlowAnalysis {
7481
* Constructs an analyzed [[DataFrame]] from a [[LogicalPlan]] by resolving Pipelines specific
7582
* TVFs and datasets that cannot be resolved directly by Catalyst.
7683
*
77-
* This function shouldn't call any singleton as it will break concurrent access to graph
78-
* analysis; or any thread local variables as graph analysis and this function will use
79-
* different threads in python repl.
84+
* This runs on the flow-resolution thread pool, which may differ from the thread that defined
85+
* the flow (e.g. in a Python REPL), so it must not depend on ambient singletons or thread-locals
86+
* carried over from that defining thread. The one piece of per-flow state it relies on - the
87+
* flow's SQL confs - is installed on the analyzing thread by
88+
* [[createFlowFunctionFromLogicalPlan]] via `SQLConf.withExistingConf`, so the Catalyst analysis
89+
* this triggers reads them through `SQLConf.get`.
8090
*
8191
* @param plan The [[LogicalPlan]] defining a flow.
8292
* @return An analyzed [[DataFrame]].
@@ -236,7 +246,7 @@ object FlowAnalysis {
236246
}
237247

238248
val incompatibleViewReadCheck =
239-
ctx.spark.conf.get("pipelines.incompatibleViewCheck.enabled", "true").toBoolean
249+
ctx.flowConf.getConfString("pipelines.incompatibleViewCheck.enabled", "true").toBoolean
240250

241251
// Wrap the DF in an alias so that columns in the DF can be referenced with
242252
// the following in the query:

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysisContext.scala

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.collection.mutable.ListBuffer
2222

2323
import org.apache.spark.sql.catalyst.TableIdentifier
2424
import org.apache.spark.sql.classic.SparkSession
25+
import org.apache.spark.sql.internal.SQLConf
2526
import org.apache.spark.sql.pipelines.AnalysisWarning
2627

2728
/**
@@ -32,7 +33,12 @@ import org.apache.spark.sql.pipelines.AnalysisWarning
3233
* @param queryContext The context of the query being evaluated.
3334
* @param requestedInputs A mutable buffer populated with names of all inputs that were
3435
* requested.
35-
* @param spark the spark session to be used.
36+
* @param spark The (shared) spark session to be used.
37+
* @param flowConf A private [[SQLConf]] holding this flow's per-flow confs. It is
38+
* installed for the analyzing thread via `SQLConf.withExistingConf`
39+
* (see `FlowAnalysis.createFlowFunctionFromLogicalPlan`) so per-flow
40+
* confs stay isolated from concurrently resolving flows and from the
41+
* shared session, without cloning the session.
3642
* @param externalInputs The names of external inputs that were used to evaluate
3743
* the flow's query.
3844
*/
@@ -46,27 +52,20 @@ private[pipelines] case class FlowAnalysisContext(
4652
shouldLowerCaseNames: Boolean = false,
4753
analysisWarnings: mutable.Buffer[AnalysisWarning] = new ListBuffer[AnalysisWarning],
4854
spark: SparkSession,
55+
flowConf: SQLConf,
4956
externalInputs: mutable.HashSet[TableIdentifier] = mutable.HashSet.empty
5057
) {
5158

5259
/** Map from `Input` name to the actual `Input` */
5360
val availableInput: Map[TableIdentifier, Input] =
5461
availableInputs.map(i => i.identifier -> i).toMap
5562

56-
/** The confs set in this context that should be undone when exiting this context. */
57-
private val confsToRestore = mutable.HashMap[String, Option[String]]()
58-
59-
/** Sets a Spark conf within this context that will be undone by `restoreOriginalConf`. */
63+
/**
64+
* Sets a Spark conf for this flow's analysis. It is set on the per-flow [[flowConf]], which is
65+
* active for the analyzing thread only, so it does not leak to other flows or to the shared
66+
* session.
67+
*/
6068
def setConf(key: String, value: String): Unit = {
61-
if (!confsToRestore.contains(key)) {
62-
confsToRestore.put(key, spark.conf.getOption(key))
63-
}
64-
spark.conf.set(key, value)
65-
}
66-
67-
/** Restores the Spark conf to its state when this context was creating by undoing confs set. */
68-
def restoreOriginalConf(): Unit = confsToRestore.foreach {
69-
case (k, Some(v)) => spark.conf.set(k, v)
70-
case (k, None) => spark.conf.unset(k)
69+
flowConf.setConfString(key, value)
7170
}
7271
}

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ package org.apache.spark.sql.pipelines.graph
2020
import org.apache.spark.sql.catalyst.TableIdentifier
2121
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
2222
import org.apache.spark.sql.catalyst.plans.logical.Union
23+
import org.apache.spark.sql.classic.DataFrame
2324
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
25+
import org.apache.spark.sql.internal.SQLConf
26+
import org.apache.spark.sql.pipelines.util.InputReadOptions
2427
import org.apache.spark.sql.pipelines.utils.{PipelineTest, TestGraphRegistrationContext}
2528
import org.apache.spark.sql.test.SharedSparkSession
2629
import org.apache.spark.sql.types._
@@ -528,4 +531,144 @@ class ConnectValidPipelineSuite extends PipelineTest with SharedSparkSession {
528531
s"Flow ${identifier.unquotedString} has the wrong schema"
529532
)
530533
}
534+
535+
test("per-flow confs are visible to the analyzer but do not leak onto the run session") {
536+
val key = "pipelines.test.flowConfIsolation"
537+
assert(spark.conf.getOption(key).isEmpty)
538+
539+
val inputId = TableIdentifier("conf_observer")
540+
// (conf the analyzer reads via SQLConf.get, conf on the run session) captured during load().
541+
var observed: (Option[String], Option[String]) = null
542+
val runSession = spark
543+
val observingInput = new Input {
544+
override def identifier: TableIdentifier = inputId
545+
override def origin: QueryOrigin = QueryOrigin()
546+
override def load(readOptions: InputReadOptions): DataFrame = {
547+
observed = (SQLConf.get.getAllConfs.get(key), runSession.conf.getOption(key))
548+
runSession.range(1).toDF()
549+
}
550+
}
551+
552+
val result = FlowAnalysis
553+
.createFlowFunctionFromLogicalPlan(UnresolvedRelation(Seq("conf_observer")))
554+
.call(
555+
allInputs = Set(inputId),
556+
availableInputs = Seq(observingInput),
557+
configuration = Map(key -> "flowValue"),
558+
queryContext = QueryContext(currentCatalog = None, currentDatabase = None),
559+
queryOrigin = QueryOrigin())
560+
561+
assert(result.dataFrame.isSuccess, s"flow analysis failed: ${result.dataFrame}")
562+
assert(observed != null, "input.load was not invoked during analysis")
563+
val (analyzerConf, runConf) = observed
564+
// The per-flow conf is what the analyzer reads ...
565+
assert(analyzerConf.contains("flowValue"))
566+
// ... but it must not leak onto the session the pipeline is run from.
567+
assert(
568+
!runConf.contains("flowValue"),
569+
"per-flow conf leaked onto the run session during flow analysis")
570+
// ... and nothing is left behind on the run session afterwards.
571+
assert(spark.conf.getOption(key).isEmpty)
572+
}
573+
574+
test("per-flow confs stay isolated when flows are resolved in parallel") {
575+
val key = "pipelines.test.flowConfIsolation"
576+
assert(spark.conf.getOption(key).isEmpty)
577+
578+
val numFlows = 8
579+
val runSession = spark
580+
// The conf value each flow's analyzer reads for `key`.
581+
val observed = new java.util.concurrent.ConcurrentHashMap[Int, String]()
582+
val errors = new java.util.concurrent.ConcurrentLinkedQueue[Throwable]()
583+
// Rendezvous so every flow is mid-analysis - its per-flow conf already applied - at the same
584+
// time. That is exactly when applying confs to a shared session would let one flow observe
585+
// another flow's value.
586+
val barrier = new java.util.concurrent.CyclicBarrier(numFlows)
587+
588+
def observingInput(i: Int): Input = new Input {
589+
override def identifier: TableIdentifier = TableIdentifier(s"conf_observer_$i")
590+
override def origin: QueryOrigin = QueryOrigin()
591+
override def load(readOptions: InputReadOptions): DataFrame = {
592+
barrier.await(60, java.util.concurrent.TimeUnit.SECONDS)
593+
observed.put(i, SQLConf.get.getConfString(key, "<unset>"))
594+
runSession.range(1).toDF()
595+
}
596+
}
597+
598+
val threads = (0 until numFlows).map { i =>
599+
val t = new Thread(() => {
600+
try {
601+
val result = FlowAnalysis
602+
.createFlowFunctionFromLogicalPlan(UnresolvedRelation(Seq(s"conf_observer_$i")))
603+
.call(
604+
allInputs = Set(TableIdentifier(s"conf_observer_$i")),
605+
availableInputs = Seq(observingInput(i)),
606+
configuration = Map(key -> s"flowValue_$i"),
607+
queryContext = QueryContext(currentCatalog = None, currentDatabase = None),
608+
queryOrigin = QueryOrigin())
609+
result.dataFrame.failed.foreach(errors.add)
610+
} catch {
611+
case t: Throwable => errors.add(t)
612+
}
613+
})
614+
t.setName(s"flow-conf-isolation-$i")
615+
t.start()
616+
t
617+
}
618+
threads.foreach(_.join(120000))
619+
620+
assert(errors.isEmpty, s"flow analysis threads failed: ${errors.toArray.mkString(", ")}")
621+
assert(
622+
observed.size() == numFlows,
623+
s"only ${observed.size()} of $numFlows flows recorded a conf")
624+
(0 until numFlows).foreach { i =>
625+
assert(
626+
observed.get(i) == s"flowValue_$i",
627+
s"flow $i observed '${observed.get(i)}' instead of its own per-flow conf")
628+
}
629+
// Nothing leaks onto the run session.
630+
assert(spark.conf.getOption(key).isEmpty)
631+
}
632+
633+
test("per-flow confs reach the analyzer through the full resolveToDataflowGraph() path") {
634+
val caseSensitiveKey = SQLConf.CASE_SENSITIVE.key
635+
// Pin the session default so the test is self-contained under the shared session. The per-flow
636+
// override below is applied to the flow's own conf, never to this session conf.
637+
withSQLConf(caseSensitiveKey -> "false") {
638+
// With case-insensitive resolution `SELECT Foo FROM src` matches the `foo` column. Setting
639+
// spark.sql.caseSensitive=true on the consumer flow makes that flow's analysis
640+
// case-sensitive, so `Foo` no longer matches `foo`. Driving this through
641+
// resolveToDataflowGraph() exercises a per-flow conf on the full resolution path (not just a
642+
// direct FlowAnalysis call) and shows it is consumed by Catalyst analysis, not merely stored
643+
// where SQLConf.get can read it. Cross-flow isolation under concurrency is covered by the
644+
// parallel test above.
645+
646+
// Baseline: no per-flow conf, so `Foo` matches `foo` and the graph resolves.
647+
val resolved = new TestGraphRegistrationContext(spark) {
648+
registerPersistedView("src", query = dfFlowFunc(spark.range(1).toDF("foo")))
649+
registerPersistedView("consumer", query = sqlFlowFunc(spark, "SELECT Foo FROM src"))
650+
}.resolveToDataflowGraph()
651+
assert(resolved.resolved, "pipeline should resolve under the default case-insensitive conf")
652+
653+
// Same query, but the consumer flow sets spark.sql.caseSensitive=true, so `Foo` no longer
654+
// matches `foo` and analysis of that flow fails.
655+
val unresolved = new TestGraphRegistrationContext(spark) {
656+
registerPersistedView("src", query = dfFlowFunc(spark.range(1).toDF("foo")))
657+
registerPersistedView(
658+
"consumer",
659+
query = sqlFlowFunc(spark, "SELECT Foo FROM src"),
660+
sqlConf = Map(caseSensitiveKey -> "true"))
661+
}.resolveToDataflowGraph()
662+
assert(!unresolved.resolved, "case-sensitive consumer flow should fail to resolve")
663+
val ex = intercept[UnresolvedPipelineException] {
664+
unresolved.validate()
665+
}
666+
assertAnalysisException(
667+
ex.directFailures(fullyQualifiedIdentifier("consumer")),
668+
"UNRESOLVED_COLUMN.WITH_SUGGESTION")
669+
670+
// The per-flow conf must not leak onto the run session.
671+
assert(spark.conf.get(caseSensitiveKey) == "false")
672+
}
673+
}
531674
}

0 commit comments

Comments
 (0)