Skip to content

Commit d40737c

Browse files
authored
Fix spark application spans status on sql analysis failure (#10981)
Track Spark SQL analysis failures to mark application spans as ERROR Add lastSqlFailed tracking to AbstractDatadogSparkListener when SQL calls (e.g. SparkSession.sql()) throw exceptions during Catalyst analysis, before any Spark job is submitted. This ensures finishApplication() can mark the application span as ERROR even when no job/stage/task events fire. The error priority in finishApplication() is: throwable (from caller) > exitCode != 0 > lastJobFailed > lastSqlFailed Add unit tests to verify SQL failures mark application spans as ERROR, and that job failures take precedence over SQL failures. Fixes: Spark application traces marked SUCCESS when SQL analysis fails Instrument SparkSession.sql() to capture analysis failures Add SparkSqlFailureAdvice that intercepts SparkSession.sql() method calls and propagates any exceptions (e.g. AnalysisException) to the listener via the new onSqlFailure() callback. This ensures SQL analysis failures that occur before any Spark job is submitted are captured and can be reported as ERROR in the application span. reset on success Test app status when SQL analysis fails fix spotless Co-authored-by: adrien.boitreaud <adrien.boitreaud@datadoghq.com>
1 parent 8ae02b1 commit d40737c

File tree

4 files changed

+145
-1
lines changed

4 files changed

+145
-1
lines changed

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
139139
private boolean lastJobFailed = false;
140140
private String lastJobFailedMessage;
141141
private String lastJobFailedStackTrace;
142+
private boolean lastSqlFailed = false;
143+
private String lastSqlFailedMessage;
144+
private String lastSqlFailedStackTrace;
142145
private int jobCount = 0;
143146
private int currentExecutorCount = 0;
144147
private int maxExecutorCount = 0;
@@ -310,6 +313,23 @@ private void captureOpenlineageContextIfPresent(
310313
builder.withTag("openlineage_root_parent_run_id", context.getRootParentRunId());
311314
}
312315

316+
/**
317+
* Called by SparkSqlFailureAdvice when a SQL call (e.g. SparkSession.sql()) throws an exception
318+
* during Catalyst analysis, before any Spark job is submitted. This ensures finishApplication()
319+
* has an error signal even when no job/stage/task events fire.
320+
*/
321+
public synchronized void onSqlFailure(Throwable throwable) {
322+
if (applicationEnded) {
323+
return;
324+
}
325+
lastSqlFailed = true;
326+
lastSqlFailedMessage = throwable.getMessage();
327+
328+
StringWriter sw = new StringWriter();
329+
throwable.printStackTrace(new PrintWriter(sw));
330+
lastSqlFailedStackTrace = sw.toString();
331+
}
332+
313333
@Override
314334
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
315335
log.info(
@@ -356,6 +376,11 @@ public synchronized void finishApplication(
356376
applicationSpan.setTag(DDTags.ERROR_TYPE, "Spark Application Failed");
357377
applicationSpan.setTag(DDTags.ERROR_MSG, lastJobFailedMessage);
358378
applicationSpan.setTag(DDTags.ERROR_STACK, lastJobFailedStackTrace);
379+
} else if (lastSqlFailed) {
380+
applicationSpan.setError(true);
381+
applicationSpan.setTag(DDTags.ERROR_TYPE, "Spark SQL Failed");
382+
applicationSpan.setTag(DDTags.ERROR_MSG, lastSqlFailedMessage);
383+
applicationSpan.setTag(DDTags.ERROR_STACK, lastSqlFailedStackTrace);
359384
}
360385

361386
applicationMetrics.setSpanMetrics(applicationSpan);
@@ -544,6 +569,7 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
544569
}
545570
} else {
546571
lastJobFailed = false;
572+
lastSqlFailed = false;
547573
}
548574

549575
SparkAggregatedTaskMetrics metrics = jobMetrics.remove(jobEnd.jobId());

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ public String[] knownMatchingTypes() {
3939
"org.apache.spark.util.Utils",
4040
"org.apache.spark.util.SparkClassUtils",
4141
"org.apache.spark.scheduler.LiveListenerBus",
42-
"org.apache.spark.sql.execution.SparkPlanInfo$"
42+
"org.apache.spark.sql.execution.SparkPlanInfo$",
43+
"org.apache.spark.sql.SparkSession"
4344
};
4445
}
4546

@@ -67,6 +68,15 @@ public void methodAdvice(MethodTransformer transformer) {
6768
.and(isDeclaredBy(named("org.apache.spark.deploy.yarn.ApplicationMaster"))),
6869
AbstractSparkInstrumentation.class.getName() + "$YarnFinishAdvice");
6970

