Skip to content

Commit 1c523ed

Browse files
authored
Add outputSize and name to the Stage metadata
Add output size and details to stage
2 parents 0e28bb9 + a321bf0 commit 1c523ed

File tree

4 files changed

+15
-6
lines changed

4 files changed

+15
-6
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ jobs:
2222
uses: actions/checkout@v3
2323

2424
- name: Validate Gradle Wrapper
25-
uses: gradle/wrapper-validation-action@v1
25+
uses: gradle/actions/wrapper-validation@v3
2626

2727
- name: Configure JDK
2828
uses: actions/setup-java@v3

lib/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ publishing {
6666
maven(MavenPublication) {
6767
groupId = 'com.joom.spark'
6868
artifactId = "spark-platform_$scalaVersion"
69-
version = '0.4.9'
69+
version = '0.4.10'
7070

7171
from components.java
7272

lib/src/main/scala/com/joom/spark/monitoring/Parts.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ case class StageSummary(
3434
memorySpillGB: Double,
3535
diskSpillGB: Double,
3636
inputGB: Double,
37+
outputGB: Double,
3738
shuffleWriteGB: Double,
3839
peakExecutionMemoryGB: Double,
3940
properties: Map[String, String],
4041
endTs: Long,
42+
name: Option[String],
4143
)
4244

4345
case class ExecutorMetric(

lib/src/main/scala/com/joom/spark/monitoring/StatsReportingSparkListener.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String,
4747
var sent: Boolean = false,
4848
var startedTaskCount: Int = 0,
4949
var failureReason: Option[String] = None,
50-
var properties: Map[String, String] = Map())
50+
var properties: Map[String, String] = Map(),
51+
var name: Option[String] = None)
5152
private val tasksPerStage = mutable.Map[StageFullId, mutable.ArrayBuffer[(TaskMetrics, TaskEndReason)]]()
5253
private val stageState = mutable.Map[StageFullId, StageState]()
5354
private val appStart: Instant = Instant.now()
@@ -155,7 +156,9 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String,
155156
}.toMap
156157

157158
val startTime = stageSubmitted.stageInfo.submissionTime.map(Instant.ofEpochMilli).getOrElse(Instant.now())
158-
stageState.getOrElseUpdate(stageFullId, StageState(startTime)).properties = properties
159+
val state = stageState.getOrElseUpdate(stageFullId, StageState(startTime))
160+
state.properties = properties
161+
state.name = Some(stageSubmitted.stageInfo.name)
159162
}
160163

161164
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
@@ -236,7 +239,7 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String,
236239
val failureReason = state.failureReason
237240
val startTime = state.startTime
238241
val summary = summarizeStage(appId, stageFullId.stageId, stageFullId.attemptNumber, success, failureReason,
239-
startTime, tasks.toSeq, state.properties, endTs)
242+
startTime, tasks.toSeq, state.properties, endTs, state.name)
240243

241244
implicit val codec: JsonValueCodec[StageSummary] = JsonCodecMaker.make
242245
send("stages", summary.get)
@@ -342,7 +345,9 @@ object StatsReportingSparkListener {
342345
failureReason: Option[String], startTime: Instant,
343346
rawTaskMetrics: Seq[(TaskMetrics, TaskEndReason)],
344347
properties: Map[String, String],
345-
endTime: Option[Long]): Option[StageSummary] = {
348+
endTime: Option[Long],
349+
name: Option[String],
350+
): Option[StageSummary] = {
346351
val taskMetrics = rawTaskMetrics.map(_._1)
347352
.filter(_ != null) // For failed tasks, there will be 'null' TaskMetrics instances.
348353
val runTimes = taskMetrics.map(_.executorRunTime.toDouble / 1000.0)
@@ -375,10 +380,12 @@ object StatsReportingSparkListener {
375380
memorySpillGB = taskMetrics.map(_.memoryBytesSpilled.toDouble).sum / GiB,
376381
diskSpillGB = taskMetrics.map(_.diskBytesSpilled).sum / GiB,
377382
inputGB = taskMetrics.map(_.inputMetrics.bytesRead).sum / GiB,
383+
outputGB = taskMetrics.map(_.outputMetrics.bytesWritten).sum / GiB,
378384
shuffleWriteGB = taskMetrics.map(_.shuffleWriteMetrics.bytesWritten).sum / GiB,
379385
peakExecutionMemoryGB = taskMetrics.map(_.peakExecutionMemory).sum / GiB,
380386
properties = properties,
381387
endTs = endTs,
388+
name = name,
382389
))
383390
}
384391
}

0 commit comments

Comments
 (0)