Skip to content

Commit 636c627

Browse files
Gluten driver oom while spark ok with same driver memory
1 parent d2b48f1 commit 636c627

11 files changed

Lines changed: 378 additions & 154 deletions

File tree

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
9999
}
100100

101101
override def genBatchScanTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
102+
ScanMetricsUtil.filterScanMetrics(
103+
genBatchScanTransformerMetricsFull(sparkContext),
104+
ScanMetricsUtil.VELOX_BATCH_SCAN_MINIMAL_METRICS)
105+
106+
private def genBatchScanTransformerMetricsFull(
107+
sparkContext: SparkContext): Map[String, SQLMetric] =
102108
Map(
103109
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
104110
"inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input vectors"),
@@ -148,6 +154,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
148154

149155
override def genHiveTableScanTransformerMetrics(
150156
sparkContext: SparkContext): Map[String, SQLMetric] =
157+
ScanMetricsUtil.filterScanMetrics(
158+
genHiveTableScanTransformerMetricsFull(sparkContext),
159+
ScanMetricsUtil.VELOX_HIVE_SCAN_MINIMAL_METRICS)
160+
161+
private def genHiveTableScanTransformerMetricsFull(
162+
sparkContext: SparkContext): Map[String, SQLMetric] =
151163
Map(
152164
"rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"),
153165
"rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"),
@@ -200,6 +212,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
200212

201213
override def genFileSourceScanTransformerMetrics(
202214
sparkContext: SparkContext): Map[String, SQLMetric] =
215+
ScanMetricsUtil.filterScanMetrics(
216+
genFileSourceScanTransformerMetricsFull(sparkContext),
217+
ScanMetricsUtil.VELOX_FILE_SCAN_MINIMAL_METRICS)
218+
219+
private def genFileSourceScanTransformerMetricsFull(
220+
sparkContext: SparkContext): Map[String, SQLMetric] =
203221
Map(
204222
"rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"),
205223
"rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"),

backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala

Lines changed: 69 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,46 +19,82 @@ package org.apache.gluten.metrics
1919
import org.apache.spark.sql.execution.metric.SQLMetric
2020
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
2121

22-
class BatchScanMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater {
22+
/**
23+
* See [[FileSourceScanMetricsUpdater]]: @transient metrics map with per-metric fields captured on
24+
* the driver for executor serialization.
25+
*/
26+
class BatchScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric])
27+
extends MetricsUpdater {
28+
29+
private def metric(key: String): Option[SQLMetric] = Option(metrics).flatMap(_.get(key))
30+
31+
private val numInputRows: Option[SQLMetric] = metric("numInputRows")
32+
private val inputVectors: Option[SQLMetric] = metric("inputVectors")
33+
private val inputBytes: Option[SQLMetric] = metric("inputBytes")
34+
private val rawInputRows: Option[SQLMetric] = metric("rawInputRows")
35+
private val rawInputBytes: Option[SQLMetric] = metric("rawInputBytes")
36+
private val outputRows: Option[SQLMetric] = metric("numOutputRows")
37+
private val outputVectors: Option[SQLMetric] = metric("outputVectors")
38+
private val outputBytes: Option[SQLMetric] = metric("outputBytes")
39+
private val cpuCount: Option[SQLMetric] = metric("cpuCount")
40+
private val scanTime: Option[SQLMetric] = metric("scanTime")
41+
private val wallNanos: Option[SQLMetric] = metric("wallNanos")
42+
private val peakMemoryBytes: Option[SQLMetric] = metric("peakMemoryBytes")
43+
private val numMemoryAllocations: Option[SQLMetric] = metric("numMemoryAllocations")
44+
private val numDynamicFiltersAccepted: Option[SQLMetric] = metric("numDynamicFiltersAccepted")
45+
private val skippedSplits: Option[SQLMetric] = metric("skippedSplits")
46+
private val processedSplits: Option[SQLMetric] = metric("processedSplits")
47+
private val skippedStrides: Option[SQLMetric] = metric("skippedStrides")
48+
private val processedStrides: Option[SQLMetric] = metric("processedStrides")
49+
private val remainingFilterTime: Option[SQLMetric] = metric("remainingFilterTime")
50+
private val ioWaitTime: Option[SQLMetric] = metric("ioWaitTime")
51+
private val storageReadBytes: Option[SQLMetric] = metric("storageReadBytes")
52+
private val storageReads: Option[SQLMetric] = metric("storageReads")
53+
private val localReadBytes: Option[SQLMetric] = metric("localReadBytes")
54+
private val ramReadBytes: Option[SQLMetric] = metric("ramReadBytes")
55+
private val preloadSplits: Option[SQLMetric] = metric("preloadSplits")
56+
private val pageLoadTime: Option[SQLMetric] = metric("pageLoadTime")
57+
private val dataSourceAddSplitTime: Option[SQLMetric] = metric("dataSourceAddSplitTime")
58+
private val dataSourceReadTime: Option[SQLMetric] = metric("dataSourceReadTime")
59+
private val loadLazyVectorTime: Option[SQLMetric] = metric("loadLazyVectorTime")
2360

2461
override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
25-
inputMetrics.bridgeIncBytesRead(metrics("rawInputBytes").value)
26-
inputMetrics.bridgeIncRecordsRead(metrics("rawInputRows").value)
62+
rawInputBytes.foreach(m => inputMetrics.bridgeIncBytesRead(m.value))
63+
rawInputRows.foreach(m => inputMetrics.bridgeIncRecordsRead(m.value))
2764
}
2865

