Skip to content

Commit 7735744

Browse files
committed
savE
1 parent 48ebd28 commit 7735744

1 file changed

Lines changed: 122 additions & 0 deletions

File tree

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.benchmark
21+
22+
import org.apache.spark.benchmark.Benchmark
23+
24+
import org.apache.comet.CometConf
25+
import org.apache.comet.CometConf.{SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT}
26+
27+
/**
28+
* Benchmark to measure partition column scan performance. This exercises the CometConstantVector
29+
* path where constant columns are exported as 1-element Arrow arrays and expanded on the native
30+
* side.
31+
*
32+
* To run this benchmark:
33+
* {{{
34+
* SPARK_GENERATE_BENCHMARK_FILES=1 make \
35+
* benchmark-org.apache.spark.sql.benchmark.CometPartitionColumnBenchmark
36+
* }}}
37+
*
38+
* Results will be written to "spark/benchmarks/CometPartitionColumnBenchmark-**results.txt".
39+
*/
40+
object CometPartitionColumnBenchmark extends CometBenchmarkBase {
41+
42+
def partitionColumnScanBenchmark(values: Int, numPartitionCols: Int): Unit = {
43+
val sqlBenchmark = new Benchmark(
44+
s"Partitioned Scan with $numPartitionCols partition column(s)",
45+
values,
46+
output = output)
47+
48+
withTempPath { dir =>
49+
withTempTable("parquetV1Table") {
50+
val partCols =
51+
(1 to numPartitionCols).map(i => s"'part$i' as p$i").mkString(", ")
52+
val partNames = (1 to numPartitionCols).map(i => s"p$i")
53+
val df = spark.sql(s"SELECT value as id, $partCols FROM $tbl")
54+
val parquetDir = dir.getCanonicalPath + "/parquetV1"
55+
df.write
56+
.partitionBy(partNames: _*)
57+
.mode("overwrite")
58+
.option("compression", "snappy")
59+
.parquet(parquetDir)
60+
spark.read.parquet(parquetDir).createOrReplaceTempView("parquetV1Table")
61+
62+
sqlBenchmark.addCase("SQL Parquet - Spark") { _ =>
63+
spark.sql("select sum(id) from parquetV1Table").noop()
64+
}
65+
66+
sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
67+
withSQLConf(
68+
CometConf.COMET_ENABLED.key -> "true",
69+
CometConf.COMET_EXEC_ENABLED.key -> "true",
70+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
71+
spark.sql("select sum(id) from parquetV1Table").noop()
72+
}
73+
}
74+
75+
sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
76+
withSQLConf(
77+
CometConf.COMET_ENABLED.key -> "true",
78+
CometConf.COMET_EXEC_ENABLED.key -> "true",
79+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
80+
spark.sql("select sum(id) from parquetV1Table").noop()
81+
}
82+
}
83+
84+
// Also benchmark reading partition columns themselves
85+
val partSumExpr =
86+
(1 to numPartitionCols).map(i => s"sum(length(p$i))").mkString(", ")
87+
88+
sqlBenchmark.addCase("SQL Parquet - Spark (read partition cols)") { _ =>
89+
spark.sql(s"select $partSumExpr from parquetV1Table").noop()
90+
}
91+
92+
sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion (partition cols)") { _ =>
93+
withSQLConf(
94+
CometConf.COMET_ENABLED.key -> "true",
95+
CometConf.COMET_EXEC_ENABLED.key -> "true",
96+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
97+
spark.sql(s"select $partSumExpr from parquetV1Table").noop()
98+
}
99+
}
100+
101+
sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat (partition cols)") { _ =>
102+
withSQLConf(
103+
CometConf.COMET_ENABLED.key -> "true",
104+
CometConf.COMET_EXEC_ENABLED.key -> "true",
105+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
106+
spark.sql(s"select $partSumExpr from parquetV1Table").noop()
107+
}
108+
}
109+
110+
sqlBenchmark.run()
111+
}
112+
}
113+
}
114+
115+
override def runCometBenchmark(mainArgs: Array[String]): Unit = {
116+
runBenchmarkWithTable("Partitioned Column Scan", 1024 * 1024 * 15) { v =>
117+
for (numPartCols <- List(1, 5)) {
118+
partitionColumnScanBenchmark(v, numPartCols)
119+
}
120+
}
121+
}
122+
}

0 commit comments

Comments
 (0)