Skip to content

Commit a7ad565

Browse files
authored
Instrument QueryExecution.assertAnalyzed() to catch DataFrame analysis failures (#11033)
## What Does This Do Adds instrumentation on `QueryExecution.assertAnalyzed()` to catch Catalyst analysis failures from any entry point: `SparkSession.sql()`, `Dataset.select()`, `Dataset.filter()`, etc. Previously, only `SparkSession.sql()` failures were caught (PR #10981), but other failures through DataFrame API operations (`df.select()`) were invisible to the tracer. ## Motivation Addresses case where logs showed `spark.application` span marked SUCCESS when the EMR step failed. The `AnalysisException` (`UNRESOLVED_COLUMN`) was thrown from `Dataset.select()` → `QueryExecution.assertAnalyzed()`, a path not covered by the existing `SparkSqlFailureAdvice` on `SparkSession.sql()`. ## Notes - `QueryExecution.assertAnalyzed()` is stable (`public void assertAnalyzed()`) across Spark 3.5.x, 4.0.x, 4.1.x - Existing `SparkSqlFailureAdvice` on `SparkSession.sql()` is kept - New integration test exercises the DataFrame API path specifically (`sparkSession.range(1).toDF("id").select("nonexistent_column")`) Co-authored-by: adrien.boitreaud <adrien.boitreaud@datadoghq.com>
1 parent 2b95dcf commit a7ad565

2 files changed

Lines changed: 51 additions & 1 deletion

File tree

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public String[] knownMatchingTypes() {
4040
"org.apache.spark.util.SparkClassUtils",
4141
"org.apache.spark.scheduler.LiveListenerBus",
4242
"org.apache.spark.sql.execution.SparkPlanInfo$",
43-
"org.apache.spark.sql.SparkSession"
43+
"org.apache.spark.sql.SparkSession",
44+
"org.apache.spark.sql.execution.QueryExecution"
4445
};
4546
}
4647

@@ -77,6 +78,14 @@ public void methodAdvice(MethodTransformer transformer) {
7778
.and(isDeclaredBy(named("org.apache.spark.sql.SparkSession"))),
7879
AbstractSparkInstrumentation.class.getName() + "$SparkSqlFailureAdvice");
7980

81+
// QueryExecution.assertAnalyzed() — catch all Catalyst analysis failures regardless of
82+
// entry point (SparkSession.sql, Dataset.select, Dataset.filter, etc.)
83+
transformer.applyAdvice(
84+
isMethod()
85+
.and(named("assertAnalyzed"))
86+
.and(isDeclaredBy(named("org.apache.spark.sql.execution.QueryExecution"))),
87+
AbstractSparkInstrumentation.class.getName() + "$QueryExecutionFailureAdvice");
88+
8089
// LiveListenerBus class is used to manage spark listeners
8190
transformer.applyAdvice(
8291
isMethod()
@@ -141,6 +150,15 @@ public static void exit(@Advice.Thrown Throwable throwable) {
141150
}
142151
}
143152

153+
public static class QueryExecutionFailureAdvice {
154+
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
155+
public static void exit(@Advice.Thrown Throwable throwable) {
156+
if (throwable != null && AbstractDatadogSparkListener.listener != null) {
157+
AbstractDatadogSparkListener.listener.onSqlFailure(throwable);
158+
}
159+
}
160+
}
161+
144162
public static class LiveListenerBusAdvice {
145163
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class)
146164
// If OL is disabled in tracer config but user set it up manually don't interfere

dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,38 @@ abstract class AbstractSparkTest extends InstrumentationSpecification {
223223
}
224224
}
225225

226+
def "DataFrame analysis failure on unresolved column marks application span as error"() {
227+
setup:
228+
def sparkSession = SparkSession.builder()
229+
.config("spark.master", "local[2]")
230+
.getOrCreate()
231+
232+
try {
233+
// Triggers AnalysisException via Dataset.select() -> QueryExecution.assertAnalyzed(),
234+
// NOT through SparkSession.sql(). This exercises the QueryExecutionFailureAdvice.
235+
sparkSession.range(1).toDF("id").select("nonexistent_column")
236+
} catch (Exception ignored) {
237+
// Expected: AnalysisException thrown by Catalyst analysis
238+
}
239+
sparkSession.stop()
240+
241+
expect:
242+
assertTraces(1) {
243+
trace(1) {
244+
span {
245+
operationName "spark.application"
246+
resourceName "spark.application"
247+
spanType "spark"
248+
errored true
249+
parent()
250+
assert span.tags["error.type"] == "Spark SQL Failed"
251+
assert span.tags["error.message"] =~ /(?i).*nonexistent_column.*/
252+
assert span.tags["error.stack"] =~ /(?s).*AnalysisException.*/
253+
}
254+
}
255+
}
256+
}
257+
226258
def "capture SparkSubmit.runMain() errors"() {
227259
setup:
228260
def sparkSession = SparkSession.builder()

0 commit comments

Comments
 (0)