@@ -20,76 +20,76 @@ import org.apache.spark.sql.execution.metric.SQLMetric
2020import org .apache .spark .sql .utils .SparkInputMetricsUtil .InputMetricsWrapper
2121
2222/**
23- * Note: "val metrics" is made transient to avoid sending driver-side metrics to tasks, e.g.
24- * "pruning time" from scan.
23+ * Note: "metrics" is @transient to avoid sending the map to executors. SQLMetric references are
24+ * captured into fields at construction time on the driver (when metrics are registered) and
25+ * serialized to executors. Missing keys (minimal scan metrics mode) yield None and are skipped.
2526 */
2627class FileSourceScanMetricsUpdater (@ transient val metrics : Map [String , SQLMetric ])
2728 extends MetricsUpdater {
2829
29- val rawInputRows : SQLMetric = metrics(" rawInputRows" )
30- val rawInputBytes : SQLMetric = metrics(" rawInputBytes" )
31- val outputRows : SQLMetric = metrics(" numOutputRows" )
32- val outputVectors : SQLMetric = metrics(" outputVectors" )
33- val outputBytes : SQLMetric = metrics(" outputBytes" )
34- val wallNanos : SQLMetric = metrics(" wallNanos" )
35- val cpuCount : SQLMetric = metrics(" cpuCount" )
36- val scanTime : SQLMetric = metrics(" scanTime" )
37- val peakMemoryBytes : SQLMetric = metrics(" peakMemoryBytes" )
38- val numMemoryAllocations : SQLMetric = metrics(" numMemoryAllocations" )
30+ private def metric (key : String ): Option [SQLMetric ] = Option (metrics).flatMap(_.get(key))
3931
40- // Number of dynamic filters received.
41- val numDynamicFiltersAccepted : SQLMetric = metrics(" numDynamicFiltersAccepted" )
42- val skippedSplits : SQLMetric = metrics(" skippedSplits" )
43- val processedSplits : SQLMetric = metrics(" processedSplits" )
44- val preloadSplits : SQLMetric = metrics(" preloadSplits" )
45- val pageLoadTime : SQLMetric = metrics(" pageLoadTime" )
46- val dataSourceAddSplitTime : SQLMetric = metrics(" dataSourceAddSplitTime" )
47- val dataSourceReadTime : SQLMetric = metrics(" dataSourceReadTime" )
48- val skippedStrides : SQLMetric = metrics(" skippedStrides" )
49- val processedStrides : SQLMetric = metrics(" processedStrides" )
50- val remainingFilterTime : SQLMetric = metrics(" remainingFilterTime" )
51- val ioWaitTime : SQLMetric = metrics(" ioWaitTime" )
52- val storageReadBytes : SQLMetric = metrics(" storageReadBytes" )
53- val storageReads : SQLMetric = metrics(" storageReads" )
54- val localReadBytes : SQLMetric = metrics(" localReadBytes" )
55- val ramReadBytes : SQLMetric = metrics(" ramReadBytes" )
56- val loadLazyVectorTime : SQLMetric = metrics(" loadLazyVectorTime" )
32+ private val rawInputRows : Option [SQLMetric ] = metric(" rawInputRows" )
33+ private val rawInputBytes : Option [SQLMetric ] = metric(" rawInputBytes" )
34+ private val outputRows : Option [SQLMetric ] = metric(" numOutputRows" )
35+ private val outputVectors : Option [SQLMetric ] = metric(" outputVectors" )
36+ private val outputBytes : Option [SQLMetric ] = metric(" outputBytes" )
37+ private val wallNanos : Option [SQLMetric ] = metric(" wallNanos" )
38+ private val cpuCount : Option [SQLMetric ] = metric(" cpuCount" )
39+ private val scanTime : Option [SQLMetric ] = metric(" scanTime" )
40+ private val peakMemoryBytes : Option [SQLMetric ] = metric(" peakMemoryBytes" )
41+ private val numMemoryAllocations : Option [SQLMetric ] = metric(" numMemoryAllocations" )
42+ private val numDynamicFiltersAccepted : Option [SQLMetric ] = metric(" numDynamicFiltersAccepted" )
43+ private val skippedSplits : Option [SQLMetric ] = metric(" skippedSplits" )
44+ private val processedSplits : Option [SQLMetric ] = metric(" processedSplits" )
45+ private val skippedStrides : Option [SQLMetric ] = metric(" skippedStrides" )
46+ private val processedStrides : Option [SQLMetric ] = metric(" processedStrides" )
47+ private val remainingFilterTime : Option [SQLMetric ] = metric(" remainingFilterTime" )
48+ private val ioWaitTime : Option [SQLMetric ] = metric(" ioWaitTime" )
49+ private val storageReadBytes : Option [SQLMetric ] = metric(" storageReadBytes" )
50+ private val storageReads : Option [SQLMetric ] = metric(" storageReads" )
51+ private val localReadBytes : Option [SQLMetric ] = metric(" localReadBytes" )
52+ private val ramReadBytes : Option [SQLMetric ] = metric(" ramReadBytes" )
53+ private val preloadSplits : Option [SQLMetric ] = metric(" preloadSplits" )
54+ private val pageLoadTime : Option [SQLMetric ] = metric(" pageLoadTime" )
55+ private val dataSourceAddSplitTime : Option [SQLMetric ] = metric(" dataSourceAddSplitTime" )
56+ private val dataSourceReadTime : Option [SQLMetric ] = metric(" dataSourceReadTime" )
57+ private val loadLazyVectorTime : Option [SQLMetric ] = metric(" loadLazyVectorTime" )
5758
5859 override def updateInputMetrics (inputMetrics : InputMetricsWrapper ): Unit = {
59- inputMetrics.bridgeIncBytesRead(rawInputBytes .value)
60- inputMetrics.bridgeIncRecordsRead(rawInputRows .value)
60+ rawInputBytes.foreach(m => inputMetrics.bridgeIncBytesRead(m .value) )
61+ rawInputRows.foreach(m => inputMetrics.bridgeIncRecordsRead(m .value) )
6162 }
6263
6364 override def updateNativeMetrics (opMetrics : IOperatorMetrics ): Unit = {
6465 if (opMetrics != null ) {
6566 val operatorMetrics = opMetrics.asInstanceOf [OperatorMetrics ]
66- rawInputRows += operatorMetrics.rawInputRows
67- rawInputBytes += operatorMetrics.rawInputBytes
68- outputRows += operatorMetrics.outputRows
69- outputVectors += operatorMetrics.outputVectors
70- outputBytes += operatorMetrics.outputBytes
71- wallNanos += operatorMetrics.wallNanos
72- cpuCount += operatorMetrics.cpuCount
73- scanTime += operatorMetrics.scanTime
74- peakMemoryBytes += operatorMetrics.peakMemoryBytes
75- numMemoryAllocations += operatorMetrics.numMemoryAllocations
76- // Number of dynamic filters received.
77- numDynamicFiltersAccepted += operatorMetrics.numDynamicFiltersAccepted
78- skippedSplits += operatorMetrics.skippedSplits
79- processedSplits += operatorMetrics.processedSplits
80- skippedStrides += operatorMetrics.skippedStrides
81- processedStrides += operatorMetrics.processedStrides
82- remainingFilterTime += operatorMetrics.remainingFilterTime
83- ioWaitTime += operatorMetrics.ioWaitTime
84- storageReadBytes += operatorMetrics.storageReadBytes
85- storageReads += operatorMetrics.storageReads
86- localReadBytes += operatorMetrics.localReadBytes
87- ramReadBytes += operatorMetrics.ramReadBytes
88- preloadSplits += operatorMetrics.preloadSplits
89- pageLoadTime += operatorMetrics.pageLoadTime
90- dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime
91- dataSourceReadTime += operatorMetrics.dataSourceReadTime
92- loadLazyVectorTime += operatorMetrics.loadLazyVectorTime
67+ ScanMetricsUtil .inc(rawInputRows, operatorMetrics.rawInputRows)
68+ ScanMetricsUtil .inc(rawInputBytes, operatorMetrics.rawInputBytes)
69+ ScanMetricsUtil .inc(outputRows, operatorMetrics.outputRows)
70+ ScanMetricsUtil .inc(outputVectors, operatorMetrics.outputVectors)
71+ ScanMetricsUtil .inc(outputBytes, operatorMetrics.outputBytes)
72+ ScanMetricsUtil .inc(wallNanos, operatorMetrics.wallNanos)
73+ ScanMetricsUtil .inc(cpuCount, operatorMetrics.cpuCount)
74+ ScanMetricsUtil .inc(scanTime, operatorMetrics.scanTime)
75+ ScanMetricsUtil .inc(peakMemoryBytes, operatorMetrics.peakMemoryBytes)
76+ ScanMetricsUtil .inc(numMemoryAllocations, operatorMetrics.numMemoryAllocations)
77+ ScanMetricsUtil .inc(numDynamicFiltersAccepted, operatorMetrics.numDynamicFiltersAccepted)
78+ ScanMetricsUtil .inc(skippedSplits, operatorMetrics.skippedSplits)
79+ ScanMetricsUtil .inc(processedSplits, operatorMetrics.processedSplits)
80+ ScanMetricsUtil .inc(skippedStrides, operatorMetrics.skippedStrides)
81+ ScanMetricsUtil .inc(processedStrides, operatorMetrics.processedStrides)
82+ ScanMetricsUtil .inc(remainingFilterTime, operatorMetrics.remainingFilterTime)
83+ ScanMetricsUtil .inc(ioWaitTime, operatorMetrics.ioWaitTime)
84+ ScanMetricsUtil .inc(storageReadBytes, operatorMetrics.storageReadBytes)
85+ ScanMetricsUtil .inc(storageReads, operatorMetrics.storageReads)
86+ ScanMetricsUtil .inc(localReadBytes, operatorMetrics.localReadBytes)
87+ ScanMetricsUtil .inc(ramReadBytes, operatorMetrics.ramReadBytes)
88+ ScanMetricsUtil .inc(preloadSplits, operatorMetrics.preloadSplits)
89+ ScanMetricsUtil .inc(pageLoadTime, operatorMetrics.pageLoadTime)
90+ ScanMetricsUtil .inc(dataSourceAddSplitTime, operatorMetrics.dataSourceAddSplitTime)
91+ ScanMetricsUtil .inc(dataSourceReadTime, operatorMetrics.dataSourceReadTime)
92+ ScanMetricsUtil .inc(loadLazyVectorTime, operatorMetrics.loadLazyVectorTime)
9393 }
9494 }
9595}
0 commit comments