Skip to content

Commit ee7c12e

Browse files
committed
Gate emr_step_id attribute to emr-detected runtimes
1 parent 3aea275 commit ee7c12e

4 files changed

Lines changed: 44 additions & 4 deletions

File tree

dd-java-agent/instrumentation/spark/spark-common/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ dependencies {
2828
testFixturesCompileOnly(libs.bundles.spock)
2929

3030
testImplementation project(':dd-java-agent:instrumentation-testing')
31+
testImplementation group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
3132
testImplementation group: 'org.apache.spark', name: 'spark-launcher_2.12', version: '2.4.0'
3233
}
3334

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
134134

135135
private volatile boolean isStreamingJob = false;
136136
private final boolean isRunningOnDatabricks;
137+
private final boolean isRunningOnEmr;
137138
private final String databricksClusterName;
138139
private final String databricksServiceName;
139140
private final String sparkServiceName;
@@ -156,6 +157,7 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
156157
this.sparkVersion = sparkVersion;
157158

158159
isRunningOnDatabricks = sparkConf.contains("spark.databricks.sparkContextId");
160+
isRunningOnEmr = EmrUtils.isRunningOnEmr(sparkConf);
159161
databricksClusterName = sparkConf.get("spark.databricks.clusterUsageTags.clusterName", null);
160162
databricksServiceName = getDatabricksServiceName(sparkConf, databricksClusterName);
161163
sparkServiceName = getSparkServiceName(sparkConf, isRunningOnDatabricks);
@@ -276,7 +278,7 @@ private void initApplicationSpanIfNotInitialized() {
276278
}
277279

278280
captureApplicationParameters(builder);
279-
captureEmrStepId(builder);
281+
captureEmrStepId(builder, isRunningOnEmr);
280282

281283
Optional<OpenlineageParentContext> openlineageParentContext =
282284
OpenlineageParentContext.from(sparkConf);
@@ -1211,7 +1213,10 @@ private void captureApplicationParameters(AgentTracer.SpanBuilder builder) {
12111213
builder.withTag("config.spark_version", sparkVersion);
12121214
}
12131215

1214-
private static void captureEmrStepId(AgentTracer.SpanBuilder builder) {
1216+
private static void captureEmrStepId(AgentTracer.SpanBuilder builder, boolean isRunningOnEmr) {
1217+
if (!isRunningOnEmr) {
1218+
return;
1219+
}
12151220
String stepId = EmrUtils.getEmrStepId();
12161221
if (stepId != null) {
12171222
builder.withTag("emr_step_id", stepId);

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/EmrUtils.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,31 @@
55
import java.util.regex.Matcher;
66
import java.util.regex.Pattern;
77
import javax.annotation.Nullable;
8+
import org.apache.spark.SparkConf;
89
import org.slf4j.Logger;
910
import org.slf4j.LoggerFactory;
1011

11-
/** Extracts the AWS EMR Step ID from the working directory name (e.g. s-07767992IY7VC5NVV854). */
12+
/** Utilities for detecting AWS EMR and extracting EMR-specific metadata. */
1213
class EmrUtils {
1314

1415
private static final Logger log = LoggerFactory.getLogger(EmrUtils.class);
15-
private static final Pattern EMR_STEP_ID_PATTERN = Pattern.compile("^(s-[0-9A-Za-z]+)$");
16+
/** EMR step ID is a 20 character string with numbers and uppercase letters only */
17+
private static final Pattern EMR_STEP_ID_PATTERN = Pattern.compile("^(s-[0-9A-Z]{20})$");
18+
19+
/**
20+
* Returns true if the Spark job is running on AWS EMR. Detection uses two EMR-exclusive keys:
21+
*
22+
* <ul>
23+
* <li>{@code spark.sql.emr.internal.extensions}: registers EMR's Spark session extensions,
24+
* present across all known EMR releases (observed in community EMR configs)
25+
* <li>{@code spark.emr.default.executor.cores}: newer EMR default, added as a fallback for
26+
* future releases where the extensions key might change
27+
* </ul>
28+
*/
29+
static boolean isRunningOnEmr(SparkConf conf) {
30+
return conf.contains("spark.sql.emr.internal.extensions")
31+
|| conf.contains("spark.emr.default.executor.cores");
32+
}
1633

1734
@Nullable
1835
static String getEmrStepId() {

dd-java-agent/instrumentation/spark/spark-common/src/test/java/datadog/trace/instrumentation/spark/EmrUtilsTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
import static org.junit.jupiter.api.Assertions.assertEquals;
44
import static org.junit.jupiter.api.Assertions.assertNull;
5+
import static org.junit.jupiter.api.Assertions.assertTrue;
56

7+
import org.apache.spark.SparkConf;
68
import org.junit.jupiter.api.AfterEach;
79
import org.junit.jupiter.api.BeforeEach;
810
import org.junit.jupiter.api.Test;
@@ -38,4 +40,19 @@ void returnsNullForApplicationIdWorkdir() {
3840
System.setProperty("user.dir", "/home/hadoop/application_1234567890_0001");
3941
assertNull(EmrUtils.getEmrStepId());
4042
}
43+
44+
@Test
45+
void isRunningOnEmrReturnsTrueWhenEmrInternalExtensionsPresent() {
46+
SparkConf conf = new SparkConf();
47+
conf.set("spark.sql.emr.internal.extensions", "com.amazonaws.emr.spark.EmrSparkSessionExtensions");
48+
assertTrue(EmrUtils.isRunningOnEmr(conf));
49+
}
50+
51+
@Test
52+
void isRunningOnEmrReturnsTrueWhenEmrDefaultExecutorCoresPresent() {
53+
SparkConf conf = new SparkConf();
54+
conf.set("spark.emr.default.executor.cores", "1");
55+
assertTrue(EmrUtils.isRunningOnEmr(conf));
56+
}
57+
4158
}

0 commit comments

Comments
 (0)