Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,19 @@ object CometConf extends ShimCometConf {
.longConf
.createWithDefault(3000L)

val COMET_METRICS_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.metrics.enabled")
.category(CATEGORY_EXEC)
.doc(
"Whether to enable Comet metrics reporting through Spark's external monitoring system. " +
"When enabled, Comet exposes metrics such as native operators, Spark operators, " +
"queries planned, transitions, and acceleration ratio. These metrics can be " +
"visualized through tools like Grafana when a metrics sink (e.g., Prometheus) is " +
"configured. Disabled by default because Spark plan traversal adds overhead and " +
"metrics require a sink to be useful.")
Comment thread
coderfender marked this conversation as resolved.
Outdated
.booleanConf
.createWithDefault(false)

val COMET_LIBHDFS_SCHEMES_KEY = "fs.comet.libhdfs.schemes"

val COMET_LIBHDFS_SCHEMES: OptionalConfigEntry[String] =
Expand Down
1 change: 1 addition & 0 deletions dev/ensure-jars-have-correct-contents.sh
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ allowed_expr+="|^org/apache/spark/sql/$"
allowed_expr+="|^org/apache/spark/sql/ExtendedExplainGenerator.*$"
allowed_expr+="|^org/apache/spark/CometPlugin.class$"
allowed_expr+="|^org/apache/spark/CometDriverPlugin.*$"
allowed_expr+="|^org/apache/spark/CometSource.*$"
allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.class$"
allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.*$"
allowed_expr+="|^scala-collection-compat.properties$"
Expand Down
34 changes: 34 additions & 0 deletions spark/src/main/scala/org/apache/comet/CometMetricsListener.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet

import org.apache.spark.CometSource
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener

class CometMetricsListener extends QueryExecutionListener {

override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
val stats = CometCoverageStats.forPlan(qe.executedPlan)
Comment thread
coderfender marked this conversation as resolved.
CometSource.recordStats(stats)
}

override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
Comment thread
coderfender marked this conversation as resolved.
Outdated
}
19 changes: 19 additions & 0 deletions spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,25 @@ class CometCoverageStats {
}
}

object CometCoverageStats {

/**
* Compute coverage stats for a plan without generating explain string.
*/
def forPlan(plan: SparkPlan): CometCoverageStats = {
val stats = new CometCoverageStats()
val explainInfo = new ExtendedExplainInfo()
explainInfo.generateTreeString(
CometExplainInfo.getActualPlan(plan),
0,
Seq(),
0,
new StringBuilder(),
stats)
stats
}
}

object CometExplainInfo {
val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo")

Expand Down
62 changes: 62 additions & 0 deletions spark/src/main/scala/org/apache/spark/CometSource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark

import org.apache.spark.metrics.source.Source

import com.codahale.metrics.{Counter, Gauge, MetricRegistry}

import org.apache.comet.CometCoverageStats

/**
* Exposes following metrics (hooked from CometCoverageStats)
* - operators.native: Total operators executed natively
* - operators.spark: Total operators that fell back to Spark
* - queries.planned: Total queries processed
* - transitions: Total Spark-to-Comet transitions
* - acceleration.ratio: native / (native + spark)
*/
object CometSource extends Source {
override val sourceName = "comet"
override val metricRegistry = new MetricRegistry()

val NATIVE_OPERATORS: Counter =
metricRegistry.counter(MetricRegistry.name("operators", "native"))
val SPARK_OPERATORS: Counter = metricRegistry.counter(MetricRegistry.name("operators", "spark"))
val QUERIES_PLANNED: Counter = metricRegistry.counter(MetricRegistry.name("queries", "planned"))
val TRANSITIONS: Counter = metricRegistry.counter(MetricRegistry.name("transitions"))

metricRegistry.register(
MetricRegistry.name("acceleration", "ratio"),
new Gauge[Double] {
override def getValue: Double = {
val native = NATIVE_OPERATORS.getCount
val total = native + SPARK_OPERATORS.getCount
if (total > 0) native.toDouble / total else 0.0
}
})

def recordStats(stats: CometCoverageStats): Unit = {
Comment thread
coderfender marked this conversation as resolved.
NATIVE_OPERATORS.inc(stats.cometOperators)
SPARK_OPERATORS.inc(stats.sparkOperators)
TRANSITIONS.inc(stats.transitions)
QUERIES_PLANNED.inc()
}
}
31 changes: 30 additions & 1 deletion spark/src/main/scala/org/apache/spark/Plugins.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD, EXECUTOR_MEMORY_OVERHEAD_FACTOR}
import org.apache.spark.sql.internal.StaticSQLConf

import org.apache.comet.CometConf.COMET_ONHEAP_ENABLED
import org.apache.comet.CometConf.{COMET_METRICS_ENABLED, COMET_ONHEAP_ENABLED}
import org.apache.comet.CometSparkSessionExtensions

