Skip to content

Commit 4b9b225

Browse files
committed
wait for throwable and let the span be finished by shutdown hook
1 parent 9c973ac commit 4b9b225

2 files changed

Lines changed: 44 additions & 4 deletions

File tree

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,12 @@ public void stateChanged(SparkAppHandle handle) {
9797
if (state == SparkAppHandle.State.FAILED
9898
|| state == SparkAppHandle.State.KILLED
9999
|| state == SparkAppHandle.State.LOST) {
100-
finishSpan(true, "Application " + state);
100+
// Set error tags but don't finish yet — RunMainAdvice may add the throwable
101+
// with the full stack trace. The span will be finished by RunMainAdvice or
102+
// the shutdown hook.
103+
span.setError(true);
104+
span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed");
105+
span.setTag(DDTags.ERROR_MSG, "Application " + state);
101106
} else {
102107
finishSpan(false, null);
103108
}

dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ class SparkLauncherTest extends InstrumentationSpecification {
130130
}
131131
}
132132

133-
def "SparkLauncherListener finishes span with error on FAILED state"() {
133+
def "SparkLauncherListener sets error tags on FAILED state but does not finish span"() {
134134
setup:
135135
SparkLauncherListener.launcherSpan = null
136136
def tracer = AgentTracer.get()
@@ -150,6 +150,40 @@ class SparkLauncherTest extends InstrumentationSpecification {
150150
handle.getAppId() >> "app-456"
151151
listener.stateChanged(handle)
152152

153+
then:
154+
// Span stays open so RunMainAdvice can add the throwable
155+
SparkLauncherListener.launcherSpan != null
156+
SparkLauncherListener.launcherSpan.isError()
157+
SparkLauncherListener.launcherSpan.getTags()["error.type"] == "Spark Launcher Failed"
158+
SparkLauncherListener.launcherSpan.getTags()["error.message"] == "Application FAILED"
159+
SparkLauncherListener.launcherSpan.getTags()["spark.app_id"] == "app-456"
160+
161+
cleanup:
162+
SparkLauncherListener.finishSpan(false, null)
163+
}
164+
165+
def "finishSpanWithThrowable adds stack trace after FAILED state"() {
166+
setup:
167+
SparkLauncherListener.launcherSpan = null
168+
def tracer = AgentTracer.get()
169+
SparkLauncherListener.launcherSpan = tracer
170+
.buildSpan("spark.launcher.launch")
171+
.withSpanType("spark")
172+
.withResourceName("SparkLauncher.startApplication")
173+
.start()
174+
SparkLauncherListener.launcherSpan.setSamplingPriority(
175+
PrioritySampling.USER_KEEP,
176+
SamplingMechanism.DATA_JOBS)
177+
def listener = new SparkLauncherListener()
178+
def handle = Mock(SparkAppHandle)
179+
180+
when:
181+
// Simulate: listener sets error tags, then RunMainAdvice finishes with throwable
182+
handle.getState() >> SparkAppHandle.State.FAILED
183+
handle.getAppId() >> "app-456"
184+
listener.stateChanged(handle)
185+
SparkLauncherListener.finishSpanWithThrowable(new RuntimeException("job crashed"))
186+
153187
then:
154188
SparkLauncherListener.launcherSpan == null
155189
assertTraces(1) {
@@ -158,8 +192,9 @@ class SparkLauncherTest extends InstrumentationSpecification {
158192
operationName "spark.launcher.launch"
159193
spanType "spark"
160194
errored true
161-
assert span.tags["error.type"] == "Spark Launcher Failed"
162-
assert span.tags["error.message"] == "Application FAILED"
195+
assert span.tags["error.type"] == "java.lang.RuntimeException"
196+
assert span.tags["error.message"] == "job crashed"
197+
assert span.tags["error.stack"] != null
163198
assert span.tags["spark.app_id"] == "app-456"
164199
}
165200
}

0 commit comments

Comments
 (0)