2966
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
3067
if (opMetrics != null) {
3168
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
32-
metrics("numInputRows") += operatorMetrics.inputRows
33-
metrics("inputVectors") += operatorMetrics.inputVectors
34-
metrics("inputBytes") += operatorMetrics.inputBytes
35-
metrics("rawInputRows") += operatorMetrics.rawInputRows
36-
metrics("rawInputBytes") += operatorMetrics.rawInputBytes
37-
metrics("numOutputRows") += operatorMetrics.outputRows
38-
metrics("outputVectors") += operatorMetrics.outputVectors
39-
metrics("outputBytes") += operatorMetrics.outputBytes
40-
metrics("cpuCount") += operatorMetrics.cpuCount
41-
metrics("scanTime") += operatorMetrics.scanTime
42-
metrics("wallNanos") += operatorMetrics.wallNanos
43-
metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
44-
metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
45-
// Number of dynamic filters received.
46-
metrics("numDynamicFiltersAccepted") += operatorMetrics.numDynamicFiltersAccepted
47-
metrics("skippedSplits") += operatorMetrics.skippedSplits
48-
metrics("processedSplits") += operatorMetrics.processedSplits
49-
metrics("skippedStrides") += operatorMetrics.skippedStrides
50-
metrics("processedStrides") += operatorMetrics.processedStrides
51-
metrics("remainingFilterTime") += operatorMetrics.remainingFilterTime
52-
metrics("ioWaitTime") += operatorMetrics.ioWaitTime
53-
metrics("storageReadBytes") += operatorMetrics.storageReadBytes
54-
metrics("storageReads") += operatorMetrics.storageReads
55-
metrics("localReadBytes") += operatorMetrics.localReadBytes
56-
metrics("ramReadBytes") += operatorMetrics.ramReadBytes
57-
metrics("preloadSplits") += operatorMetrics.preloadSplits
58-
metrics("pageLoadTime") += operatorMetrics.pageLoadTime
59-
metrics("dataSourceAddSplitTime") += operatorMetrics.dataSourceAddSplitTime
60-
metrics("dataSourceReadTime") += operatorMetrics.dataSourceReadTime
61-
metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime
69+
ScanMetricsUtil.inc(numInputRows, operatorMetrics.inputRows)
70+
ScanMetricsUtil.inc(inputVectors, operatorMetrics.inputVectors)
71+
ScanMetricsUtil.inc(inputBytes, operatorMetrics.inputBytes)
72+
ScanMetricsUtil.inc(rawInputRows, operatorMetrics.rawInputRows)
73+
ScanMetricsUtil.inc(rawInputBytes, operatorMetrics.rawInputBytes)
74+
ScanMetricsUtil.inc(outputRows, operatorMetrics.outputRows)
75+
ScanMetricsUtil.inc(outputVectors, operatorMetrics.outputVectors)
76+
ScanMetricsUtil.inc(outputBytes, operatorMetrics.outputBytes)
77+
ScanMetricsUtil.inc(cpuCount, operatorMetrics.cpuCount)
78+
ScanMetricsUtil.inc(scanTime, operatorMetrics.scanTime)
79+
ScanMetricsUtil.inc(wallNanos, operatorMetrics.wallNanos)
80+
ScanMetricsUtil.inc(peakMemoryBytes, operatorMetrics.peakMemoryBytes)
81+
ScanMetricsUtil.inc(numMemoryAllocations, operatorMetrics.numMemoryAllocations)
82+
ScanMetricsUtil.inc(numDynamicFiltersAccepted, operatorMetrics.numDynamicFiltersAccepted)
83+
ScanMetricsUtil.inc(skippedSplits, operatorMetrics.skippedSplits)
84+
ScanMetricsUtil.inc(processedSplits, operatorMetrics.processedSplits)
85+
ScanMetricsUtil.inc(skippedStrides, operatorMetrics.skippedStrides)
86+
ScanMetricsUtil.inc(processedStrides, operatorMetrics.processedStrides)
87+
ScanMetricsUtil.inc(remainingFilterTime, operatorMetrics.remainingFilterTime)
88+
ScanMetricsUtil.inc(ioWaitTime, operatorMetrics.ioWaitTime)
89+
ScanMetricsUtil.inc(storageReadBytes, operatorMetrics.storageReadBytes)
90+
ScanMetricsUtil.inc(storageReads, operatorMetrics.storageReads)
91+
ScanMetricsUtil.inc(localReadBytes, operatorMetrics.localReadBytes)
92+
ScanMetricsUtil.inc(ramReadBytes, operatorMetrics.ramReadBytes)
93+
ScanMetricsUtil.inc(preloadSplits, operatorMetrics.preloadSplits)
94+
ScanMetricsUtil.inc(pageLoadTime, operatorMetrics.pageLoadTime)
95+
ScanMetricsUtil.inc(dataSourceAddSplitTime, operatorMetrics.dataSourceAddSplitTime)
96+
ScanMetricsUtil.inc(dataSourceReadTime, operatorMetrics.dataSourceReadTime)
97+
ScanMetricsUtil.inc(loadLazyVectorTime, operatorMetrics.loadLazyVectorTime)
6298
}
6399
}
64100
}

backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala

Lines changed: 58 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -20,76 +20,76 @@ import org.apache.spark.sql.execution.metric.SQLMetric
2020
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
2121

2222
/**
23-
* Note: "val metrics" is made transient to avoid sending driver-side metrics to tasks, e.g.
24-
* "pruning time" from scan.
23+
* Note: "metrics" is @transient to avoid sending the map to executors. SQLMetric references are
24+
* captured into fields at construction time on the driver (when metrics are registered) and
25+
* serialized to executors. Missing keys (minimal scan metrics mode) yield None and are skipped.
2526
*/
2627
class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric])
2728
extends MetricsUpdater {
2829

29-
val rawInputRows: SQLMetric = metrics("rawInputRows")
30-
val rawInputBytes: SQLMetric = metrics("rawInputBytes")
31-
val outputRows: SQLMetric = metrics("numOutputRows")
32-
val outputVectors: SQLMetric = metrics("outputVectors")
33-
val outputBytes: SQLMetric = metrics("outputBytes")
34-
val wallNanos: SQLMetric = metrics("wallNanos")
35-
val cpuCount: SQLMetric = metrics("cpuCount")
36-
val scanTime: SQLMetric = metrics("scanTime")
37-
val peakMemoryBytes: SQLMetric = metrics("peakMemoryBytes")
38-
val numMemoryAllocations: SQLMetric = metrics("numMemoryAllocations")
30+
private def metric(key: String): Option[SQLMetric] = Option(metrics).flatMap(_.get(key))
3931

40-
// Number of dynamic filters received.
41-
val numDynamicFiltersAccepted: SQLMetric = metrics("numDynamicFiltersAccepted")
42-
val skippedSplits: SQLMetric = metrics("skippedSplits")
43-
val processedSplits: SQLMetric = metrics("processedSplits")
44-
val preloadSplits: SQLMetric = metrics("preloadSplits")
45-
val pageLoadTime: SQLMetric = metrics("pageLoadTime")
46-
val dataSourceAddSplitTime: SQLMetric = metrics("dataSourceAddSplitTime")
47-
val dataSourceReadTime: SQLMetric = metrics("dataSourceReadTime")
48-
val skippedStrides: SQLMetric = metrics("skippedStrides")
49-
val processedStrides: SQLMetric = metrics("processedStrides")
50-
val remainingFilterTime: SQLMetric = metrics("remainingFilterTime")
51-
val ioWaitTime: SQLMetric = metrics("ioWaitTime")
52-
val storageReadBytes: SQLMetric = metrics("storageReadBytes")
53-
val storageReads: SQLMetric = metrics("storageReads")
54-
val localReadBytes: SQLMetric = metrics("localReadBytes")
55-
val ramReadBytes: SQLMetric = metrics("ramReadBytes")
56-
val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime")
32+
private val rawInputRows: Option[SQLMetric] = metric("rawInputRows")
33+
private val rawInputBytes: Option[SQLMetric] = metric("rawInputBytes")
34+
private val outputRows: Option[SQLMetric] = metric("numOutputRows")
35+
private val outputVectors: Option[SQLMetric] = metric("outputVectors")
36+
private val outputBytes: Option[SQLMetric] = metric("outputBytes")
37+
private val wallNanos: Option[SQLMetric] = metric("wallNanos")
38+
private val cpuCount: Option[SQLMetric] = metric("cpuCount")
39+
private val scanTime: Option[SQLMetric] = metric("scanTime")
40+
private val peakMemoryBytes: Option[SQLMetric] = metric("peakMemoryBytes")
41+
private val numMemoryAllocations: Option[SQLMetric] = metric("numMemoryAllocations")
42+
private val numDynamicFiltersAccepted: Option[SQLMetric] = metric("numDynamicFiltersAccepted")
43+
private val skippedSplits: Option[SQLMetric] = metric("skippedSplits")
44+
private val processedSplits: Option[SQLMetric] = metric("processedSplits")
45+
private val skippedStrides: Option[SQLMetric] = metric("skippedStrides")
46+
private val processedStrides: Option[SQLMetric] = metric("processedStrides")
47+
private val remainingFilterTime: Option[SQLMetric] = metric("remainingFilterTime")
48+
private val ioWaitTime: Option[SQLMetric] = metric("ioWaitTime")
49+
private val storageReadBytes: Option[SQLMetric] = metric("storageReadBytes")
50+
private val storageReads: Option[SQLMetric] = metric("storageReads")
51+
private val localReadBytes: Option[SQLMetric] = metric("localReadBytes")
52+
private val ramReadBytes: Option[SQLMetric] = metric("ramReadBytes")
53+
private val preloadSplits: Option[SQLMetric] = metric("preloadSplits")
54+
private val pageLoadTime: Option[SQLMetric] = metric("pageLoadTime")
55+
private val dataSourceAddSplitTime: Option[SQLMetric] = metric("dataSourceAddSplitTime")
56+
private val dataSourceReadTime: Option[SQLMetric] = metric("dataSourceReadTime")
57+
private val loadLazyVectorTime: Option[SQLMetric] = metric("loadLazyVectorTime")
5758

5859
override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
59-
inputMetrics.bridgeIncBytesRead(rawInputBytes.value)
60-
inputMetrics.bridgeIncRecordsRead(rawInputRows.value)
60+
rawInputBytes.foreach(m => inputMetrics.bridgeIncBytesRead(m.value))
61+
rawInputRows.foreach(m => inputMetrics.bridgeIncRecordsRead(m.value))
6162
}
6263

