Skip to content

Commit 85b414f

Browse files
committed
Update variable name to accumulatorToStageID
1 parent b1b3e4c commit 85b414f

2 files changed

Lines changed: 11 additions & 11 deletions

File tree

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
128128
protected final HashMap<Long, SparkPlanInfo> sqlPlans = new HashMap<>();
129129
private final HashMap<String, SparkListenerExecutorAdded> liveExecutors = new HashMap<>();
130130

131-
private final Map<Long, Integer> acc2stage = new HashMap<>();
131+
private final Map<Long, Integer> accumulatorToStageID = new HashMap<>();
132132

133133
private volatile boolean isStreamingJob = false;
134134
private final boolean isRunningOnDatabricks;
@@ -644,7 +644,7 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl
644644

645645
for (AccumulableInfo info :
646646
JavaConverters.asJavaCollection(stageInfo.accumulables().values())) {
647-
acc2stage.put(info.id(), stageId);
647+
accumulatorToStageID.put(info.id(), stageId);
648648
}
649649

650650
Properties prop = stageProperties.remove(stageSpanKey);
@@ -676,7 +676,7 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl
676676

677677
SparkPlanInfo sqlPlan = sqlPlans.get(sqlExecutionId);
678678
if (sqlPlan != null) {
679-
SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, acc2stage, stageMetric, stageId);
679+
SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, accumulatorToStageID, stageMetric, stageId);
680680
}
681681

682682
span.finish(completionTimeMs * 1000);

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ public class SparkSQLUtils {
2323
public static void addSQLPlanToStageSpan(
2424
AgentSpan span,
2525
SparkPlanInfo plan,
26-
Map<Long, Integer> accumulators,
26+
Map<Long, Integer> accumulatorToStageID,
2727
SparkAggregatedTaskMetrics stageMetric,
2828
int stageId) {
2929
Set<Integer> parentStageIds = new HashSet<>();
3030
SparkPlanInfoForStage planForStage =
31-
computeStageInfoForStage(plan, accumulators, stageId, parentStageIds, false);
31+
computeStageInfoForStage(plan, accumulatorToStageID, stageId, parentStageIds, false);
3232

3333
span.setTag("_dd.spark.sql_parent_stage_ids", parentStageIds.toString());
3434

@@ -40,11 +40,11 @@ public static void addSQLPlanToStageSpan(
4040

4141
public static SparkPlanInfoForStage computeStageInfoForStage(
4242
SparkPlanInfo plan,
43-
Map<Long, Integer> accumulators,
43+
Map<Long, Integer> accumulatorToStageID,
4444
int stageId,
4545
Set<Integer> parentStageIds,
4646
boolean foundStage) {
47-
Set<Integer> stageIds = stageIdsForPlan(plan, accumulators);
47+
Set<Integer> stageIds = stageIdsForPlan(plan, accumulatorToStageID);
4848

4949
boolean hasStageInfo = !stageIds.isEmpty();
5050
boolean isForStage = stageIds.contains(stageId);
@@ -64,7 +64,7 @@ public static SparkPlanInfoForStage computeStageInfoForStage(
6464
List<SparkPlanInfoForStage> childrenForStage = new ArrayList<>();
6565
for (SparkPlanInfo child : children) {
6666
SparkPlanInfoForStage planForStage =
67-
computeStageInfoForStage(child, accumulators, stageId, parentStageIds, true);
67+
computeStageInfoForStage(child, accumulatorToStageID, stageId, parentStageIds, true);
6868

6969
if (planForStage != null) {
7070
childrenForStage.add(planForStage);
@@ -76,7 +76,7 @@ public static SparkPlanInfoForStage computeStageInfoForStage(
7676
// The expected stage was not found yet, searching in the children nodes
7777
for (SparkPlanInfo child : children) {
7878
SparkPlanInfoForStage planForStage =
79-
computeStageInfoForStage(child, accumulators, stageId, parentStageIds, false);
79+
computeStageInfoForStage(child, accumulatorToStageID, stageId, parentStageIds, false);
8080

8181
if (planForStage != null) {
8282
// Early stopping if the stage was found, no need to keep searching
@@ -88,14 +88,14 @@ public static SparkPlanInfoForStage computeStageInfoForStage(
8888
return null;
8989
}
9090

91-
private static Set<Integer> stageIdsForPlan(SparkPlanInfo info, Map<Long, Integer> accumulators) {
91+
private static Set<Integer> stageIdsForPlan(SparkPlanInfo info, Map<Long, Integer> accumulatorToStageID) {
9292
Set<Integer> stageIds = new HashSet<>();
9393

9494
Collection<SQLMetricInfo> metrics =
9595
AbstractDatadogSparkListener.listener.getPlanInfoMetrics(info);
9696
for (SQLMetricInfo metric : metrics) {
9797
// Using the accumulators to associate a plan with its stage
98-
Integer stageId = accumulators.get(metric.accumulatorId());
98+
Integer stageId = accumulatorToStageID.get(metric.accumulatorId());
9999

100100
if (stageId != null) {
101101
stageIds.add(stageId);

0 commit comments

Comments
 (0)