@@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.SparkPlan
2828import org .apache .spark .sql .execution .adaptive .AdaptiveSparkPlanExec
2929
3030import org .apache .comet .CometConf
31+ import org .apache .comet .rules .CometScanRule
3132import org .apache .comet .serde .OperatorOuterClass
3233import org .apache .comet .serde .operator .CometIcebergNativeScan
3334
@@ -301,9 +302,72 @@ object CometOperatorSerdeBenchmark extends CometBenchmarkBase {
301302 }
302303 }
303304
305+ /**
306+ * Benchmarks CometScanRule.apply() on Iceberg BatchScanExec plans.
307+ *
308+ * This measures the validation overhead when converting Spark Iceberg scans to Comet scans.
309+ */
310+ def icebergScanRuleBenchmark (numPartitions : Int ): Unit = {
311+ if (! icebergAvailable) {
312+ // scalastyle:off println
313+ println(" Iceberg not available in classpath, skipping benchmark" )
314+ // scalastyle:on println
315+ return
316+ }
317+
318+ withTempIcebergDir { warehouseDir =>
319+ withSQLConf(
320+ " spark.sql.catalog.bench_cat" -> " org.apache.iceberg.spark.SparkCatalog" ,
321+ " spark.sql.catalog.bench_cat.type" -> " hadoop" ,
322+ " spark.sql.catalog.bench_cat.warehouse" -> warehouseDir.getAbsolutePath,
323+ CometConf .COMET_ENABLED .key -> " true" ,
324+ CometConf .COMET_EXEC_ENABLED .key -> " true" ,
325+ CometConf .COMET_ICEBERG_NATIVE_ENABLED .key -> " true" ) {
326+
327+ // Create the partitioned table
328+ createPartitionedIcebergTable(warehouseDir, numPartitions)
329+
330+ val fullTableName = " bench_cat.db.serde_bench_table"
331+
332+ // Get the sparkPlan (before post-hoc rules like CometScanRule)
333+ val df = spark.sql(s " SELECT * FROM $fullTableName" )
334+ val sparkPlan = df.queryExecution.sparkPlan
335+
336+ // scalastyle:off println
337+ println(s " SparkPlan class: ${sparkPlan.getClass.getSimpleName}" )
338+ // scalastyle:on println
339+
340+ val rule = CometScanRule (spark)
341+ val iterations = 100
342+
343+ val benchmark = new Benchmark (
344+ s " CometScanRule apply ( $numPartitions partitions) " ,
345+ iterations,
346+ output = output)
347+
348+ benchmark.addCase(" CometScanRule.apply(sparkPlan)" ) { _ =>
349+ var i = 0
350+ while (i < iterations) {
351+ rule.apply(sparkPlan)
352+ i += 1
353+ }
354+ }
355+
356+ benchmark.run()
357+
358+ // Cleanup
359+ spark.sql(s " DROP TABLE IF EXISTS $fullTableName" )
360+ }
361+ }
362+ }
363+
304364 override def runCometBenchmark (args : Array [String ]): Unit = {
305365 val numPartitions = if (args.nonEmpty) args(0 ).toInt else 30000
306366
367+ runBenchmark(" CometScanRule Benchmark" ) {
368+ icebergScanRuleBenchmark(numPartitions)
369+ }
370+
307371 runBenchmark(" IcebergScan Operator Serde Benchmark" ) {
308372 icebergScanSerdeBenchmark(numPartitions)
309373 }
0 commit comments