Skip to content

Commit e2d98c0

Browse files
authored
fallback to jobGroupid if jobrunid=taskrunid (#11199)
# What Does This Do On Databricks 18.2, `spark.databricks.job.runId` returns the job run ID instead of the task run ID. This causes `DatabricksParentContext` to compute identical trace and span IDs, orphaning Spark spans from their Databricks parent. Fix: skip `spark.databricks.job.runId` when it equals the job run ID and fall through to `spark.jobGroup.id` extraction. # Motivation Broken parent-child relationship between Databricks job/task spans and Spark spans on Databricks 18.2. The `spark.databricks.job.runId` property semantics changed in Databricks 18.2 — it now returns the job-level run ID rather than the task-level run ID. Prior versions are unaffected. # QA Examples Trace before: https://ddstaging.datadoghq.com/apm/traces?query=%40_trace_root%3A1%20service%3Aall-purpose&agg_m=count&agg_m_source=base&agg_t=count&cols=service%2Cresource_name%2C%40duration%2C%40http.method%2C%40http.status_code%2C%40_span.count%2C%40_duration.by_service&fromUser=false&graphType=flamegraph&historicalData=true&messageDisplay=inline&query_translation_version=v0&shouldShowLegend=true&sort=desc&spanID=2159132286262269065&spanType=trace-root&storage=hot&timeHint=1777044603326&trace=AwAAAZ3AHMG-ib6LIAAAABhBWjNBSGtRNUFBQ19GLUxjWVlIV2t2WWQAAAAkMTE5ZGMwMzgtMmFmOS00NTFlLTk2YWYtZWFjNTQxOWY3YjhhAAAAfg&traceID=2159132286262269063&traceQuery=&view=spans&start=1776931657013&end=1777277257013&paused=false trace after https://ddstaging.datadoghq.com/apm/traces?query=%40_trace_root%3A1%20service%3Aall-purpose&agg_m=count&agg_m_source=base&agg_t=count&cols=service%2Cresource_name%2C%40duration%2C%40http.method%2C%40http.status_code%2C%40_span.count%2C%40_duration.by_service&fromUser=false&graphType=flamegraph&historicalData=true&messageDisplay=inline&query_translation_version=v0&shouldShowLegend=true&sort=desc&spanID=10469624543607262827&spanType=trace-root&storage=hot&timeHint=1777276836597&trace=AwAAAZ3N9Fr1O6CLVgAAABhBWjNOOXhfLUFBQ3NPZURNalpLQkZrdVIAAAAkZjE5ZGNkZjctMmI4ZC00YWJkLWJlZmYtYThlNGJjMmE3Zjk2AAAAAQ&traceID=10469624543607262825&traceQuery=&view=spans&start=1777190817655&end=1777277217655&paused=false # Contributor Checklist - Format the title according to [the contribution guidelines](https://github.com/DataDog/dd-trace-java/blob/master/CONTRIBUTING.md#title-format) - Assign the `type:` and (`comp:` or `inst:`) labels in addition to [any other useful labels](https://github.com/DataDog/dd-trace-java/blob/master/CONTRIBUTING.md#labels) - Avoid using `close`, `fix`, or [any linking keywords](https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword) when referencing an issue Use `solves` instead, and assign the PR [milestone](https://github.com/DataDog/dd-trace-java/milestones) to the issue - Update the [CODEOWNERS](https://github.com/DataDog/dd-trace-java/blob/master/.github/CODEOWNERS) file on source file addition, migration, or deletion - Update [public documentation](https://docs.datadoghq.com/tracing/trace_collection/library_config/java/) with any new configuration flags or behaviors <!-- # Opening vs Drafting a PR: When opening a pull request, please open it as a draft to not auto assign reviewers before you feel the pull request is in a reviewable state. # Linking a JIRA ticket: Please link your JIRA ticket by adding its identifier between brackets (ex [PROJ-IDENT]) in the PR description, not the title. This requirement only applies to Datadog employees. --> Co-authored-by: adrien.boitreaud <adrien.boitreaud@datadoghq.com>
1 parent 515d17b commit e2d98c0

3 files changed

Lines changed: 54 additions & 6 deletions

File tree

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ private void addDatabricksSpecificTags(
426426
if (properties != null) {
427427
String databricksJobId = getDatabricksJobId(properties);
428428
String databricksJobRunId = getDatabricksJobRunId(properties, databricksClusterName);
429-
String databricksTaskRunId = getDatabricksTaskRunId(properties);
429+
String databricksTaskRunId = getDatabricksTaskRunId(properties, databricksJobRunId);
430430

431431
// ids to link those spans to databricks job/task traces
432432
builder.withTag("databricks_job_id", databricksJobId);
@@ -1177,10 +1177,14 @@ private static String getDatabricksJobRunId(
11771177
}
11781178

11791179
@SuppressForbidden // split with one-char String use a fast-path without regex usage
1180-
private static String getDatabricksTaskRunId(Properties properties) {
1181-
// spark.databricks.job.runId is the runId of the task, not of the Job
1180+
private static String getDatabricksTaskRunId(Properties properties, String jobRunId) {
1181+
// spark.databricks.job.runId is the runId of the task, not of the Job, until Databricks 18.2
11821182
String taskRunId = properties.getProperty("spark.databricks.job.runId");
1183-
if (taskRunId != null) {
1183+
// On Databricks 18.2+, spark.databricks.job.runId now returns the job run ID
1184+
// There is no easy config key to extract the task run ID, so we use the fallback extraction
1185+
// methods
1186+
// Task run ID is crucial for the spans parent-child relationship inside the trace
1187+
if (taskRunId != null && !taskRunId.equals(jobRunId)) {
11841188
return taskRunId;
11851189
}
11861190

dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkStructuredStreamingTest.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class AbstractSparkStructuredStreamingTest extends InstrumentationSpecification
4444
.config("spark.databricks.sparkContextId", "3291395623902517763")
4545
.config("spark.databricks.job.id", "3822225623902514353")
4646
.config("spark.databricks.job.parentRunId", "3851395623902519743")
47-
.config("spark.databricks.job.runId", "3851395623902519743")
47+
.config("spark.databricks.job.runId", "4851395623902519743")
4848
.getOrCreate()
4949
}
5050

@@ -303,7 +303,7 @@ class AbstractSparkStructuredStreamingTest extends InstrumentationSpecification
303303
spanType "spark"
304304
parent()
305305
links({
306-
link(DDTraceId.from((long)12052652441736835200), (long)-6394091631972716416)
306+
link(DDTraceId.from((long)12052652441736835200), (long)1375416004467624525)
307307
})
308308
}
309309
span {

dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,50 @@ abstract class AbstractSparkTest extends InstrumentationSpecification {
422422
sparkSession.stop()
423423
}
424424

425+
def "fallback to jobGroup.id when spark.databricks.job.runId equals parentRunId on Databricks 18.2+"() {
426+
setup:
427+
def sparkSession = SparkSession.builder()
428+
.config("spark.master", "local")
429+
.config("spark.default.parallelism", "2")
430+
.config("spark.sql.shuffle.partitions", "2")
431+
.config("spark.databricks.sparkContextId", "some_id")
432+
.getOrCreate()
433+
434+
sparkSession.sparkContext().setLocalProperty("spark.databricks.job.id", "1234")
435+
sparkSession.sparkContext().setLocalProperty("spark.databricks.job.runId", "5678") // Same as parentRunId
436+
sparkSession.sparkContext().setLocalProperty("spark.jobGroup.id", "0000_job-1234-run-7890-action-0000")
437+
sparkSession.sparkContext().setLocalProperty("spark.databricks.job.parentRunId", "5678")
438+
TestSparkComputation.generateTestSparkComputation(sparkSession)
439+
440+
expect:
441+
assertTraces(1) {
442+
trace(3) {
443+
span {
444+
operationName "spark.job"
445+
spanType "spark"
446+
traceId 8944764253919609482G
447+
parentSpanId 3503717452567411167G
448+
assert span.tags["databricks_job_id"] == "1234"
449+
assert span.tags["databricks_job_run_id"] == "5678"
450+
assert span.tags["databricks_task_run_id"] == "7890"
451+
}
452+
span {
453+
operationName "spark.stage"
454+
spanType "spark"
455+
childOf(span(0))
456+
}
457+
span {
458+
operationName "spark.stage"
459+
spanType "spark"
460+
childOf(span(0))
461+
}
462+
}
463+
}
464+
465+
cleanup:
466+
sparkSession.stop()
467+
}
468+
425469
def "compute the databricks parent context"() {
426470
setup:
427471
def contextWithJobRunId = new DatabricksParentContext("1234", "5678", "9012")

0 commit comments

Comments
 (0)