71+
// SparkSession.sql(String, ...) — catch AnalysisException failures that fire during Catalyst
72+
// analysis before any Spark job is submitted and are invisible to the listener bus
73+
transformer.applyAdvice(
74+
isMethod()
75+
.and(named("sql"))
76+
.and(takesArgument(0, String.class))
77+
.and(isDeclaredBy(named("org.apache.spark.sql.SparkSession"))),
78+
AbstractSparkInstrumentation.class.getName() + "$SparkSqlFailureAdvice");
79+
7080
// LiveListenerBus class is used to manage spark listeners
7181
transformer.applyAdvice(
7282
isMethod()
@@ -122,6 +132,15 @@ public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) S
122132
}
123133
}
124134

135+
public static class SparkSqlFailureAdvice {
136+
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
137+
public static void exit(@Advice.Thrown Throwable throwable) {
138+
if (throwable != null && AbstractDatadogSparkListener.listener != null) {
139+
AbstractDatadogSparkListener.listener.onSqlFailure(throwable);
140+
}
141+
}
142+
}
143+
125144
public static class LiveListenerBusAdvice {
126145
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class)
127146
// If OL is disabled in tracer config but user set it up manually don't interfere

dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,75 @@ abstract class AbstractSparkListenerTest extends InstrumentationSpecification {
522522
}
523523
}
524524

525+
def "test SQL analysis failure marks application span as error"() {
526+
setup:
527+
def listener = getTestDatadogSparkListener()
528+
listener.onApplicationStart(applicationStartEvent(1000L))
529+
530+
// Simulate a SQL failure during Catalyst analysis (before any Spark job is submitted)
531+
def analysisException = new RuntimeException("[TABLE_OR_VIEW_NOT_FOUND] The table or view `missing_table` cannot be found.")
532+
listener.onSqlFailure(analysisException)
533+
534+
listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L))
535+
536+
expect:
537+
assertTraces(1) {
538+
trace(1) {
539+
span {
540+
operationName "spark.application"
541+
spanType "spark"
542+
errored true
543+
parent()
544+
assert span.tags["error.type"] == "Spark SQL Failed"
545+
assert span.tags["error.message"] == "[TABLE_OR_VIEW_NOT_FOUND] The table or view `missing_table` cannot be found."
546+
assert span.tags["error.stack"] != null
547+
}
548+
}
549+
}
550+
}
551+
552+
def "test SQL analysis failure does not override job failure"() {
553+
setup:
554+
def listener = getTestDatadogSparkListener()
555+
listener.onApplicationStart(applicationStartEvent(1000L))
556+
557+
// SQL failure happens first
558+
listener.onSqlFailure(new RuntimeException("SQL error"))
559+
560+
// Then a job runs and fails — job failure should take precedence
561+
listener.onJobStart(jobStartEvent(1, 1500L, [1]))
562+
listener.onStageSubmitted(stageSubmittedEvent(1, 1500L))
563+
listener.onStageCompleted(stageCompletedEvent(1, 1800L))
564+
listener.onJobEnd(jobFailedEvent(1, 2000L, "Job aborted due to NullPointerException"))
565+
566+
listener.onApplicationEnd(new SparkListenerApplicationEnd(3000L))
567+
568+
expect:
569+
assertTraces(1) {
570+
trace(3) {
571+
span {
572+
operationName "spark.application"
573+
spanType "spark"
574+
errored true
575+
parent()
576+
// Job failure should take precedence over SQL failure
577+
assert span.tags["error.type"] == "Spark Application Failed"
578+
}
579+
span {
580+
operationName "spark.job"
581+
spanType "spark"
582+
errored true
583+
childOf(span(0))
584+
}
585+
span {
586+
operationName "spark.stage"
587+
spanType "spark"
588+
childOf(span(1))
589+
}
590+
}
591+
}
592+
}
593+
525594
def "test setupOpenLineage gets service name"(boolean serviceNameSetByUser, String serviceName, String sparkAppName) {
526595
setup:
527596
SparkConf sparkConf = new SparkConf()

dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,36 @@ abstract class AbstractSparkTest extends InstrumentationSpecification {
193193
}
194194
}
195195

196+
def "sql analysis failure on missing table marks application span as error"() {
197+
setup:
198+
def sparkSession = SparkSession.builder()
199+
.config("spark.master", "local[2]")
200+
.getOrCreate()
201+
202+
try {
203+
sparkSession.sql("SELECT * FROM missing_table").show()
204+
} catch (Exception ignored) {
205+
// Expected: AnalysisException thrown by Catalyst before any Spark job is submitted
206+
}
207+
sparkSession.stop()
208+
209+
expect:
210+
assertTraces(1) {
211+
trace(1) {
212+
span {
213+
operationName "spark.application"
214+
resourceName "spark.application"
215+
spanType "spark"
216+
errored true
217+
parent()
218+
assert span.tags["error.type"] == "Spark SQL Failed"
219+
assert span.tags["error.message"] =~ /(?i).*missing_table.*/
220+
assert span.tags["error.stack"] =~ /(?s).*AnalysisException.*/
221+
}
222+
}
223+
}
224+
}
225+
196226
def "capture SparkSubmit.runMain() errors"() {
197227
setup:
198228
def sparkSession = SparkSession.builder()

0 commit comments

Comments
 (0)