-
Notifications
You must be signed in to change notification settings - Fork 335
Spark instrumentation: capture emr step id when spark runs as EMR step #10670
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
2d6992e
1a62eb6
369fce0
594e9cf
bbf7b20
95b82cc
3aea275
ee7c12e
cfa491c
a8fdf19
d3c7ebb
34af8f9
e0bec00
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| package datadog.trace.instrumentation.spark; | ||
|
|
||
| import java.nio.file.Path; | ||
| import java.nio.file.Paths; | ||
| import java.util.regex.Matcher; | ||
| import java.util.regex.Pattern; | ||
| import javax.annotation.Nullable; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** Extracts the AWS EMR Step ID from the working directory name (e.g. s-07767992IY7VC5NVV854). */ | ||
| class EmrUtils { | ||
|
|
||
| private static final Logger log = LoggerFactory.getLogger(EmrUtils.class); | ||
| private static final Pattern EMR_STEP_ID_PATTERN = Pattern.compile("^(s-[0-9A-Za-z]+)$"); | ||
|
|
||
| @Nullable | ||
| static String getEmrStepId() { | ||
| try { | ||
| String userDir = System.getProperty("user.dir"); | ||
| if (userDir != null) { | ||
| Path workDir = Paths.get(userDir).getFileName(); | ||
| if (workDir != null) { | ||
| Matcher matcher = EMR_STEP_ID_PATTERN.matcher(workDir.toString()); | ||
| if (matcher.matches()) { | ||
| log.debug("EMR step ID extracted: {}", matcher.group(1)); | ||
| return matcher.group(1); | ||
| } | ||
| } | ||
| } | ||
| } catch (Throwable t) { | ||
| log.debug("Unable to extract EMR step ID from working directory", t); | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| private EmrUtils() {} | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,6 +39,7 @@ public static synchronized void createLauncherSpan(Object launcher) { | |
| .start(); | ||
| span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS); | ||
| setLauncherConfigTags(span, launcher); | ||
| captureEmrStepId(span); | ||
| launcherSpan = span; | ||
|
|
||
| if (!shutdownHookRegistered) { | ||
|
|
@@ -131,6 +132,13 @@ public void infoChanged(SparkAppHandle handle) { | |
| } | ||
| } | ||
|
|
||
| private static void captureEmrStepId(AgentSpan span) { | ||
| String stepId = EmrUtils.getEmrStepId(); | ||
| if (stepId != null) { | ||
| span.setTag("emr_step_id", stepId); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we also add a tag this is EMR launcher? does it make sense?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. spark.application are tagged them with step id, above in the other Listener. regarding marking these as emr launcher, I think the span operation name |
||
| } | ||
|
|
||
| /** | ||
| * Extract launcher configuration via reflection and set as span tags. Secret redaction uses the | ||
| * default pattern only (not spark.redaction.regex) because the SparkLauncher conf map is a plain | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| package datadog.trace.instrumentation.spark; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertNull; | ||
|
|
||
| import org.junit.jupiter.api.AfterEach; | ||
| import org.junit.jupiter.api.BeforeEach; | ||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| class EmrUtilsTest { | ||
|
|
||
| private String originalUserDir; | ||
|
|
||
| @BeforeEach | ||
| void saveUserDir() { | ||
| originalUserDir = System.getProperty("user.dir"); | ||
| } | ||
|
|
||
| @AfterEach | ||
| void restoreUserDir() { | ||
| System.setProperty("user.dir", originalUserDir); | ||
| } | ||
|
|
||
| @Test | ||
| void returnsStepIdWhenWorkdirMatchesEmrPattern() { | ||
| System.setProperty("user.dir", "/mnt/var/lib/hadoop/steps/s-07767992IY7VC5NVV854"); | ||
| assertEquals("s-07767992IY7VC5NVV854", EmrUtils.getEmrStepId()); | ||
| } | ||
|
|
||
| @Test | ||
| void returnsNullWhenWorkdirDoesNotMatchEmrPattern() { | ||
| System.setProperty("user.dir", "/home/hadoop"); | ||
| assertNull(EmrUtils.getEmrStepId()); | ||
| } | ||
|
|
||
| @Test | ||
| void returnsNullForApplicationIdWorkdir() { | ||
| System.setProperty("user.dir", "/home/hadoop/application_1234567890_0001"); | ||
| assertNull(EmrUtils.getEmrStepId()); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider creating unit test for this method. This would be really helpful in understanding what paths match the regex.
Also, is there any way to figure out if the launcher is run on EMR? Otherwise, it's hard to say if this aint adding emr tags to non-emr jobs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added unit tests in EmrUtilsTest.java !
The regex is specific enough to only match the EMR step ids, so I think we are guaranteed to only add this tag on Spark on EMR spans.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The regex matches all work dirs starting with
s-. Are you saying this is good enough to tell Spark is running on EMR?