Skip to content

Commit 19b12c1

Browse files
andygroveclaude
andcommitted
Add partition column scan benchmark to CometReadBenchmark
Adds a benchmark that writes a partitioned parquet table and measures scan performance with 1 and 5 partition columns. Tests both reading data columns alongside partitions and reading partition columns themselves. This exercises the CometConstantVector → native scalar expansion path. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 1921e6c commit 19b12c1

1 file changed

Lines changed: 63 additions & 0 deletions

File tree

spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,63 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
620620
}
621621
}
622622

623+
def partitionColumnScanBenchmark(values: Int, numPartitionCols: Int): Unit = {
624+
val sqlBenchmark = new Benchmark(
625+
s"Partitioned Scan with $numPartitionCols partition column(s)",
626+
values,
627+
output = output)
628+
629+
withTempPath { dir =>
630+
withTempTable("parquetV1Table") {
631+
// Create a table with data columns and partition columns
632+
val partCols = (1 to numPartitionCols).map(i => s"'part$i' as p$i").mkString(", ")
633+
val partNames = (1 to numPartitionCols).map(i => s"p$i").mkString(", ")
634+
prepareTable(dir, spark.sql(s"SELECT value as id, $partCols FROM $tbl"), Some(partNames))
635+
636+
sqlBenchmark.addCase("SQL Parquet - Spark") { _ =>
637+
spark.sql("select sum(id) from parquetV1Table").noop()
638+
}
639+
640+
sqlBenchmark.addCase("SQL Parquet - Comet (Scan Only)") { _ =>
641+
withSQLConf(
642+
CometConf.COMET_ENABLED.key -> "true",
643+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_COMET) {
644+
spark.sql("select sum(id) from parquetV1Table").noop()
645+
}
646+
}
647+
648+
sqlBenchmark.addCase("SQL Parquet - Comet (Scan + Exec)") { _ =>
649+
withSQLConf(
650+
CometConf.COMET_ENABLED.key -> "true",
651+
CometConf.COMET_EXEC_ENABLED.key -> "true",
652+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
653+
spark.sql("select sum(id) from parquetV1Table").noop()
654+
}
655+
}
656+
657+
// Also benchmark reading partition columns themselves
658+
val partSumExpr = (1 to numPartitionCols)
659+
.map(i => s"sum(length(p$i))")
660+
.mkString(", ")
661+
662+
sqlBenchmark.addCase("SQL Parquet - Spark (read partition cols)") { _ =>
663+
spark.sql(s"select $partSumExpr from parquetV1Table").noop()
664+
}
665+
666+
sqlBenchmark.addCase("SQL Parquet - Comet (read partition cols)") { _ =>
667+
withSQLConf(
668+
CometConf.COMET_ENABLED.key -> "true",
669+
CometConf.COMET_EXEC_ENABLED.key -> "true",
670+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
671+
spark.sql(s"select $partSumExpr from parquetV1Table").noop()
672+
}
673+
}
674+
675+
sqlBenchmark.run()
676+
}
677+
}
678+
}
679+
623680
def sortedLgStrFilterScanBenchmark(values: Int, fractionOfZeros: Double): Unit = {
624681
val percentageOfZeros = fractionOfZeros * 100
625682
val benchmark =
@@ -751,6 +808,12 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
751808
sortedLgStrFilterScanBenchmark(v, fractionOfZeros)
752809
}
753810
}
811+
812+
runBenchmarkWithTable("Partitioned Column Scan", 1024 * 1024 * 15) { v =>
813+
for (numPartCols <- List(1, 5)) {
814+
partitionColumnScanBenchmark(v, numPartCols)
815+
}
816+
}
754817
}
755818
}
756819

0 commit comments

Comments
 (0)