Skip to content

Commit 9c973ac

Browse files
committed
extract span building in SparkLaunchListener
1 parent 95c6c74 commit 9c973ac

6 files changed

Lines changed: 125 additions & 134 deletions

File tree

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public static void exit(@Advice.Thrown Throwable throwable) {
107107
AbstractDatadogSparkListener.listener.finishApplication(
108108
System.currentTimeMillis(), throwable, 0, null);
109109
} else {
110-
SparkLauncherAdvice.finishSpanWithThrowable(throwable);
110+
SparkLauncherListener.finishSpanWithThrowable(throwable);
111111
}
112112
}
113113
}

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import datadog.trace.agent.tooling.Instrumenter;
99
import datadog.trace.agent.tooling.InstrumenterModule;
1010
import datadog.trace.api.InstrumenterConfig;
11+
import net.bytebuddy.asm.Advice;
12+
import org.apache.spark.launcher.SparkAppHandle;
1113

1214
@AutoService(InstrumenterModule.class)
1315
public class SparkLauncherInstrumentation extends InstrumenterModule.Tracing
@@ -31,8 +33,7 @@ public String instrumentedType() {
3133
public String[] helperClassNames() {
3234
return new String[] {
3335
packageName + ".SparkConfAllowList",
34-
packageName + ".SparkLauncherAdvice",
35-
packageName + ".SparkLauncherAdvice$AppHandleListener",
36+
packageName + ".SparkLauncherListener",
3637
};
3738
}
3839

@@ -42,6 +43,25 @@ public void methodAdvice(MethodTransformer transformer) {
4243
isMethod()
4344
.and(named("startApplication"))
4445
.and(isDeclaredBy(named("org.apache.spark.launcher.SparkLauncher"))),
45-
packageName + ".SparkLauncherAdvice$StartApplicationAdvice");
46+
SparkLauncherInstrumentation.class.getName() + "$StartApplicationAdvice");
47+
}
48+
49+
public static class StartApplicationAdvice {
50+
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
51+
public static void exit(
52+
@Advice.This Object launcher,
53+
@Advice.Return SparkAppHandle handle,
54+
@Advice.Thrown Throwable throwable) {
55+
SparkLauncherListener.createLauncherSpan(launcher);
56+
57+
if (throwable != null) {
58+
SparkLauncherListener.finishSpanWithThrowable(throwable);
59+
return;
60+
}
61+
62+
if (handle != null) {
63+
handle.addListener(new SparkLauncherListener());
64+
}
65+
}
4666
}
4767
}

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherAdvice.java renamed to dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java

Lines changed: 76 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -7,76 +7,18 @@
77
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
88
import java.lang.reflect.Field;
99
import java.util.Map;
10-
import net.bytebuddy.asm.Advice;
1110
import org.apache.spark.launcher.SparkAppHandle;
1211
import org.slf4j.Logger;
1312
import org.slf4j.LoggerFactory;
1413

