Skip to content

Commit 559ce6c

Browse files
committed
remove SparkLauncher.launch() instrumentation
1 parent 3f0d8a0 commit 559ce6c

5 files changed

Lines changed: 73 additions & 93 deletions

File tree

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -106,17 +106,6 @@ public static void exit(@Advice.Thrown Throwable throwable) {
106106
if (AbstractDatadogSparkListener.listener != null) {
107107
AbstractDatadogSparkListener.listener.finishApplication(
108108
System.currentTimeMillis(), throwable, 0, null);
109-
} else {
110-
try {
111-
Class<?> adviceClass =
112-
Thread.currentThread()
113-
.getContextClassLoader()
114-
.loadClass("datadog.trace.instrumentation.spark.SparkLauncherAdvice");
115-
java.lang.reflect.Method finishMethod =
116-
adviceClass.getMethod("finishLauncherSpan", Throwable.class);
117-
finishMethod.invoke(null, throwable);
118-
} catch (Exception ignored) {
119-
}
120109
}
121110
}
122111
}

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkExitAdvice.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,6 @@ public static void enter(@Advice.Argument(0) int exitCode) {
2121
.getMethod(
2222
"finishApplication", long.class, Throwable.class, int.class, String.class);
2323
method.invoke(datadogListener, System.currentTimeMillis(), null, exitCode, null);
24-
} else {
25-
// No Spark listener means we may be in a launcher-only process (e.g. SparkLauncher on EMR)
26-
Class<?> adviceClass =
27-
Thread.currentThread()
28-
.getContextClassLoader()
29-
.loadClass("datadog.trace.instrumentation.spark.SparkLauncherAdvice");
30-
Method finishMethod = adviceClass.getDeclaredMethod("finishLauncherSpan", int.class);
31-
finishMethod.invoke(null, exitCode);
3224
}
3325
} catch (Exception ignored) {
3426
}

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java

Lines changed: 13 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import datadog.trace.api.sampling.SamplingMechanism;
66
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
77
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
8-
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
98
import java.lang.reflect.Field;
109
import java.util.Map;
1110
import java.util.regex.Pattern;
@@ -22,9 +21,7 @@ public class SparkLauncherAdvice {
2221
private static final Pattern CONF_REDACTION_PATTERN =
2322
Pattern.compile("(?i)secret|password|token|access.key|api.key");
2423

25-
/** The launcher span, accessible from SparkExitAdvice via reflection. */
26-
@SuppressFBWarnings("PA_PUBLIC_PRIMITIVE_ATTRIBUTE")
27-
public static volatile AgentSpan launcherSpan;
24+
static volatile AgentSpan launcherSpan;
2825

2926
private static volatile boolean shutdownHookRegistered = false;
3027

@@ -89,7 +86,7 @@ private static void setStringFieldAsTag(
8986
}
9087
}
9188

