Skip to content

Commit cc12228

Browse files
charlesmyudevflow.devflow-routing-intake
andauthored
Track external accumulators in tracer instead of using SparkInfo values (#10553)
Track external accumulators in tracer instead of using SparkInfo values Create and implement getSum Send summed SQL plan metric values Limit external accumulators to 5,000 per stage Use compensated sum to limit rounding errors Cast to Number type instead of Long Merge branch 'master' into charles.yu/djm-0000/fix-spark-plan-metrics Co-authored-by: devflow.devflow-routing-intake <devflow.devflow-routing-intake@kubernetes.us1.ddbuild.io>
1 parent 97429bd commit cc12228

File tree

10 files changed

+618
-92
lines changed

10 files changed

+618
-92
lines changed

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.spark.ExceptionFailure;
4141
import org.apache.spark.SparkConf;
4242
import org.apache.spark.TaskFailedReason;
43+
import org.apache.spark.executor.TaskMetrics;
4344
import org.apache.spark.scheduler.AccumulableInfo;
4445
import org.apache.spark.scheduler.JobFailed;
4546
import org.apache.spark.scheduler.SparkListener;
@@ -64,6 +65,7 @@
6465
import org.apache.spark.sql.streaming.StateOperatorProgress;
6566
import org.apache.spark.sql.streaming.StreamingQueryListener;
6667
import org.apache.spark.sql.streaming.StreamingQueryProgress;
68+
import org.apache.spark.util.AccumulatorV2;
6769
import org.slf4j.Logger;
6870
import org.slf4j.LoggerFactory;
6971
import scala.Tuple2;
@@ -127,8 +129,10 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
127129
private final HashMap<String, SparkListenerExecutorAdded> liveExecutors = new HashMap<>();
128130

129131
// There is no easy way to know if an accumulator is not useful anymore (meaning it is not part of
130-
// an active SQL query)
131-
// so capping the size of the collection storing them
132+
// an active SQL query) so capping the size of the collection storing them
133+
// TODO (CY): Is this potentially the reason why some Spark Plans aren't showing up consistently?
134+
// If we know we don't need the accumulator values, can we drop all associated data and just map
135+
// stage ID -> accumulator ID? Put this behind some FF
132136
private final Map<Long, SparkSQLUtils.AccumulatorWithStage> accumulators =
133137
new RemoveEldestHashMap<>(MAX_ACCUMULATOR_SIZE);
134138

@@ -229,6 +233,12 @@ public void setupOpenLineage(DDTraceId traceId) {
229233
/** Parent Ids of a Stage. Provide an implementation based on a specific scala version */
230234
protected abstract int[] getStageParentIds(StageInfo info);
231235

236+
/**
237+
* All External Accumulators associated with a given task. Provide an implementation based on a
238+
* specific scala version
239+
*/
240+
protected abstract List<AccumulatorV2> getExternalAccumulators(TaskMetrics metrics);
241+
232242
@Override
233243
public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) {
234244
this.applicationStart = applicationStart;
@@ -672,7 +682,7 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl
672682

673683
SparkPlanInfo sqlPlan = sqlPlans.get(sqlExecutionId);
674684
if (sqlPlan != null) {
675-
SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, accumulators, stageId);
685+
SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, accumulators, stageMetric, stageId);
676686
}
677687

678688
span.finish(completionTimeMs * 1000);
@@ -686,7 +696,9 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
686696

687697
SparkAggregatedTaskMetrics stageMetric = stageMetrics.get(stageSpanKey);
688698
if (stageMetric != null) {
689-
stageMetric.addTaskMetrics(taskEnd);
699+
// Not happy that we have to extract external accumulators here, but needed as we're dealing
700+
// with Seq which varies across Scala versions
701+
stageMetric.addTaskMetrics(taskEnd, getExternalAccumulators(taskEnd.taskMetrics()));
690702
}
691703

