Skip to content

Commit 4e5f33e

Browse files
committed
Instrument QueryExecution.assertAnalyzed() to catch DataFrame analysis failures
1 parent da36c18 commit 4e5f33e

3 files changed

Lines changed: 61 additions & 53 deletions

File tree

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -644,9 +644,7 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
644644
}
645645
} else {
646646
lastJobFailed = false;
647-
// Note: we intentionally do NOT reset lastSqlFailed here. A successful Spark job
648-
// should not erase a prior SQL failure (e.g. from cleanup operations after a failed
649-
// spark.sql() call). The SQL failure is only relevant at finishApplication time.
647+
lastSqlFailed = false;
650648
}
651649

652650
SparkAggregatedTaskMetrics metrics = jobMetrics.remove(jobEnd.jobId());

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 28 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public String[] knownMatchingTypes() {
4040
"org.apache.spark.util.SparkClassUtils",
4141
"org.apache.spark.scheduler.LiveListenerBus",
4242
"org.apache.spark.sql.execution.SparkPlanInfo$",
43-
"org.apache.spark.sql.SparkSession"
43+
"org.apache.spark.sql.SparkSession",
44+
"org.apache.spark.sql.execution.QueryExecution"
4445
};
4546
}
4647

@@ -77,6 +78,14 @@ public void methodAdvice(MethodTransformer transformer) {
7778
.and(isDeclaredBy(named("org.apache.spark.sql.SparkSession"))),
7879
AbstractSparkInstrumentation.class.getName() + "$SparkSqlFailureAdvice");
7980

