Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
}

override def genBatchScanTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
ScanMetricsUtil.filterScanMetrics(
genBatchScanTransformerMetricsFull(sparkContext),
ScanMetricsUtil.VELOX_BATCH_SCAN_MINIMAL_METRICS)

private def genBatchScanTransformerMetricsFull(
sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input vectors"),
Expand Down Expand Up @@ -148,6 +154,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {

override def genHiveTableScanTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
ScanMetricsUtil.filterScanMetrics(
genHiveTableScanTransformerMetricsFull(sparkContext),
ScanMetricsUtil.VELOX_HIVE_SCAN_MINIMAL_METRICS)

private def genHiveTableScanTransformerMetricsFull(
sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"),
"rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"),
Expand Down Expand Up @@ -200,6 +212,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {

override def genFileSourceScanTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
ScanMetricsUtil.filterScanMetrics(
genFileSourceScanTransformerMetricsFull(sparkContext),
ScanMetricsUtil.VELOX_FILE_SCAN_MINIMAL_METRICS)

private def genFileSourceScanTransformerMetricsFull(
sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"),
"rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,82 @@ package org.apache.gluten.metrics
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper

class BatchScanMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater {
/**
* See [[FileSourceScanMetricsUpdater]]: @transient metrics map with per-metric fields captured on
* the driver for executor serialization.
*/
class BatchScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric])
extends MetricsUpdater {

private def metric(key: String): Option[SQLMetric] = Option(metrics).flatMap(_.get(key))

private val numInputRows: Option[SQLMetric] = metric("numInputRows")
private val inputVectors: Option[SQLMetric] = metric("inputVectors")
private val inputBytes: Option[SQLMetric] = metric("inputBytes")
private val rawInputRows: Option[SQLMetric] = metric("rawInputRows")
private val rawInputBytes: Option[SQLMetric] = metric("rawInputBytes")
private val outputRows: Option[SQLMetric] = metric("numOutputRows")
private val outputVectors: Option[SQLMetric] = metric("outputVectors")
private val outputBytes: Option[SQLMetric] = metric("outputBytes")
private val cpuCount: Option[SQLMetric] = metric("cpuCount")
private val scanTime: Option[SQLMetric] = metric("scanTime")
private val wallNanos: Option[SQLMetric] = metric("wallNanos")
private val peakMemoryBytes: Option[SQLMetric] = metric("peakMemoryBytes")
private val numMemoryAllocations: Option[SQLMetric] = metric("numMemoryAllocations")
private val numDynamicFiltersAccepted: Option[SQLMetric] = metric("numDynamicFiltersAccepted")
private val skippedSplits: Option[SQLMetric] = metric("skippedSplits")
private val processedSplits: Option[SQLMetric] = metric("processedSplits")
private val skippedStrides: Option[SQLMetric] = metric("skippedStrides")
private val processedStrides: Option[SQLMetric] = metric("processedStrides")
private val remainingFilterTime: Option[SQLMetric] = metric("remainingFilterTime")
private val ioWaitTime: Option[SQLMetric] = metric("ioWaitTime")
private val storageReadBytes: Option[SQLMetric] = metric("storageReadBytes")
private val storageReads: Option[SQLMetric] = metric("storageReads")
private val localReadBytes: Option[SQLMetric] = metric("localReadBytes")
private val ramReadBytes: Option[SQLMetric] = metric("ramReadBytes")
private val preloadSplits: Option[SQLMetric] = metric("preloadSplits")
private val pageLoadTime: Option[SQLMetric] = metric("pageLoadTime")
private val dataSourceAddSplitTime: Option[SQLMetric] = metric("dataSourceAddSplitTime")
private val dataSourceReadTime: Option[SQLMetric] = metric("dataSourceReadTime")
private val loadLazyVectorTime: Option[SQLMetric] = metric("loadLazyVectorTime")

override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
inputMetrics.bridgeIncBytesRead(metrics("rawInputBytes").value)
inputMetrics.bridgeIncRecordsRead(metrics("rawInputRows").value)
rawInputBytes.foreach(m => inputMetrics.bridgeIncBytesRead(m.value))
rawInputRows.foreach(m => inputMetrics.bridgeIncRecordsRead(m.value))
}

override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
metrics("numInputRows") += operatorMetrics.inputRows
metrics("inputVectors") += operatorMetrics.inputVectors
metrics("inputBytes") += operatorMetrics.inputBytes
metrics("rawInputRows") += operatorMetrics.rawInputRows
metrics("rawInputBytes") += operatorMetrics.rawInputBytes
metrics("numOutputRows") += operatorMetrics.outputRows
metrics("outputVectors") += operatorMetrics.outputVectors
metrics("outputBytes") += operatorMetrics.outputBytes
metrics("cpuCount") += operatorMetrics.cpuCount
metrics("scanTime") += operatorMetrics.scanTime
metrics("wallNanos") += operatorMetrics.wallNanos
metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
// Number of dynamic filters received.
metrics("numDynamicFiltersAccepted") += operatorMetrics.numDynamicFiltersAccepted
metrics("skippedSplits") += operatorMetrics.skippedSplits
metrics("processedSplits") += operatorMetrics.processedSplits
metrics("skippedStrides") += operatorMetrics.skippedStrides
metrics("processedStrides") += operatorMetrics.processedStrides
metrics("remainingFilterTime") += operatorMetrics.remainingFilterTime
metrics("ioWaitTime") += operatorMetrics.ioWaitTime
metrics("storageReadBytes") += operatorMetrics.storageReadBytes
metrics("storageReads") += operatorMetrics.storageReads
metrics("localReadBytes") += operatorMetrics.localReadBytes
metrics("ramReadBytes") += operatorMetrics.ramReadBytes
metrics("preloadSplits") += operatorMetrics.preloadSplits
metrics("pageLoadTime") += operatorMetrics.pageLoadTime
metrics("dataSourceAddSplitTime") += operatorMetrics.dataSourceAddSplitTime
metrics("dataSourceReadTime") += operatorMetrics.dataSourceReadTime
metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime
ScanMetricsUtil.inc(numInputRows, operatorMetrics.inputRows)
ScanMetricsUtil.inc(inputVectors, operatorMetrics.inputVectors)
ScanMetricsUtil.inc(inputBytes, operatorMetrics.inputBytes)
ScanMetricsUtil.inc(rawInputRows, operatorMetrics.rawInputRows)
ScanMetricsUtil.inc(rawInputBytes, operatorMetrics.rawInputBytes)
ScanMetricsUtil.inc(outputRows, operatorMetrics.outputRows)
ScanMetricsUtil.inc(outputVectors, operatorMetrics.outputVectors)
ScanMetricsUtil.inc(outputBytes, operatorMetrics.outputBytes)
ScanMetricsUtil.inc(cpuCount, operatorMetrics.cpuCount)
ScanMetricsUtil.inc(scanTime, operatorMetrics.scanTime)
ScanMetricsUtil.inc(wallNanos, operatorMetrics.wallNanos)
ScanMetricsUtil.inc(peakMemoryBytes, operatorMetrics.peakMemoryBytes)
ScanMetricsUtil.inc(numMemoryAllocations, operatorMetrics.numMemoryAllocations)
ScanMetricsUtil.inc(numDynamicFiltersAccepted, operatorMetrics.numDynamicFiltersAccepted)
ScanMetricsUtil.inc(skippedSplits, operatorMetrics.skippedSplits)
ScanMetricsUtil.inc(processedSplits, operatorMetrics.processedSplits)
ScanMetricsUtil.inc(skippedStrides, operatorMetrics.skippedStrides)
ScanMetricsUtil.inc(processedStrides, operatorMetrics.processedStrides)
ScanMetricsUtil.inc(remainingFilterTime, operatorMetrics.remainingFilterTime)
ScanMetricsUtil.inc(ioWaitTime, operatorMetrics.ioWaitTime)
ScanMetricsUtil.inc(storageReadBytes, operatorMetrics.storageReadBytes)
ScanMetricsUtil.inc(storageReads, operatorMetrics.storageReads)
ScanMetricsUtil.inc(localReadBytes, operatorMetrics.localReadBytes)
ScanMetricsUtil.inc(ramReadBytes, operatorMetrics.ramReadBytes)
ScanMetricsUtil.inc(preloadSplits, operatorMetrics.preloadSplits)
ScanMetricsUtil.inc(pageLoadTime, operatorMetrics.pageLoadTime)
ScanMetricsUtil.inc(dataSourceAddSplitTime, operatorMetrics.dataSourceAddSplitTime)
ScanMetricsUtil.inc(dataSourceReadTime, operatorMetrics.dataSourceReadTime)
ScanMetricsUtil.inc(loadLazyVectorTime, operatorMetrics.loadLazyVectorTime)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,76 +20,76 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper

/**
* Note: "val metrics" is made transient to avoid sending driver-side metrics to tasks, e.g.
* "pruning time" from scan.
* Note: "metrics" is @transient to avoid sending the map to executors. SQLMetric references are
* captured into fields at construction time on the driver (when metrics are registered) and
* serialized to executors. Missing keys (minimal scan metrics mode) yield None and are skipped.
*/
class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric])
extends MetricsUpdater {

val rawInputRows: SQLMetric = metrics("rawInputRows")
val rawInputBytes: SQLMetric = metrics("rawInputBytes")
val outputRows: SQLMetric = metrics("numOutputRows")
val outputVectors: SQLMetric = metrics("outputVectors")
val outputBytes: SQLMetric = metrics("outputBytes")
val wallNanos: SQLMetric = metrics("wallNanos")
val cpuCount: SQLMetric = metrics("cpuCount")
val scanTime: SQLMetric = metrics("scanTime")
val peakMemoryBytes: SQLMetric = metrics("peakMemoryBytes")
val numMemoryAllocations: SQLMetric = metrics("numMemoryAllocations")
private def metric(key: String): Option[SQLMetric] = Option(metrics).flatMap(_.get(key))

// Number of dynamic filters received.
val numDynamicFiltersAccepted: SQLMetric = metrics("numDynamicFiltersAccepted")
val skippedSplits: SQLMetric = metrics("skippedSplits")
val processedSplits: SQLMetric = metrics("processedSplits")
val preloadSplits: SQLMetric = metrics("preloadSplits")
val pageLoadTime: SQLMetric = metrics("pageLoadTime")
val dataSourceAddSplitTime: SQLMetric = metrics("dataSourceAddSplitTime")
val dataSourceReadTime: SQLMetric = metrics("dataSourceReadTime")
val skippedStrides: SQLMetric = metrics("skippedStrides")
val processedStrides: SQLMetric = metrics("processedStrides")
val remainingFilterTime: SQLMetric = metrics("remainingFilterTime")
val ioWaitTime: SQLMetric = metrics("ioWaitTime")
val storageReadBytes: SQLMetric = metrics("storageReadBytes")
val storageReads: SQLMetric = metrics("storageReads")
val localReadBytes: SQLMetric = metrics("localReadBytes")
val ramReadBytes: SQLMetric = metrics("ramReadBytes")
val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime")
private val rawInputRows: Option[SQLMetric] = metric("rawInputRows")
private val rawInputBytes: Option[SQLMetric] = metric("rawInputBytes")
private val outputRows: Option[SQLMetric] = metric("numOutputRows")
private val outputVectors: Option[SQLMetric] = metric("outputVectors")
private val outputBytes: Option[SQLMetric] = metric("outputBytes")
private val wallNanos: Option[SQLMetric] = metric("wallNanos")
private val cpuCount: Option[SQLMetric] = metric("cpuCount")
private val scanTime: Option[SQLMetric] = metric("scanTime")
private val peakMemoryBytes: Option[SQLMetric] = metric("peakMemoryBytes")
private val numMemoryAllocations: Option[SQLMetric] = metric("numMemoryAllocations")
private val numDynamicFiltersAccepted: Option[SQLMetric] = metric("numDynamicFiltersAccepted")
private val skippedSplits: Option[SQLMetric] = metric("skippedSplits")
private val processedSplits: Option[SQLMetric] = metric("processedSplits")
private val skippedStrides: Option[SQLMetric] = metric("skippedStrides")
private val processedStrides: Option[SQLMetric] = metric("processedStrides")
private val remainingFilterTime: Option[SQLMetric] = metric("remainingFilterTime")
private val ioWaitTime: Option[SQLMetric] = metric("ioWaitTime")
private val storageReadBytes: Option[SQLMetric] = metric("storageReadBytes")
private val storageReads: Option[SQLMetric] = metric("storageReads")
private val localReadBytes: Option[SQLMetric] = metric("localReadBytes")
private val ramReadBytes: Option[SQLMetric] = metric("ramReadBytes")
private val preloadSplits: Option[SQLMetric] = metric("preloadSplits")
private val pageLoadTime: Option[SQLMetric] = metric("pageLoadTime")
private val dataSourceAddSplitTime: Option[SQLMetric] = metric("dataSourceAddSplitTime")
private val dataSourceReadTime: Option[SQLMetric] = metric("dataSourceReadTime")
private val loadLazyVectorTime: Option[SQLMetric] = metric("loadLazyVectorTime")

override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
inputMetrics.bridgeIncBytesRead(rawInputBytes.value)
inputMetrics.bridgeIncRecordsRead(rawInputRows.value)
rawInputBytes.foreach(m => inputMetrics.bridgeIncBytesRead(m.value))
rawInputRows.foreach(m => inputMetrics.bridgeIncRecordsRead(m.value))
}

override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
rawInputRows += operatorMetrics.rawInputRows
rawInputBytes += operatorMetrics.rawInputBytes
outputRows += operatorMetrics.outputRows
outputVectors += operatorMetrics.outputVectors
outputBytes += operatorMetrics.outputBytes
wallNanos += operatorMetrics.wallNanos
cpuCount += operatorMetrics.cpuCount
scanTime += operatorMetrics.scanTime
peakMemoryBytes += operatorMetrics.peakMemoryBytes
numMemoryAllocations += operatorMetrics.numMemoryAllocations
// Number of dynamic filters received.
numDynamicFiltersAccepted += operatorMetrics.numDynamicFiltersAccepted
skippedSplits += operatorMetrics.skippedSplits
processedSplits += operatorMetrics.processedSplits
skippedStrides += operatorMetrics.skippedStrides
processedStrides += operatorMetrics.processedStrides
remainingFilterTime += operatorMetrics.remainingFilterTime
ioWaitTime += operatorMetrics.ioWaitTime
storageReadBytes += operatorMetrics.storageReadBytes
storageReads += operatorMetrics.storageReads
localReadBytes += operatorMetrics.localReadBytes
ramReadBytes += operatorMetrics.ramReadBytes
preloadSplits += operatorMetrics.preloadSplits
pageLoadTime += operatorMetrics.pageLoadTime
dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime
dataSourceReadTime += operatorMetrics.dataSourceReadTime
loadLazyVectorTime += operatorMetrics.loadLazyVectorTime
ScanMetricsUtil.inc(rawInputRows, operatorMetrics.rawInputRows)
ScanMetricsUtil.inc(rawInputBytes, operatorMetrics.rawInputBytes)
ScanMetricsUtil.inc(outputRows, operatorMetrics.outputRows)
ScanMetricsUtil.inc(outputVectors, operatorMetrics.outputVectors)
ScanMetricsUtil.inc(outputBytes, operatorMetrics.outputBytes)
ScanMetricsUtil.inc(wallNanos, operatorMetrics.wallNanos)
ScanMetricsUtil.inc(cpuCount, operatorMetrics.cpuCount)
ScanMetricsUtil.inc(scanTime, operatorMetrics.scanTime)
ScanMetricsUtil.inc(peakMemoryBytes, operatorMetrics.peakMemoryBytes)
ScanMetricsUtil.inc(numMemoryAllocations, operatorMetrics.numMemoryAllocations)
ScanMetricsUtil.inc(numDynamicFiltersAccepted, operatorMetrics.numDynamicFiltersAccepted)
ScanMetricsUtil.inc(skippedSplits, operatorMetrics.skippedSplits)
ScanMetricsUtil.inc(processedSplits, operatorMetrics.processedSplits)
ScanMetricsUtil.inc(skippedStrides, operatorMetrics.skippedStrides)
ScanMetricsUtil.inc(processedStrides, operatorMetrics.processedStrides)
ScanMetricsUtil.inc(remainingFilterTime, operatorMetrics.remainingFilterTime)
ScanMetricsUtil.inc(ioWaitTime, operatorMetrics.ioWaitTime)
ScanMetricsUtil.inc(storageReadBytes, operatorMetrics.storageReadBytes)
ScanMetricsUtil.inc(storageReads, operatorMetrics.storageReads)
ScanMetricsUtil.inc(localReadBytes, operatorMetrics.localReadBytes)
ScanMetricsUtil.inc(ramReadBytes, operatorMetrics.ramReadBytes)
ScanMetricsUtil.inc(preloadSplits, operatorMetrics.preloadSplits)
ScanMetricsUtil.inc(pageLoadTime, operatorMetrics.pageLoadTime)
ScanMetricsUtil.inc(dataSourceAddSplitTime, operatorMetrics.dataSourceAddSplitTime)
ScanMetricsUtil.inc(dataSourceReadTime, operatorMetrics.dataSourceReadTime)
ScanMetricsUtil.inc(loadLazyVectorTime, operatorMetrics.loadLazyVectorTime)
}
}
}
Loading
Loading