-
Notifications
You must be signed in to change notification settings - Fork 329
chore: Add checks to microbenchmarks for plan running natively in Comet #3045
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,14 +31,18 @@ import org.apache.parquet.crypto.keytools.mocks.InMemoryKMS | |
| import org.apache.spark.SparkConf | ||
| import org.apache.spark.benchmark.Benchmark | ||
| import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession} | ||
| import org.apache.spark.sql.comet._ | ||
| import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec | ||
| import org.apache.spark.sql.execution.{ColumnarToRowExec, InputAdapter, SparkPlan, WholeStageCodegenExec} | ||
| import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper | ||
| import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.types.DecimalType | ||
|
|
||
| import org.apache.comet.CometConf | ||
| import org.apache.comet.CometSparkSessionExtensions | ||
|
|
||
| trait CometBenchmarkBase extends SqlBasedBenchmark { | ||
| trait CometBenchmarkBase extends SqlBasedBenchmark with AdaptiveSparkPlanHelper { | ||
| override def getSparkSession: SparkSession = { | ||
| val conf = new SparkConf() | ||
| .setAppName("CometReadBenchmark") | ||
|
|
@@ -88,28 +92,6 @@ trait CometBenchmarkBase extends SqlBasedBenchmark { | |
| } | ||
| } | ||
|
|
||
| /** Runs function `f` with Comet on and off. */ | ||
| final def runWithComet(name: String, cardinality: Long)(f: => Unit): Unit = { | ||
| val benchmark = new Benchmark(name, cardinality, output = output) | ||
|
|
||
| benchmark.addCase(s"$name - Spark ") { _ => | ||
| withSQLConf(CometConf.COMET_ENABLED.key -> "false") { | ||
| f | ||
| } | ||
| } | ||
|
|
||
| benchmark.addCase(s"$name - Comet") { _ => | ||
| withSQLConf( | ||
| CometConf.COMET_ENABLED.key -> "true", | ||
| CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
| SQLConf.ANSI_ENABLED.key -> "false") { | ||
| f | ||
| } | ||
| } | ||
|
|
||
| benchmark.run() | ||
| } | ||
|
|
||
| /** | ||
| * Runs an expression benchmark with standard cases: Spark, Comet (Scan), Comet (Scan + Exec). | ||
| * This provides a consistent benchmark structure for expression evaluation. | ||
|
|
@@ -149,6 +131,29 @@ trait CometBenchmarkBase extends SqlBasedBenchmark { | |
| CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
| "spark.sql.optimizer.constantFolding.enabled" -> "false") ++ extraCometConfigs | ||
|
|
||
| // Check that the plan is fully Comet native before running the benchmark | ||
| withSQLConf(cometExecConfigs.toSeq: _*) { | ||
| val df = spark.sql(query) | ||
| df.noop() | ||
| val plan = stripAQEPlan(df.queryExecution.executedPlan) | ||
| findFirstNonCometOperator(plan) match { | ||
| case Some(op) => | ||
| // scalastyle:off println | ||
| println() | ||
| println("=" * 80) | ||
| println("WARNING: Benchmark plan is NOT fully Comet native!") | ||
| println(s"First non-Comet operator: ${op.nodeName}") | ||
| println("=" * 80) | ||
| println("Query plan:") | ||
| println(plan.treeString) | ||
| println("=" * 80) | ||
| println() | ||
| // scalastyle:on println | ||
| case None => | ||
| // All operators are Comet native, no warning needed | ||
| } | ||
| } | ||
|
|
||
| benchmark.addCase("Comet (Scan + Exec)") { _ => | ||
| withSQLConf(cometExecConfigs.toSeq: _*) { | ||
| spark.sql(query).noop() | ||
|
|
@@ -158,6 +163,28 @@ trait CometBenchmarkBase extends SqlBasedBenchmark { | |
| benchmark.run() | ||
| } | ||
|
|
||
| /** | ||
| * Finds the first non-Comet operator in the plan, if any. This is used to verify that | ||
| * benchmarks are running fully on Comet native when expected. | ||
| * | ||
| * Based on CometTestBase.findFirstNonCometOperator. | ||
| */ | ||
| protected def findFirstNonCometOperator(plan: SparkPlan): Option[SparkPlan] = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we keep one version for this methods to keep in sync the test behavior?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I moved this to a new We may want to rename this trait in the future if we move more code there. |
||
| plan.foreach { | ||
| case _: CometNativeScanExec | _: CometScanExec | _: CometBatchScanExec | | ||
| _: CometIcebergNativeScanExec => | ||
| case _: CometSinkPlaceHolder | _: CometScanWrapper => | ||
| case _: CometColumnarToRowExec => | ||
| case _: CometSparkToColumnarExec => | ||
| case _: CometExec | _: CometShuffleExchangeExec => | ||
| case _: CometBroadcastExchangeExec => | ||
| case _: WholeStageCodegenExec | _: ColumnarToRowExec | _: InputAdapter => | ||
| case op => | ||
| return Some(op) | ||
| } | ||
| None | ||
| } | ||
|
|
||
| protected def prepareTable(dir: File, df: DataFrame, partition: Option[String] = None): Unit = { | ||
| val testDf = if (partition.isDefined) { | ||
| df.write.partitionBy(partition.get) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is no longer used