Skip to content

Commit 5a62ccc

Browse files
charlesmyudevflow.devflow-routing-intake
andauthored
Store accumulator-stage lookups directly (#10645)
Store accumulator-stage lookups directly Update variable name to accumulatorToStageID Spotless Co-authored-by: devflow.devflow-routing-intake <devflow.devflow-routing-intake@kubernetes.us1.ddbuild.io>
1 parent 28a1a34 commit 5a62ccc

File tree

2 files changed

+14
-19
lines changed

2 files changed

+14
-19
lines changed

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
128128
protected final HashMap<Long, SparkPlanInfo> sqlPlans = new HashMap<>();
129129
private final HashMap<String, SparkListenerExecutorAdded> liveExecutors = new HashMap<>();
130130

131-
// There is no easy way to know if an accumulator is not useful anymore (meaning it is not part of
132-
// an active SQL query) so capping the size of the collection storing them
133-
// TODO (CY): Is this potentially the reason why some Spark Plans aren't showing up consistently?
134-
// If we know we don't need the accumulator values, can we drop all associated data and just map
135-
// stage ID -> accumulator ID? Put this behind some FF
136-
private final Map<Long, SparkSQLUtils.AccumulatorWithStage> accumulators =
137-
new RemoveEldestHashMap<>(MAX_ACCUMULATOR_SIZE);
131+
private final Map<Long, Integer> accumulatorToStageID = new HashMap<>();
138132

139133
private volatile boolean isStreamingJob = false;
140134
private final boolean isRunningOnDatabricks;
@@ -650,7 +644,7 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl
650644

651645
for (AccumulableInfo info :
652646
JavaConverters.asJavaCollection(stageInfo.accumulables().values())) {
653-
accumulators.put(info.id(), new SparkSQLUtils.AccumulatorWithStage(stageId, info));
647+
accumulatorToStageID.put(info.id(), stageId);
654648
}
655649

656650
Properties prop = stageProperties.remove(stageSpanKey);
@@ -682,7 +676,8 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl
682676

683677
SparkPlanInfo sqlPlan = sqlPlans.get(sqlExecutionId);
684678
if (sqlPlan != null) {
685-
SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, accumulators, stageMetric, stageId);
679+
SparkSQLUtils.addSQLPlanToStageSpan(
680+
span, sqlPlan, accumulatorToStageID, stageMetric, stageId);
686681
}
687682

688683
span.finish(completionTimeMs * 1000);

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ public class SparkSQLUtils {
2323
public static void addSQLPlanToStageSpan(
2424
AgentSpan span,
2525
SparkPlanInfo plan,
26-
Map<Long, AccumulatorWithStage> accumulators,
26+
Map<Long, Integer> accumulatorToStageID,
2727
SparkAggregatedTaskMetrics stageMetric,
2828
int stageId) {
2929
Set<Integer> parentStageIds = new HashSet<>();
3030
SparkPlanInfoForStage planForStage =
31-
computeStageInfoForStage(plan, accumulators, stageId, parentStageIds, false);
31+
computeStageInfoForStage(plan, accumulatorToStageID, stageId, parentStageIds, false);
3232

3333
span.setTag("_dd.spark.sql_parent_stage_ids", parentStageIds.toString());
3434

@@ -40,11 +40,11 @@ public static void addSQLPlanToStageSpan(
4040

4141
public static SparkPlanInfoForStage computeStageInfoForStage(
4242
SparkPlanInfo plan,
43-
Map<Long, AccumulatorWithStage> accumulators,
43+
Map<Long, Integer> accumulatorToStageID,
4444
int stageId,
4545
Set<Integer> parentStageIds,
4646
boolean foundStage) {
47-
Set<Integer> stageIds = stageIdsForPlan(plan, accumulators);
47+
Set<Integer> stageIds = stageIdsForPlan(plan, accumulatorToStageID);
4848

4949
boolean hasStageInfo = !stageIds.isEmpty();
5050
boolean isForStage = stageIds.contains(stageId);
@@ -64,7 +64,7 @@ public static SparkPlanInfoForStage computeStageInfoForStage(
6464
List<SparkPlanInfoForStage> childrenForStage = new ArrayList<>();
6565
for (SparkPlanInfo child : children) {
6666
SparkPlanInfoForStage planForStage =
67-
computeStageInfoForStage(child, accumulators, stageId, parentStageIds, true);
67+
computeStageInfoForStage(child, accumulatorToStageID, stageId, parentStageIds, true);
6868

6969
if (planForStage != null) {
7070
childrenForStage.add(planForStage);
@@ -76,7 +76,7 @@ public static SparkPlanInfoForStage computeStageInfoForStage(
7676
// The expected stage was not found yet, searching in the children nodes
7777
for (SparkPlanInfo child : children) {
7878
SparkPlanInfoForStage planForStage =
79-
computeStageInfoForStage(child, accumulators, stageId, parentStageIds, false);
79+
computeStageInfoForStage(child, accumulatorToStageID, stageId, parentStageIds, false);
8080

8181
if (planForStage != null) {
8282
// Early stopping if the stage was found, no need to keep searching
@@ -89,17 +89,17 @@ public static SparkPlanInfoForStage computeStageInfoForStage(
8989
}
9090

9191
private static Set<Integer> stageIdsForPlan(
92-
SparkPlanInfo info, Map<Long, AccumulatorWithStage> accumulators) {
92+
SparkPlanInfo info, Map<Long, Integer> accumulatorToStageID) {
9393
Set<Integer> stageIds = new HashSet<>();
9494

9595
Collection<SQLMetricInfo> metrics =
9696
AbstractDatadogSparkListener.listener.getPlanInfoMetrics(info);
9797
for (SQLMetricInfo metric : metrics) {
9898
// Using the accumulators to associate a plan with its stage
99-
AccumulatorWithStage acc = accumulators.get(metric.accumulatorId());
99+
Integer stageId = accumulatorToStageID.get(metric.accumulatorId());
100100

101-
if (acc != null) {
102-
stageIds.add(acc.stageId);
101+
if (stageId != null) {
102+
stageIds.add(stageId);
103103
}
104104
}
105105

0 commit comments

Comments
 (0)