692704
if (taskEnd.taskMetrics() != null) {

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,24 @@
11
package datadog.trace.instrumentation.spark;
22

3+
import com.fasterxml.jackson.core.JsonGenerator;
34
import datadog.metrics.api.Histogram;
45
import datadog.trace.api.Config;
56
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
7+
import java.io.IOException;
68
import java.nio.ByteBuffer;
79
import java.util.Base64;
10+
import java.util.List;
11+
import java.util.Map;
812
import org.apache.spark.TaskFailedReason;
913
import org.apache.spark.executor.TaskMetrics;
1014
import org.apache.spark.scheduler.SparkListenerTaskEnd;
15+
import org.apache.spark.sql.execution.metric.SQLMetricInfo;
16+
import org.apache.spark.util.AccumulatorV2;
1117

1218
class SparkAggregatedTaskMetrics {
1319
private static final double HISTOGRAM_RELATIVE_ACCURACY = 1 / 32.0;
1420
private static final int HISTOGRAM_MAX_NUM_BINS = 512;
21+
private static final int MAX_ACCUMULATOR_SIZE = 5000;
1522
private final boolean isSparkTaskHistogramEnabled = Config.get().isSparkTaskHistogramEnabled();
1623

1724
private long executorDeserializeTime = 0L;
@@ -59,13 +66,17 @@ class SparkAggregatedTaskMetrics {
5966
private Histogram shuffleWriteBytesHistogram;
6067
private Histogram diskBytesSpilledHistogram;
6168

69+
// Used for Spark SQL Plan metrics ONLY, don't put in regular span for now
70+
private Map<Long, Histogram> externalAccumulableHistograms;
71+
6272
public SparkAggregatedTaskMetrics() {}
6373

6474
public SparkAggregatedTaskMetrics(long availableExecutorTime) {
6575
this.previousAvailableExecutorTime = availableExecutorTime;
6676
}
6777

68-
public void addTaskMetrics(SparkListenerTaskEnd taskEnd) {
78+
public void addTaskMetrics(
79+
SparkListenerTaskEnd taskEnd, List<AccumulatorV2> externalAccumulators) {
6980
taskCompletedCount += 1;
7081

7182
if (taskEnd.taskInfo().attemptNumber() > 0) {
@@ -127,6 +138,31 @@ public void addTaskMetrics(SparkListenerTaskEnd taskEnd) {
127138
shuffleWriteBytesHistogram, taskMetrics.shuffleWriteMetrics().bytesWritten());
128139
diskBytesSpilledHistogram =
129140
lazyHistogramAccept(diskBytesSpilledHistogram, taskMetrics.diskBytesSpilled());
141+
142+
// TODO (CY): Should we also look at TaskInfo accumulable update values as a backup? Is that
143+
// only needed for SHS?
144+
if (externalAccumulators != null && !externalAccumulators.isEmpty()) {
145+
if (externalAccumulableHistograms == null) {
146+
externalAccumulableHistograms = new RemoveEldestHashMap<>(MAX_ACCUMULATOR_SIZE);
147+
}
148+
149+
externalAccumulators.forEach(
150+
acc -> {
151+
Histogram hist = externalAccumulableHistograms.get(acc.id());
152+
if (hist == null) {
153+
hist =
154+
Histogram.newHistogram(HISTOGRAM_RELATIVE_ACCURACY, HISTOGRAM_MAX_NUM_BINS);
155+
}
156+
157+
try {
158+
// As of spark 3.5, all SQL metrics are Long, safeguard if it changes in new
159+
// versions
160+
hist.accept(((Number) acc.value()).doubleValue());
161+
externalAccumulableHistograms.put(acc.id(), hist);
162+
} catch (ClassCastException ignored) {
163+
}
164+
});
165+
}
130166
}
131167
}
132168
}
@@ -276,6 +312,22 @@ private Histogram lazyHistogramAccept(Histogram hist, double value) {
276312
return hist;
277313
}
278314

315+
// Used to put external accum metrics to JSON for Spark SQL plans
316+
public void externalAccumToJson(JsonGenerator generator, SQLMetricInfo info) throws IOException {
317+
if (externalAccumulableHistograms != null) {
318+
Histogram hist = externalAccumulableHistograms.get(info.accumulatorId());
319+
String name = info.name();
320+
321+
if (name != null && hist != null) {
322+
generator.writeStartObject();
323+
generator.writeStringField(name, histogramToBase64(hist));
324+
generator.writeNumberField("sum", hist.getSum());
325+
generator.writeStringField("type", info.metricType());
326+
generator.writeEndObject();
327+
}
328+
}
329+
}
330+
279331
public static long computeTaskRunTime(TaskMetrics metrics) {
280332
return metrics.executorDeserializeTime()
281333
+ metrics.executorRunTime()

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public static void addSQLPlanToStageSpan(
2424
AgentSpan span,
2525
SparkPlanInfo plan,
2626
Map<Long, AccumulatorWithStage> accumulators,
27+
SparkAggregatedTaskMetrics stageMetric,
2728
int stageId) {
2829
Set<Integer> parentStageIds = new HashSet<>();
2930
SparkPlanInfoForStage planForStage =
@@ -32,7 +33,7 @@ public static void addSQLPlanToStageSpan(
3233
span.setTag("_dd.spark.sql_parent_stage_ids", parentStageIds.toString());
3334

3435
if (planForStage != null) {
35-
String json = planForStage.toJson(accumulators);
36+
String json = planForStage.toJson(stageMetric);
3637
span.setTag("_dd.spark.sql_plan", json);
3738
}
3839
}
@@ -143,15 +144,15 @@ public SparkPlanInfoForStage(SparkPlanInfo plan, List<SparkPlanInfoForStage> chi
143144
this.children = children;
144145
}
145146

146-
public String toJson(Map<Long, AccumulatorWithStage> accumulators) {
147+
public String toJson(SparkAggregatedTaskMetrics stageMetric) {
147148
// Using the jackson JSON lib used by spark
148149
// https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12/3.5.0
149150
ObjectMapper mapper =
150151
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
151152

152153
ByteArrayOutputStream baos = new ByteArrayOutputStream();
153154
try (JsonGenerator generator = mapper.getFactory().createGenerator(baos)) {
154-
this.toJson(generator, accumulators, mapper);
155+
this.toJson(generator, mapper, stageMetric);
155156
} catch (IOException e) {
156157
return null;
157158
}
@@ -160,7 +161,7 @@ public String toJson(Map<Long, AccumulatorWithStage> accumulators) {
160161
}
161162

162163
private void toJson(
163-
JsonGenerator generator, Map<Long, AccumulatorWithStage> accumulators, ObjectMapper mapper)
164+
JsonGenerator generator, ObjectMapper mapper, SparkAggregatedTaskMetrics stageMetric)
164165
throws IOException {
165166
generator.writeStartObject();
166167
generator.writeStringField("node", plan.nodeName());
@@ -199,11 +200,7 @@ private void toJson(
199200
generator.writeFieldName("metrics");
200201
generator.writeStartArray();
201202
for (SQLMetricInfo metric : metrics) {
202-
long accumulatorId = metric.accumulatorId();
203-
AccumulatorWithStage acc = accumulators.get(accumulatorId);
204-
if (acc != null) {
205-
acc.toJson(generator, metric);
206-
}
203+
stageMetric.externalAccumToJson(generator, metric);
207204
}
208205
generator.writeEndArray();
209206
}
@@ -213,7 +210,7 @@ private void toJson(
213210
generator.writeFieldName("children");
214211
generator.writeStartArray();
215212
for (SparkPlanInfoForStage child : children) {
216-
child.toJson(generator, accumulators, mapper);
213+
child.toJson(generator, mapper, stageMetric);
217214
}
218215
generator.writeEndArray();
219216
}

0 commit comments

Comments
 (0)