/**
Expand Down Expand Up @@ -57,6 +57,9 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl
// register CometSparkSessionExtensions if it isn't already registered
CometDriverPlugin.registerCometSessionExtension(sc.conf)

// Register Comet metrics
CometDriverPlugin.registerCometMetrics(sc)

if (CometSparkSessionExtensions.shouldOverrideMemoryConf(sc.getConf)) {
val execMemOverhead = if (sc.getConf.contains(EXECUTOR_MEMORY_OVERHEAD.key)) {
sc.getConf.getSizeAsMb(EXECUTOR_MEMORY_OVERHEAD.key)
Expand Down Expand Up @@ -101,6 +104,32 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl
}

object CometDriverPlugin extends Logging {
def registerCometMetrics(sc: SparkContext): Unit = {
Comment thread
coderfender marked this conversation as resolved.
if (sc.getConf.getBoolean(
COMET_METRICS_ENABLED.key,
COMET_METRICS_ENABLED.defaultValue.get)) {
sc.env.metricsSystem.registerSource(CometSource)

val listenerKey = "spark.sql.queryExecutionListeners"
val listenerClass = "org.apache.comet.CometMetricsListener"
val listeners = sc.conf.get(listenerKey, "")
if (listeners.isEmpty) {
logInfo(s"Setting $listenerKey=$listenerClass")
sc.conf.set(listenerKey, listenerClass)
} else {
val currentListeners = listeners.split(",").map(_.trim)
if (!currentListeners.contains(listenerClass)) {
val newValue = s"$listeners,$listenerClass"
logInfo(s"Setting $listenerKey=$newValue")
sc.conf.set(listenerKey, newValue)
}
}
} else {
logInfo(
"Comet metrics reporting is disabled. Set spark.comet.metrics.enabled=true to enable.")
}
}

def registerCometSessionExtension(conf: SparkConf): Unit = {
val extensionKey = StaticSQLConf.SPARK_SESSION_EXTENSIONS.key
val extensionClass = classOf[CometSparkSessionExtensions].getName
Expand Down
42 changes: 41 additions & 1 deletion spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package org.apache.spark

import org.apache.spark.sql.CometTestBase
import java.io.File

import org.apache.spark.sql.{CometTestBase, SaveMode}
import org.apache.spark.sql.internal.StaticSQLConf

class CometPluginsSuite extends CometTestBase {
Expand All @@ -32,6 +34,7 @@ class CometPluginsSuite extends CometTestBase {
conf.set("spark.comet.enabled", "true")
conf.set("spark.comet.exec.enabled", "true")
conf.set("spark.comet.exec.onHeap.enabled", "true")
conf.set("spark.comet.metrics.enabled", "true")
conf
}

Expand Down Expand Up @@ -77,6 +80,43 @@ class CometPluginsSuite extends CometTestBase {
}
}

test("CometSource metrics are recorded") {
val nativeBefore = CometSource.NATIVE_OPERATORS.getCount
val queriesBefore = CometSource.QUERIES_PLANNED.getCount

withTempPath { dir =>
val path = new File(dir, "test.parquet").toString
spark.range(1000).toDF("id").write.mode(SaveMode.Overwrite).parquet(path)
spark.read.parquet(path).filter("id > 500").collect()
}
spark.sparkContext.listenerBus.waitUntilEmpty()
assert(
CometSource.QUERIES_PLANNED.getCount > queriesBefore,
"queries.planned should increment after query")
assert(
CometSource.NATIVE_OPERATORS.getCount > nativeBefore,
"operators.native should increment for native execution")
}

test("metrics not double counted with AQE") {
withSQLConf("spark.sql.adaptive.enabled" -> "true") {
withTempPath { dir =>
val path = new File(dir, "test.parquet").toString
spark.range(10000).toDF("id").write.mode(SaveMode.Overwrite).parquet(path)

spark.sparkContext.listenerBus.waitUntilEmpty()
val queriesBefore = CometSource.QUERIES_PLANNED.getCount
spark.read.parquet(path).filter("id > 100").collect()
spark.read.parquet(path).filter("id > 200").collect()
spark.sparkContext.listenerBus.waitUntilEmpty()
val queriesAfter = CometSource.QUERIES_PLANNED.getCount
Comment thread
coderfender marked this conversation as resolved.
assert(
queriesAfter == queriesBefore + 2,
s"Expected 2 queries, got ${queriesAfter - queriesBefore}")
}
}
}

test("Default Comet memory overhead") {
val execMemOverhead1 = spark.conf.get("spark.executor.memoryOverhead")
val execMemOverhead2 = spark.sessionState.conf.getConfString("spark.executor.memoryOverhead")
Expand Down
Loading