Skip to content

Commit 02e8ca4

Browse files
andygroveclaude
andcommitted
refactor: Split read benchmarks and add addParquetScanCases helper
Extract iceberg benchmarks into CometIcebergReadBenchmark and add addParquetScanCases helper to CometBenchmarkBase to eliminate the repeated 3-case pattern (Spark / native_datafusion / native_iceberg_compat) across all parquet benchmarks. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 7735744 commit 02e8ca4

4 files changed

Lines changed: 137 additions & 326 deletions

File tree

spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf
3838
import org.apache.spark.sql.types.DecimalType
3939

4040
import org.apache.comet.CometConf
41+
import org.apache.comet.CometConf.{SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT}
4142
import org.apache.comet.CometSparkSessionExtensions
4243

4344
trait CometBenchmarkBase
@@ -164,6 +165,32 @@ trait CometBenchmarkBase
164165
benchmark.run()
165166
}
166167

168+
protected def addParquetScanCases(
169+
benchmark: Benchmark,
170+
query: String,
171+
caseSuffix: String = "",
172+
extraConf: Map[String, String] = Map.empty): Unit = {
173+
val suffix = if (caseSuffix.nonEmpty) s" ($caseSuffix)" else ""
174+
175+
benchmark.addCase(s"SQL Parquet - Spark$suffix") { _ =>
176+
withSQLConf(extraConf.toSeq: _*) {
177+
spark.sql(query).noop()
178+
}
179+
}
180+
181+
for (scanImpl <- Seq(SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT)) {
182+
benchmark.addCase(s"SQL Parquet - Comet ($scanImpl)$suffix") { _ =>
183+
withSQLConf(
184+
(extraConf ++ Map(
185+
CometConf.COMET_ENABLED.key -> "true",
186+
CometConf.COMET_EXEC_ENABLED.key -> "true",
187+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl)).toSeq: _*) {
188+
spark.sql(query).noop()
189+
}
190+
}
191+
}
192+
}
193+
167194
protected def prepareTable(dir: File, df: DataFrame, partition: Option[String] = None): Unit = {
168195
val testDf = if (partition.isDefined) {
169196
df.write.partitionBy(partition.get)
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.benchmark
21+
22+
import org.apache.spark.benchmark.Benchmark
23+
import org.apache.spark.sql.types._
24+
25+
import org.apache.comet.CometConf
26+
27+
/**
28+
* Benchmark to measure Comet Iceberg read performance. To run this benchmark:
29+
* `SPARK_GENERATE_BENCHMARK_FILES=1 make
30+
* benchmark-org.apache.spark.sql.benchmark.CometIcebergReadBenchmark` Results will be written to
31+
* "spark/benchmarks/CometIcebergReadBenchmark-**results.txt".
32+
*/
33+
object CometIcebergReadBenchmark extends CometBenchmarkBase {
34+
35+
def icebergScanBenchmark(values: Int, dataType: DataType): Unit = {
36+
val sqlBenchmark =
37+
new Benchmark(s"SQL Single ${dataType.sql} Iceberg Column Scan", values, output = output)
38+
39+
withTempPath { dir =>
40+
withTempTable("icebergTable") {
41+
prepareIcebergTable(
42+
dir,
43+
spark.sql(s"SELECT CAST(value as ${dataType.sql}) id FROM $tbl"),
44+
"icebergTable")
45+
46+
val query = dataType match {
47+
case BooleanType => "sum(cast(id as bigint))"
48+
case _ => "sum(id)"
49+
}
50+
51+
sqlBenchmark.addCase("SQL Iceberg - Spark") { _ =>
52+
withSQLConf(
53+
"spark.memory.offHeap.enabled" -> "true",
54+
"spark.memory.offHeap.size" -> "10g") {
55+
spark.sql(s"select $query from icebergTable").noop()
56+
}
57+
}
58+
59+
sqlBenchmark.addCase("SQL Iceberg - Comet Iceberg-Rust") { _ =>
60+
withSQLConf(
61+
CometConf.COMET_ENABLED.key -> "true",
62+
CometConf.COMET_EXEC_ENABLED.key -> "true",
63+
"spark.memory.offHeap.enabled" -> "true",
64+
"spark.memory.offHeap.size" -> "10g",
65+
CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
66+
spark.sql(s"select $query from icebergTable").noop()
67+
}
68+
}
69+
70+
sqlBenchmark.run()
71+
}
72+
}
73+
}
74+
75+
override def runCometBenchmark(mainArgs: Array[String]): Unit = {
76+
runBenchmarkWithTable("SQL Single Numeric Iceberg Column Scan", 1024 * 1024 * 128) { v =>
77+
Seq(BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType)
78+
.foreach(icebergScanBenchmark(v, _))
79+
}
80+
}
81+
}

spark/src/test/scala/org/apache/spark/sql/benchmark/CometPartitionColumnBenchmark.scala

Lines changed: 5 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@ package org.apache.spark.sql.benchmark
2121

2222
import org.apache.spark.benchmark.Benchmark
2323

24-
import org.apache.comet.CometConf
25-
import org.apache.comet.CometConf.{SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT}
26-
2724
/**
2825
* Benchmark to measure partition column scan performance. This exercises the CometConstantVector
2926
* path where constant columns are exported as 1-element Arrow arrays and expanded on the native
@@ -59,53 +56,16 @@ object CometPartitionColumnBenchmark extends CometBenchmarkBase {
5956
.parquet(parquetDir)
6057
spark.read.parquet(parquetDir).createOrReplaceTempView("parquetV1Table")
6158

62-
sqlBenchmark.addCase("SQL Parquet - Spark") { _ =>
63-
spark.sql("select sum(id) from parquetV1Table").noop()
64-
}
65-
66-
sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
67-
withSQLConf(
68-
CometConf.COMET_ENABLED.key -> "true",
69-
CometConf.COMET_EXEC_ENABLED.key -> "true",
70-
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
71-
spark.sql("select sum(id) from parquetV1Table").noop()
72-
}
73-
}
74-
75-
sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
76-
withSQLConf(
77-
CometConf.COMET_ENABLED.key -> "true",
78-
CometConf.COMET_EXEC_ENABLED.key -> "true",
79-
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
80-
spark.sql("select sum(id) from parquetV1Table").noop()
81-
}
82-
}
59+
addParquetScanCases(sqlBenchmark, "select sum(id) from parquetV1Table")
8360

8461
// Also benchmark reading partition columns themselves
8562
val partSumExpr =
8663
(1 to numPartitionCols).map(i => s"sum(length(p$i))").mkString(", ")
8764

88-
sqlBenchmark.addCase("SQL Parquet - Spark (read partition cols)") { _ =>
89-
spark.sql(s"select $partSumExpr from parquetV1Table").noop()
90-
}
91-
92-
sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion (partition cols)") { _ =>
93-
withSQLConf(
94-
CometConf.COMET_ENABLED.key -> "true",
95-
CometConf.COMET_EXEC_ENABLED.key -> "true",
96-
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
97-
spark.sql(s"select $partSumExpr from parquetV1Table").noop()
98-
}
99-
}
100-
101-
sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat (partition cols)") { _ =>
102-
withSQLConf(
103-
CometConf.COMET_ENABLED.key -> "true",
104-
CometConf.COMET_EXEC_ENABLED.key -> "true",
105-
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
106-
spark.sql(s"select $partSumExpr from parquetV1Table").noop()
107-
}
108-
}
65+
addParquetScanCases(
66+
sqlBenchmark,
67+
s"select $partSumExpr from parquetV1Table",
68+
caseSuffix = "partition cols")
10969

11070
sqlBenchmark.run()
11171
}

0 commit comments

Comments
 (0)