92-
public static synchronized void createLauncherSpan(String resource, Object launcher) {
89+
static synchronized void createLauncherSpan(Object launcher) {
9390
if (launcherSpan != null) {
9491
return;
9592
}
@@ -99,14 +96,10 @@ public static synchronized void createLauncherSpan(String resource, Object launc
9996
tracer
10097
.buildSpan("spark.launcher.launch")
10198
.withSpanType("spark")
102-
.withResourceName(resource)
99+
.withResourceName("SparkLauncher.startApplication")
103100
.start();
104101
span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS);
105-
106-
if (launcher != null) {
107-
setLauncherConfigTags(span, launcher);
108-
}
109-
102+
setLauncherConfigTags(span, launcher);
110103
launcherSpan = span;
111104

112105
if (!shutdownHookRegistered) {
@@ -127,20 +120,20 @@ public static synchronized void createLauncherSpan(String resource, Object launc
127120
}
128121
}
129122

130-
public static synchronized void finishLauncherSpan(int exitCode) {
123+
static synchronized void finishSpan(boolean isError, String errorType) {
131124
AgentSpan span = launcherSpan;
132125
if (span == null) {
133126
return;
134127
}
135-
if (exitCode != 0) {
128+
if (isError) {
136129
span.setError(true);
137-
span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed with exit code " + exitCode);
130+
span.setTag(DDTags.ERROR_TYPE, errorType);
138131
}
139132
span.finish();
140133
launcherSpan = null;
141134
}
142135

143-
public static synchronized void finishLauncherSpan(Throwable throwable) {
136+
static synchronized void finishSpanWithThrowable(Throwable throwable) {
144137
AgentSpan span = launcherSpan;
145138
if (span == null) {
146139
return;
@@ -158,15 +151,10 @@ public static void exit(
158151
@Advice.This Object launcher,
159152
@Advice.Return SparkAppHandle handle,
160153
@Advice.Thrown Throwable throwable) {
161-
createLauncherSpan("SparkLauncher.startApplication", launcher);
154+
createLauncherSpan(launcher);
162155

163156
if (throwable != null) {
164-
AgentSpan span = launcherSpan;
165-
if (span != null) {
166-
span.addThrowable(throwable);
167-
span.finish();
168-
launcherSpan = null;
169-
}
157+
finishSpanWithThrowable(throwable);
170158
return;
171159
}
172160

@@ -180,22 +168,6 @@ public static void exit(
180168
}
181169
}
182170

183-
public static class LaunchAdvice {
184-
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
185-
public static void exit(@Advice.This Object launcher, @Advice.Thrown Throwable throwable) {
186-
createLauncherSpan("SparkLauncher.launch", launcher);
187-
188-
if (throwable != null) {
189-
AgentSpan span = launcherSpan;
190-
if (span != null) {
191-
span.addThrowable(throwable);
192-
span.finish();
193-
launcherSpan = null;
194-
}
195-
}
196-
}
197-
}
198-
199171
static class AppHandleListener implements SparkAppHandle.Listener {
200172
@Override
201173
public void stateChanged(SparkAppHandle handle) {
@@ -213,8 +185,9 @@ public void stateChanged(SparkAppHandle handle) {
213185
if (state == SparkAppHandle.State.FAILED
214186
|| state == SparkAppHandle.State.KILLED
215187
|| state == SparkAppHandle.State.LOST) {
216-
span.setError(true);
217-
span.setTag(DDTags.ERROR_TYPE, "Spark Application " + state);
188+
finishSpan(true, "Spark Application " + state);
189+
} else {
190+
finishSpan(false, null);
218191
}
219192
}
220193
}

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,5 @@ public void methodAdvice(MethodTransformer transformer) {
4343
.and(named("startApplication"))
4444
.and(isDeclaredBy(named("org.apache.spark.launcher.SparkLauncher"))),
4545
packageName + ".SparkLauncherAdvice$StartApplicationAdvice");
46-
47-
transformer.applyAdvice(
48-
isMethod()
49-
.and(named("launch"))
50-
.and(isDeclaredBy(named("org.apache.spark.launcher.SparkLauncher"))),
51-
packageName + ".SparkLauncherAdvice$LaunchAdvice");
5246
}
5347
}

dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy

Lines changed: 60 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ import datadog.trace.agent.test.InstrumentationSpecification
44
import datadog.trace.api.sampling.PrioritySampling
55
import datadog.trace.api.sampling.SamplingMechanism
66
import datadog.trace.bootstrap.instrumentation.api.AgentTracer
7+
import org.apache.spark.launcher.SparkAppHandle
78
import org.apache.spark.launcher.SparkLauncher
89

910
class SparkLauncherTest extends InstrumentationSpecification {
1011

11-
def "createLauncherSpan extracts config tags from SparkLauncher"() {
12+
def "StartApplicationAdvice extracts config tags from SparkLauncher"() {
1213
setup:
1314
SparkLauncherAdvice.launcherSpan = null
1415

@@ -21,8 +22,10 @@ class SparkLauncherTest extends InstrumentationSpecification {
2122
.setAppResource("/path/to/app.jar")
2223
.setConf("spark.executor.memory", "4g")
2324
.setConf("spark.executor.instances", "10")
24-
SparkLauncherAdvice.createLauncherSpan("SparkLauncher.startApplication", launcher)
25-
SparkLauncherAdvice.finishLauncherSpan(0)
25+
// Call the advice directly (handle=null, throwable=null simulates a successful call
26+
// where no SparkAppHandle is returned — span stays open for shutdown hook)
27+
SparkLauncherAdvice.createLauncherSpan(launcher)
28+
SparkLauncherAdvice.finishSpan(false, null)
2629

2730
then:
2831
assertTraces(1) {
@@ -44,7 +47,7 @@ class SparkLauncherTest extends InstrumentationSpecification {
4447
}
4548
}
4649

47-
def "createLauncherSpan redacts sensitive conf values"() {
50+
def "StartApplicationAdvice redacts sensitive conf values"() {
4851
setup:
4952
SparkLauncherAdvice.launcherSpan = null
5053

@@ -54,8 +57,8 @@ class SparkLauncherTest extends InstrumentationSpecification {
5457
.setConf("spark.app.name", "my-secret-app")
5558
// spark.master is allowlisted; its value is harmless so should pass through
5659
.setConf("spark.master", "yarn")
57-
SparkLauncherAdvice.createLauncherSpan("SparkLauncher.startApplication", launcher)
58-
SparkLauncherAdvice.finishLauncherSpan(0)
60+
SparkLauncherAdvice.createLauncherSpan(launcher)
61+
SparkLauncherAdvice.finishSpan(false, null)
5962

6063
then:
6164
assertTraces(1) {
@@ -72,64 +75,93 @@ class SparkLauncherTest extends InstrumentationSpecification {
7275
}
7376
}
7477

75-
def "generate spark.launcher span from startApplication"() {
78+
def "finishSpanWithThrowable finishes span with error"() {
7679
setup:
7780
SparkLauncherAdvice.launcherSpan = null
7881

7982
when:
83+
def launcher = new SparkLauncher().setAppName("test-app")
84+
SparkLauncherAdvice.createLauncherSpan(launcher)
85+
SparkLauncherAdvice.finishSpanWithThrowable(new RuntimeException("startApplication failed"))
86+
87+
then:
88+
SparkLauncherAdvice.launcherSpan == null
89+
assertTraces(1) {
90+
trace(1) {
91+
span {
92+
operationName "spark.launcher.launch"
93+
spanType "spark"
94+
errored true
95+
}
96+
}
97+
}
98+
}
99+
100+
def "AppHandleListener finishes span on final state FINISHED"() {
101+
setup:
102+
SparkLauncherAdvice.launcherSpan = null
80103
def tracer = AgentTracer.get()
81-
def launcherSpan = tracer
82-
.buildSpan("spark.launcher")
104+
SparkLauncherAdvice.launcherSpan = tracer
105+
.buildSpan("spark.launcher.launch")
83106
.withSpanType("spark")
84107
.withResourceName("SparkLauncher.startApplication")
85108
.start()
86-
launcherSpan.setSamplingPriority(
109+
SparkLauncherAdvice.launcherSpan.setSamplingPriority(
87110
PrioritySampling.USER_KEEP,
88111
SamplingMechanism.DATA_JOBS)
89-
SparkLauncherAdvice.launcherSpan = launcherSpan
112+
def listener = new SparkLauncherAdvice.AppHandleListener()
113+
def handle = Mock(SparkAppHandle)
90114

91-
SparkLauncherAdvice.finishLauncherSpan(1)
115+
when:
116+
handle.getState() >> SparkAppHandle.State.FINISHED
117+
handle.getAppId() >> "app-123"
118+
listener.stateChanged(handle)
92119

93120
then:
121+
SparkLauncherAdvice.launcherSpan == null
94122
assertTraces(1) {
95123
trace(1) {
96124
span {
97-
operationName "spark.launcher"
125+
operationName "spark.launcher.launch"
98126
spanType "spark"
99-
resourceName "SparkLauncher.startApplication"
100-
errored true
101-
assert span.tags["error.type"] == "Spark Launcher Failed with exit code 1"
127+
errored false
128+
assert span.tags["spark.app_id"] == "app-123"
129+
assert span.tags["spark.launcher.app_state"] == "FINISHED"
102130
}
103131
}
104132
}
105133
}
106134

107-
def "generate spark.launcher span with successful exit"() {
135+
def "AppHandleListener finishes span with error on FAILED state"() {
108136
setup:
109137
SparkLauncherAdvice.launcherSpan = null
110-
111-
when:
112138
def tracer = AgentTracer.get()
113-
def launcherSpan = tracer
114-
.buildSpan("spark.launcher")
139+
SparkLauncherAdvice.launcherSpan = tracer
140+
.buildSpan("spark.launcher.launch")
115141
.withSpanType("spark")
116-
.withResourceName("SparkLauncher.launch")
142+
.withResourceName("SparkLauncher.startApplication")
117143
.start()
118-
launcherSpan.setSamplingPriority(
144+
SparkLauncherAdvice.launcherSpan.setSamplingPriority(
119145
PrioritySampling.USER_KEEP,
120146
SamplingMechanism.DATA_JOBS)
121-
SparkLauncherAdvice.launcherSpan = launcherSpan
147+
def listener = new SparkLauncherAdvice.AppHandleListener()
148+
def handle = Mock(SparkAppHandle)
122149

123-
SparkLauncherAdvice.finishLauncherSpan(0)
150+
when:
151+
handle.getState() >> SparkAppHandle.State.FAILED
152+
handle.getAppId() >> "app-456"
153+
listener.stateChanged(handle)
124154

125155
then:
156+
SparkLauncherAdvice.launcherSpan == null
126157
assertTraces(1) {
127158
trace(1) {
128159
span {
129-
operationName "spark.launcher"
160+
operationName "spark.launcher.launch"
130161
spanType "spark"
131-
resourceName "SparkLauncher.launch"
132-
errored false
162+
errored true
163+
assert span.tags["error.type"] == "Spark Application FAILED"
164+
assert span.tags["spark.app_id"] == "app-456"
133165
}
134166
}
135167
}

0 commit comments

Comments
 (0)