Skip to content

Commit bfbfa5b

Browse files
authored
Add spark.openlineage.appName tag to spark.application spans (#11246)
Add spark.openlineage.appName tag to spark.application spans Test spark.openlineage.appName tag on spark.application spans Merge branch 'master' into adrien.boitreaud/ol-jobname Co-authored-by: adrien.boitreaud <adrien.boitreaud@datadoghq.com>
1 parent b0652b9 commit bfbfa5b

2 files changed

Lines changed: 50 additions & 0 deletions

File tree

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ private void initApplicationSpanIfNotInitialized() {
284284

285285
captureApplicationParameters(builder);
286286
captureEmrStepId(builder);
287+
captureOpenlineageJobInfo(builder);
287288

288289
Optional<OpenlineageParentContext> openlineageParentContext =
289290
OpenlineageParentContext.from(sparkConf);
@@ -299,6 +300,13 @@ private void initApplicationSpanIfNotInitialized() {
299300
applicationSpan.setMeasured(true);
300301
}
301302

303+
private void captureOpenlineageJobInfo(AgentTracer.SpanBuilder builder) {
304+
String olAppName = sparkConf.get("spark.openlineage.appName", null);
305+
if (olAppName != null) {
306+
builder.withTag("spark.openlineage.appName", olAppName);
307+
}
308+
}
309+
302310
private void captureOpenlineageContextIfPresent(
303311
AgentTracer.SpanBuilder builder, OpenlineageParentContext context) {
304312
builder.asChildOf(context);

dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,48 @@ abstract class AbstractSparkListenerTest extends InstrumentationSpecification {
721721
.getOption("spark.openlineage.circuitBreaker.timeoutInSeconds") == Option.apply("120")
722722
}
723723

724+
def "sets spark.openlineage.appName tag when configured"() {
725+
setup:
726+
def conf = new SparkConf()
727+
conf.set("spark.openlineage.appName", "my-ol-app-name")
728+
def listener = getTestDatadogSparkListener(conf)
729+
730+
when:
731+
listener.onApplicationStart(applicationStartEvent(1000L))
732+
listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L))
733+
734+
then:
735+
assertTraces(1) {
736+
trace(1) {
737+
span {
738+
operationName "spark.application"
739+
spanType "spark"
740+
assert span.tags["spark.openlineage.appName"] == "my-ol-app-name"
741+
}
742+
}
743+
}
744+
}
745+
746+
def "does not set spark.openlineage.appName tag when not configured"() {
747+
setup:
748+
def listener = getTestDatadogSparkListener()
749+
750+
when:
751+
listener.onApplicationStart(applicationStartEvent(1000L))
752+
listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L))
753+
754+
then:
755+
assertTraces(1) {
756+
trace(1) {
757+
span {
758+
operationName "spark.application"
759+
spanType "spark"
760+
assert !span.tags.containsKey("spark.openlineage.appName")
761+
}
762+
}
763+
}
764+
}
765+
724766
protected validateRelativeError(double value, double expected, double relativeAccuracy) {
725767
double relativeError = Math.abs(value - expected) / expected
726768
assert relativeError < relativeAccuracy

0 commit comments

Comments
 (0)