6364
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
6465
if (opMetrics != null) {
6566
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
66-
rawInputRows += operatorMetrics.rawInputRows
67-
rawInputBytes += operatorMetrics.rawInputBytes
68-
outputRows += operatorMetrics.outputRows
69-
outputVectors += operatorMetrics.outputVectors
70-
outputBytes += operatorMetrics.outputBytes
71-
wallNanos += operatorMetrics.wallNanos
72-
cpuCount += operatorMetrics.cpuCount
73-
scanTime += operatorMetrics.scanTime
74-
peakMemoryBytes += operatorMetrics.peakMemoryBytes
75-
numMemoryAllocations += operatorMetrics.numMemoryAllocations
76-
// Number of dynamic filters received.
77-
numDynamicFiltersAccepted += operatorMetrics.numDynamicFiltersAccepted
78-
skippedSplits += operatorMetrics.skippedSplits
79-
processedSplits += operatorMetrics.processedSplits
80-
skippedStrides += operatorMetrics.skippedStrides
81-
processedStrides += operatorMetrics.processedStrides
82-
remainingFilterTime += operatorMetrics.remainingFilterTime
83-
ioWaitTime += operatorMetrics.ioWaitTime
84-
storageReadBytes += operatorMetrics.storageReadBytes
85-
storageReads += operatorMetrics.storageReads
86-
localReadBytes += operatorMetrics.localReadBytes
87-
ramReadBytes += operatorMetrics.ramReadBytes
88-
preloadSplits += operatorMetrics.preloadSplits
89-
pageLoadTime += operatorMetrics.pageLoadTime
90-
dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime
91-
dataSourceReadTime += operatorMetrics.dataSourceReadTime
92-
loadLazyVectorTime += operatorMetrics.loadLazyVectorTime
67+
ScanMetricsUtil.inc(rawInputRows, operatorMetrics.rawInputRows)
68+
ScanMetricsUtil.inc(rawInputBytes, operatorMetrics.rawInputBytes)
69+
ScanMetricsUtil.inc(outputRows, operatorMetrics.outputRows)
70+
ScanMetricsUtil.inc(outputVectors, operatorMetrics.outputVectors)
71+
ScanMetricsUtil.inc(outputBytes, operatorMetrics.outputBytes)
72+
ScanMetricsUtil.inc(wallNanos, operatorMetrics.wallNanos)
73+
ScanMetricsUtil.inc(cpuCount, operatorMetrics.cpuCount)
74+
ScanMetricsUtil.inc(scanTime, operatorMetrics.scanTime)
75+
ScanMetricsUtil.inc(peakMemoryBytes, operatorMetrics.peakMemoryBytes)
76+
ScanMetricsUtil.inc(numMemoryAllocations, operatorMetrics.numMemoryAllocations)
77+
ScanMetricsUtil.inc(numDynamicFiltersAccepted, operatorMetrics.numDynamicFiltersAccepted)
78+
ScanMetricsUtil.inc(skippedSplits, operatorMetrics.skippedSplits)
79+
ScanMetricsUtil.inc(processedSplits, operatorMetrics.processedSplits)
80+
ScanMetricsUtil.inc(skippedStrides, operatorMetrics.skippedStrides)
81+
ScanMetricsUtil.inc(processedStrides, operatorMetrics.processedStrides)
82+
ScanMetricsUtil.inc(remainingFilterTime, operatorMetrics.remainingFilterTime)
83+
ScanMetricsUtil.inc(ioWaitTime, operatorMetrics.ioWaitTime)
84+
ScanMetricsUtil.inc(storageReadBytes, operatorMetrics.storageReadBytes)
85+
ScanMetricsUtil.inc(storageReads, operatorMetrics.storageReads)
86+
ScanMetricsUtil.inc(localReadBytes, operatorMetrics.localReadBytes)
87+
ScanMetricsUtil.inc(ramReadBytes, operatorMetrics.ramReadBytes)
88+
ScanMetricsUtil.inc(preloadSplits, operatorMetrics.preloadSplits)
89+
ScanMetricsUtil.inc(pageLoadTime, operatorMetrics.pageLoadTime)
90+
ScanMetricsUtil.inc(dataSourceAddSplitTime, operatorMetrics.dataSourceAddSplitTime)
91+
ScanMetricsUtil.inc(dataSourceReadTime, operatorMetrics.dataSourceReadTime)
92+
ScanMetricsUtil.inc(loadLazyVectorTime, operatorMetrics.loadLazyVectorTime)
9393
}
9494
}
9595
}

0 commit comments

Comments
 (0)