@@ -4,6 +4,8 @@ import com.datadoghq.sketch.ddsketch.DDSketchProtoBinding
44import com.datadoghq.sketch.ddsketch.proto.DDSketch
55import com.datadoghq.sketch.ddsketch.store.CollapsingLowestDenseStore
66import datadog.trace.agent.test.InstrumentationSpecification
7+ import datadog.trace.api.DDTraceId
8+ import datadog.trace.api.ProcessTags
79import org.apache.spark.SparkConf
810import org.apache.spark.Success$
911import org.apache.spark.executor.TaskMetrics
@@ -12,6 +14,7 @@ import org.apache.spark.scheduler.SparkListenerApplicationEnd
1214import org.apache.spark.scheduler.SparkListenerApplicationStart
1315import org.apache.spark.scheduler.SparkListenerExecutorAdded
1416import org.apache.spark.scheduler.SparkListenerExecutorRemoved
17+ import org.apache.spark.scheduler.SparkListenerInterface
1518import org.apache.spark.scheduler.SparkListenerJobEnd
1619import org.apache.spark.scheduler.SparkListenerJobStart
1720import org.apache.spark.scheduler.SparkListenerStageCompleted
@@ -519,6 +522,52 @@ abstract class AbstractSparkListenerTest extends InstrumentationSpecification {
519522 }
520523 }
521524
525+ def " test setupOpenLineage gets service name" (boolean serviceNameSetByUser, String serviceName, String sparkAppName) {
526+ setup :
527+ SparkConf sparkConf = new SparkConf ()
528+ injectSysConfig(" dd.service.name.set.by.user" , Boolean . toString(serviceNameSetByUser))
529+ if (serviceNameSetByUser) {
530+ injectSysConfig(" dd.service.name" , serviceName)
531+ }
532+ if (sparkAppName != null ) {
533+ sparkConf. set(" spark.app.name" , sparkAppName)
534+ }
535+
536+ def listener = getTestDatadogSparkListener(sparkConf)
537+ listener. openLineageSparkListener = Mock (SparkListenerInterface )
538+ listener. openLineageSparkConf = new SparkConf ()
539+ listener. setupOpenLineage(Mock (DDTraceId ))
540+
541+ expect :
542+ assert listener
543+ .openLineageSparkConf
544+ .get(" spark.openlineage.run.tags" )
545+ .split(" ;" )
546+ .contains(" _dd.ol_service:expected-service-name" )
547+
548+ where :
549+ serviceNameSetByUser | serviceName | sparkAppName
550+ true | " expected-service-name" | null
551+ false | null | " expected-service-name"
552+ true | " spark" | " expected-service-name"
553+ true | " hadoop" | " expected-service-name"
554+ }
555+
556+ def " test setupOpenLineage fills ProcessTags" () {
557+ setup :
558+ def listener = getTestDatadogSparkListener()
559+ listener. openLineageSparkListener = Mock (SparkListenerInterface )
560+ listener. openLineageSparkConf = new SparkConf ()
561+ listener. setupOpenLineage(Mock (DDTraceId ))
562+
563+ expect :
564+ assert listener
565+ .openLineageSparkConf
566+ .get(" spark.openlineage.run.tags" )
567+ .split(" ;" )
568+ .contains(" _dd.ol_intake.process_tags:" + ProcessTags . getTagsForSerialization())
569+ }
570+
522571 protected validateRelativeError (double value , double expected , double relativeAccuracy ) {
523572 double relativeError = Math . abs(value - expected) / expected
524573 assert relativeError < relativeAccuracy
0 commit comments