Skip to content

Commit 520835a

Browse files
ZiyaZaaokolnychyi
authored andcommitted
[SPARK-56598][SQL] Custom metrics support for TruncatableTable
### What changes were proposed in this pull request? Added support for connectors to provide custom metrics in Truncate and Delete operations. Refactored the existing custom metrics logic to be reusable across different plan nodes. ### Why are the changes needed? For connectors to provide metric values. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7 Closes #55511 from ZiyaZa/custom-metrics. Authored-by: Ziya Mukhtarov <ziya5muxtarov@gmail.com> Signed-off-by: Anton Okolnychyi <aokolnychyi@apache.org>
1 parent 586ac79 commit 520835a

11 files changed

Lines changed: 187 additions & 92 deletions

File tree

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.spark.sql.connector.catalog;
1919

2020
import org.apache.spark.annotation.Evolving;
21+
import org.apache.spark.sql.connector.metric.CustomMetric;
22+
import org.apache.spark.sql.connector.metric.CustomTaskMetric;
2123

2224
/**
2325
* Represents a table which can be atomically truncated.
@@ -34,4 +36,25 @@ public interface TruncatableTable extends Table {
3436
* @since 3.2.0
3537
*/
3638
boolean truncateTable();
39+
40+
/**
41+
* Returns an array of supported custom metrics with name and description.
42+
* By default it returns empty array.
43+
*
44+
* @since 4.2.0
45+
*/
46+
default CustomMetric[] supportedCustomMetrics() {
47+
return new CustomMetric[]{};
48+
}
49+
50+
/**
51+
* Returns an array of custom metrics which are collected with values at the driver side only.
52+
* Note that these metrics must be included in the supported custom metrics reported by
53+
* `supportedCustomMetrics`.
54+
*
55+
* @since 4.2.0
56+
*/
57+
default CustomTaskMetric[] reportDriverMetrics() {
58+
return new CustomTaskMetric[]{};
59+
}
3760
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ case class BatchScanExec(
129129
new DataSourceRDD(
130130
sparkContext, filteredPartitions, readerFactory, supportsColumnar, customMetrics)
131131
}
132-
postDriverMetrics()
132+
postDriverMetrics(scan.reportDriverMetrics())
133133
rdd
134134
}
135135

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ case class ContinuousScanExec(
6565
schema,
6666
readerFactory,
6767
customMetrics)
68-
postDriverMetrics()
68+
postDriverMetrics(scan.reportDriverMetrics())
6969
inputRDD
7070
}
7171
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,22 @@ import org.apache.spark.sql.catalyst.plans.physical
2424
import org.apache.spark.sql.catalyst.plans.physical.KeyedPartitioning
2525
import org.apache.spark.sql.catalyst.util.truncatedString
2626
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, PartitionReaderFactory, Scan}
27-
import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode, SafeForKWayMerge, SQLExecution}
28-
import org.apache.spark.sql.execution.metric.SQLMetrics
27+
import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode, SafeForKWayMerge}
28+
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
2929
import org.apache.spark.sql.internal.connector.SupportsMetadata
3030
import org.apache.spark.sql.vectorized.ColumnarBatch
31-
import org.apache.spark.util.ArrayImplicits._
3231
import org.apache.spark.util.Utils
3332

34-
trait DataSourceV2ScanExecBase extends LeafExecNode with SafeForKWayMerge {
33+
trait DataSourceV2ScanExecBase
34+
extends LeafExecNode
35+
with SafeForKWayMerge
36+
with SupportsCustomDriverMetrics {
3537

36-
lazy val customMetrics = scan.supportedCustomMetrics().map { customMetric =>
37-
customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, customMetric)
38-
}.toMap
38+
override lazy val customMetrics: Map[String, SQLMetric] =
39+
createCustomMetrics(scan.supportedCustomMetrics())
3940

40-
override lazy val metrics = {
41-
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) ++
42-
customMetrics
43-
}
41+
override protected lazy val sparkMetrics: Map[String, SQLMetric] =
42+
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
4443

4544
def scan: Scan
4645

@@ -145,18 +144,6 @@ trait DataSourceV2ScanExecBase extends LeafExecNode with SafeForKWayMerge {
145144
}
146145
}
147146

