Skip to content

Commit f520a2f

Browse files
committed
[SPARK-57525][CONNECT] Declarative Pipelines should not throw NoSuchElementException when a run fails without an attached cause
### What changes were proposed in this pull request? `PipelinesHandler.startRun` rethrows a failed pipeline run to the Spark Connect client via `runFailureEvent.foreach { event => throw event.error.get }`. But `event.error` is `None` for run termination reasons that carry no cause - `UnexpectedRunFailure` and `FailureStoppingFlow` both have `cause = None` - so `event.error.get` raised a `NoSuchElementException`, crashing the handler and hiding the real failure from the client. This PR extracts the rethrow into `throwRunFailure`: when the failure has a cause it is rethrown unchanged; when it does not, a `SparkException` with a new `PIPELINE_RUN_FAILED` error condition is thrown, carrying the run's termination message. `PIPELINE_RUN_FAILED` (rather than `INTERNAL_ERROR`) is used so that operational outcomes such as `FailureStoppingFlow` are not mislabeled as Spark bugs. ### Why are the changes needed? A run that fails without an attached cause (e.g. `UnexpectedRunFailure`, or a flow that fails to stop) currently surfaces to the Connect client as an opaque `NoSuchElementException` ("None.get") instead of the actual run-failure message. That masks the real problem and looks like an internal error. These reasons reach this code via the asynchronous `onCompletion` path, where `PipelineExecution.runPipeline`'s own catch never fires. ### Does this PR introduce _any_ user-facing change? Yes. When a pipeline run fails without an attached cause, the Spark Connect client now receives a `PIPELINE_RUN_FAILED` error carrying the run's termination message (e.g. "Run failed unexpectedly.") instead of a `NoSuchElementException`. ### How was this patch tested? New `PipelinesHandlerSuite` unit-tests `throwRunFailure` for both cases: the cause-present case rethrows the original cause, and the no-cause case throws a `PIPELINE_RUN_FAILED` `SparkException` carrying the termination message (verified with `checkError`, using the real `UnexpectedRunFailure` and `FailureStoppingFlow` messages). The cause-less termination reasons cannot be triggered deterministically through the end-to-end run path, so the rethrow is unit-tested directly. `SparkThrowableSuite` validates the new error condition. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Claude Opus 4.8) Closes #56594 from LuciferYang/sdp-run-failure-no-cause. Authored-by: YangJie <yangjie01@baidu.com> Signed-off-by: yangjie01 <yangjie01@baidu.com>
1 parent 2f1536d commit f520a2f

4 files changed

Lines changed: 101 additions & 3 deletions

