Skip to content

Commit 66cec29

Browse files
committed
Instrument QueryExecution.assertAnalyzed() to catch DataFrame analysis failures
1 parent 6880c80 commit 66cec29

7 files changed

Lines changed: 200 additions & 8 deletions

File tree

dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/muzzle/MuzzleCheck.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,32 @@ public boolean matches(ClassLoader classLoader) {
3131
boolean muzzleMatches = muzzle().matches(classLoader);
3232
if (muzzleMatches) {
3333
InstrumenterState.applyInstrumentation(classLoader, instrumentationId);
34+
if (instrumentationClass.contains("spark") || instrumentationClass.contains("Spark")) {
35+
System.err.println(
36+
"[DD-SPARK-DEBUG] MuzzleCheck PASSED: "
37+
+ InstrumenterState.describe(instrumentationId)
38+
+ " classloader="
39+
+ classLoader);
40+
}
3441
} else {
3542
InstrumenterState.blockInstrumentation(classLoader, instrumentationId);
43+
if (instrumentationClass.contains("spark") || instrumentationClass.contains("Spark")) {
44+
final List<Reference.Mismatch> mismatches =
45+
muzzle.getMismatchedReferenceSources(classLoader);
46+
System.err.println(
47+
"[DD-SPARK-DEBUG] MuzzleCheck FAILED: "
48+
+ InstrumenterState.describe(instrumentationId)
49+
+ " classloader="
50+
+ classLoader);
51+
for (final Reference.Mismatch mismatch : mismatches) {
52+
System.err.println(
53+
"[DD-SPARK-DEBUG] MuzzleCheck mismatch: "
54+
+ InstrumenterState.describe(instrumentationId)
55+
+ " muzzle.mismatch=\""
56+
+ mismatch
57+
+ "\"");
58+
}
59+
}
3660
if (log.isDebugEnabled()) {
3761
final List<Reference.Mismatch> mismatches =
3862
muzzle.getMismatchedReferenceSources(classLoader);

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,17 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
173173
AgentThreadFactory.newAgentThread(
174174
AgentThreadFactory.AgentThread.DATA_JOBS_MONITORING_SHUTDOWN_HOOK,
175175
() -> {
176+
System.err.println(
177+
"[DD-SPARK-DEBUG] shutdownHook: applicationEnded="
178+
+ applicationEnded
179+
+ ", lastJobFailed="
180+
+ lastJobFailed
181+
+ ", lastSqlFailed="
182+
+ lastSqlFailed
183+
+ ", thread="
184+
+ Thread.currentThread().getName());
176185
if (!applicationEnded) {
177-
log.info("Finishing application trace from shutdown hook");
186+
System.err.println("[DD-SPARK-DEBUG] shutdownHook: calling finishApplication");
178187
finishApplication(System.currentTimeMillis(), null, 0, null);
179188
}
180189
}));
@@ -319,7 +328,18 @@ private void captureOpenlineageContextIfPresent(
319328
* has an error signal even when no job/stage/task events fire.
320329
*/
321330
public synchronized void onSqlFailure(Throwable throwable) {
331+
System.err.println(
332+
"[DD-SPARK-DEBUG] onSqlFailure: applicationEnded="
333+
+ applicationEnded
334+
+ ", throwable="
335+
+ throwable.getClass().getName()
336+
+ ": "
337+
+ throwable.getMessage()
338+
+ ", thread="
339+
+ Thread.currentThread().getName());
340+
322341
if (applicationEnded) {
342+
System.err.println("[DD-SPARK-DEBUG] onSqlFailure: skipping because applicationEnded=true");
323343
return;
324344
}
325345
lastSqlFailed = true;
@@ -328,13 +348,19 @@ public synchronized void onSqlFailure(Throwable throwable) {
328348
StringWriter sw = new StringWriter();
329349
throwable.printStackTrace(new PrintWriter(sw));
330350
lastSqlFailedStackTrace = sw.toString();
351+
352+
System.err.println("[DD-SPARK-DEBUG] onSqlFailure: lastSqlFailed set to true");
331353
}
332354

333355
@Override
334356
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
335-
log.info(
336-
"Received spark application end event, finish trace on this event: {}",
337-
finishTraceOnApplicationEnd);
357+
System.err.println(
358+
"[DD-SPARK-DEBUG] onApplicationEnd: finishTraceOnApplicationEnd="
359+
+ finishTraceOnApplicationEnd
360+
+ ", applicationEnded="
361+
+ applicationEnded
362+
+ ", thread="
363+
+ Thread.currentThread().getName());
338364
notifyOl(x -> openLineageSparkListener.onApplicationEnd(x), applicationEnd);
339365

340366
if (finishTraceOnApplicationEnd) {
@@ -346,9 +372,29 @@ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
346372
// the signature of this function is changed
347373
public synchronized void finishApplication(
348374
long time, Throwable throwable, int exitCode, String msg) {
349-
log.info("Finishing spark application trace");
375+
System.err.println(
376+
"[DD-SPARK-DEBUG] finishApplication: thread="
377+
+ Thread.currentThread().getName()
378+
+ ", applicationEnded="
379+
+ applicationEnded
380+
+ ", throwable="
381+
+ (throwable != null
382+
? throwable.getClass().getName() + ": " + throwable.getMessage()
383+
: "null")
384+
+ ", exitCode="
385+
+ exitCode
386+
+ ", msg="
387+
+ msg
388+
+ ", lastJobFailed="
389+
+ lastJobFailed
390+
+ ", lastSqlFailed="
391+
+ lastSqlFailed
392+
+ ", jobCount="
393+
+ jobCount);
350394

351395
if (applicationEnded) {
396+
System.err.println(
397+
"[DD-SPARK-DEBUG] finishApplication: skipping because applicationEnded=true (duplicate call)");
352398
return;
353399
}
354400
applicationEnded = true;
@@ -361,9 +407,12 @@ public synchronized void finishApplication(
361407

362408
initApplicationSpanIfNotInitialized();
363409

410+
String errorDecision;
364411
if (throwable != null) {
412+
errorDecision = "throwable";
365413
applicationSpan.addThrowable(throwable);
366414
} else if (exitCode != 0) {
415+
errorDecision = "exitCode=" + exitCode;
367416
applicationSpan.setError(true);
368417
applicationSpan.setTag(
369418
DDTags.ERROR_TYPE, "Spark Application Failed with exit code " + exitCode);
@@ -372,16 +421,21 @@ public synchronized void finishApplication(
372421
applicationSpan.setTag(DDTags.ERROR_MSG, errorMessage);
373422
applicationSpan.setTag(DDTags.ERROR_STACK, msg);
374423
} else if (lastJobFailed) {
424+
errorDecision = "lastJobFailed";
375425
applicationSpan.setError(true);
376426
applicationSpan.setTag(DDTags.ERROR_TYPE, "Spark Application Failed");
377427
applicationSpan.setTag(DDTags.ERROR_MSG, lastJobFailedMessage);
378428
applicationSpan.setTag(DDTags.ERROR_STACK, lastJobFailedStackTrace);
379429
} else if (lastSqlFailed) {
430+
errorDecision = "lastSqlFailed";
380431
applicationSpan.setError(true);
381432
applicationSpan.setTag(DDTags.ERROR_TYPE, "Spark SQL Failed");
382433
applicationSpan.setTag(DDTags.ERROR_MSG, lastSqlFailedMessage);
383434
applicationSpan.setTag(DDTags.ERROR_STACK, lastSqlFailedStackTrace);
435+
} else {
436+
errorDecision = "none (SUCCESS)";
384437
}
438+
System.err.println("[DD-SPARK-DEBUG] finishApplication: errorDecision=" + errorDecision);
385439

386440
applicationMetrics.setSpanMetrics(applicationSpan);
387441
applicationSpan.setMetric("spark.max_executor_count", maxExecutorCount);
@@ -486,6 +540,13 @@ private AgentSpan getOrCreateSqlSpan(
486540

487541
@Override
488542
public synchronized void onJobStart(SparkListenerJobStart jobStart) {
543+
System.err.println(
544+
"[DD-SPARK-DEBUG] onJobStart: jobId="
545+
+ jobStart.jobId()
546+
+ ", stageCount="
547+
+ getStageCount(jobStart)
548+
+ ", thread="
549+
+ Thread.currentThread().getName());
489550
jobCount++;
490551
if (jobSpans.size() > MAX_COLLECTION_SIZE) {
491552
return;
@@ -544,8 +605,22 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
544605

545606
@Override
546607
public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
608+
System.err.println(
609+
"[DD-SPARK-DEBUG] onJobEnd: jobId="
610+
+ jobEnd.jobId()
611+
+ ", result="
612+
+ jobEnd.jobResult().getClass().getSimpleName()
613+
+ ", lastJobFailed="
614+
+ lastJobFailed
615+
+ ", lastSqlFailed="
616+
+ lastSqlFailed
617+
+ ", thread="
618+
+ Thread.currentThread().getName());
619+
547620
AgentSpan jobSpan = jobSpans.remove(jobEnd.jobId());
548621
if (jobSpan == null) {
622+
System.err.println(
623+
"[DD-SPARK-DEBUG] onJobEnd: no span found for jobId=" + jobEnd.jobId() + ", skipping");
549624
return;
550625
}
551626

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public String[] knownMatchingTypes() {
4040
"org.apache.spark.util.SparkClassUtils",
4141
"org.apache.spark.scheduler.LiveListenerBus",
4242
"org.apache.spark.sql.execution.SparkPlanInfo$",
43-
"org.apache.spark.sql.SparkSession"
43+
"org.apache.spark.sql.SparkSession",
44+
"org.apache.spark.sql.execution.QueryExecution"
4445
};
4546
}
4647

@@ -77,6 +78,14 @@ public void methodAdvice(MethodTransformer transformer) {
7778
.and(isDeclaredBy(named("org.apache.spark.sql.SparkSession"))),
7879
AbstractSparkInstrumentation.class.getName() + "$SparkSqlFailureAdvice");
7980

81+
// QueryExecution.assertAnalyzed() — catch all Catalyst analysis failures regardless of
82+
// entry point (SparkSession.sql, Dataset.select, Dataset.filter, etc.)
83+
transformer.applyAdvice(
84+
isMethod()
85+
.and(named("assertAnalyzed"))
86+
.and(isDeclaredBy(named("org.apache.spark.sql.execution.QueryExecution"))),
87+
AbstractSparkInstrumentation.class.getName() + "$QueryExecutionFailureAdvice");
88+
8089
// LiveListenerBus class is used to manage spark listeners
8190
transformer.applyAdvice(
8291
isMethod()
@@ -135,8 +144,39 @@ public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) S
135144
public static class SparkSqlFailureAdvice {
136145
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
137146
public static void exit(@Advice.Thrown Throwable throwable) {
138-
if (throwable != null && AbstractDatadogSparkListener.listener != null) {
139-
AbstractDatadogSparkListener.listener.onSqlFailure(throwable);
147+
if (throwable != null) {
148+
System.err.println(
149+
"[DD-SPARK-DEBUG] SparkSqlFailureAdvice.exit: thread="
150+
+ Thread.currentThread().getName()
151+
+ ", throwable="
152+
+ throwable.getClass().getName()
153+
+ ": "
154+
+ throwable.getMessage()
155+
+ ", listenerNull="
156+
+ (AbstractDatadogSparkListener.listener == null));
157+
if (AbstractDatadogSparkListener.listener != null) {
158+
AbstractDatadogSparkListener.listener.onSqlFailure(throwable);
159+
}
160+
}
161+
}
162+
}
163+
164+
public static class QueryExecutionFailureAdvice {
165+
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
166+
public static void exit(@Advice.Thrown Throwable throwable) {
167+
if (throwable != null) {
168+
System.err.println(
169+
"[DD-SPARK-DEBUG] QueryExecutionFailureAdvice.exit: thread="
170+
+ Thread.currentThread().getName()
171+
+ ", throwable="
172+
+ throwable.getClass().getName()
173+
+ ": "
174+
+ throwable.getMessage()
175+
+ ", listenerNull="
176+
+ (AbstractDatadogSparkListener.listener == null));
177+
if (AbstractDatadogSparkListener.listener != null) {
178+
AbstractDatadogSparkListener.listener.onSqlFailure(throwable);
179+
}
140180
}
141181
}
142182
}

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkExitAdvice.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ public static void enter(@Advice.Argument(0) int exitCode) {
1414
.getContextClassLoader()
1515
.loadClass("datadog.trace.instrumentation.spark.AbstractDatadogSparkListener");
1616
Object datadogListener = klass.getDeclaredField("listener").get(null);
17+
System.err.println(
18+
"[DD-SPARK-DEBUG] SparkExitAdvice.enter: exitCode="
19+
+ exitCode
20+
+ ", listenerNull="
21+
+ (datadogListener == null)
22+
+ ", thread="
23+
+ Thread.currentThread().getName());
1724
if (datadogListener != null) {
1825
Method method =
1926
datadogListener

dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,38 @@ abstract class AbstractSparkTest extends InstrumentationSpecification {
223223
}
224224
}
225225

226+
def "DataFrame analysis failure on unresolved column marks application span as error"() {
227+
setup:
228+
def sparkSession = SparkSession.builder()
229+
.config("spark.master", "local[2]")
230+
.getOrCreate()
231+
232+
try {
233+
// Triggers AnalysisException via Dataset.select() -> QueryExecution.assertAnalyzed(),
234+
// NOT through SparkSession.sql(). This exercises the QueryExecutionFailureAdvice.
235+
sparkSession.range(1).toDF("id").select("nonexistent_column")
236+
} catch (Exception ignored) {
237+
// Expected: AnalysisException thrown by Catalyst analysis
238+
}
239+
sparkSession.stop()
240+
241+
expect:
242+
assertTraces(1) {
243+
trace(1) {
244+
span {
245+
operationName "spark.application"
246+
resourceName "spark.application"
247+
spanType "spark"
248+
errored true
249+
parent()
250+
assert span.tags["error.type"] == "Spark SQL Failed"
251+
assert span.tags["error.message"] =~ /(?i).*nonexistent_column.*/
252+
assert span.tags["error.stack"] =~ /(?s).*AnalysisException.*/
253+
}
254+
}
255+
}
256+
}
257+
226258
def "capture SparkSubmit.runMain() errors"() {
227259
setup:
228260
def sparkSession = SparkSession.builder()

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,13 @@ public static void enter(@Advice.This SparkContext sparkContext) {
9494
AbstractDatadogSparkListener.listener =
9595
new DatadogSpark212Listener(
9696
sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version());
97+
System.err.println(
98+
"[DD-SPARK-DEBUG] listener registered: class="
99+
+ AbstractDatadogSparkListener.listener.getClass().getName()
100+
+ ", appId="
101+
+ sparkContext.applicationId()
102+
+ ", thread="
103+
+ Thread.currentThread().getName());
97104
sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener);
98105
}
99106
}

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,13 @@ public static void enter(@Advice.This SparkContext sparkContext) {
9595
AbstractDatadogSparkListener.listener =
9696
new DatadogSpark213Listener(
9797
sparkContext.getConf(), sparkContext.applicationId(), sparkContext.version());
98+
System.err.println(
99+
"[DD-SPARK-DEBUG] listener registered: class="
100+
+ AbstractDatadogSparkListener.listener.getClass().getName()
101+
+ ", appId="
102+
+ sparkContext.applicationId()
103+
+ ", thread="
104+
+ Thread.currentThread().getName());
98105
sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener);
99106
}
100107
}

0 commit comments

Comments
 (0)