15-
public class SparkLauncherAdvice {
14+
public class SparkLauncherListener implements SparkAppHandle.Listener {
1615

17-
private static final Logger log = LoggerFactory.getLogger(SparkLauncherAdvice.class);
16+
private static final Logger log = LoggerFactory.getLogger(SparkLauncherListener.class);
1817

1918
static volatile AgentSpan launcherSpan;
2019

2120
private static volatile boolean shutdownHookRegistered = false;
2221

23-
/** Extract SparkLauncher configuration via reflection and set as span tags. */
24-
private static void setLauncherConfigTags(AgentSpan span, Object launcher) {
25-
try {
26-
// SparkLauncher extends AbstractLauncher which has a 'builder' field
27-
Field builderField = launcher.getClass().getSuperclass().getDeclaredField("builder");
28-
builderField.setAccessible(true);
29-
Object builder = builderField.get(launcher);
30-
if (builder == null) {
31-
return;
32-
}
33-
34-
Class<?> builderClass = builder.getClass();
35-
// Fields are on AbstractCommandBuilder (parent of SparkSubmitCommandBuilder)
36-
Class<?> abstractBuilderClass = builderClass.getSuperclass();
37-
38-
setStringFieldAsTag(span, builder, abstractBuilderClass, "master", "master");
39-
setStringFieldAsTag(span, builder, abstractBuilderClass, "deployMode", "deploy_mode");
40-
setStringFieldAsTag(span, builder, abstractBuilderClass, "appName", "application_name");
41-
setStringFieldAsTag(span, builder, abstractBuilderClass, "mainClass", "main_class");
42-
setStringFieldAsTag(span, builder, abstractBuilderClass, "appResource", "app_resource");
43-
44-
// Extract spark conf entries and redact sensitive values
45-
try {
46-
Field confField = abstractBuilderClass.getDeclaredField("conf");
47-
confField.setAccessible(true);
48-
@SuppressWarnings("unchecked")
49-
Map<String, String> conf = (Map<String, String>) confField.get(builder);
50-
if (conf != null) {
51-
for (Map.Entry<String, String> entry : conf.entrySet()) {
52-
if (SparkConfAllowList.canCaptureJobParameter(entry.getKey())) {
53-
String value = SparkConfAllowList.redactValue(entry.getKey(), entry.getValue());
54-
span.setTag("config." + entry.getKey().replace('.', '_'), value);
55-
}
56-
}
57-
}
58-
} catch (NoSuchFieldException e) {
59-
log.debug("Could not find conf field on builder", e);
60-
}
61-
} catch (Exception e) {
62-
log.debug("Failed to extract SparkLauncher configuration", e);
63-
}
64-
}
65-
66-
private static void setStringFieldAsTag(
67-
AgentSpan span, Object obj, Class<?> clazz, String fieldName, String tagName) {
68-
try {
69-
Field field = clazz.getDeclaredField(fieldName);
70-
field.setAccessible(true);
71-
Object value = field.get(obj);
72-
if (value != null) {
73-
span.setTag(tagName, value.toString());
74-
}
75-
} catch (Exception e) {
76-
log.debug("Could not read field {} from builder", fieldName, e);
77-
}
78-
}
79-
8022
public static synchronized void createLauncherSpan(Object launcher) {
8123
if (launcherSpan != null) {
8224
return;
@@ -93,13 +35,14 @@ public static synchronized void createLauncherSpan(Object launcher) {
9335
setLauncherConfigTags(span, launcher);
9436
launcherSpan = span;
9537

38+
9639
if (!shutdownHookRegistered) {
9740
shutdownHookRegistered = true;
9841
Runtime.getRuntime()
9942
.addShutdownHook(
10043
new Thread(
10144
() -> {
102-
synchronized (SparkLauncherAdvice.class) {
45+
synchronized (SparkLauncherListener.class) {
10346
AgentSpan s = launcherSpan;
10447
if (s != null) {
10548
log.info("Finishing spark.launcher span from shutdown hook");
@@ -137,65 +80,93 @@ public static synchronized void finishSpanWithThrowable(Throwable throwable) {
13780
launcherSpan = null;
13881
}
13982

140-
public static class StartApplicationAdvice {
141-
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
142-
public static void exit(
143-
@Advice.This Object launcher,
144-
@Advice.Return SparkAppHandle handle,
145-
@Advice.Thrown Throwable throwable) {
146-
createLauncherSpan(launcher);
83+
@Override
84+
public void stateChanged(SparkAppHandle handle) {
85+
SparkAppHandle.State state = handle.getState();
86+
AgentSpan span = launcherSpan;
87+
if (span != null) {
88+
span.setTag("spark.launcher.app_state", state.toString());
14789

148-
if (throwable != null) {
149-
finishSpanWithThrowable(throwable);
150-
return;
90+
String appId = handle.getAppId();
91+
if (appId != null) {
92+
span.setTag("spark.app_id", appId);
93+
span.setTag("app_id", appId);
15194
}
15295

153-
if (handle != null) {
154-
try {
155-
handle.addListener(new AppHandleListener());
156-
} catch (Exception e) {
157-
log.debug("Failed to register SparkAppHandle listener", e);
96+
if (state.isFinal()) {
97+
if (state == SparkAppHandle.State.FAILED
98+
|| state == SparkAppHandle.State.KILLED
99+
|| state == SparkAppHandle.State.LOST) {
100+
finishSpan(true, "Application " + state);
101+
} else {
102+
finishSpan(false, null);
158103
}
159104
}
160105
}
161106
}
162107

163-
public static class AppHandleListener implements SparkAppHandle.Listener {
164-
@Override
165-
public void stateChanged(SparkAppHandle handle) {
166-
SparkAppHandle.State state = handle.getState();
167-
AgentSpan span = launcherSpan;
168-
if (span != null) {
169-
span.setTag("spark.launcher.app_state", state.toString());
170-
171-
String appId = handle.getAppId();
172-
if (appId != null) {
173-
span.setTag("spark.app_id", appId);
174-
span.setTag("app_id", appId);
175-
}
108+
@Override
109+
public void infoChanged(SparkAppHandle handle) {
110+
AgentSpan span = launcherSpan;
111+
if (span != null) {
112+
String appId = handle.getAppId();
113+
if (appId != null) {
114+
span.setTag("spark.app_id", appId);
115+
span.setTag("app_id", appId);
116+
}
117+
}
118+
}
176119

177-
if (state.isFinal()) {
178-
if (state == SparkAppHandle.State.FAILED
179-
|| state == SparkAppHandle.State.KILLED
180-
|| state == SparkAppHandle.State.LOST) {
181-
finishSpan(true, "Application " + state);
182-
} else {
183-
finishSpan(false, null);
120+
private static void setLauncherConfigTags(AgentSpan span, Object launcher) {
121+
try {
122+
Field builderField = launcher.getClass().getSuperclass().getDeclaredField("builder");
123+
builderField.setAccessible(true);
124+
Object builder = builderField.get(launcher);
125+
if (builder == null) {
126+
return;
127+
}
128+
129+
Class<?> builderClass = builder.getClass();
130+
Class<?> abstractBuilderClass = builderClass.getSuperclass();
131+
132+
setStringFieldAsTag(span, builder, abstractBuilderClass, "master", "master");
133+
setStringFieldAsTag(span, builder, abstractBuilderClass, "deployMode", "deploy_mode");
134+
setStringFieldAsTag(span, builder, abstractBuilderClass, "appName", "application_name");
135+
setStringFieldAsTag(span, builder, abstractBuilderClass, "mainClass", "main_class");
136+
setStringFieldAsTag(span, builder, abstractBuilderClass, "appResource", "app_resource");
137+
138+
try {
139+
Field confField = abstractBuilderClass.getDeclaredField("conf");
140+
confField.setAccessible(true);
141+
@SuppressWarnings("unchecked")
142+
Map<String, String> conf = (Map<String, String>) confField.get(builder);
143+
if (conf != null) {
144+
for (Map.Entry<String, String> entry : conf.entrySet()) {
145+
if (SparkConfAllowList.canCaptureJobParameter(entry.getKey())) {
146+
String value = SparkConfAllowList.redactValue(entry.getKey(), entry.getValue());
147+
span.setTag("config." + entry.getKey().replace('.', '_'), value);
148+
}
184149
}
185150
}
151+
} catch (NoSuchFieldException e) {
152+
log.debug("Could not find conf field on builder", e);
186153
}
154+
} catch (Exception e) {
155+
log.debug("Failed to extract SparkLauncher configuration", e);
187156
}
157+
}
188158

189-
@Override
190-
public void infoChanged(SparkAppHandle handle) {
191-
AgentSpan span = launcherSpan;
192-
if (span != null) {
193-
String appId = handle.getAppId();
194-
if (appId != null) {
195-
span.setTag("spark.app_id", appId);
196-
span.setTag("app_id", appId);
197-
}
159+
private static void setStringFieldAsTag(
160+
AgentSpan span, Object obj, Class<?> clazz, String fieldName, String tagName) {
161+
try {
162+
Field field = clazz.getDeclaredField(fieldName);
163+
field.setAccessible(true);
164+
Object value = field.get(obj);
165+
if (value != null) {
166+
span.setTag(tagName, value.toString());
198167
}
168+
} catch (Exception e) {
169+
log.debug("Could not read field {} from builder", fieldName, e);
199170
}
200171
}
201172
}

0 commit comments

Comments
 (0)