81+
// QueryExecution.assertAnalyzed() — catch all Catalyst analysis failures regardless of
82+
// entry point (SparkSession.sql, Dataset.select, Dataset.filter, etc.)
83+
transformer.applyAdvice(
84+
isMethod()
85+
.and(named("assertAnalyzed"))
86+
.and(isDeclaredBy(named("org.apache.spark.sql.execution.QueryExecution"))),
87+
AbstractSparkInstrumentation.class.getName() + "$QueryExecutionFailureAdvice");
88+
8089
// LiveListenerBus class is used to manage spark listeners
8190
transformer.applyAdvice(
8291
isMethod()
@@ -89,11 +98,6 @@ public void methodAdvice(MethodTransformer transformer) {
8998
public static class PrepareSubmitEnvAdvice {
9099
@Advice.OnMethodEnter(suppress = Throwable.class)
91100
public static void enter(@Advice.Argument(0) SparkSubmitArguments submitArgs) {
92-
System.err.println(
93-
"[DD-SPARK-DEBUG] PrepareSubmitEnvAdvice.enter: primaryResource="
94-
+ submitArgs.primaryResource()
95-
+ ", thread="
96-
+ Thread.currentThread().getName());
97101

98102
// Using pyspark `python script.py`, spark JVM is launched as PythonGatewayServer, which is
99103
// exited using System.exit(0), leading to the exit advice not being called
@@ -104,36 +108,20 @@ public static void enter(@Advice.Argument(0) SparkSubmitArguments submitArgs) {
104108

105109
// prepareSubmitEnvironment might be called before/after runMain depending on spark version
106110
AbstractDatadogSparkListener.finishTraceOnApplicationEnd = true;
107-
System.err.println(
108-
"[DD-SPARK-DEBUG] PrepareSubmitEnvAdvice: detected pyspark-shell, set isPysparkShell=true, finishTraceOnApplicationEnd=true");
109111
}
110112
}
111113
}
112114

113115
public static class RunMainAdvice {
114116
@Advice.OnMethodEnter(suppress = Throwable.class)
115117
public static void enter() {
116-
System.err.println(
117-
"[DD-SPARK-DEBUG] RunMainAdvice.enter: isPysparkShell="
118-
+ AbstractDatadogSparkListener.isPysparkShell
119-
+ ", thread="
120-
+ Thread.currentThread().getName());
121118
if (!AbstractDatadogSparkListener.isPysparkShell) {
122119
AbstractDatadogSparkListener.finishTraceOnApplicationEnd = false;
123120
}
124121
}
125122

126123
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
127124
public static void exit(@Advice.Thrown Throwable throwable) {
128-
System.err.println(
129-
"[DD-SPARK-DEBUG] RunMainAdvice.exit: throwable="
130-
+ (throwable != null
131-
? throwable.getClass().getName() + ": " + throwable.getMessage()
132-
: "null")
133-
+ ", listenerNull="
134-
+ (AbstractDatadogSparkListener.listener == null)
135-
+ ", thread="
136-
+ Thread.currentThread().getName());
137125
if (AbstractDatadogSparkListener.listener != null) {
138126
AbstractDatadogSparkListener.listener.finishApplication(
139127
System.currentTimeMillis(), throwable, 0, null);
@@ -146,15 +134,6 @@ public static void exit(@Advice.Thrown Throwable throwable) {
146134
public static class YarnFinishAdvice {
147135
@Advice.OnMethodEnter(suppress = Throwable.class)
148136
public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) String msg) {
149-
System.err.println(
150-
"[DD-SPARK-DEBUG] YarnFinishAdvice.enter: exitCode="
151-
+ exitCode
152-
+ ", msg="
153-
+ msg
154-
+ ", listenerNull="
155-
+ (AbstractDatadogSparkListener.listener == null)
156-
+ ", thread="
157-
+ Thread.currentThread().getName());
158137
if (AbstractDatadogSparkListener.listener != null) {
159138
AbstractDatadogSparkListener.listener.finishApplication(
160139
System.currentTimeMillis(), null, exitCode, msg);
@@ -163,19 +142,6 @@ public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) S
163142
}
164143

165144
public static class SparkSqlFailureAdvice {
166-
@Advice.OnMethodEnter(suppress = Throwable.class)
167-
public static void enter(@Advice.Argument(0) String sqlText) {
168-
System.err.println(
169-
"[DD-SPARK-DEBUG] SparkSqlFailureAdvice.enter: thread="
170-
+ Thread.currentThread().getName()
171-
+ ", sql="
172-
+ (sqlText != null && sqlText.length() > 200
173-
? sqlText.substring(0, 200) + "..."
174-
: sqlText)
175-
+ ", listenerNull="
176-
+ (AbstractDatadogSparkListener.listener == null));
177-
}
178-
179145
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
180146
public static void exit(@Advice.Thrown Throwable throwable) {
181147
if (throwable != null) {
@@ -190,15 +156,27 @@ public static void exit(@Advice.Thrown Throwable throwable) {
190156
+ (AbstractDatadogSparkListener.listener == null));
191157
if (AbstractDatadogSparkListener.listener != null) {
192158
AbstractDatadogSparkListener.listener.onSqlFailure(throwable);
193-
} else {
194-
System.err.println(
195-
"[DD-SPARK-DEBUG] SparkSqlFailureAdvice.exit: listener is null, cannot record SQL failure");
196159
}
197-
} else {
160+
}
161+
}
162+
}
163+
164+
public static class QueryExecutionFailureAdvice {
165+
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
166+
public static void exit(@Advice.Thrown Throwable throwable) {
167+
if (throwable != null) {
198168
System.err.println(
199-
"[DD-SPARK-DEBUG] SparkSqlFailureAdvice.exit: thread="
169+
"[DD-SPARK-DEBUG] QueryExecutionFailureAdvice.exit: thread="
200170
+ Thread.currentThread().getName()
201-
+ ", success (no throwable)");
171+
+ ", throwable="
172+
+ throwable.getClass().getName()
173+
+ ": "
174+
+ throwable.getMessage()
175+
+ ", listenerNull="
176+
+ (AbstractDatadogSparkListener.listener == null));
177+
if (AbstractDatadogSparkListener.listener != null) {
178+
AbstractDatadogSparkListener.listener.onSqlFailure(throwable);
179+
}
202180
}
203181
}
204182
}

dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,38 @@ abstract class AbstractSparkTest extends InstrumentationSpecification {
223223
}
224224
}
225225

226+
def "DataFrame analysis failure on unresolved column marks application span as error"() {
227+
setup:
228+
def sparkSession = SparkSession.builder()
229+
.config("spark.master", "local[2]")
230+
.getOrCreate()
231+
232+
try {
233+
// Triggers AnalysisException via Dataset.select() -> QueryExecution.assertAnalyzed(),
234+
// NOT through SparkSession.sql(). This exercises the QueryExecutionFailureAdvice.
235+
sparkSession.range(1).toDF("id").select("nonexistent_column")
236+
} catch (Exception ignored) {
237+
// Expected: AnalysisException thrown by Catalyst analysis
238+
}
239+
sparkSession.stop()
240+
241+
expect:
242+
assertTraces(1) {
243+
trace(1) {
244+
span {
245+
operationName "spark.application"
246+
resourceName "spark.application"
247+
spanType "spark"
248+
errored true
249+
parent()
250+
assert span.tags["error.type"] == "Spark SQL Failed"
251+
assert span.tags["error.message"] =~ /(?i).*nonexistent_column.*/
252+
assert span.tags["error.stack"] =~ /(?s).*AnalysisException.*/
253+
}
254+
}
255+
}
256+
}
257+
226258
def "capture SparkSubmit.runMain() errors"() {
227259
setup:
228260
def sparkSession = SparkSession.builder()

0 commit comments

Comments
 (0)