File tree

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6226,6 +6226,12 @@
62266226
],
62276227
"sqlState" : "0A000"
62286228
},
6229+
"PIPELINE_RUN_FAILED" : {
6230+
"message" : [
6231+
"<message>"
6232+
],
6233+
"sqlState" : "42K0S"
6234+
},
62296235
"PIPELINE_SQL_GRAPH_ELEMENT_REGISTRATION_ERROR" : {
62306236
"message" : [
62316237
"<message>",

common/utils/src/main/resources/error/error-states.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4655,6 +4655,12 @@
46554655
"standard": "N",
46564656
"usedBy": ["Spark"]
46574657
},
4658+
"42K0S": {
4659+
"description": "Pipeline run failed.",
4660+
"origin": "Spark",
4661+
"standard": "N",
4662+
"usedBy": ["Spark"]
4663+
},
46584664
"42KD0": {
46594665
"description": "Ambiguous name reference.",
46604666
"origin": "Databricks",

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.util.Using
2222

2323
import io.grpc.stub.StreamObserver
2424

25+
import org.apache.spark.SparkException
2526
import org.apache.spark.connect.proto
2627
import org.apache.spark.connect.proto.{ExecutePlanResponse, PipelineCommandResult, Relation, ResolvedIdentifier}
2728
import org.apache.spark.connect.proto.PipelineCommand.DefineFlow.AutoCdcFlowDetails
@@ -563,12 +564,27 @@ private[connect] object PipelinesHandler extends Logging {
563564

564565
// Rethrow any exceptions that caused the pipeline run to fail so that the exception is
565566
// propagated back to the SC client / CLI.
566-
runFailureEvent.foreach { event =>
567-
throw event.error.get
568-
}
567+
runFailureEvent.foreach(throwRunFailure)
569568
}
570569
}
571570

571+
/**
572+
* Rethrows the failure behind a terminal run-failure event so it reaches the Spark Connect
573+
* client. Most failures carry the underlying cause (e.g. a flow's QueryExecutionFailure), but
574+
* some termination reasons (UnexpectedRunFailure, FailureStoppingFlow) have none. When the
575+
* cause is absent, throw a PIPELINE_RUN_FAILED error built from the event message rather than
576+
* calling Option.get, which would throw a NoSuchElementException and hide the real failure.
577+
* Using PIPELINE_RUN_FAILED instead of INTERNAL_ERROR avoids mislabeling operational failures
578+
* as bugs.
579+
*/
580+
private[connect] def throwRunFailure(failureEvent: PipelineEvent): Nothing = {
581+
throw failureEvent.error.getOrElse(
582+
new SparkException(
583+
errorClass = "PIPELINE_RUN_FAILED",
584+
messageParameters = Map("message" -> failureEvent.message),
585+
cause = null))
586+
}
587+
572588
/**
573589
* Creates the table filters for the full refresh and refresh operations based on the StartRun
574590
* command user provided. Also validates the command parameters to ensure that they are
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connect.pipelines
19+
20+
import org.apache.spark.{SparkException, SparkFunSuite}
21+
import org.apache.spark.sql.catalyst.TableIdentifier
22+
import org.apache.spark.sql.pipelines.common.RunState
23+
import org.apache.spark.sql.pipelines.graph.{FailureStoppingFlow, UnexpectedRunFailure}
24+
import org.apache.spark.sql.pipelines.logging.{ConstructPipelineEvent, EventLevel, PipelineEventOrigin, RunProgress}
25+
26+
class PipelinesHandlerSuite extends SparkFunSuite {
27+
28+
private def runFailedEvent(message: String, error: Option[Throwable]) =
29+
ConstructPipelineEvent(
30+
origin =
31+
PipelineEventOrigin(datasetName = None, flowName = None, sourceCodeLocation = None),
32+
// throwRunFailure only reads message and exception; the remaining fields are filled with
33+
// valid placeholder values to construct the event.
34+
level = EventLevel.INFO,
35+
message = message,
36+
details = RunProgress(RunState.FAILED),
37+
exception = error)
38+
39+
// Use the real no-cause termination-reason messages so the tests break if their wording drifts.
40+
private val unexpectedRunFailureMessage = UnexpectedRunFailure().message
41+
42+
private val failureStoppingFlowMessage =
43+
FailureStoppingFlow(
44+
Seq(TableIdentifier("t1", Some("db")), TableIdentifier("t2", Some("db")))).message
45+
46+
// Regression guard rather than a fix-validation test: the old buggy code (throw error.get) also
47+
// rethrew the cause unchanged, so this case passes against both implementations. The no-cause
48+
// test below is the one that genuinely exercises this PR's fix.
49+
test("throwRunFailure rethrows the underlying cause when present") {
50+
val cause = new RuntimeException("boom")
51+
val thrown = intercept[RuntimeException] {
52+
PipelinesHandler.throwRunFailure(runFailedEvent("Run failed.", Some(cause)))
53+
}
54+
assert(thrown eq cause)
55+
}
56+
57+
test("throwRunFailure surfaces the message when the failure has no cause") {
58+
// No-cause reasons must fall back to a PIPELINE_RUN_FAILED error built from the event message
59+
// rather than raising NoSuchElementException; the message is forwarded verbatim.
60+
Seq(unexpectedRunFailureMessage, failureStoppingFlowMessage).foreach { message =>
61+
val thrown = intercept[SparkException] {
62+
PipelinesHandler.throwRunFailure(runFailedEvent(message, None))
63+
}
64+
checkError(
65+
thrown,
66+
condition = "PIPELINE_RUN_FAILED",
67+
parameters = Map("message" -> message))
68+
}
69+
}
70+
}

0 commit comments

Comments
 (0)