Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@ import org.apache.parquet.crypto.keytools.mocks.InMemoryKMS
import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession}
import org.apache.spark.sql.comet._
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
import org.apache.spark.sql.execution.{ColumnarToRowExec, InputAdapter, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DecimalType

import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions

trait CometBenchmarkBase extends SqlBasedBenchmark {
trait CometBenchmarkBase extends SqlBasedBenchmark with AdaptiveSparkPlanHelper {
override def getSparkSession: SparkSession = {
val conf = new SparkConf()
.setAppName("CometReadBenchmark")
Expand Down Expand Up @@ -88,28 +92,6 @@ trait CometBenchmarkBase extends SqlBasedBenchmark {
}
}

/** Runs function `f` with Comet on and off. */
final def runWithComet(name: String, cardinality: Long)(f: => Unit): Unit = {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is no longer used

val benchmark = new Benchmark(name, cardinality, output = output)

benchmark.addCase(s"$name - Spark ") { _ =>
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
f
}
}

benchmark.addCase(s"$name - Comet") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
SQLConf.ANSI_ENABLED.key -> "false") {
f
}
}

benchmark.run()
}

/**
* Runs an expression benchmark with standard cases: Spark, Comet (Scan), Comet (Scan + Exec).
* This provides a consistent benchmark structure for expression evaluation.
Expand Down Expand Up @@ -149,6 +131,29 @@ trait CometBenchmarkBase extends SqlBasedBenchmark {
CometConf.COMET_EXEC_ENABLED.key -> "true",
"spark.sql.optimizer.constantFolding.enabled" -> "false") ++ extraCometConfigs

// Check that the plan is fully Comet native before running the benchmark
withSQLConf(cometExecConfigs.toSeq: _*) {
val df = spark.sql(query)
df.noop()
val plan = stripAQEPlan(df.queryExecution.executedPlan)
findFirstNonCometOperator(plan) match {
case Some(op) =>
// scalastyle:off println
println()
println("=" * 80)
println("WARNING: Benchmark plan is NOT fully Comet native!")
println(s"First non-Comet operator: ${op.nodeName}")
println("=" * 80)
println("Query plan:")
println(plan.treeString)
println("=" * 80)
println()
// scalastyle:on println
case None =>
// All operators are Comet native, no warning needed
}
}

benchmark.addCase("Comet (Scan + Exec)") { _ =>
withSQLConf(cometExecConfigs.toSeq: _*) {
spark.sql(query).noop()
Expand All @@ -158,6 +163,28 @@ trait CometBenchmarkBase extends SqlBasedBenchmark {
benchmark.run()
}

/**
* Finds the first non-Comet operator in the plan, if any. This is used to verify that
* benchmarks are running fully on Comet native when expected.
*
* Based on CometTestBase.findFirstNonCometOperator.
*/
protected def findFirstNonCometOperator(plan: SparkPlan): Option[SparkPlan] = {
Copy link
Copy Markdown
Contributor

@comphead comphead Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we keep one version for this methods to keep in sync the test behavior?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this to a new CometPlanChecker trait to avoid duplication.

We may want to rename this trait in the future if we move more code there.

plan.foreach {
case _: CometNativeScanExec | _: CometScanExec | _: CometBatchScanExec |
_: CometIcebergNativeScanExec =>
case _: CometSinkPlaceHolder | _: CometScanWrapper =>
case _: CometColumnarToRowExec =>
case _: CometSparkToColumnarExec =>
case _: CometExec | _: CometShuffleExchangeExec =>
case _: CometBroadcastExchangeExec =>
case _: WholeStageCodegenExec | _: ColumnarToRowExec | _: InputAdapter =>
case op =>
return Some(op)
}
None
}

protected def prepareTable(dir: File, df: DataFrame, partition: Option[String] = None): Unit = {
val testDf = if (partition.isDefined) {
df.write.partitionBy(partition.get)
Expand Down