Skip to content

Commit 1ca4fa6

Browse files
committed
Add output size and details to stage
1 parent 0e28bb9 commit 1ca4fa6

File tree

3 files changed

+14
-5
lines changed

3 files changed

+14
-5
lines changed

lib/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ publishing {
6666
maven(MavenPublication) {
6767
groupId = 'com.joom.spark'
6868
artifactId = "spark-platform_$scalaVersion"
69-
version = '0.4.9'
69+
version = '0.4.10'
7070

7171
from components.java
7272

lib/src/main/scala/com/joom/spark/monitoring/Parts.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ case class StageSummary(
3434
memorySpillGB: Double,
3535
diskSpillGB: Double,
3636
inputGB: Double,
37+
outputGB: Double,
3738
shuffleWriteGB: Double,
3839
peakExecutionMemoryGB: Double,
3940
properties: Map[String, String],
4041
endTs: Long,
42+
details: Option[String],
4143
)
4244

4345
case class ExecutorMetric(

lib/src/main/scala/com/joom/spark/monitoring/StatsReportingSparkListener.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String,
4747
var sent: Boolean = false,
4848
var startedTaskCount: Int = 0,
4949
var failureReason: Option[String] = None,
50-
var properties: Map[String, String] = Map())
50+
var properties: Map[String, String] = Map(),
51+
var details: Option[String] = None)
5152
private val tasksPerStage = mutable.Map[StageFullId, mutable.ArrayBuffer[(TaskMetrics, TaskEndReason)]]()
5253
private val stageState = mutable.Map[StageFullId, StageState]()
5354
private val appStart: Instant = Instant.now()
@@ -155,7 +156,9 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String,
155156
}.toMap
156157

157158
val startTime = stageSubmitted.stageInfo.submissionTime.map(Instant.ofEpochMilli).getOrElse(Instant.now())
158-
stageState.getOrElseUpdate(stageFullId, StageState(startTime)).properties = properties
159+
val state = stageState.getOrElseUpdate(stageFullId, StageState(startTime))
160+
state.properties = properties
161+
state.details = Some(stageSubmitted.stageInfo.details)
159162
}
160163

161164
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
@@ -236,7 +239,7 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String,
236239
val failureReason = state.failureReason
237240
val startTime = state.startTime
238241
val summary = summarizeStage(appId, stageFullId.stageId, stageFullId.attemptNumber, success, failureReason,
239-
startTime, tasks.toSeq, state.properties, endTs)
242+
startTime, tasks.toSeq, state.properties, endTs, state.details)
240243

241244
implicit val codec: JsonValueCodec[StageSummary] = JsonCodecMaker.make
242245
send("stages", summary.get)
@@ -342,7 +345,9 @@ object StatsReportingSparkListener {
342345
failureReason: Option[String], startTime: Instant,
343346
rawTaskMetrics: Seq[(TaskMetrics, TaskEndReason)],
344347
properties: Map[String, String],
345-
endTime: Option[Long]): Option[StageSummary] = {
348+
endTime: Option[Long],
349+
details: Option[String],
350+
): Option[StageSummary] = {
346351
val taskMetrics = rawTaskMetrics.map(_._1)
347352
.filter(_ != null) // For failed tasks, there will be 'null' TaskMetrics instances.
348353
val runTimes = taskMetrics.map(_.executorRunTime.toDouble / 1000.0)
@@ -375,10 +380,12 @@ object StatsReportingSparkListener {
375380
memorySpillGB = taskMetrics.map(_.memoryBytesSpilled.toDouble).sum / GiB,
376381
diskSpillGB = taskMetrics.map(_.diskBytesSpilled).sum / GiB,
377382
inputGB = taskMetrics.map(_.inputMetrics.bytesRead).sum / GiB,
383+
outputGB = taskMetrics.map(_.outputMetrics.bytesWritten).sum / GiB,
378384
shuffleWriteGB = taskMetrics.map(_.shuffleWriteMetrics.bytesWritten).sum / GiB,
379385
peakExecutionMemoryGB = taskMetrics.map(_.peakExecutionMemory).sum / GiB,
380386
properties = properties,
381387
endTs = endTs,
388+
details = details,
382389
))
383390
}
384391
}

0 commit comments

Comments
 (0)