Skip to content

Commit a82be15

Browse files
aboitreauddevflow.devflow-routing-intake
andauthored
initial spark launcher instrumentation (#10629)
initial spark launcher instrumentation use ddtags Fix tess move test to the right /test dir advice should be public finish launcher span with error via RunMainAdvice sportLess synchronize shutdown hook Capture more spark relevant attrs Update tests with new attrs fix sportBugsMain and muzzle remove SparkLauncher.launch() instrumentation share common config key redaction method make public to avoid IllegalAccessError error type and error message Add appId and stack trace extract span building in SparkLaunchListener wait for throwable and let the span be finished by shutdown hook spotless apply fix span lifecycle in the launcher listener Merge branch 'master' into adrien.boitreaud/spark-launcher-instrumentation Co-authored-by: devflow.devflow-routing-intake <devflow.devflow-routing-intake@kubernetes.us1.ddbuild.io>
1 parent 6b1e694 commit a82be15

8 files changed

Lines changed: 487 additions & 4 deletions

File tree

dd-java-agent/instrumentation/spark/spark-common/build.gradle

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ configurations.configureEach {
1010
dependencies {
1111
compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
1212
compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
13+
compileOnly group: 'org.apache.spark', name: 'spark-launcher_2.12', version: '2.4.0'
1314

1415
testFixturesImplementation group: 'com.datadoghq', name: 'sketches-java', version: '0.8.2'
1516
testFixturesImplementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.14.0'
@@ -21,7 +22,12 @@ dependencies {
2122
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
2223
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
2324
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-yarn_2.12', version: '2.4.0'
25+
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-launcher_2.12', version: '2.4.0'
2426

2527
testFixturesCompileOnly(libs.bundles.groovy)
2628
testFixturesCompileOnly(libs.bundles.spock)
29+
30+
testImplementation project(':dd-java-agent:instrumentation-testing')
31+
testImplementation group: 'org.apache.spark', name: 'spark-launcher_2.12', version: '2.4.0'
2732
}
33+

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ public static void exit(@Advice.Thrown Throwable throwable) {
106106
if (AbstractDatadogSparkListener.listener != null) {
107107
AbstractDatadogSparkListener.listener.finishApplication(
108108
System.currentTimeMillis(), throwable, 0, null);
109+
} else {
110+
SparkLauncherListener.finishSpanWithThrowable(throwable);
109111
}
110112
}
111113
}

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkConfAllowList.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717
* @see <a href="https://spark.apache.org/docs/latest/configuration.html">Spark Configuration</a>
1818
*/
1919
class SparkConfAllowList {
20+
// Using values from
21+
// https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/internal/config/package.scala#L1150-L1158
22+
static final String DEFAULT_REDACTION_REGEX = "(?i)secret|password|token|access.key|api.key";
23+
24+
private static final Pattern DEFAULT_REDACTION_PATTERN = Pattern.compile(DEFAULT_REDACTION_REGEX);
25+
2026
/**
2127
* Job-specific parameters that can be used to control job execution or provide metadata about the
2228
* job being executed
@@ -80,11 +86,17 @@ public static boolean canCaptureJobParameter(String parameterName) {
8086
return allowedJobParams.contains(parameterName);
8187
}
8288

89+
/** Redact a value if the key or value matches the default redaction pattern. */
90+
public static String redactValue(String key, String value) {
91+
if (DEFAULT_REDACTION_PATTERN.matcher(key).find()
92+
|| DEFAULT_REDACTION_PATTERN.matcher(value).find()) {
93+
return "[redacted]";
94+
}
95+
return value;
96+
}
97+
8398
public static List<Map.Entry<String, String>> getRedactedSparkConf(SparkConf conf) {
84-
// Using values from
85-
// https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/internal/config/package.scala#L1150-L1158
86-
String redactionPattern =
87-
conf.get("spark.redaction.regex", "(?i)secret|password|token|access.key|api.key");
99+
String redactionPattern = conf.get("spark.redaction.regex", DEFAULT_REDACTION_REGEX);
88100
List<Map.Entry<String, String>> redacted = new ArrayList<>();
89101
Pattern pattern = Pattern.compile(redactionPattern);
90102

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
5+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
6+
7+
import com.google.auto.service.AutoService;
8+
import datadog.trace.agent.tooling.Instrumenter;
9+
import datadog.trace.agent.tooling.InstrumenterModule;
10+
import datadog.trace.api.InstrumenterConfig;
11+
import net.bytebuddy.asm.Advice;
12+
import org.apache.spark.launcher.SparkAppHandle;
13+
14+
@AutoService(InstrumenterModule.class)
15+
public class SparkLauncherInstrumentation extends InstrumenterModule.Tracing
16+
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
17+
18+
public SparkLauncherInstrumentation() {
19+
super("spark-launcher");
20+
}
21+
22+
@Override
23+
protected boolean defaultEnabled() {
24+
return InstrumenterConfig.get().isDataJobsEnabled();
25+
}
26+
27+
@Override
28+
public String instrumentedType() {
29+
return "org.apache.spark.launcher.SparkLauncher";
30+
}
31+
32+
@Override
33+
public String[] helperClassNames() {
34+
return new String[] {
35+
packageName + ".SparkConfAllowList", packageName + ".SparkLauncherListener",
36+
};
37+
}
38+
39+
@Override
40+
public void methodAdvice(MethodTransformer transformer) {
41+
transformer.applyAdvice(
42+
isMethod()
43+
.and(named("startApplication"))
44+
.and(isDeclaredBy(named("org.apache.spark.launcher.SparkLauncher"))),
45+
SparkLauncherInstrumentation.class.getName() + "$StartApplicationAdvice");
46+
}
47+
48+
public static class StartApplicationAdvice {
49+
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
50+
public static void exit(
51+
@Advice.This Object launcher,
52+
@Advice.Return SparkAppHandle handle,
53+
@Advice.Thrown Throwable throwable) {
54+
SparkLauncherListener.createLauncherSpan(launcher);
55+
56+
if (throwable != null) {
57+
SparkLauncherListener.finishSpanWithThrowable(throwable);
58+
return;
59+
}
60+
61+
if (handle != null) {
62+
handle.addListener(new SparkLauncherListener());
63+
}
64+
}
65+
}
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import datadog.trace.api.DDTags;
4+
import datadog.trace.api.sampling.PrioritySampling;
5+
import datadog.trace.api.sampling.SamplingMechanism;
6+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
7+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
8+
import java.lang.reflect.Field;
9+
import java.util.Map;
10+
import org.apache.spark.launcher.SparkAppHandle;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
14+
/**
15+
* Listener for SparkLauncher spans. Tracks the lifecycle of a Spark application submitted via
16+
* SparkLauncher.startApplication(). Only a single launcher span can be active at a time. Subsequent
17+
* calls to startApplication() from the same or different launcher instances will not create spans;
18+
* only the first launch in the JVM is traced
19+
*/
20+
public class SparkLauncherListener implements SparkAppHandle.Listener {
21+
22+
private static final Logger log = LoggerFactory.getLogger(SparkLauncherListener.class);
23+
24+
static volatile AgentSpan launcherSpan;
25+
26+
private static volatile boolean shutdownHookRegistered = false;
27+
28+
public static synchronized void createLauncherSpan(Object launcher) {
29+
if (launcherSpan != null) {
30+
return;
31+
}
32+
33+
AgentTracer.TracerAPI tracer = AgentTracer.get();
34+
AgentSpan span =
35+
tracer
36+
.buildSpan("spark.launcher.launch")
37+
.withSpanType("spark")
38+
.withResourceName("SparkLauncher.startApplication")
39+
.start();
40+
span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS);
41+
setLauncherConfigTags(span, launcher);
42+
launcherSpan = span;
43+
44+
if (!shutdownHookRegistered) {
45+
shutdownHookRegistered = true;
46+
Runtime.getRuntime()
47+
.addShutdownHook(
48+
new Thread(
49+
() -> {
50+
synchronized (SparkLauncherListener.class) {
51+
AgentSpan s = launcherSpan;
52+
if (s != null) {
53+
log.info("Finishing spark.launcher span from shutdown hook");
54+
s.finish();
55+
launcherSpan = null;
56+
}
57+
}
58+
}));
59+
}
60+
}
61+
62+
public static synchronized void finishSpan(boolean isError, String errorMessage) {
63+
AgentSpan span = launcherSpan;
64+
if (span == null) {
65+
return;
66+
}
67+
if (isError) {
68+
span.setError(true);
69+
span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed");
70+
span.setTag(DDTags.ERROR_MSG, errorMessage);
71+
}
72+
span.finish();
73+
launcherSpan = null;
74+
}
75+
76+
public static synchronized void finishSpanWithThrowable(Throwable throwable) {
77+
AgentSpan span = launcherSpan;
78+
if (span == null) {
79+
return;
80+
}
81+
if (throwable != null) {
82+
span.addThrowable(throwable);
83+
}
84+
span.finish();
85+
launcherSpan = null;
86+
}
87+
88+
@Override
89+
public void stateChanged(SparkAppHandle handle) {
90+
synchronized (SparkLauncherListener.class) {
91+
SparkAppHandle.State state = handle.getState();
92+
AgentSpan span = launcherSpan;
93+
if (span != null) {
94+
span.setTag("spark.launcher.app_state", state.toString());
95+
96+
String appId = handle.getAppId();
97+
if (appId != null) {
98+
span.setTag("spark.app_id", appId);
99+
span.setTag("app_id", appId);
100+
}
101+
102+
if (state.isFinal()) {
103+
if (state == SparkAppHandle.State.FAILED
104+
|| state == SparkAppHandle.State.KILLED
105+
|| state == SparkAppHandle.State.LOST) {
106+
// Set error tags but don't finish yet — RunMainAdvice may add the throwable
107+
// with the full stack trace. The span will be finished by RunMainAdvice or
108+
// the shutdown hook.
109+
span.setError(true);
110+
span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed");
111+
span.setTag(DDTags.ERROR_MSG, "Application " + state);
112+
} else {
113+
finishSpan(false, null);
114+
}
115+
}
116+
}
117+
}
118+
}
119+
120+
@Override
121+
public void infoChanged(SparkAppHandle handle) {
122+
synchronized (SparkLauncherListener.class) {
123+
AgentSpan span = launcherSpan;
124+
if (span != null) {
125+
String appId = handle.getAppId();
126+
if (appId != null) {
127+
span.setTag("spark.app_id", appId);
128+
span.setTag("app_id", appId);
129+
}
130+
}
131+
}
132+
}
133+
134+
/**
135+
* Extract launcher configuration via reflection and set as span tags. Secret redaction uses the
136+
* default pattern only (not spark.redaction.regex) because the SparkLauncher conf map is a plain
137+
* Map, not a SparkConf, so there is no way to read the user's custom redaction regex at this
138+
* point.
139+
*/
140+
private static void setLauncherConfigTags(AgentSpan span, Object launcher) {
141+
try {
142+
Field builderField = launcher.getClass().getSuperclass().getDeclaredField("builder");
143+
builderField.setAccessible(true);
144+
Object builder = builderField.get(launcher);
145+
if (builder == null) {
146+
return;
147+
}
148+
149+
Class<?> builderClass = builder.getClass();
150+
Class<?> abstractBuilderClass = builderClass.getSuperclass();
151+
152+
setStringFieldAsTag(span, builder, abstractBuilderClass, "master", "master");
153+
setStringFieldAsTag(span, builder, abstractBuilderClass, "deployMode", "deploy_mode");
154+
setStringFieldAsTag(span, builder, abstractBuilderClass, "appName", "application_name");
155+
setStringFieldAsTag(span, builder, abstractBuilderClass, "mainClass", "main_class");
156+
setStringFieldAsTag(span, builder, abstractBuilderClass, "appResource", "app_resource");
157+
158+
try {
159+
Field confField = abstractBuilderClass.getDeclaredField("conf");
160+
confField.setAccessible(true);
161+
@SuppressWarnings("unchecked")
162+
Map<String, String> conf = (Map<String, String>) confField.get(builder);
163+
if (conf != null) {
164+
for (Map.Entry<String, String> entry : conf.entrySet()) {
165+
if (SparkConfAllowList.canCaptureJobParameter(entry.getKey())) {
166+
String value = SparkConfAllowList.redactValue(entry.getKey(), entry.getValue());
167+
span.setTag("config." + entry.getKey().replace('.', '_'), value);
168+
}
169+
}
170+
}
171+
} catch (NoSuchFieldException e) {
172+
log.debug("Could not find conf field on builder", e);
173+
}
174+
} catch (Exception e) {
175+
log.debug("Failed to extract SparkLauncher configuration", e);
176+
}
177+
}
178+
179+
private static void setStringFieldAsTag(
180+
AgentSpan span, Object obj, Class<?> clazz, String fieldName, String tagName) {
181+
try {
182+
Field field = clazz.getDeclaredField(fieldName);
183+
field.setAccessible(true);
184+
Object value = field.get(obj);
185+
if (value != null) {
186+
span.setTag(tagName, value.toString());
187+
}
188+
} catch (Exception e) {
189+
log.debug("Could not read field {} from builder", fieldName, e);
190+
}
191+
}
192+
}

0 commit comments

Comments
 (0)