Skip to content

Commit 67c52c7

Browse files
Gluten driver oom while spark ok with same driver memory
1 parent 5286b8c commit 67c52c7

11 files changed

Lines changed: 299 additions & 158 deletions

File tree

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
9999
}
100100

101101
override def genBatchScanTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
102+
ScanMetricsUtil.filterScanMetrics(
103+
genBatchScanTransformerMetricsFull(sparkContext),
104+
ScanMetricsUtil.VELOX_BATCH_SCAN_MINIMAL_METRICS)
105+
106+
private def genBatchScanTransformerMetricsFull(
107+
sparkContext: SparkContext): Map[String, SQLMetric] =
102108
Map(
103109
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
104110
"inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input vectors"),
@@ -148,6 +154,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
148154

149155
override def genHiveTableScanTransformerMetrics(
150156
sparkContext: SparkContext): Map[String, SQLMetric] =
157+
ScanMetricsUtil.filterScanMetrics(
158+
genHiveTableScanTransformerMetricsFull(sparkContext),
159+
ScanMetricsUtil.VELOX_HIVE_SCAN_MINIMAL_METRICS)
160+
161+
private def genHiveTableScanTransformerMetricsFull(
162+
sparkContext: SparkContext): Map[String, SQLMetric] =
151163
Map(
152164
"rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"),
153165
"rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"),
@@ -200,6 +212,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
200212

201213
override def genFileSourceScanTransformerMetrics(
202214
sparkContext: SparkContext): Map[String, SQLMetric] =
215+
ScanMetricsUtil.filterScanMetrics(
216+
genFileSourceScanTransformerMetricsFull(sparkContext),
217+
ScanMetricsUtil.VELOX_FILE_SCAN_MINIMAL_METRICS)
218+
219+
private def genFileSourceScanTransformerMetricsFull(
220+
sparkContext: SparkContext): Map[String, SQLMetric] =
203221
Map(
204222
"rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"),
205223
"rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"),

backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -29,36 +29,38 @@ class BatchScanMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metri
2929
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
3030
if (opMetrics != null) {
3131
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
32-
metrics("numInputRows") += operatorMetrics.inputRows
33-
metrics("inputVectors") += operatorMetrics.inputVectors
34-
metrics("inputBytes") += operatorMetrics.inputBytes
35-
metrics("rawInputRows") += operatorMetrics.rawInputRows
36-
metrics("rawInputBytes") += operatorMetrics.rawInputBytes
37-
metrics("numOutputRows") += operatorMetrics.outputRows
38-
metrics("outputVectors") += operatorMetrics.outputVectors
39-
metrics("outputBytes") += operatorMetrics.outputBytes
40-
metrics("cpuCount") += operatorMetrics.cpuCount
41-
metrics("scanTime") += operatorMetrics.scanTime
42-
metrics("wallNanos") += operatorMetrics.wallNanos
43-
metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
44-
metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
45-
// Number of dynamic filters received.
46-
metrics("numDynamicFiltersAccepted") += operatorMetrics.numDynamicFiltersAccepted
47-
metrics("skippedSplits") += operatorMetrics.skippedSplits
48-
metrics("processedSplits") += operatorMetrics.processedSplits
49-
metrics("skippedStrides") += operatorMetrics.skippedStrides
50-
metrics("processedStrides") += operatorMetrics.processedStrides
51-
metrics("remainingFilterTime") += operatorMetrics.remainingFilterTime
52-
metrics("ioWaitTime") += operatorMetrics.ioWaitTime
53-
metrics("storageReadBytes") += operatorMetrics.storageReadBytes
54-
metrics("storageReads") += operatorMetrics.storageReads
55-
metrics("localReadBytes") += operatorMetrics.localReadBytes
56-
metrics("ramReadBytes") += operatorMetrics.ramReadBytes
57-
metrics("preloadSplits") += operatorMetrics.preloadSplits
58-
metrics("pageLoadTime") += operatorMetrics.pageLoadTime
59-
metrics("dataSourceAddSplitTime") += operatorMetrics.dataSourceAddSplitTime
60-
metrics("dataSourceReadTime") += operatorMetrics.dataSourceReadTime
61-
metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime
32+
ScanMetricsUtil.inc(metrics, "numInputRows", operatorMetrics.inputRows)
33+
ScanMetricsUtil.inc(metrics, "inputVectors", operatorMetrics.inputVectors)
34+
ScanMetricsUtil.inc(metrics, "inputBytes", operatorMetrics.inputBytes)
35+
ScanMetricsUtil.inc(metrics, "rawInputRows", operatorMetrics.rawInputRows)
36+
ScanMetricsUtil.inc(metrics, "rawInputBytes", operatorMetrics.rawInputBytes)
37+
ScanMetricsUtil.inc(metrics, "numOutputRows", operatorMetrics.outputRows)
38+
ScanMetricsUtil.inc(metrics, "outputVectors", operatorMetrics.outputVectors)
39+
ScanMetricsUtil.inc(metrics, "outputBytes", operatorMetrics.outputBytes)
40+
ScanMetricsUtil.inc(metrics, "cpuCount", operatorMetrics.cpuCount)
41+
ScanMetricsUtil.inc(metrics, "scanTime", operatorMetrics.scanTime)
42+
ScanMetricsUtil.inc(metrics, "wallNanos", operatorMetrics.wallNanos)
43+
ScanMetricsUtil.inc(metrics, "peakMemoryBytes", operatorMetrics.peakMemoryBytes)
44+
ScanMetricsUtil.inc(metrics, "numMemoryAllocations", operatorMetrics.numMemoryAllocations)
45+
ScanMetricsUtil.inc(
46+
metrics,
47+
"numDynamicFiltersAccepted",
48+
operatorMetrics.numDynamicFiltersAccepted)
49+
ScanMetricsUtil.inc(metrics, "skippedSplits", operatorMetrics.skippedSplits)
50+
ScanMetricsUtil.inc(metrics, "processedSplits", operatorMetrics.processedSplits)
51+
ScanMetricsUtil.inc(metrics, "skippedStrides", operatorMetrics.skippedStrides)
52+
ScanMetricsUtil.inc(metrics, "processedStrides", operatorMetrics.processedStrides)
53+
ScanMetricsUtil.inc(metrics, "remainingFilterTime", operatorMetrics.remainingFilterTime)
54+
ScanMetricsUtil.inc(metrics, "ioWaitTime", operatorMetrics.ioWaitTime)
55+
ScanMetricsUtil.inc(metrics, "storageReadBytes", operatorMetrics.storageReadBytes)
56+
ScanMetricsUtil.inc(metrics, "storageReads", operatorMetrics.storageReads)
57+
ScanMetricsUtil.inc(metrics, "localReadBytes", operatorMetrics.localReadBytes)
58+
ScanMetricsUtil.inc(metrics, "ramReadBytes", operatorMetrics.ramReadBytes)
59+
ScanMetricsUtil.inc(metrics, "preloadSplits", operatorMetrics.preloadSplits)
60+
ScanMetricsUtil.inc(metrics, "pageLoadTime", operatorMetrics.pageLoadTime)
61+
ScanMetricsUtil.inc(metrics, "dataSourceAddSplitTime", operatorMetrics.dataSourceAddSplitTime)
62+
ScanMetricsUtil.inc(metrics, "dataSourceReadTime", operatorMetrics.dataSourceReadTime)
63+
ScanMetricsUtil.inc(metrics, "loadLazyVectorTime", operatorMetrics.loadLazyVectorTime)
6264
}
6365
}
6466
}

backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala

Lines changed: 35 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -20,76 +20,49 @@ import org.apache.spark.sql.execution.metric.SQLMetric
2020
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
2121

