Skip to content

Commit 45057c5

Browse files
committed
initial spark launcher instrumentation
1 parent daf2c01 commit 45057c5

5 files changed

Lines changed: 273 additions & 0 deletions

File tree

dd-java-agent/instrumentation/spark/spark-common/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ configurations.configureEach {
1010
dependencies {
1111
compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
1212
compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
13+
compileOnly group: 'org.apache.spark', name: 'spark-launcher_2.12', version: '2.4.0'
1314

1415
testFixturesImplementation group: 'com.datadoghq', name: 'sketches-java', version: '0.8.2'
1516
testFixturesImplementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.14.0'
@@ -21,6 +22,7 @@ dependencies {
2122
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
2223
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
2324
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-yarn_2.12', version: '2.4.0'
25+
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-launcher_2.12', version: '2.4.0'
2426

2527
testFixturesCompileOnly(libs.bundles.groovy)
2628
testFixturesCompileOnly(libs.bundles.spock)

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkExitAdvice.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ public static void enter(@Advice.Argument(0) int exitCode) {
2121
.getMethod(
2222
"finishApplication", long.class, Throwable.class, int.class, String.class);
2323
method.invoke(datadogListener, System.currentTimeMillis(), null, exitCode, null);
24+
} else {
25+
// No Spark listener means we may be in a launcher-only process (e.g. SparkLauncher on EMR)
26+
Class<?> adviceClass =
27+
Thread.currentThread()
28+
.getContextClassLoader()
29+
.loadClass("datadog.trace.instrumentation.spark.SparkLauncherAdvice");
30+
Method finishMethod = adviceClass.getDeclaredMethod("finishLauncherSpan", int.class);
31+
finishMethod.invoke(null, exitCode);
2432
}
2533
} catch (Exception ignored) {
2634
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
4+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
5+
import net.bytebuddy.asm.Advice;
6+
import org.apache.spark.launcher.SparkAppHandle;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
class SparkLauncherAdvice {
11+
12+
private static final Logger log = LoggerFactory.getLogger(SparkLauncherAdvice.class);
13+
14+
/** The launcher span, accessible from SparkExitAdvice via reflection. */
15+
static volatile AgentSpan launcherSpan;
16+
17+
private static volatile boolean shutdownHookRegistered = false;
18+
19+
private static synchronized void createLauncherSpan(String resource) {
20+
if (launcherSpan != null) {
21+
return;
22+
}
23+
24+
AgentTracer.TracerAPI tracer = AgentTracer.get();
25+
AgentSpan span =
26+
tracer
27+
.buildSpan("spark.launcher")
28+
.withSpanType("spark")
29+
.withResourceName(resource)
30+
.start();
31+
span.setSamplingPriority(
32+
datadog.trace.api.sampling.PrioritySampling.USER_KEEP,
33+
datadog.trace.api.sampling.SamplingMechanism.DATA_JOBS);
34+
launcherSpan = span;
35+
36+
if (!shutdownHookRegistered) {
37+
shutdownHookRegistered = true;
38+
Runtime.getRuntime()
39+
.addShutdownHook(
40+
new Thread(
41+
() -> {
42+
AgentSpan s = launcherSpan;
43+
if (s != null) {
44+
log.info("Finishing spark.launcher span from shutdown hook");
45+
s.finish();
46+
launcherSpan = null;
47+
}
48+
}));
49+
}
50+
}
51+
52+
static synchronized void finishLauncherSpan(int exitCode) {
53+
AgentSpan span = launcherSpan;
54+
if (span == null) {
55+
return;
56+
}
57+
if (exitCode != 0) {
58+
span.setError(true);
59+
span.setTag("error.type", "Launcher process failed with exit code " + exitCode);
60+
}
61+
span.finish();
62+
launcherSpan = null;
63+
}
64+
65+
public static class StartApplicationAdvice {
66+
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
67+
public static void exit(
68+
@Advice.Return SparkAppHandle handle, @Advice.Thrown Throwable throwable) {
69+
createLauncherSpan("SparkLauncher.startApplication");
70+
71+
if (throwable != null) {
72+
AgentSpan span = launcherSpan;
73+
if (span != null) {
74+
span.setError(true);
75+
span.addThrowable(throwable);
76+
span.finish();
77+
launcherSpan = null;
78+
}
79+
return;
80+
}
81+
82+
if (handle != null) {
83+
try {
84+
handle.addListener(new AppHandleListener());
85+
} catch (Exception e) {
86+
log.debug("Failed to register SparkAppHandle listener", e);
87+
}
88+
}
89+
}
90+
}
91+
92+
public static class LaunchAdvice {
93+
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
94+
public static void exit(@Advice.Thrown Throwable throwable) {
95+
createLauncherSpan("SparkLauncher.launch");
96+
97+
if (throwable != null) {
98+
AgentSpan span = launcherSpan;
99+
if (span != null) {
100+
span.setError(true);
101+
span.addThrowable(throwable);
102+
span.finish();
103+
launcherSpan = null;
104+
}
105+
}
106+
}
107+
}
108+
109+
static class AppHandleListener implements SparkAppHandle.Listener {
110+
@Override
111+
public void stateChanged(SparkAppHandle handle) {
112+
SparkAppHandle.State state = handle.getState();
113+
AgentSpan span = launcherSpan;
114+
if (span != null) {
115+
span.setTag("spark.launcher.app_state", state.toString());
116+
117+
String appId = handle.getAppId();
118+
if (appId != null) {
119+
span.setTag("spark.app_id", appId);
120+
}
121+
122+
if (state.isFinal()) {
123+
if (state == SparkAppHandle.State.FAILED
124+
|| state == SparkAppHandle.State.KILLED
125+
|| state == SparkAppHandle.State.LOST) {
126+
span.setError(true);
127+
span.setTag("error.type", "Spark application " + state);
128+
}
129+
}
130+
}
131+
}
132+
133+
@Override
134+
public void infoChanged(SparkAppHandle handle) {
135+
AgentSpan span = launcherSpan;
136+
if (span != null) {
137+
String appId = handle.getAppId();
138+
if (appId != null) {
139+
span.setTag("spark.app_id", appId);
140+
}
141+
}
142+
}
143+
}
144+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
5+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
6+
7+
import com.google.auto.service.AutoService;
8+
import datadog.trace.agent.tooling.Instrumenter;
9+
import datadog.trace.agent.tooling.InstrumenterModule;
10+
import datadog.trace.api.InstrumenterConfig;
11+
12+
@AutoService(InstrumenterModule.class)
13+
public class SparkLauncherInstrumentation extends InstrumenterModule.Tracing
14+
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
15+
16+
public SparkLauncherInstrumentation() {
17+
super("spark-launcher");
18+
}
19+
20+
@Override
21+
protected boolean defaultEnabled() {
22+
return InstrumenterConfig.get().isDataJobsEnabled();
23+
}
24+
25+
@Override
26+
public String instrumentedType() {
27+
return "org.apache.spark.launcher.SparkLauncher";
28+
}
29+
30+
@Override
31+
public String[] helperClassNames() {
32+
return new String[] {
33+
packageName + ".SparkLauncherAdvice",
34+
packageName + ".SparkLauncherAdvice$AppHandleListener",
35+
};
36+
}
37+
38+
@Override
39+
public void methodAdvice(MethodTransformer transformer) {
40+
transformer.applyAdvice(
41+
isMethod()
42+
.and(named("startApplication"))
43+
.and(isDeclaredBy(named("org.apache.spark.launcher.SparkLauncher"))),
44+
packageName + ".SparkLauncherAdvice$StartApplicationAdvice");
45+
46+
transformer.applyAdvice(
47+
isMethod()
48+
.and(named("launch"))
49+
.and(isDeclaredBy(named("org.apache.spark.launcher.SparkLauncher"))),
50+
packageName + ".SparkLauncherAdvice$LaunchAdvice");
51+
}
52+
}

dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -780,4 +780,71 @@ abstract class AbstractSparkTest extends InstrumentationSpecification {
780780
}
781781
}
782782
}
783+
784+
def "generate spark.launcher span from startApplication"() {
785+
setup:
786+
// Reset any previous launcher span
787+
SparkLauncherAdvice.launcherSpan = null
788+
789+
when:
790+
// Directly invoke the advice to simulate what would happen when startApplication is called
791+
def tracer = datadog.trace.bootstrap.instrumentation.api.AgentTracer.get()
792+
def span = tracer
793+
.buildSpan("spark.launcher")
794+
.withSpanType("spark")
795+
.withResourceName("SparkLauncher.startApplication")
796+
.start()
797+
span.setSamplingPriority(
798+
datadog.trace.api.sampling.PrioritySampling.USER_KEEP,
799+
datadog.trace.api.sampling.SamplingMechanism.DATA_JOBS)
800+
SparkLauncherAdvice.launcherSpan = span
801+
802+
// Simulate a non-zero exit finishing the launcher span
803+
SparkLauncherAdvice.finishLauncherSpan(1)
804+
805+
then:
806+
assertTraces(1) {
807+
trace(1) {
808+
span {
809+
operationName "spark.launcher"
810+
spanType "spark"
811+
resourceName "SparkLauncher.startApplication"
812+
errored true
813+
assert span.tags["error.type"] == "Launcher process failed with exit code 1"
814+
}
815+
}
816+
}
817+
}
818+
819+
def "generate spark.launcher span with successful exit"() {
820+
setup:
821+
SparkLauncherAdvice.launcherSpan = null
822+
823+
when:
824+
def tracer = datadog.trace.bootstrap.instrumentation.api.AgentTracer.get()
825+
def span = tracer
826+
.buildSpan("spark.launcher")
827+
.withSpanType("spark")
828+
.withResourceName("SparkLauncher.launch")
829+
.start()
830+
span.setSamplingPriority(
831+
datadog.trace.api.sampling.PrioritySampling.USER_KEEP,
832+
datadog.trace.api.sampling.SamplingMechanism.DATA_JOBS)
833+
SparkLauncherAdvice.launcherSpan = span
834+
835+
// Simulate a successful exit
836+
SparkLauncherAdvice.finishLauncherSpan(0)
837+
838+
then:
839+
assertTraces(1) {
840+
trace(1) {
841+
span {
842+
operationName "spark.launcher"
843+
spanType "spark"
844+
resourceName "SparkLauncher.launch"
845+
errored false
846+
}
847+
}
848+
}
849+
}
783850
}

0 commit comments

Comments
 (0)