Skip to content

Commit 454ca68

Browse files
authored
refactor: Split read benchmarks and add addParquetScanCases helper (#3407)
1 parent d89e50a commit 454ca68

4 files changed

Lines changed: 214 additions & 281 deletions

File tree

spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf
3838
import org.apache.spark.sql.types.DecimalType
3939

4040
import org.apache.comet.CometConf
41+
import org.apache.comet.CometConf.{SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT}
4142
import org.apache.comet.CometSparkSessionExtensions
4243

4344
trait CometBenchmarkBase
@@ -164,6 +165,32 @@ trait CometBenchmarkBase
164165
benchmark.run()
165166
}
166167

168+
protected def addParquetScanCases(
169+
benchmark: Benchmark,
170+
query: String,
171+
caseSuffix: String = "",
172+
extraConf: Map[String, String] = Map.empty): Unit = {
173+
val suffix = if (caseSuffix.nonEmpty) s" ($caseSuffix)" else ""
174+
175+
benchmark.addCase(s"SQL Parquet - Spark$suffix") { _ =>
176+
withSQLConf(extraConf.toSeq: _*) {
177+
spark.sql(query).noop()
178+
}
179+
}
180+
181+
for (scanImpl <- Seq(SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT)) {
182+
benchmark.addCase(s"SQL Parquet - Comet ($scanImpl)$suffix") { _ =>
183+
withSQLConf(
184+
(extraConf ++ Map(
185+
CometConf.COMET_ENABLED.key -> "true",
186+
CometConf.COMET_EXEC_ENABLED.key -> "true",
187+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl)).toSeq: _*) {
188+
spark.sql(query).noop()
189+
}
190+
}
191+
}
192+
}
193+
167194
protected def prepareTable(dir: File, df: DataFrame, partition: Option[String] = None): Unit = {
168195
val testDf = if (partition.isDefined) {
169196
df.write.partitionBy(partition.get)
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.benchmark
21+
22+
import org.apache.spark.benchmark.Benchmark
23+
import org.apache.spark.sql.types._
24+
25+
import org.apache.comet.CometConf
26+
27+
/**
28+
* Benchmark to measure Comet Iceberg read performance. To run this benchmark:
29+
* `SPARK_GENERATE_BENCHMARK_FILES=1 make
30+
* benchmark-org.apache.spark.sql.benchmark.CometIcebergReadBenchmark` Results will be written to
31+
* "spark/benchmarks/CometIcebergReadBenchmark-**results.txt".
32+
*/
33+
object CometIcebergReadBenchmark extends CometBenchmarkBase {
34+
35+
def icebergScanBenchmark(values: Int, dataType: DataType): Unit = {
36+
val sqlBenchmark =
37+
new Benchmark(s"SQL Single ${dataType.sql} Iceberg Column Scan", values, output = output)
38+
39+
withTempPath { dir =>
40+
withTempTable("icebergTable") {
41+
prepareIcebergTable(
42+
dir,
43+
spark.sql(s"SELECT CAST(value as ${dataType.sql}) id FROM $tbl"),
44+
"icebergTable")
45+
46+
val query = dataType match {
47+
case BooleanType => "sum(cast(id as bigint))"
48+
case _ => "sum(id)"
49+
}
50+
51+
sqlBenchmark.addCase("SQL Iceberg - Spark") { _ =>
52+
withSQLConf(
53+
"spark.memory.offHeap.enabled" -> "true",
54+
"spark.memory.offHeap.size" -> "10g") {
55+
spark.sql(s"select $query from icebergTable").noop()
56+
}
57+
}
58+
59+
sqlBenchmark.addCase("SQL Iceberg - Comet Iceberg-Rust") { _ =>
60+
withSQLConf(
61+
CometConf.COMET_ENABLED.key -> "true",
62+
CometConf.COMET_EXEC_ENABLED.key -> "true",
63+
"spark.memory.offHeap.enabled" -> "true",
64+
"spark.memory.offHeap.size" -> "10g",
65+
CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
66+
spark.sql(s"select $query from icebergTable").noop()
67+
}
68+
}
69+
70+
sqlBenchmark.run()
71+
}
72+
}
73+
}
74+
75+
override def runCometBenchmark(mainArgs: Array[String]): Unit = {
76+
runBenchmarkWithTable("SQL Single Numeric Iceberg Column Scan", 1024 * 1024 * 128) { v =>
77+
Seq(BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType)
78+
.foreach(icebergScanBenchmark(v, _))
79+
}
80+
}
81+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.benchmark
21+
22+
import org.apache.spark.benchmark.Benchmark
23+
24+
/**
25+
* Benchmark to measure partition column scan performance. This exercises the CometConstantVector
26+
* path where constant columns are exported as 1-element Arrow arrays and expanded on the native
27+
* side.
28+
*
29+
* To run this benchmark:
30+
* {{{
31+
* SPARK_GENERATE_BENCHMARK_FILES=1 make \
32+
* benchmark-org.apache.spark.sql.benchmark.CometPartitionColumnBenchmark
33+
* }}}
34+
*
35+
* Results will be written to "spark/benchmarks/CometPartitionColumnBenchmark-**results.txt".
36+
*/
37+
object CometPartitionColumnBenchmark extends CometBenchmarkBase {
38+
39+
def partitionColumnScanBenchmark(values: Int, numPartitionCols: Int): Unit = {
40+
val sqlBenchmark = new Benchmark(
41+
s"Partitioned Scan with $numPartitionCols partition column(s)",
42+
values,
43+
output = output)
44+
45+
withTempPath { dir =>
46+
withTempTable("parquetV1Table") {
47+
val partCols =
48+
(1 to numPartitionCols).map(i => s"'part$i' as p$i").mkString(", ")
49+
val partNames = (1 to numPartitionCols).map(i => s"p$i")
50+
val df = spark.sql(s"SELECT value as id, $partCols FROM $tbl")
51+
val parquetDir = dir.getCanonicalPath + "/parquetV1"
52+
df.write
53+
.partitionBy(partNames: _*)
54+
.mode("overwrite")
55+
.option("compression", "snappy")
56+
.parquet(parquetDir)
57+
spark.read.parquet(parquetDir).createOrReplaceTempView("parquetV1Table")
58+
59+
addParquetScanCases(sqlBenchmark, "select sum(id) from parquetV1Table")
60+
61+
// Also benchmark reading partition columns themselves
62+
val partSumExpr =
63+
(1 to numPartitionCols).map(i => s"sum(length(p$i))").mkString(", ")
64+
65+
addParquetScanCases(
66+
sqlBenchmark,
67+
s"select $partSumExpr from parquetV1Table",
68+
caseSuffix = "partition cols")
69+
70+
sqlBenchmark.run()
71+
}
72+
}
73+
}
74+
75+
override def runCometBenchmark(mainArgs: Array[String]): Unit = {
76+
runBenchmarkWithTable("Partitioned Column Scan", 1024 * 1024 * 15) { v =>
77+
for (numPartCols <- List(1, 5)) {
78+
partitionColumnScanBenchmark(v, numPartCols)
79+
}
80+
}
81+
}
82+
}

0 commit comments

Comments
 (0)