2222
/**
23-
* Note: "val metrics" is made transient to avoid sending driver-side metrics to tasks, e.g.
24-
* "pruning time" from scan.
23+
* Holds executor-side scan metrics only (no driver metrics such as numFiles / metadataTime). The
24+
* map must not be @transient: this updater is captured in WholeStage RDD closures and runs on
25+
* executors after deserialization.
2526
*/
26-
class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric])
27-
extends MetricsUpdater {
28-
29-
val rawInputRows: SQLMetric = metrics("rawInputRows")
30-
val rawInputBytes: SQLMetric = metrics("rawInputBytes")
31-
val outputRows: SQLMetric = metrics("numOutputRows")
32-
val outputVectors: SQLMetric = metrics("outputVectors")
33-
val outputBytes: SQLMetric = metrics("outputBytes")
34-
val wallNanos: SQLMetric = metrics("wallNanos")
35-
val cpuCount: SQLMetric = metrics("cpuCount")
36-
val scanTime: SQLMetric = metrics("scanTime")
37-
val peakMemoryBytes: SQLMetric = metrics("peakMemoryBytes")
38-
val numMemoryAllocations: SQLMetric = metrics("numMemoryAllocations")
39-
40-
// Number of dynamic filters received.
41-
val numDynamicFiltersAccepted: SQLMetric = metrics("numDynamicFiltersAccepted")
42-
val skippedSplits: SQLMetric = metrics("skippedSplits")
43-
val processedSplits: SQLMetric = metrics("processedSplits")
44-
val preloadSplits: SQLMetric = metrics("preloadSplits")
45-
val pageLoadTime: SQLMetric = metrics("pageLoadTime")
46-
val dataSourceAddSplitTime: SQLMetric = metrics("dataSourceAddSplitTime")
47-
val dataSourceReadTime: SQLMetric = metrics("dataSourceReadTime")
48-
val skippedStrides: SQLMetric = metrics("skippedStrides")
49-
val processedStrides: SQLMetric = metrics("processedStrides")
50-
val remainingFilterTime: SQLMetric = metrics("remainingFilterTime")
51-
val ioWaitTime: SQLMetric = metrics("ioWaitTime")
52-
val storageReadBytes: SQLMetric = metrics("storageReadBytes")
53-
val storageReads: SQLMetric = metrics("storageReads")
54-
val localReadBytes: SQLMetric = metrics("localReadBytes")
55-
val ramReadBytes: SQLMetric = metrics("ramReadBytes")
56-
val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime")
27+
class FileSourceScanMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater {
5728

5829
override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
59-
inputMetrics.bridgeIncBytesRead(rawInputBytes.value)
60-
inputMetrics.bridgeIncRecordsRead(rawInputRows.value)
30+
inputMetrics.bridgeIncBytesRead(metrics("rawInputBytes").value)
31+
inputMetrics.bridgeIncRecordsRead(metrics("rawInputRows").value)
6132
}
6233

6334
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
6435
if (opMetrics != null) {
6536
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
66-
rawInputRows += operatorMetrics.rawInputRows
67-
rawInputBytes += operatorMetrics.rawInputBytes
68-
outputRows += operatorMetrics.outputRows
69-
outputVectors += operatorMetrics.outputVectors
70-
outputBytes += operatorMetrics.outputBytes
71-
wallNanos += operatorMetrics.wallNanos
72-
cpuCount += operatorMetrics.cpuCount
73-
scanTime += operatorMetrics.scanTime
74-
peakMemoryBytes += operatorMetrics.peakMemoryBytes
75-
numMemoryAllocations += operatorMetrics.numMemoryAllocations
76-
// Number of dynamic filters received.
77-
numDynamicFiltersAccepted += operatorMetrics.numDynamicFiltersAccepted
78-
skippedSplits += operatorMetrics.skippedSplits
79-
processedSplits += operatorMetrics.processedSplits
80-
skippedStrides += operatorMetrics.skippedStrides
81-
processedStrides += operatorMetrics.processedStrides
82-
remainingFilterTime += operatorMetrics.remainingFilterTime
83-
ioWaitTime += operatorMetrics.ioWaitTime
84-
storageReadBytes += operatorMetrics.storageReadBytes
85-
storageReads += operatorMetrics.storageReads
86-
localReadBytes += operatorMetrics.localReadBytes
87-
ramReadBytes += operatorMetrics.ramReadBytes
88-
preloadSplits += operatorMetrics.preloadSplits
89-
pageLoadTime += operatorMetrics.pageLoadTime
90-
dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime
91-
dataSourceReadTime += operatorMetrics.dataSourceReadTime
92-
loadLazyVectorTime += operatorMetrics.loadLazyVectorTime
37+
ScanMetricsUtil.inc(metrics, "rawInputRows", operatorMetrics.rawInputRows)
38+
ScanMetricsUtil.inc(metrics, "rawInputBytes", operatorMetrics.rawInputBytes)
39+
ScanMetricsUtil.inc(metrics, "numOutputRows", operatorMetrics.outputRows)
40+
ScanMetricsUtil.inc(metrics, "outputVectors", operatorMetrics.outputVectors)
41+
ScanMetricsUtil.inc(metrics, "outputBytes", operatorMetrics.outputBytes)
42+
ScanMetricsUtil.inc(metrics, "wallNanos", operatorMetrics.wallNanos)
43+
ScanMetricsUtil.inc(metrics, "cpuCount", operatorMetrics.cpuCount)
44+
ScanMetricsUtil.inc(metrics, "scanTime", operatorMetrics.scanTime)
45+
ScanMetricsUtil.inc(metrics, "peakMemoryBytes", operatorMetrics.peakMemoryBytes)
46+
ScanMetricsUtil.inc(metrics, "numMemoryAllocations", operatorMetrics.numMemoryAllocations)
47+
ScanMetricsUtil.inc(
48+
metrics,
49+
"numDynamicFiltersAccepted",
50+
operatorMetrics.numDynamicFiltersAccepted)
51+
ScanMetricsUtil.inc(metrics, "skippedSplits", operatorMetrics.skippedSplits)
52+
ScanMetricsUtil.inc(metrics, "processedSplits", operatorMetrics.processedSplits)
53+
ScanMetricsUtil.inc(metrics, "skippedStrides", operatorMetrics.skippedStrides)
54+
ScanMetricsUtil.inc(metrics, "processedStrides", operatorMetrics.processedStrides)
55+
ScanMetricsUtil.inc(metrics, "remainingFilterTime", operatorMetrics.remainingFilterTime)
56+
ScanMetricsUtil.inc(metrics, "ioWaitTime", operatorMetrics.ioWaitTime)
57+
ScanMetricsUtil.inc(metrics, "storageReadBytes", operatorMetrics.storageReadBytes)
58+
ScanMetricsUtil.inc(metrics, "storageReads", operatorMetrics.storageReads)
59+
ScanMetricsUtil.inc(metrics, "localReadBytes", operatorMetrics.localReadBytes)
60+
ScanMetricsUtil.inc(metrics, "ramReadBytes", operatorMetrics.ramReadBytes)
61+
ScanMetricsUtil.inc(metrics, "preloadSplits", operatorMetrics.preloadSplits)
62+
ScanMetricsUtil.inc(metrics, "pageLoadTime", operatorMetrics.pageLoadTime)
63+
ScanMetricsUtil.inc(metrics, "dataSourceAddSplitTime", operatorMetrics.dataSourceAddSplitTime)
64+
ScanMetricsUtil.inc(metrics, "dataSourceReadTime", operatorMetrics.dataSourceReadTime)
65+
ScanMetricsUtil.inc(metrics, "loadLazyVectorTime", operatorMetrics.loadLazyVectorTime)
9366
}
9467
}
9568
}

backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala

Lines changed: 35 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -19,72 +19,48 @@ package org.apache.gluten.metrics
1919
import org.apache.spark.sql.execution.metric.SQLMetric
2020
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
2121

22-
class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric])
23-
extends MetricsUpdater {
24-
val rawInputRows: SQLMetric = metrics("rawInputRows")
25-
val rawInputBytes: SQLMetric = metrics("rawInputBytes")
26-
val outputRows: SQLMetric = metrics("numOutputRows")
27-
val outputVectors: SQLMetric = metrics("outputVectors")
28-
val outputBytes: SQLMetric = metrics("outputBytes")
29-
val wallNanos: SQLMetric = metrics("wallNanos")
30-
val cpuCount: SQLMetric = metrics("cpuCount")
31-
val scanTime: SQLMetric = metrics("scanTime")
32-
val peakMemoryBytes: SQLMetric = metrics("peakMemoryBytes")
33-
val numMemoryAllocations: SQLMetric = metrics("numMemoryAllocations")
34-
35-
// Number of dynamic filters received.
36-
val numDynamicFiltersAccepted: SQLMetric = metrics("numDynamicFiltersAccepted")
37-
val skippedSplits: SQLMetric = metrics("skippedSplits")
38-
val processedSplits: SQLMetric = metrics("processedSplits")
39-
val preloadSplits: SQLMetric = metrics("preloadSplits")
40-
val pageLoadTime: SQLMetric = metrics("pageLoadTime")
41-
val dataSourceAddSplitTime: SQLMetric = metrics("dataSourceAddSplitTime")
42-
val dataSourceReadTime: SQLMetric = metrics("dataSourceReadTime")
43-
val skippedStrides: SQLMetric = metrics("skippedStrides")
44-
val processedStrides: SQLMetric = metrics("processedStrides")
45-
val remainingFilterTime: SQLMetric = metrics("remainingFilterTime")
46-
val ioWaitTime: SQLMetric = metrics("ioWaitTime")
47-
val storageReadBytes: SQLMetric = metrics("storageReadBytes")
48-
val storageReads: SQLMetric = metrics("storageReads")
49-
val localReadBytes: SQLMetric = metrics("localReadBytes")
50-
val ramReadBytes: SQLMetric = metrics("ramReadBytes")
51-
val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime")
22+
/**
23+
* Holds executor-side scan metrics only. Must not be @transient (see FileSourceScanMetricsUpdater).
24+
*/
25+
class HiveTableScanMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater {
5226

5327
override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
54-
inputMetrics.bridgeIncBytesRead(rawInputBytes.value)
55-
inputMetrics.bridgeIncRecordsRead(rawInputRows.value)
28+
inputMetrics.bridgeIncBytesRead(metrics("rawInputBytes").value)
29+
inputMetrics.bridgeIncRecordsRead(metrics("rawInputRows").value)
5630
}
5731

5832
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
5933
if (opMetrics != null) {
6034
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
61-
rawInputRows += operatorMetrics.rawInputRows
62-
rawInputBytes += operatorMetrics.rawInputBytes
63-
outputRows += operatorMetrics.outputRows
64-
outputVectors += operatorMetrics.outputVectors
65-
outputBytes += operatorMetrics.outputBytes
66-
wallNanos += operatorMetrics.wallNanos
67-
cpuCount += operatorMetrics.cpuCount
68-
scanTime += operatorMetrics.scanTime
69-
peakMemoryBytes += operatorMetrics.peakMemoryBytes
70-
numMemoryAllocations += operatorMetrics.numMemoryAllocations
71-
// Number of dynamic filters received.
72-
numDynamicFiltersAccepted += operatorMetrics.numDynamicFiltersAccepted
73-
skippedSplits += operatorMetrics.skippedSplits
74-
processedSplits += operatorMetrics.processedSplits
75-
skippedStrides += operatorMetrics.skippedStrides
76-
processedStrides += operatorMetrics.processedStrides
77-
remainingFilterTime += operatorMetrics.remainingFilterTime
78-
ioWaitTime += operatorMetrics.ioWaitTime
79-
storageReadBytes += operatorMetrics.storageReadBytes
80-
storageReads += operatorMetrics.storageReads
81-
localReadBytes += operatorMetrics.localReadBytes
82-
ramReadBytes += operatorMetrics.ramReadBytes
83-
preloadSplits += operatorMetrics.preloadSplits
84-
pageLoadTime += operatorMetrics.pageLoadTime
85-
dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime
86-
dataSourceReadTime += operatorMetrics.dataSourceReadTime
87-
loadLazyVectorTime += operatorMetrics.loadLazyVectorTime
35+
ScanMetricsUtil.inc(metrics, "rawInputRows", operatorMetrics.rawInputRows)
36+
ScanMetricsUtil.inc(metrics, "rawInputBytes", operatorMetrics.rawInputBytes)
37+
ScanMetricsUtil.inc(metrics, "numOutputRows", operatorMetrics.outputRows)
38+
ScanMetricsUtil.inc(metrics, "outputVectors", operatorMetrics.outputVectors)
39+
ScanMetricsUtil.inc(metrics, "outputBytes", operatorMetrics.outputBytes)
40+
ScanMetricsUtil.inc(metrics, "wallNanos", operatorMetrics.wallNanos)
41+
ScanMetricsUtil.inc(metrics, "cpuCount", operatorMetrics.cpuCount)
42+
ScanMetricsUtil.inc(metrics, "scanTime", operatorMetrics.scanTime)
43+
ScanMetricsUtil.inc(metrics, "peakMemoryBytes", operatorMetrics.peakMemoryBytes)
44+
ScanMetricsUtil.inc(metrics, "numMemoryAllocations", operatorMetrics.numMemoryAllocations)
45+
ScanMetricsUtil.inc(
46+
metrics,
47+
"numDynamicFiltersAccepted",
48+
operatorMetrics.numDynamicFiltersAccepted)
49+
ScanMetricsUtil.inc(metrics, "skippedSplits", operatorMetrics.skippedSplits)
50+
ScanMetricsUtil.inc(metrics, "processedSplits", operatorMetrics.processedSplits)
51+
ScanMetricsUtil.inc(metrics, "skippedStrides", operatorMetrics.skippedStrides)
52+
ScanMetricsUtil.inc(metrics, "processedStrides", operatorMetrics.processedStrides)
53+
ScanMetricsUtil.inc(metrics, "remainingFilterTime", operatorMetrics.remainingFilterTime)
54+
ScanMetricsUtil.inc(metrics, "ioWaitTime", operatorMetrics.ioWaitTime)
55+
ScanMetricsUtil.inc(metrics, "storageReadBytes", operatorMetrics.storageReadBytes)
56+
ScanMetricsUtil.inc(metrics, "storageReads", operatorMetrics.storageReads)
57+
ScanMetricsUtil.inc(metrics, "localReadBytes", operatorMetrics.localReadBytes)
58+
ScanMetricsUtil.inc(metrics, "ramReadBytes", operatorMetrics.ramReadBytes)
59+
ScanMetricsUtil.inc(metrics, "preloadSplits", operatorMetrics.preloadSplits)
60+
ScanMetricsUtil.inc(metrics, "pageLoadTime", operatorMetrics.pageLoadTime)
61+
ScanMetricsUtil.inc(metrics, "dataSourceAddSplitTime", operatorMetrics.dataSourceAddSplitTime)
62+
ScanMetricsUtil.inc(metrics, "dataSourceReadTime", operatorMetrics.dataSourceReadTime)
63+
ScanMetricsUtil.inc(metrics, "loadLazyVectorTime", operatorMetrics.loadLazyVectorTime)
8864
}
8965
}
9066
}

backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
199199
assert(scan.isDefined)
200200
val metrics = scan.get.metrics
201201
assert(metrics("rawInputRows").value == 100)
202-
assert(metrics("outputVectors").value == 1)
202+
assert(metrics.contains("numOutputRows"))
203203
}
204204
}
205205

0 commit comments

Comments
 (0)