diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala index 69ed2f09a99..118cceda9e4 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala @@ -99,6 +99,12 @@ class VeloxMetricsApi extends MetricsApi with Logging { } override def genBatchScanTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = + ScanMetricsUtil.filterScanMetrics( + genBatchScanTransformerMetricsFull(sparkContext), + ScanMetricsUtil.VELOX_BATCH_SCAN_MINIMAL_METRICS) + + private def genBatchScanTransformerMetricsFull( + sparkContext: SparkContext): Map[String, SQLMetric] = Map( "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), "inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input vectors"), @@ -148,6 +154,12 @@ class VeloxMetricsApi extends MetricsApi with Logging { override def genHiveTableScanTransformerMetrics( sparkContext: SparkContext): Map[String, SQLMetric] = + ScanMetricsUtil.filterScanMetrics( + genHiveTableScanTransformerMetricsFull(sparkContext), + ScanMetricsUtil.VELOX_HIVE_SCAN_MINIMAL_METRICS) + + private def genHiveTableScanTransformerMetricsFull( + sparkContext: SparkContext): Map[String, SQLMetric] = Map( "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"), "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"), @@ -200,6 +212,12 @@ class VeloxMetricsApi extends MetricsApi with Logging { override def genFileSourceScanTransformerMetrics( sparkContext: SparkContext): Map[String, SQLMetric] = + ScanMetricsUtil.filterScanMetrics( + genFileSourceScanTransformerMetricsFull(sparkContext), + ScanMetricsUtil.VELOX_FILE_SCAN_MINIMAL_METRICS) + + private def genFileSourceScanTransformerMetricsFull( + sparkContext: SparkContext): Map[String, SQLMetric] = Map( "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"), "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"), diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala index 9e2b1d7c920..45767a2c535 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala @@ -19,46 +19,82 @@ package org.apache.gluten.metrics import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper -class BatchScanMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater { +/** + * See [[FileSourceScanMetricsUpdater]]: @transient metrics map with per-metric fields captured on + * the driver for executor serialization. + */ +class BatchScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric]) + extends MetricsUpdater { + + private def metric(key: String): Option[SQLMetric] = Option(metrics).flatMap(_.get(key)) + + private val numInputRows: Option[SQLMetric] = metric("numInputRows") + private val inputVectors: Option[SQLMetric] = metric("inputVectors") + private val inputBytes: Option[SQLMetric] = metric("inputBytes") + private val rawInputRows: Option[SQLMetric] = metric("rawInputRows") + private val rawInputBytes: Option[SQLMetric] = metric("rawInputBytes") + private val outputRows: Option[SQLMetric] = metric("numOutputRows") + private val outputVectors: Option[SQLMetric] = metric("outputVectors") + private val outputBytes: Option[SQLMetric] = metric("outputBytes") + private val cpuCount: Option[SQLMetric] = metric("cpuCount") + private val scanTime: Option[SQLMetric] = metric("scanTime") + private val wallNanos: Option[SQLMetric] = metric("wallNanos") + private val peakMemoryBytes: Option[SQLMetric] = metric("peakMemoryBytes") + private val numMemoryAllocations: Option[SQLMetric] = metric("numMemoryAllocations") + private val numDynamicFiltersAccepted: Option[SQLMetric] = metric("numDynamicFiltersAccepted") + private val skippedSplits: Option[SQLMetric] = metric("skippedSplits") + private val processedSplits: Option[SQLMetric] = metric("processedSplits") + private val skippedStrides: Option[SQLMetric] = metric("skippedStrides") + private val processedStrides: Option[SQLMetric] = metric("processedStrides") + private val remainingFilterTime: Option[SQLMetric] = metric("remainingFilterTime") + private val ioWaitTime: Option[SQLMetric] = metric("ioWaitTime") + private val storageReadBytes: Option[SQLMetric] = metric("storageReadBytes") + private val storageReads: Option[SQLMetric] = metric("storageReads") + private val localReadBytes: Option[SQLMetric] = metric("localReadBytes") + private val ramReadBytes: Option[SQLMetric] = metric("ramReadBytes") + private val preloadSplits: Option[SQLMetric] = metric("preloadSplits") + private val pageLoadTime: Option[SQLMetric] = metric("pageLoadTime") + private val dataSourceAddSplitTime: Option[SQLMetric] = metric("dataSourceAddSplitTime") + private val dataSourceReadTime: Option[SQLMetric] = metric("dataSourceReadTime") + private val loadLazyVectorTime: Option[SQLMetric] = metric("loadLazyVectorTime") override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = { - inputMetrics.bridgeIncBytesRead(metrics("rawInputBytes").value) - inputMetrics.bridgeIncRecordsRead(metrics("rawInputRows").value) + rawInputBytes.foreach(m => inputMetrics.bridgeIncBytesRead(m.value)) + rawInputRows.foreach(m => inputMetrics.bridgeIncRecordsRead(m.value)) } override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = { if (opMetrics != null) { val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics] - metrics("numInputRows") += operatorMetrics.inputRows - metrics("inputVectors") += operatorMetrics.inputVectors - metrics("inputBytes") += operatorMetrics.inputBytes - metrics("rawInputRows") += operatorMetrics.rawInputRows - metrics("rawInputBytes") += operatorMetrics.rawInputBytes - metrics("numOutputRows") += operatorMetrics.outputRows - metrics("outputVectors") += operatorMetrics.outputVectors - metrics("outputBytes") += operatorMetrics.outputBytes - metrics("cpuCount") += operatorMetrics.cpuCount - metrics("scanTime") += operatorMetrics.scanTime - metrics("wallNanos") += operatorMetrics.wallNanos - metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes - metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations - // Number of dynamic filters received. - metrics("numDynamicFiltersAccepted") += operatorMetrics.numDynamicFiltersAccepted - metrics("skippedSplits") += operatorMetrics.skippedSplits - metrics("processedSplits") += operatorMetrics.processedSplits - metrics("skippedStrides") += operatorMetrics.skippedStrides - metrics("processedStrides") += operatorMetrics.processedStrides - metrics("remainingFilterTime") += operatorMetrics.remainingFilterTime - metrics("ioWaitTime") += operatorMetrics.ioWaitTime - metrics("storageReadBytes") += operatorMetrics.storageReadBytes - metrics("storageReads") += operatorMetrics.storageReads - metrics("localReadBytes") += operatorMetrics.localReadBytes - metrics("ramReadBytes") += operatorMetrics.ramReadBytes - metrics("preloadSplits") += operatorMetrics.preloadSplits - metrics("pageLoadTime") += operatorMetrics.pageLoadTime - metrics("dataSourceAddSplitTime") += operatorMetrics.dataSourceAddSplitTime - metrics("dataSourceReadTime") += operatorMetrics.dataSourceReadTime - metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime + ScanMetricsUtil.inc(numInputRows, operatorMetrics.inputRows) + ScanMetricsUtil.inc(inputVectors, operatorMetrics.inputVectors) + ScanMetricsUtil.inc(inputBytes, operatorMetrics.inputBytes) + ScanMetricsUtil.inc(rawInputRows, operatorMetrics.rawInputRows) + ScanMetricsUtil.inc(rawInputBytes, operatorMetrics.rawInputBytes) + ScanMetricsUtil.inc(outputRows, operatorMetrics.outputRows) + ScanMetricsUtil.inc(outputVectors, operatorMetrics.outputVectors) + ScanMetricsUtil.inc(outputBytes, operatorMetrics.outputBytes) + ScanMetricsUtil.inc(cpuCount, operatorMetrics.cpuCount) + ScanMetricsUtil.inc(scanTime, operatorMetrics.scanTime) + ScanMetricsUtil.inc(wallNanos, operatorMetrics.wallNanos) + ScanMetricsUtil.inc(peakMemoryBytes, operatorMetrics.peakMemoryBytes) + ScanMetricsUtil.inc(numMemoryAllocations, operatorMetrics.numMemoryAllocations) + ScanMetricsUtil.inc(numDynamicFiltersAccepted, operatorMetrics.numDynamicFiltersAccepted) + ScanMetricsUtil.inc(skippedSplits, operatorMetrics.skippedSplits) + ScanMetricsUtil.inc(processedSplits, operatorMetrics.processedSplits) + ScanMetricsUtil.inc(skippedStrides, operatorMetrics.skippedStrides) + ScanMetricsUtil.inc(processedStrides, operatorMetrics.processedStrides) + ScanMetricsUtil.inc(remainingFilterTime, operatorMetrics.remainingFilterTime) + ScanMetricsUtil.inc(ioWaitTime, operatorMetrics.ioWaitTime) + ScanMetricsUtil.inc(storageReadBytes, operatorMetrics.storageReadBytes) + ScanMetricsUtil.inc(storageReads, operatorMetrics.storageReads) + ScanMetricsUtil.inc(localReadBytes, operatorMetrics.localReadBytes) + ScanMetricsUtil.inc(ramReadBytes, operatorMetrics.ramReadBytes) + ScanMetricsUtil.inc(preloadSplits, operatorMetrics.preloadSplits) + ScanMetricsUtil.inc(pageLoadTime, operatorMetrics.pageLoadTime) + ScanMetricsUtil.inc(dataSourceAddSplitTime, operatorMetrics.dataSourceAddSplitTime) + ScanMetricsUtil.inc(dataSourceReadTime, operatorMetrics.dataSourceReadTime) + ScanMetricsUtil.inc(loadLazyVectorTime, operatorMetrics.loadLazyVectorTime) } } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala index 527f8d7bc5f..4b62447aac6 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala @@ -20,76 +20,76 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper /** - * Note: "val metrics" is made transient to avoid sending driver-side metrics to tasks, e.g. - * "pruning time" from scan. + * Note: "metrics" is @transient to avoid sending the map to executors. SQLMetric references are + * captured into fields at construction time on the driver (when metrics are registered) and + * serialized to executors. Missing keys (minimal scan metrics mode) yield None and are skipped. */ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric]) extends MetricsUpdater { - val rawInputRows: SQLMetric = metrics("rawInputRows") - val rawInputBytes: SQLMetric = metrics("rawInputBytes") - val outputRows: SQLMetric = metrics("numOutputRows") - val outputVectors: SQLMetric = metrics("outputVectors") - val outputBytes: SQLMetric = metrics("outputBytes") - val wallNanos: SQLMetric = metrics("wallNanos") - val cpuCount: SQLMetric = metrics("cpuCount") - val scanTime: SQLMetric = metrics("scanTime") - val peakMemoryBytes: SQLMetric = metrics("peakMemoryBytes") - val numMemoryAllocations: SQLMetric = metrics("numMemoryAllocations") + private def metric(key: String): Option[SQLMetric] = Option(metrics).flatMap(_.get(key)) - // Number of dynamic filters received. - val numDynamicFiltersAccepted: SQLMetric = metrics("numDynamicFiltersAccepted") - val skippedSplits: SQLMetric = metrics("skippedSplits") - val processedSplits: SQLMetric = metrics("processedSplits") - val preloadSplits: SQLMetric = metrics("preloadSplits") - val pageLoadTime: SQLMetric = metrics("pageLoadTime") - val dataSourceAddSplitTime: SQLMetric = metrics("dataSourceAddSplitTime") - val dataSourceReadTime: SQLMetric = metrics("dataSourceReadTime") - val skippedStrides: SQLMetric = metrics("skippedStrides") - val processedStrides: SQLMetric = metrics("processedStrides") - val remainingFilterTime: SQLMetric = metrics("remainingFilterTime") - val ioWaitTime: SQLMetric = metrics("ioWaitTime") - val storageReadBytes: SQLMetric = metrics("storageReadBytes") - val storageReads: SQLMetric = metrics("storageReads") - val localReadBytes: SQLMetric = metrics("localReadBytes") - val ramReadBytes: SQLMetric = metrics("ramReadBytes") - val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime") + private val rawInputRows: Option[SQLMetric] = metric("rawInputRows") + private val rawInputBytes: Option[SQLMetric] = metric("rawInputBytes") + private val outputRows: Option[SQLMetric] = metric("numOutputRows") + private val outputVectors: Option[SQLMetric] = metric("outputVectors") + private val outputBytes: Option[SQLMetric] = metric("outputBytes") + private val wallNanos: Option[SQLMetric] = metric("wallNanos") + private val cpuCount: Option[SQLMetric] = metric("cpuCount") + private val scanTime: Option[SQLMetric] = metric("scanTime") + private val peakMemoryBytes: Option[SQLMetric] = metric("peakMemoryBytes") + private val numMemoryAllocations: Option[SQLMetric] = metric("numMemoryAllocations") + private val numDynamicFiltersAccepted: Option[SQLMetric] = metric("numDynamicFiltersAccepted") + private val skippedSplits: Option[SQLMetric] = metric("skippedSplits") + private val processedSplits: Option[SQLMetric] = metric("processedSplits") + private val skippedStrides: Option[SQLMetric] = metric("skippedStrides") + private val processedStrides: Option[SQLMetric] = metric("processedStrides") + private val remainingFilterTime: Option[SQLMetric] = metric("remainingFilterTime") + private val ioWaitTime: Option[SQLMetric] = metric("ioWaitTime") + private val storageReadBytes: Option[SQLMetric] = metric("storageReadBytes") + private val storageReads: Option[SQLMetric] = metric("storageReads") + private val localReadBytes: Option[SQLMetric] = metric("localReadBytes") + private val ramReadBytes: Option[SQLMetric] = metric("ramReadBytes") + private val preloadSplits: Option[SQLMetric] = metric("preloadSplits") + private val pageLoadTime: Option[SQLMetric] = metric("pageLoadTime") + private val dataSourceAddSplitTime: Option[SQLMetric] = metric("dataSourceAddSplitTime") + private val dataSourceReadTime: Option[SQLMetric] = metric("dataSourceReadTime") + private val loadLazyVectorTime: Option[SQLMetric] = metric("loadLazyVectorTime") override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = { - inputMetrics.bridgeIncBytesRead(rawInputBytes.value) - inputMetrics.bridgeIncRecordsRead(rawInputRows.value) + rawInputBytes.foreach(m => inputMetrics.bridgeIncBytesRead(m.value)) + rawInputRows.foreach(m => inputMetrics.bridgeIncRecordsRead(m.value)) } override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = { if (opMetrics != null) { val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics] - rawInputRows += operatorMetrics.rawInputRows - rawInputBytes += operatorMetrics.rawInputBytes - outputRows += operatorMetrics.outputRows - outputVectors += operatorMetrics.outputVectors - outputBytes += operatorMetrics.outputBytes - wallNanos += operatorMetrics.wallNanos - cpuCount += operatorMetrics.cpuCount - scanTime += operatorMetrics.scanTime - peakMemoryBytes += operatorMetrics.peakMemoryBytes - numMemoryAllocations += operatorMetrics.numMemoryAllocations - // Number of dynamic filters received. - numDynamicFiltersAccepted += operatorMetrics.numDynamicFiltersAccepted - skippedSplits += operatorMetrics.skippedSplits - processedSplits += operatorMetrics.processedSplits - skippedStrides += operatorMetrics.skippedStrides - processedStrides += operatorMetrics.processedStrides - remainingFilterTime += operatorMetrics.remainingFilterTime - ioWaitTime += operatorMetrics.ioWaitTime - storageReadBytes += operatorMetrics.storageReadBytes - storageReads += operatorMetrics.storageReads - localReadBytes += operatorMetrics.localReadBytes - ramReadBytes += operatorMetrics.ramReadBytes - preloadSplits += operatorMetrics.preloadSplits - pageLoadTime += operatorMetrics.pageLoadTime - dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime - dataSourceReadTime += operatorMetrics.dataSourceReadTime - loadLazyVectorTime += operatorMetrics.loadLazyVectorTime + ScanMetricsUtil.inc(rawInputRows, operatorMetrics.rawInputRows) + ScanMetricsUtil.inc(rawInputBytes, operatorMetrics.rawInputBytes) + ScanMetricsUtil.inc(outputRows, operatorMetrics.outputRows) + ScanMetricsUtil.inc(outputVectors, operatorMetrics.outputVectors) + ScanMetricsUtil.inc(outputBytes, operatorMetrics.outputBytes) + ScanMetricsUtil.inc(wallNanos, operatorMetrics.wallNanos) + ScanMetricsUtil.inc(cpuCount, operatorMetrics.cpuCount) + ScanMetricsUtil.inc(scanTime, operatorMetrics.scanTime) + ScanMetricsUtil.inc(peakMemoryBytes, operatorMetrics.peakMemoryBytes) + ScanMetricsUtil.inc(numMemoryAllocations, operatorMetrics.numMemoryAllocations) + ScanMetricsUtil.inc(numDynamicFiltersAccepted, operatorMetrics.numDynamicFiltersAccepted) + ScanMetricsUtil.inc(skippedSplits, operatorMetrics.skippedSplits) + ScanMetricsUtil.inc(processedSplits, operatorMetrics.processedSplits) + ScanMetricsUtil.inc(skippedStrides, operatorMetrics.skippedStrides) + ScanMetricsUtil.inc(processedStrides, operatorMetrics.processedStrides) + ScanMetricsUtil.inc(remainingFilterTime, operatorMetrics.remainingFilterTime) + ScanMetricsUtil.inc(ioWaitTime, operatorMetrics.ioWaitTime) + ScanMetricsUtil.inc(storageReadBytes, operatorMetrics.storageReadBytes) + ScanMetricsUtil.inc(storageReads, operatorMetrics.storageReads) + ScanMetricsUtil.inc(localReadBytes, operatorMetrics.localReadBytes) + ScanMetricsUtil.inc(ramReadBytes, operatorMetrics.ramReadBytes) + ScanMetricsUtil.inc(preloadSplits, operatorMetrics.preloadSplits) + ScanMetricsUtil.inc(pageLoadTime, operatorMetrics.pageLoadTime) + ScanMetricsUtil.inc(dataSourceAddSplitTime, operatorMetrics.dataSourceAddSplitTime) + ScanMetricsUtil.inc(dataSourceReadTime, operatorMetrics.dataSourceReadTime) + ScanMetricsUtil.inc(loadLazyVectorTime, operatorMetrics.loadLazyVectorTime) } } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala index 3330f8b15ae..7affd730aa0 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala @@ -19,72 +19,73 @@ package org.apache.gluten.metrics import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper +/** See [[FileSourceScanMetricsUpdater]]. */ class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric]) extends MetricsUpdater { - val rawInputRows: SQLMetric = metrics("rawInputRows") - val rawInputBytes: SQLMetric = metrics("rawInputBytes") - val outputRows: SQLMetric = metrics("numOutputRows") - val outputVectors: SQLMetric = metrics("outputVectors") - val outputBytes: SQLMetric = metrics("outputBytes") - val wallNanos: SQLMetric = metrics("wallNanos") - val cpuCount: SQLMetric = metrics("cpuCount") - val scanTime: SQLMetric = metrics("scanTime") - val peakMemoryBytes: SQLMetric = metrics("peakMemoryBytes") - val numMemoryAllocations: SQLMetric = metrics("numMemoryAllocations") - // Number of dynamic filters received. - val numDynamicFiltersAccepted: SQLMetric = metrics("numDynamicFiltersAccepted") - val skippedSplits: SQLMetric = metrics("skippedSplits") - val processedSplits: SQLMetric = metrics("processedSplits") - val preloadSplits: SQLMetric = metrics("preloadSplits") - val pageLoadTime: SQLMetric = metrics("pageLoadTime") - val dataSourceAddSplitTime: SQLMetric = metrics("dataSourceAddSplitTime") - val dataSourceReadTime: SQLMetric = metrics("dataSourceReadTime") - val skippedStrides: SQLMetric = metrics("skippedStrides") - val processedStrides: SQLMetric = metrics("processedStrides") - val remainingFilterTime: SQLMetric = metrics("remainingFilterTime") - val ioWaitTime: SQLMetric = metrics("ioWaitTime") - val storageReadBytes: SQLMetric = metrics("storageReadBytes") - val storageReads: SQLMetric = metrics("storageReads") - val localReadBytes: SQLMetric = metrics("localReadBytes") - val ramReadBytes: SQLMetric = metrics("ramReadBytes") - val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime") + private def metric(key: String): Option[SQLMetric] = Option(metrics).flatMap(_.get(key)) + + private val rawInputRows: Option[SQLMetric] = metric("rawInputRows") + private val rawInputBytes: Option[SQLMetric] = metric("rawInputBytes") + private val outputRows: Option[SQLMetric] = metric("numOutputRows") + private val outputVectors: Option[SQLMetric] = metric("outputVectors") + private val outputBytes: Option[SQLMetric] = metric("outputBytes") + private val wallNanos: Option[SQLMetric] = metric("wallNanos") + private val cpuCount: Option[SQLMetric] = metric("cpuCount") + private val scanTime: Option[SQLMetric] = metric("scanTime") + private val peakMemoryBytes: Option[SQLMetric] = metric("peakMemoryBytes") + private val numMemoryAllocations: Option[SQLMetric] = metric("numMemoryAllocations") + private val numDynamicFiltersAccepted: Option[SQLMetric] = metric("numDynamicFiltersAccepted") + private val skippedSplits: Option[SQLMetric] = metric("skippedSplits") + private val processedSplits: Option[SQLMetric] = metric("processedSplits") + private val skippedStrides: Option[SQLMetric] = metric("skippedStrides") + private val processedStrides: Option[SQLMetric] = metric("processedStrides") + private val remainingFilterTime: Option[SQLMetric] = metric("remainingFilterTime") + private val ioWaitTime: Option[SQLMetric] = metric("ioWaitTime") + private val storageReadBytes: Option[SQLMetric] = metric("storageReadBytes") + private val storageReads: Option[SQLMetric] = metric("storageReads") + private val localReadBytes: Option[SQLMetric] = metric("localReadBytes") + private val ramReadBytes: Option[SQLMetric] = metric("ramReadBytes") + private val preloadSplits: Option[SQLMetric] = metric("preloadSplits") + private val pageLoadTime: Option[SQLMetric] = metric("pageLoadTime") + private val dataSourceAddSplitTime: Option[SQLMetric] = metric("dataSourceAddSplitTime") + private val dataSourceReadTime: Option[SQLMetric] = metric("dataSourceReadTime") + private val loadLazyVectorTime: Option[SQLMetric] = metric("loadLazyVectorTime") override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = { - inputMetrics.bridgeIncBytesRead(rawInputBytes.value) - inputMetrics.bridgeIncRecordsRead(rawInputRows.value) + rawInputBytes.foreach(m => inputMetrics.bridgeIncBytesRead(m.value)) + rawInputRows.foreach(m => inputMetrics.bridgeIncRecordsRead(m.value)) } override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = { if (opMetrics != null) { val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics] - rawInputRows += operatorMetrics.rawInputRows - rawInputBytes += operatorMetrics.rawInputBytes - outputRows += operatorMetrics.outputRows - outputVectors += operatorMetrics.outputVectors - outputBytes += operatorMetrics.outputBytes - wallNanos += operatorMetrics.wallNanos - cpuCount += operatorMetrics.cpuCount - scanTime += operatorMetrics.scanTime - peakMemoryBytes += operatorMetrics.peakMemoryBytes - numMemoryAllocations += operatorMetrics.numMemoryAllocations - // Number of dynamic filters received. - numDynamicFiltersAccepted += operatorMetrics.numDynamicFiltersAccepted - skippedSplits += operatorMetrics.skippedSplits - processedSplits += operatorMetrics.processedSplits - skippedStrides += operatorMetrics.skippedStrides - processedStrides += operatorMetrics.processedStrides - remainingFilterTime += operatorMetrics.remainingFilterTime - ioWaitTime += operatorMetrics.ioWaitTime - storageReadBytes += operatorMetrics.storageReadBytes - storageReads += operatorMetrics.storageReads - localReadBytes += operatorMetrics.localReadBytes - ramReadBytes += operatorMetrics.ramReadBytes - preloadSplits += operatorMetrics.preloadSplits - pageLoadTime += operatorMetrics.pageLoadTime - dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime - dataSourceReadTime += operatorMetrics.dataSourceReadTime - loadLazyVectorTime += operatorMetrics.loadLazyVectorTime + ScanMetricsUtil.inc(rawInputRows, operatorMetrics.rawInputRows) + ScanMetricsUtil.inc(rawInputBytes, operatorMetrics.rawInputBytes) + ScanMetricsUtil.inc(outputRows, operatorMetrics.outputRows) + ScanMetricsUtil.inc(outputVectors, operatorMetrics.outputVectors) + ScanMetricsUtil.inc(outputBytes, operatorMetrics.outputBytes) + ScanMetricsUtil.inc(wallNanos, operatorMetrics.wallNanos) + ScanMetricsUtil.inc(cpuCount, operatorMetrics.cpuCount) + ScanMetricsUtil.inc(scanTime, operatorMetrics.scanTime) + ScanMetricsUtil.inc(peakMemoryBytes, operatorMetrics.peakMemoryBytes) + ScanMetricsUtil.inc(numMemoryAllocations, operatorMetrics.numMemoryAllocations) + ScanMetricsUtil.inc(numDynamicFiltersAccepted, operatorMetrics.numDynamicFiltersAccepted) + ScanMetricsUtil.inc(skippedSplits, operatorMetrics.skippedSplits) + ScanMetricsUtil.inc(processedSplits, operatorMetrics.processedSplits) + ScanMetricsUtil.inc(skippedStrides, operatorMetrics.skippedStrides) + ScanMetricsUtil.inc(processedStrides, operatorMetrics.processedStrides) + ScanMetricsUtil.inc(remainingFilterTime, operatorMetrics.remainingFilterTime) + ScanMetricsUtil.inc(ioWaitTime, operatorMetrics.ioWaitTime) + ScanMetricsUtil.inc(storageReadBytes, operatorMetrics.storageReadBytes) + ScanMetricsUtil.inc(storageReads, operatorMetrics.storageReads) + ScanMetricsUtil.inc(localReadBytes, operatorMetrics.localReadBytes) + ScanMetricsUtil.inc(ramReadBytes, operatorMetrics.ramReadBytes) + ScanMetricsUtil.inc(preloadSplits, operatorMetrics.preloadSplits) + ScanMetricsUtil.inc(pageLoadTime, operatorMetrics.pageLoadTime) + ScanMetricsUtil.inc(dataSourceAddSplitTime, operatorMetrics.dataSourceAddSplitTime) + ScanMetricsUtil.inc(dataSourceReadTime, operatorMetrics.dataSourceReadTime) + ScanMetricsUtil.inc(loadLazyVectorTime, operatorMetrics.loadLazyVectorTime) } } } diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala index 35edc4fa6e2..bddd047d459 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala @@ -199,7 +199,7 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa assert(scan.isDefined) val metrics = scan.get.metrics assert(metrics("rawInputRows").value == 100) - assert(metrics("outputVectors").value == 1) + assert(metrics.contains("numOutputRows")) } } @@ -286,27 +286,33 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa } test("Velox cache metrics") { - val df = spark.sql(s"SELECT * FROM metrics_t1") - val scans = collect(df.queryExecution.executedPlan) { - case scan: FileSourceScanExecTransformer => scan + // ramReadBytes is not in the default minimal scan metric set. + withSQLConf(GlutenConfig.SCAN_DETAILED_METRICS_ENABLED.key -> "true") { + val df = spark.sql(s"SELECT * FROM metrics_t1") + val scans = collect(df.queryExecution.executedPlan) { + case scan: FileSourceScanExecTransformer => scan + } + df.collect() + assert(scans.length === 1) + val metrics = scans.head.metrics + assert(metrics("storageReadBytes").value > 0) + assert(metrics("ramReadBytes").value == 0) } - df.collect() - assert(scans.length === 1) - val metrics = scans.head.metrics - assert(metrics("storageReadBytes").value > 0) - assert(metrics("ramReadBytes").value == 0) } test("Velox datasource metrics") { - val df = spark.sql(s"SELECT * FROM metrics_t1") - val scans = collect(df.queryExecution.executedPlan) { - case scan: FileSourceScanExecTransformer => scan + // dataSource* metrics are not in the default minimal scan metric set. + withSQLConf(GlutenConfig.SCAN_DETAILED_METRICS_ENABLED.key -> "true") { + val df = spark.sql(s"SELECT * FROM metrics_t1") + val scans = collect(df.queryExecution.executedPlan) { + case scan: FileSourceScanExecTransformer => scan + } + df.collect() + assert(scans.length === 1) + val metrics = scans.head.metrics + assert(metrics("dataSourceReadTime").value > 0) + assert(metrics("dataSourceAddSplitTime").value > 0) } - df.collect() - assert(scans.length === 1) - val metrics = scans.head.metrics - assert(metrics("dataSourceReadTime").value > 0) - assert(metrics("dataSourceAddSplitTime").value > 0) } test("test nested loop join metrics") { diff --git a/docs/Configuration.md b/docs/Configuration.md index 4ed5cd4b27b..7e3aa02efeb 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -138,6 +138,7 @@ nav_order: 15 | spark.gluten.sql.pushAggregateThroughJoin.maxDepth | 🔄 Dynamic | 2147483647 | Maximum join traversal depth when applying the push-aggregate-through-join optimization. A value of 1 allows pushing an aggregate through a single join; larger values allow the rule to traverse and push through multiple consecutive joins. | | spark.gluten.sql.removeNativeWriteFilesSortAndProject | 🔄 Dynamic | true | When true, Gluten will remove the vanilla Spark V1Writes added sort and project for velox backend. | | spark.gluten.sql.rewrite.dateTimestampComparison | 🔄 Dynamic | true | Rewrite the comparision between date and timestamp to timestamp comparison.For example `from_unixtime(ts) > date` will be rewritten to `ts > to_unixtime(date)` | +| spark.gluten.sql.scan.detailedMetrics.enabled | 🔄 Dynamic | false | When true, Velox backend scan operators register all detailed SQL metrics. When false (default), only essential scan metrics are registered to reduce driver memory usage. Also enabled automatically when spark.gluten.sql.debug is true. Does not affect the ClickHouse backend. | | spark.gluten.sql.scan.fileSchemeValidation.enabled | 🔄 Dynamic | true | When true, enable file path scheme validation for scan. Validation will fail if file scheme is not supported by registered file systems, which will cause scan operator fall back. | | spark.gluten.sql.supported.flattenNestedFunctions | 🔄 Dynamic | and,or | Flatten nested functions as one for optimization. | | spark.gluten.sql.text.input.empty.as.default | 🔄 Dynamic | false | treat empty fields in CSV input as default values. | diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index acfb84cf960..666c1b8f288 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -327,6 +327,10 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) { def debug: Boolean = getConf(DEBUG_ENABLED) + /** Full scan SQL metrics; also enabled when [[debug]] is true. */ + def detailedScanMetricsEnabled: Boolean = + getConf(SCAN_DETAILED_METRICS_ENABLED) || debug + def collectUtStats: Boolean = getConf(UT_STATISTIC) def benchmarkStageId: Int = getConf(BENCHMARK_TASK_STAGEID) @@ -1290,6 +1294,16 @@ object GlutenConfig extends ConfigRegistry { .booleanConf .createWithDefault(false) + val SCAN_DETAILED_METRICS_ENABLED = + buildConf("spark.gluten.sql.scan.detailedMetrics.enabled") + .doc( + "When true, Velox backend scan operators register all detailed SQL metrics. When false " + + "(default), only essential scan metrics are registered to reduce driver memory usage. " + + "Also enabled automatically when spark.gluten.sql.debug is true. " + + "Does not affect the ClickHouse backend.") + .booleanConf + .createWithDefault(false) + val DEBUG_KEEP_JNI_WORKSPACE = buildStaticConf("spark.gluten.sql.debug.keepJniWorkspace") .internal() diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala index 2278bd3a62b..8fea175d7ee 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala @@ -109,11 +109,15 @@ abstract class FileSourceScanExecTransformerBase( disableBucketedScan) with DatasourceScanTransformer { - // Note: "metrics" is made transient to avoid sending driver-side metrics to tasks. - @transient override lazy val metrics: Map[String, SQLMetric] = + // Executor-side metrics only (excludes driverMetricsAlias). + @transient private lazy val executorSideScanMetrics: Map[String, SQLMetric] = BackendsApiManager.getMetricsApiInstance .genFileSourceScanTransformerMetrics(sparkContext) - .filter(m => !driverMetricsAlias.contains(m._1)) ++ driverMetricsAlias + .filter(m => !driverMetricsAlias.contains(m._1)) + + // Note: "metrics" is made transient to avoid sending driver-side metrics to tasks. + @transient override lazy val metrics: Map[String, SQLMetric] = + executorSideScanMetrics ++ driverMetricsAlias override def scanFilters: Seq[Expression] = dataFilters @@ -177,7 +181,8 @@ abstract class FileSourceScanExecTransformerBase( } override def metricsUpdater(): MetricsUpdater = - BackendsApiManager.getMetricsApiInstance.genFileSourceScanTransformerMetricsUpdater(metrics) + BackendsApiManager.getMetricsApiInstance + .genFileSourceScanTransformerMetricsUpdater(executorSideScanMetrics) override val nodeName: String = { s"${getClass.getSimpleName} $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}" diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/metrics/ScanMetricsUtil.scala b/gluten-substrait/src/main/scala/org/apache/gluten/metrics/ScanMetricsUtil.scala new file mode 100644 index 00000000000..30c6fbcec49 --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/gluten/metrics/ScanMetricsUtil.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.metrics + +import org.apache.gluten.config.GlutenConfig + +import org.apache.spark.sql.execution.metric.SQLMetric + +/** + * Velox-only utilities to reduce scan operator SQL metrics under driver memory pressure. + * + * By default only essential metrics are registered. Enable full collection via + * `spark.gluten.sql.scan.detailedMetrics.enabled` or `spark.gluten.sql.debug`. + */ +object ScanMetricsUtil { + + def detailedScanMetricsEnabled: Boolean = GlutenConfig.get.detailedScanMetricsEnabled + + /** Velox BatchScanExecTransformer - executor-side metrics (default subset). */ + val VELOX_BATCH_SCAN_MINIMAL_METRICS: Set[String] = Set( + "rawInputRows", + "rawInputBytes", + "numOutputRows", + "outputBytes", + "scanTime", + "wallNanos", + "peakMemoryBytes", + "ioWaitTime", + "storageReadBytes" + ) + + /** Velox FileSourceScan / HiveTableScan - executor + Spark-aligned driver metrics. */ + val VELOX_FILE_SCAN_MINIMAL_METRICS: Set[String] = VELOX_BATCH_SCAN_MINIMAL_METRICS ++ Set( + "numFiles", + "metadataTime", + "filesSize", + "numPartitions", + "pruningTime" + ) + + val VELOX_HIVE_SCAN_MINIMAL_METRICS: Set[String] = VELOX_FILE_SCAN_MINIMAL_METRICS + + /** Driver-only FileSource/Hive scan metrics (not passed to executor MetricsUpdater). */ + val VELOX_FILE_SCAN_DRIVER_METRICS: Set[String] = Set( + "numFiles", + "metadataTime", + "filesSize", + "numPartitions", + "pruningTime" + ) + + def filterExecutorMetrics(metrics: Map[String, SQLMetric]): Map[String, SQLMetric] = + metrics.filterKeys(!VELOX_FILE_SCAN_DRIVER_METRICS.contains(_)).toMap + + def filterScanMetrics( + metrics: Map[String, SQLMetric], + minimalKeys: Set[String]): Map[String, SQLMetric] = { + if (detailedScanMetricsEnabled) { + metrics + } else { + metrics.filterKeys(minimalKeys.contains).toMap + } + } + + def inc(metric: Option[SQLMetric], value: Long): Unit = metric.foreach(_ += value) +} diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala index a091d047c2b..26613f5f806 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.BasicScanExecTransformer -import org.apache.gluten.metrics.MetricsUpdater +import org.apache.gluten.metrics.{MetricsUpdater, ScanMetricsUtil} import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.spark.Partition @@ -116,7 +116,8 @@ case class HiveTableScanExecTransformer( override def getRootPathsInternal: Seq[String] = Seq.empty override def metricsUpdater(): MetricsUpdater = - BackendsApiManager.getMetricsApiInstance.genHiveTableScanTransformerMetricsUpdater(metrics) + BackendsApiManager.getMetricsApiInstance.genHiveTableScanTransformerMetricsUpdater( + ScanMetricsUtil.filterExecutorMetrics(metrics)) @transient private lazy val hivePartitionConverter = new HivePartitionConverter(session.sessionState.newHadoopConf(), session) diff --git a/gluten-substrait/src/test/scala/org/apache/gluten/metrics/ScanMetricsUtilSuite.scala b/gluten-substrait/src/test/scala/org/apache/gluten/metrics/ScanMetricsUtilSuite.scala new file mode 100644 index 00000000000..e00f4e385c7 --- /dev/null +++ b/gluten-substrait/src/test/scala/org/apache/gluten/metrics/ScanMetricsUtilSuite.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.metrics + +import org.apache.gluten.config.GlutenConfig + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.plans.SQLHelper +import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.test.SharedSparkSession + +class ScanMetricsUtilSuite extends SparkFunSuite with SharedSparkSession with SQLHelper { + + test("filterScanMetrics keeps minimal Velox batch scan keys by default") { + withSQLConf( + GlutenConfig.SCAN_DETAILED_METRICS_ENABLED.key -> "false", + GlutenConfig.DEBUG_ENABLED.key -> "false") { + val full = Map( + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "in"), + "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "raw in rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "out"), + "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu") + ) + val filtered = + ScanMetricsUtil.filterScanMetrics(full, ScanMetricsUtil.VELOX_BATCH_SCAN_MINIMAL_METRICS) + assert(!filtered.contains("numInputRows")) + assert(!filtered.contains("cpuCount")) + assert(filtered.contains("rawInputRows")) + assert(filtered.contains("numOutputRows")) + } + } + + test("filterScanMetrics returns all keys when detailed metrics enabled") { + withSQLConf(GlutenConfig.SCAN_DETAILED_METRICS_ENABLED.key -> "true") { + val full = Map( + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "in"), + "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "raw in rows")) + val filtered = + ScanMetricsUtil.filterScanMetrics(full, ScanMetricsUtil.VELOX_BATCH_SCAN_MINIMAL_METRICS) + assert(filtered.size == full.size) + } + } + + test("filterScanMetrics returns all keys when debug is enabled") { + withSQLConf( + GlutenConfig.SCAN_DETAILED_METRICS_ENABLED.key -> "false", + GlutenConfig.DEBUG_ENABLED.key -> "true") { + val full = Map("cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu")) + val filtered = + ScanMetricsUtil.filterScanMetrics(full, ScanMetricsUtil.VELOX_BATCH_SCAN_MINIMAL_METRICS) + assert(filtered.size == full.size) + } + } +}