Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ private void initApplicationSpanIfNotInitialized() {
}

captureApplicationParameters(builder);
captureEmrStepId(builder);

Optional<OpenlineageParentContext> openlineageParentContext =
OpenlineageParentContext.from(sparkConf);
Expand Down Expand Up @@ -1210,6 +1211,13 @@ private void captureApplicationParameters(AgentTracer.SpanBuilder builder) {
builder.withTag("config.spark_version", sparkVersion);
}

private static void captureEmrStepId(AgentTracer.SpanBuilder builder) {
String stepId = EmrUtils.getEmrStepId();
if (stepId != null) {
builder.withTag("emr_step_id", stepId);
}
}

private void captureJobParameters(AgentTracer.SpanBuilder builder, Properties properties) {
for (Tuple2<String, String> conf : sparkConf.getAll()) {
if (SparkConfAllowList.canCaptureJobParameter(conf._1)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package datadog.trace.instrumentation.spark;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Extracts the AWS EMR Step ID from the working directory name (e.g. s-07767992IY7VC5NVV854). */
class EmrUtils {

private static final Logger log = LoggerFactory.getLogger(EmrUtils.class);

/** EMR step ID is a 20 character string with numbers and uppercase letters only */
private static final Pattern EMR_STEP_ID_PATTERN = Pattern.compile("^(s-[0-9A-Z]{20})$");

@Nullable
static String getEmrStepId() {
try {
String userDir = System.getProperty("user.dir");
if (userDir == null) {
return null;
}
Path workDir = Paths.get(userDir).getFileName();
if (workDir == null) {
return null;
}
Matcher matcher = EMR_STEP_ID_PATTERN.matcher(workDir.toString());
if (matcher.matches()) {
log.debug("EMR step ID extracted: {}", matcher.group(1));
return matcher.group(1);
}
} catch (Throwable t) {
log.debug("Unable to extract EMR step ID from working directory", t);
}
return null;
}

private EmrUtils() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ public String instrumentedType() {
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".SparkConfAllowList", packageName + ".SparkLauncherListener",
packageName + ".EmrUtils",
packageName + ".SparkConfAllowList",
packageName + ".SparkLauncherListener",
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public static synchronized void createLauncherSpan(Object launcher) {
.start();
span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS);
setLauncherConfigTags(span, launcher);
captureEmrStepId(span);
launcherSpan = span;

if (!shutdownHookRegistered) {
Expand Down Expand Up @@ -131,6 +132,13 @@ public void infoChanged(SparkAppHandle handle) {
}
}

private static void captureEmrStepId(AgentSpan span) {
String stepId = EmrUtils.getEmrStepId();
if (stepId != null) {
span.setTag("emr_step_id", stepId);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also add a tag this is EMR launcher? does it make sense?

Copy link
Copy Markdown
Contributor Author

@aboitreaud aboitreaud Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.application are tagged them with step id, above in the other Listener.

regarding marking these as emr launcher, I think the span operation name spark.launcher.launch already tells us this is a launcher + seeing step_id means we're necessarily on EMR, so I'd say we don't need more than that but let me know how you feel

}

/**
* Extract launcher configuration via reflection and set as span tags. Secret redaction uses the
* default pattern only (not spark.redaction.regex) because the SparkLauncher conf map is a plain
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package datadog.trace.instrumentation.spark;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class EmrUtilsTest {

private String originalUserDir;

@BeforeEach
void saveUserDir() {
originalUserDir = System.getProperty("user.dir");
}

@AfterEach
void restoreUserDir() {
System.setProperty("user.dir", originalUserDir);
}

@Test
void returnsStepIdWhenWorkdirMatchesEmrPattern() {
System.setProperty("user.dir", "/mnt/var/lib/hadoop/steps/s-07767992IY7VC5NVV854");
assertEquals("s-07767992IY7VC5NVV854", EmrUtils.getEmrStepId());
}

@Test
void returnsNullWhenWorkdirDoesNotMatchEmrPattern() {
System.setProperty("user.dir", "/home/hadoop");
assertNull(EmrUtils.getEmrStepId());
}

@Test
void returnsNullForApplicationIdWorkdir() {
System.setProperty("user.dir", "/home/hadoop/application_1234567890_0001");
assertNull(EmrUtils.getEmrStepId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public String[] helperClassNames() {
packageName + ".AbstractSparkPlanSerializer",
packageName + ".AbstractSparkPlanUtils",
packageName + ".DatabricksParentContext",
packageName + ".EmrUtils",
packageName + ".OpenlineageParentContext",
packageName + ".DatadogSpark212Listener",
packageName + ".PredeterminedTraceIdContext",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public String[] helperClassNames() {
packageName + ".AbstractSparkPlanSerializer",
packageName + ".AbstractSparkPlanUtils",
packageName + ".DatabricksParentContext",
packageName + ".EmrUtils",
packageName + ".OpenlineageParentContext",
packageName + ".DatadogSpark213Listener",
packageName + ".PredeterminedTraceIdContext",
Expand Down