148-
protected def postDriverMetrics(): Unit = {
149-
val driveSQLMetrics = scan.reportDriverMetrics().map(customTaskMetric => {
150-
val metric = metrics(customTaskMetric.name())
151-
metric.set(customTaskMetric.value())
152-
metric
153-
})
154-
155-
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
156-
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
157-
driveSQLMetrics.toImmutableArraySeq)
158-
}
159-
160147
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
161148
val numOutputRows = longMetric("numOutputRows")
162149
inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { b =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,29 @@ import org.apache.spark.sql.catalyst.transactions.TransactionUtils
2323
import org.apache.spark.sql.connector.catalog.SupportsDeleteV2
2424
import org.apache.spark.sql.connector.catalog.transactions.Transaction
2525
import org.apache.spark.sql.connector.expressions.filter.Predicate
26+
import org.apache.spark.sql.execution.metric.SQLMetric
2627

2728
case class DeleteFromTableExec(
2829
table: SupportsDeleteV2,
2930
condition: Array[Predicate],
3031
refreshCache: () => Unit,
31-
transaction: Option[Transaction] = None) extends LeafV2CommandExec with TransactionalExec {
32+
transaction: Option[Transaction] = None)
33+
extends LeafV2CommandExec
34+
with TransactionalExec
35+
with SupportsCustomDriverMetrics {
36+
37+
override lazy val customMetrics: Map[String, SQLMetric] =
38+
createCustomMetrics(table.supportedCustomMetrics())
3239

3340
override def withTransaction(txn: Option[Transaction]): DeleteFromTableExec =
3441
copy(transaction = txn)
3542

3643
override protected def run(): Seq[InternalRow] = {
37-
table.deleteWhere(condition)
44+
try {
45+
table.deleteWhere(condition)
46+
} finally {
47+
postDriverMetrics(table.reportDriverMetrics())
48+
}
3849
transaction.foreach(TransactionUtils.commit)
3950
refreshCache()
4051
Seq.empty

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ case class MicroBatchScanExec(
5555
override lazy val inputRDD: RDD[InternalRow] = {
5656
val inputRDD = new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar,
5757
customMetrics)
58-
postDriverMetrics()
58+
postDriverMetrics(scan.reportDriverMetrics())
5959
inputRDD
6060
}
6161

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ case class RealTimeStreamScanExec(
166166
supportsColumnar,
167167
customMetrics
168168
)
169-
postDriverMetrics()
169+
postDriverMetrics(scan.reportDriverMetrics())
170170
inputRDD
171171
}
172172
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.v2
19+
20+
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
21+
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
22+
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
23+
import org.apache.spark.util.ArrayImplicits._
24+
25+
/**
26+
* A mixin for Spark plan nodes that expose driver-side custom metrics reported by a connector.
27+
* Implementations declare the connector-owned metrics via [[customMetrics]]; after the underlying
28+
* operation has executed they call [[postDriverMetrics]] with the connector's reported values so
29+
* they are visible in the SQL UI.
30+
*
31+
* Nodes that also expose Spark-owned metrics supply them via [[sparkMetrics]]. Names in
32+
* [[sparkMetrics]] are reserved: if the connector happens to report a value under the same name,
33+
* Spark's value wins and the connector's is dropped.
34+
*/
35+
trait SupportsCustomDriverMetrics { self: SparkPlan =>
36+
37+
/**
38+
* The custom metrics the connector supports for this operation, keyed by name.
39+
*/
40+
def customMetrics: Map[String, SQLMetric]
41+
42+
/**
43+
* Spark-owned metrics that should appear alongside the connector-declared ones. Values under
44+
* these names are owned by Spark and take precedence on a name collision.
45+
*/
46+
protected def sparkMetrics: Map[String, SQLMetric] = Map.empty
47+
48+
override lazy val metrics: Map[String, SQLMetric] = customMetrics ++ sparkMetrics
49+
50+
/**
51+
* Converts an array of connector-declared metrics into the map shape [[customMetrics]] uses.
52+
*/
53+
protected def createCustomMetrics(metrics: Array[CustomMetric]): Map[String, SQLMetric] = {
54+
metrics.map { m =>
55+
m.name -> SQLMetrics.createV2CustomMetric(sparkContext, m)
56+
}.toMap
57+
}
58+
59+
/**
60+
* Applies the values reported by the connector to the declared metrics and posts them so the
61+
* SQL UI reflects the final values. Metrics not declared via [[customMetrics]] are ignored.
62+
* Metrics whose name collides with [[sparkMetrics]] are also ignored so Spark-owned values
63+
* are preserved.
64+
*/
65+
protected def postDriverMetrics(taskMetrics: Array[CustomTaskMetric]): Unit = {
66+
val updated = taskMetrics.flatMap { t =>
67+
if (sparkMetrics.contains(t.name())) {
68+
// Spark metrics take precedence on collisions.
69+
None
70+
} else {
71+
metrics.get(t.name()).map { metric =>
72+
metric.set(t.value())
73+
metric
74+
}
75+
}
76+
}
77+
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
78+
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, updated.toImmutableArraySeq)
79+
}
80+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,28 @@ package org.apache.spark.sql.execution.datasources.v2
2020
import org.apache.spark.sql.catalyst.InternalRow
2121
import org.apache.spark.sql.catalyst.expressions.Attribute
2222
import org.apache.spark.sql.connector.catalog.TruncatableTable
23+
import org.apache.spark.sql.execution.metric.SQLMetric
2324

2425
/**
2526
* Physical plan node for table truncation.
2627
*/
2728
case class TruncateTableExec(
2829
table: TruncatableTable,
29-
refreshCache: () => Unit) extends LeafV2CommandExec {
30+
refreshCache: () => Unit)
31+
extends LeafV2CommandExec
32+
with SupportsCustomDriverMetrics {
33+
34+
override lazy val customMetrics: Map[String, SQLMetric] =
35+
createCustomMetrics(table.supportedCustomMetrics())
3036

3137
override def output: Seq[Attribute] = Seq.empty
3238

3339
override protected def run(): Seq[InternalRow] = {
34-
if (table.truncateTable()) refreshCache()
35-
Seq.empty
40+
try {
41+
if (table.truncateTable()) refreshCache()
42+
Seq.empty
43+
} finally {
44+
postDriverMetrics(table.reportDriverMetrics())
45+
}
3646
}
3747
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2323
import org.apache.spark.sql.classic.Dataset
2424
import org.apache.spark.sql.connector.catalog.SupportsWrite
2525
import org.apache.spark.sql.connector.write.V1Write
26-
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
27-
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
26+
import org.apache.spark.sql.execution.SparkPlan
27+
import org.apache.spark.sql.execution.metric.SQLMetric
2828
import org.apache.spark.sql.sources.InsertableRelation
2929

3030
/**
@@ -56,13 +56,15 @@ case class OverwriteByExpressionExecV1(
5656
write: V1Write) extends V1FallbackWriters
5757

5858
/** Some helper interfaces that use V2 write semantics through the V1 writer interface. */
59-
sealed trait V1FallbackWriters extends LeafV2CommandExec with SupportsV1Write {
59+
sealed trait V1FallbackWriters
60+
extends LeafV2CommandExec
61+
with SupportsV1Write
62+
with SupportsCustomDriverMetrics {
63+
6064
override def output: Seq[Attribute] = Nil
6165

62-
override val metrics: Map[String, SQLMetric] =
63-
write.supportedCustomMetrics().map { customMetric =>
64-
customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, customMetric)
65-
}.toMap
66+
override lazy val customMetrics: Map[String, SQLMetric] =
67+
createCustomMetrics(write.supportedCustomMetrics())
6668

6769
def table: SupportsWrite
6870
def refreshCache: () => Unit
@@ -75,12 +77,7 @@ sealed trait V1FallbackWriters extends LeafV2CommandExec with SupportsV1Write {
7577

7678
Nil
7779
} finally {
78-
write.reportDriverMetrics().foreach { customTaskMetric =>
79-
metrics.get(customTaskMetric.name()).foreach(_.set(customTaskMetric.value()))
80-
}
81-
82-
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
83-
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
80+
postDriverMetrics(write.reportDriverMetrics())
8481
}
8582
}
8683
}

0 commit comments

Comments
 (0)