Skip to content

Commit e1ee448

Browse files
committed
SparkLaunchListener
1 parent 7e1bb03 commit e1ee448

8 files changed

Lines changed: 473 additions & 4 deletions

File tree

dd-java-agent/instrumentation/spark/spark-common/build.gradle

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ configurations.configureEach {
1010
dependencies {
1111
compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
1212
compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
13+
compileOnly group: 'org.apache.spark', name: 'spark-launcher_2.12', version: '2.4.0'
1314

1415
testFixturesImplementation group: 'com.datadoghq', name: 'sketches-java', version: '0.8.2'
1516
testFixturesImplementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.14.0'
@@ -21,7 +22,12 @@ dependencies {
2122
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
2223
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
2324
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-yarn_2.12', version: '2.4.0'
25+
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-launcher_2.12', version: '2.4.0'
2426

2527
testFixturesCompileOnly(libs.bundles.groovy)
2628
testFixturesCompileOnly(libs.bundles.spock)
29+
30+
testImplementation project(':dd-java-agent:instrumentation-testing')
31+
testImplementation group: 'org.apache.spark', name: 'spark-launcher_2.12', version: '2.4.0'
2732
}
33+

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ public static void exit(@Advice.Thrown Throwable throwable) {
106106
if (AbstractDatadogSparkListener.listener != null) {
107107
AbstractDatadogSparkListener.listener.finishApplication(
108108
System.currentTimeMillis(), throwable, 0, null);
109+
} else {
110+
SparkLauncherListener.finishSpanWithThrowable(throwable);
109111
}
110112
}
111113
}

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkConfAllowList.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717
* @see <a href="https://spark.apache.org/docs/latest/configuration.html">Spark Configuration</a>
1818
*/
1919
class SparkConfAllowList {
20+
// Using values from
21+
// https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/internal/config/package.scala#L1150-L1158
22+
static final String DEFAULT_REDACTION_REGEX = "(?i)secret|password|token|access.key|api.key";
23+
24+
private static final Pattern DEFAULT_REDACTION_PATTERN = Pattern.compile(DEFAULT_REDACTION_REGEX);
25+
2026
/**
2127
* Job-specific parameters that can be used to control job execution or provide metadata about the
2228
* job being executed
@@ -80,11 +86,17 @@ public static boolean canCaptureJobParameter(String parameterName) {
8086
return allowedJobParams.contains(parameterName);
8187
}
8288

89+
/** Redact a value if the key or value matches the default redaction pattern. */
90+
public static String redactValue(String key, String value) {
91+
if (DEFAULT_REDACTION_PATTERN.matcher(key).find()
92+
|| DEFAULT_REDACTION_PATTERN.matcher(value).find()) {
93+
return "[redacted]";
94+
}
95+
return value;
96+
}
97+
8398
public static List<Map.Entry<String, String>> getRedactedSparkConf(SparkConf conf) {
84-
// Using values from
85-
// https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/internal/config/package.scala#L1150-L1158
86-
String redactionPattern =
87-
conf.get("spark.redaction.regex", "(?i)secret|password|token|access.key|api.key");
99+
String redactionPattern = conf.get("spark.redaction.regex", DEFAULT_REDACTION_REGEX);
88100
List<Map.Entry<String, String>> redacted = new ArrayList<>();
89101
Pattern pattern = Pattern.compile(redactionPattern);
90102

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
5+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
6+
7+
import com.google.auto.service.AutoService;
8+
import datadog.trace.agent.tooling.Instrumenter;
9+
import datadog.trace.agent.tooling.InstrumenterModule;
10+
import datadog.trace.api.InstrumenterConfig;
11+
import net.bytebuddy.asm.Advice;
12+
import org.apache.spark.launcher.SparkAppHandle;
13+
14+
@AutoService(InstrumenterModule.class)
15+
public class SparkLauncherInstrumentation extends InstrumenterModule.Tracing
16+
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
17+
18+
public SparkLauncherInstrumentation() {
19+
super("spark-launcher");
20+
}
21+
22+
@Override
23+
protected boolean defaultEnabled() {
24+
return InstrumenterConfig.get().isDataJobsEnabled();
25+
}
26+
27+
@Override
28+
public String instrumentedType() {
29+
return "org.apache.spark.launcher.SparkLauncher";
30+
}
31+
32+
@Override
33+
public String[] helperClassNames() {
34+
return new String[] {
35+
packageName + ".SparkConfAllowList",
36+
packageName + ".SparkLauncherListener",
37+
};
38+
}
39+
40+
@Override
41+
public void methodAdvice(MethodTransformer transformer) {
42+
transformer.applyAdvice(
43+
isMethod()
44+
.and(named("startApplication"))
45+
.and(isDeclaredBy(named("org.apache.spark.launcher.SparkLauncher"))),
46+
SparkLauncherInstrumentation.class.getName() + "$StartApplicationAdvice");
47+
}
48+
49+
public static class StartApplicationAdvice {
50+
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
51+
public static void exit(
52+
@Advice.This Object launcher,
53+
@Advice.Return SparkAppHandle handle,
54+
@Advice.Thrown Throwable throwable) {
55+
SparkLauncherListener.createLauncherSpan(launcher);
56+
57+
if (throwable != null) {
58+
SparkLauncherListener.finishSpanWithThrowable(throwable);
59+
return;
60+
}
61+
62+
if (handle != null) {
63+
handle.addListener(new SparkLauncherListener());
64+
}
65+
}
66+
}
67+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import datadog.trace.api.DDTags;
4+
import datadog.trace.api.sampling.PrioritySampling;
5+
import datadog.trace.api.sampling.SamplingMechanism;
6+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
7+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
8+
import java.lang.reflect.Field;
9+
import java.util.Map;
10+
import org.apache.spark.launcher.SparkAppHandle;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
14+
public class SparkLauncherListener implements SparkAppHandle.Listener {
15+
16+
private static final Logger log = LoggerFactory.getLogger(SparkLauncherListener.class);
17+
18+
static volatile AgentSpan launcherSpan;
19+
20+
private static volatile boolean shutdownHookRegistered = false;
21+
22+
public static synchronized void createLauncherSpan(Object launcher) {
23+
if (launcherSpan != null) {
24+
return;
25+
}
26+
27+
AgentTracer.TracerAPI tracer = AgentTracer.get();
28+
AgentSpan span =
29+
tracer
30+
.buildSpan("spark.launcher.launch")
31+
.withSpanType("spark")
32+
.withResourceName("SparkLauncher.startApplication")
33+
.start();
34+
span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS);
35+
setLauncherConfigTags(span, launcher);
36+
launcherSpan = span;
37+
38+
39+
if (!shutdownHookRegistered) {
40+
shutdownHookRegistered = true;
41+
Runtime.getRuntime()
42+
.addShutdownHook(
43+
new Thread(
44+
() -> {
45+
synchronized (SparkLauncherListener.class) {
46+
AgentSpan s = launcherSpan;
47+
if (s != null) {
48+
log.info("Finishing spark.launcher span from shutdown hook");
49+
s.finish();
50+
launcherSpan = null;
51+
}
52+
}
53+
}));
54+
}
55+
}
56+
57+
public static synchronized void finishSpan(boolean isError, String errorMessage) {
58+
AgentSpan span = launcherSpan;
59+
if (span == null) {
60+
return;
61+
}
62+
if (isError) {
63+
span.setError(true);
64+
span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed");
65+
span.setTag(DDTags.ERROR_MSG, errorMessage);
66+
}
67+
span.finish();
68+
launcherSpan = null;
69+
}
70+
71+
public static synchronized void finishSpanWithThrowable(Throwable throwable) {
72+
AgentSpan span = launcherSpan;
73+
if (span == null) {
74+
return;
75+
}
76+
if (throwable != null) {
77+
span.addThrowable(throwable);
78+
}
79+
span.finish();
80+
launcherSpan = null;
81+
}
82+
83+
@Override
84+
public void stateChanged(SparkAppHandle handle) {
85+
SparkAppHandle.State state = handle.getState();
86+
AgentSpan span = launcherSpan;
87+
if (span != null) {
88+
span.setTag("spark.launcher.app_state", state.toString());
89+
90+
String appId = handle.getAppId();
91+
if (appId != null) {
92+
span.setTag("spark.app_id", appId);
93+
span.setTag("app_id", appId);
94+
}
95+
96+
if (state.isFinal()) {
97+
if (state == SparkAppHandle.State.FAILED
98+
|| state == SparkAppHandle.State.KILLED
99+
|| state == SparkAppHandle.State.LOST) {
100+
// Set error tags but don't finish yet — RunMainAdvice may add the throwable
101+
// with the full stack trace. The span will be finished by RunMainAdvice or
102+
// the shutdown hook.
103+
span.setError(true);
104+
span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed");
105+
span.setTag(DDTags.ERROR_MSG, "Application " + state);
106+
} else {
107+
finishSpan(false, null);
108+
}
109+
}
110+
}
111+
}
112+
113+
@Override
114+
public void infoChanged(SparkAppHandle handle) {
115+
AgentSpan span = launcherSpan;
116+
if (span != null) {
117+
String appId = handle.getAppId();
118+
if (appId != null) {
119+
span.setTag("spark.app_id", appId);
120+
span.setTag("app_id", appId);
121+
}
122+
}
123+
}
124+
125+
private static void setLauncherConfigTags(AgentSpan span, Object launcher) {
126+
try {
127+
Field builderField = launcher.getClass().getSuperclass().getDeclaredField("builder");
128+
builderField.setAccessible(true);
129+
Object builder = builderField.get(launcher);
130+
if (builder == null) {
131+
return;
132+
}
133+
134+
Class<?> builderClass = builder.getClass();
135+
Class<?> abstractBuilderClass = builderClass.getSuperclass();
136+
137+
setStringFieldAsTag(span, builder, abstractBuilderClass, "master", "master");
138+
setStringFieldAsTag(span, builder, abstractBuilderClass, "deployMode", "deploy_mode");
139+
setStringFieldAsTag(span, builder, abstractBuilderClass, "appName", "application_name");
140+
setStringFieldAsTag(span, builder, abstractBuilderClass, "mainClass", "main_class");
141+
setStringFieldAsTag(span, builder, abstractBuilderClass, "appResource", "app_resource");
142+
143+
try {
144+
Field confField = abstractBuilderClass.getDeclaredField("conf");
145+
confField.setAccessible(true);
146+
@SuppressWarnings("unchecked")
147+
Map<String, String> conf = (Map<String, String>) confField.get(builder);
148+
if (conf != null) {
149+
for (Map.Entry<String, String> entry : conf.entrySet()) {
150+
if (SparkConfAllowList.canCaptureJobParameter(entry.getKey())) {
151+
String value = SparkConfAllowList.redactValue(entry.getKey(), entry.getValue());
152+
span.setTag("config." + entry.getKey().replace('.', '_'), value);
153+
}
154+
}
155+
}
156+
} catch (NoSuchFieldException e) {
157+
log.debug("Could not find conf field on builder", e);
158+
}
159+
} catch (Exception e) {
160+
log.debug("Failed to extract SparkLauncher configuration", e);
161+
}
162+
}
163+
164+
private static void setStringFieldAsTag(
165+
AgentSpan span, Object obj, Class<?> clazz, String fieldName, String tagName) {
166+
try {
167+
Field field = clazz.getDeclaredField(fieldName);
168+
field.setAccessible(true);
169+
Object value = field.get(obj);
170+
if (value != null) {
171+
span.setTag(tagName, value.toString());
172+
}
173+
} catch (Exception e) {
174+
log.debug("Could not read field {} from builder", fieldName, e);
175+
}
176+
}
177+
}

0 commit comments

Comments
 (0)