Skip to content

Commit 985007c

Browse files
aboitreauddevflow.devflow-routing-intake
andauthored
Spark instrumentation: capture emr step id when spark runs as EMR step (#10670)
make span attr for emr step id captured from workdir capture step id for launcher spans extract helper in shared location step id extraction error resilience add class as helper debug log rename to EmrUtils Add unit tests Gate emr_step_id attribute to emr-detected runtimes spotless apply switch to keys contain .emr. check remove isRunningOnEMR regex is specific enough to avoid collision early returns to avoid deep nesting Merge branch 'master' into adrien.boitreaud/capture-emr-step-id Co-authored-by: devflow.devflow-routing-intake <devflow.devflow-routing-intake@kubernetes.us1.ddbuild.io>
1 parent c780493 commit 985007c

7 files changed

Lines changed: 104 additions & 1 deletion

File tree

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ private void initApplicationSpanIfNotInitialized() {
276276
}
277277

278278
captureApplicationParameters(builder);
279+
captureEmrStepId(builder);
279280

280281
Optional<OpenlineageParentContext> openlineageParentContext =
281282
OpenlineageParentContext.from(sparkConf);
@@ -1210,6 +1211,13 @@ private void captureApplicationParameters(AgentTracer.SpanBuilder builder) {
12101211
builder.withTag("config.spark_version", sparkVersion);
12111212
}
12121213

1214+
private static void captureEmrStepId(AgentTracer.SpanBuilder builder) {
1215+
String stepId = EmrUtils.getEmrStepId();
1216+
if (stepId != null) {
1217+
builder.withTag("emr_step_id", stepId);
1218+
}
1219+
}
1220+
12131221
private void captureJobParameters(AgentTracer.SpanBuilder builder, Properties properties) {
12141222
for (Tuple2<String, String> conf : sparkConf.getAll()) {
12151223
if (SparkConfAllowList.canCaptureJobParameter(conf._1)) {
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import java.nio.file.Path;
4+
import java.nio.file.Paths;
5+
import java.util.regex.Matcher;
6+
import java.util.regex.Pattern;
7+
import javax.annotation.Nullable;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
/** Extracts the AWS EMR Step ID from the working directory name (e.g. s-07767992IY7VC5NVV854). */
12+
class EmrUtils {
13+
14+
private static final Logger log = LoggerFactory.getLogger(EmrUtils.class);
15+
16+
/** EMR step ID is a 20 character string with numbers and uppercase letters only */
17+
private static final Pattern EMR_STEP_ID_PATTERN = Pattern.compile("^(s-[0-9A-Z]{20})$");
18+
19+
@Nullable
20+
static String getEmrStepId() {
21+
try {
22+
String userDir = System.getProperty("user.dir");
23+
if (userDir == null) {
24+
return null;
25+
}
26+
Path workDir = Paths.get(userDir).getFileName();
27+
if (workDir == null) {
28+
return null;
29+
}
30+
Matcher matcher = EMR_STEP_ID_PATTERN.matcher(workDir.toString());
31+
if (matcher.matches()) {
32+
log.debug("EMR step ID extracted: {}", matcher.group(1));
33+
return matcher.group(1);
34+
}
35+
} catch (Throwable t) {
36+
log.debug("Unable to extract EMR step ID from working directory", t);
37+
}
38+
return null;
39+
}
40+
41+
private EmrUtils() {}
42+
}

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ public String instrumentedType() {
3232
@Override
3333
public String[] helperClassNames() {
3434
return new String[] {
35-
packageName + ".SparkConfAllowList", packageName + ".SparkLauncherListener",
35+
packageName + ".EmrUtils",
36+
packageName + ".SparkConfAllowList",
37+
packageName + ".SparkLauncherListener",
3638
};
3739
}
3840

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public static synchronized void createLauncherSpan(Object launcher) {
3939
.start();
4040
span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS);
4141
setLauncherConfigTags(span, launcher);
42+
captureEmrStepId(span);
4243
launcherSpan = span;
4344

4445
if (!shutdownHookRegistered) {
@@ -131,6 +132,13 @@ public void infoChanged(SparkAppHandle handle) {
131132
}
132133
}
133134

135+
private static void captureEmrStepId(AgentSpan span) {
136+
String stepId = EmrUtils.getEmrStepId();
137+
if (stepId != null) {
138+
span.setTag("emr_step_id", stepId);
139+
}
140+
}
141+
134142
/**
135143
* Extract launcher configuration via reflection and set as span tags. Secret redaction uses the
136144
* default pattern only (not spark.redaction.regex) because the SparkLauncher conf map is a plain
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertNull;
5+
6+
import org.junit.jupiter.api.AfterEach;
7+
import org.junit.jupiter.api.BeforeEach;
8+
import org.junit.jupiter.api.Test;
9+
10+
class EmrUtilsTest {
11+
12+
private String originalUserDir;
13+
14+
@BeforeEach
15+
void saveUserDir() {
16+
originalUserDir = System.getProperty("user.dir");
17+
}
18+
19+
@AfterEach
20+
void restoreUserDir() {
21+
System.setProperty("user.dir", originalUserDir);
22+
}
23+
24+
@Test
25+
void returnsStepIdWhenWorkdirMatchesEmrPattern() {
26+
System.setProperty("user.dir", "/mnt/var/lib/hadoop/steps/s-07767992IY7VC5NVV854");
27+
assertEquals("s-07767992IY7VC5NVV854", EmrUtils.getEmrStepId());
28+
}
29+
30+
@Test
31+
void returnsNullWhenWorkdirDoesNotMatchEmrPattern() {
32+
System.setProperty("user.dir", "/home/hadoop");
33+
assertNull(EmrUtils.getEmrStepId());
34+
}
35+
36+
@Test
37+
void returnsNullForApplicationIdWorkdir() {
38+
System.setProperty("user.dir", "/home/hadoop/application_1234567890_0001");
39+
assertNull(EmrUtils.getEmrStepId());
40+
}
41+
}

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public String[] helperClassNames() {
2626
packageName + ".AbstractSparkPlanSerializer",
2727
packageName + ".AbstractSparkPlanUtils",
2828
packageName + ".DatabricksParentContext",
29+
packageName + ".EmrUtils",
2930
packageName + ".OpenlineageParentContext",
3031
packageName + ".DatadogSpark212Listener",
3132
packageName + ".PredeterminedTraceIdContext",

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public String[] helperClassNames() {
2626
packageName + ".AbstractSparkPlanSerializer",
2727
packageName + ".AbstractSparkPlanUtils",
2828
packageName + ".DatabricksParentContext",
29+
packageName + ".EmrUtils",
2930
packageName + ".OpenlineageParentContext",
3031
packageName + ".DatadogSpark213Listener",
3132
packageName + ".PredeterminedTraceIdContext",

0 commit comments

Comments
 (0)