@@ -26,70 +26,43 @@ import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
2626class FileSourceScanMetricsUpdater (@ transient val metrics : Map [String , SQLMetric ])
2727 extends MetricsUpdater {
2828
29- val rawInputRows : SQLMetric = metrics(" rawInputRows" )
30- val rawInputBytes : SQLMetric = metrics(" rawInputBytes" )
31- val outputRows : SQLMetric = metrics(" numOutputRows" )
32- val outputVectors : SQLMetric = metrics(" outputVectors" )
33- val outputBytes : SQLMetric = metrics(" outputBytes" )
34- val wallNanos : SQLMetric = metrics(" wallNanos" )
35- val cpuCount : SQLMetric = metrics(" cpuCount" )
36- val scanTime : SQLMetric = metrics(" scanTime" )
37- val peakMemoryBytes : SQLMetric = metrics(" peakMemoryBytes" )
38- val numMemoryAllocations : SQLMetric = metrics(" numMemoryAllocations" )
39-
40- // Number of dynamic filters received.
41- val numDynamicFiltersAccepted : SQLMetric = metrics(" numDynamicFiltersAccepted" )
42- val skippedSplits : SQLMetric = metrics(" skippedSplits" )
43- val processedSplits : SQLMetric = metrics(" processedSplits" )
44- val preloadSplits : SQLMetric = metrics(" preloadSplits" )
45- val pageLoadTime : SQLMetric = metrics(" pageLoadTime" )
46- val dataSourceAddSplitTime : SQLMetric = metrics(" dataSourceAddSplitTime" )
47- val dataSourceReadTime : SQLMetric = metrics(" dataSourceReadTime" )
48- val skippedStrides : SQLMetric = metrics(" skippedStrides" )
49- val processedStrides : SQLMetric = metrics(" processedStrides" )
50- val remainingFilterTime : SQLMetric = metrics(" remainingFilterTime" )
51- val ioWaitTime : SQLMetric = metrics(" ioWaitTime" )
52- val storageReadBytes : SQLMetric = metrics(" storageReadBytes" )
53- val storageReads : SQLMetric = metrics(" storageReads" )
54- val localReadBytes : SQLMetric = metrics(" localReadBytes" )
55- val ramReadBytes : SQLMetric = metrics(" ramReadBytes" )
56- val loadLazyVectorTime : SQLMetric = metrics(" loadLazyVectorTime" )
57-
5829 override def updateInputMetrics (inputMetrics : InputMetricsWrapper ): Unit = {
59- inputMetrics.bridgeIncBytesRead(rawInputBytes.value)
60- inputMetrics.bridgeIncRecordsRead(rawInputRows.value)
30+ inputMetrics.bridgeIncBytesRead(metrics( " rawInputBytes" ) .value)
31+ inputMetrics.bridgeIncRecordsRead(metrics( " rawInputRows" ) .value)
6132 }
6233
6334 override def updateNativeMetrics (opMetrics : IOperatorMetrics ): Unit = {
6435 if (opMetrics != null ) {
6536 val operatorMetrics = opMetrics.asInstanceOf [OperatorMetrics ]
66- rawInputRows += operatorMetrics.rawInputRows
67- rawInputBytes += operatorMetrics.rawInputBytes
68- outputRows += operatorMetrics.outputRows
69- outputVectors += operatorMetrics.outputVectors
70- outputBytes += operatorMetrics.outputBytes
71- wallNanos += operatorMetrics.wallNanos
72- cpuCount += operatorMetrics.cpuCount
73- scanTime += operatorMetrics.scanTime
74- peakMemoryBytes += operatorMetrics.peakMemoryBytes
75- numMemoryAllocations += operatorMetrics.numMemoryAllocations
76- // Number of dynamic filters received.
77- numDynamicFiltersAccepted += operatorMetrics.numDynamicFiltersAccepted
78- skippedSplits += operatorMetrics.skippedSplits
79- processedSplits += operatorMetrics.processedSplits
80- skippedStrides += operatorMetrics.skippedStrides
81- processedStrides += operatorMetrics.processedStrides
82- remainingFilterTime += operatorMetrics.remainingFilterTime
83- ioWaitTime += operatorMetrics.ioWaitTime
84- storageReadBytes += operatorMetrics.storageReadBytes
85- storageReads += operatorMetrics.storageReads
86- localReadBytes += operatorMetrics.localReadBytes
87- ramReadBytes += operatorMetrics.ramReadBytes
88- preloadSplits += operatorMetrics.preloadSplits
89- pageLoadTime += operatorMetrics.pageLoadTime
90- dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime
91- dataSourceReadTime += operatorMetrics.dataSourceReadTime
92- loadLazyVectorTime += operatorMetrics.loadLazyVectorTime
37+ ScanMetricsUtil .inc(metrics, " rawInputRows" , operatorMetrics.rawInputRows)
38+ ScanMetricsUtil .inc(metrics, " rawInputBytes" , operatorMetrics.rawInputBytes)
39+ ScanMetricsUtil .inc(metrics, " numOutputRows" , operatorMetrics.outputRows)
40+ ScanMetricsUtil .inc(metrics, " outputVectors" , operatorMetrics.outputVectors)
41+ ScanMetricsUtil .inc(metrics, " outputBytes" , operatorMetrics.outputBytes)
42+ ScanMetricsUtil .inc(metrics, " wallNanos" , operatorMetrics.wallNanos)
43+ ScanMetricsUtil .inc(metrics, " cpuCount" , operatorMetrics.cpuCount)
44+ ScanMetricsUtil .inc(metrics, " scanTime" , operatorMetrics.scanTime)
45+ ScanMetricsUtil .inc(metrics, " peakMemoryBytes" , operatorMetrics.peakMemoryBytes)
46+ ScanMetricsUtil .inc(metrics, " numMemoryAllocations" , operatorMetrics.numMemoryAllocations)
47+ ScanMetricsUtil .inc(
48+ metrics,
49+ " numDynamicFiltersAccepted" ,
50+ operatorMetrics.numDynamicFiltersAccepted)
51+ ScanMetricsUtil .inc(metrics, " skippedSplits" , operatorMetrics.skippedSplits)
52+ ScanMetricsUtil .inc(metrics, " processedSplits" , operatorMetrics.processedSplits)
53+ ScanMetricsUtil .inc(metrics, " skippedStrides" , operatorMetrics.skippedStrides)
54+ ScanMetricsUtil .inc(metrics, " processedStrides" , operatorMetrics.processedStrides)
55+ ScanMetricsUtil .inc(metrics, " remainingFilterTime" , operatorMetrics.remainingFilterTime)
56+ ScanMetricsUtil .inc(metrics, " ioWaitTime" , operatorMetrics.ioWaitTime)
57+ ScanMetricsUtil .inc(metrics, " storageReadBytes" , operatorMetrics.storageReadBytes)
58+ ScanMetricsUtil .inc(metrics, " storageReads" , operatorMetrics.storageReads)
59+ ScanMetricsUtil .inc(metrics, " localReadBytes" , operatorMetrics.localReadBytes)
60+ ScanMetricsUtil .inc(metrics, " ramReadBytes" , operatorMetrics.ramReadBytes)
61+ ScanMetricsUtil .inc(metrics, " preloadSplits" , operatorMetrics.preloadSplits)
62+ ScanMetricsUtil .inc(metrics, " pageLoadTime" , operatorMetrics.pageLoadTime)
63+ ScanMetricsUtil .inc(metrics, " dataSourceAddSplitTime" , operatorMetrics.dataSourceAddSplitTime)
64+ ScanMetricsUtil .inc(metrics, " dataSourceReadTime" , operatorMetrics.dataSourceReadTime)
65+ ScanMetricsUtil .inc(metrics, " loadLazyVectorTime" , operatorMetrics.loadLazyVectorTime)
9366 }
9467 }
9568}
0 commit comments