Skip to content

Commit 0a21380

Browse files
schenksjclaude
andcommitted
bench: Delta benchmarks and TPC runner infrastructure
- CometDeltaReadBenchmark: per-type read benchmarks mirroring Iceberg - CometDeltaBenchmarkTest: end-to-end benchmark harness - CometBenchmarkBase: add prepareDeltaTable alongside prepareIcebergTable - create-delta-tables.py: TPC-H/TPC-DS Parquet-to-Delta converter - comet-delta.toml / comet-delta-hashjoin.toml: TPC engine configs Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6980fa1 commit 0a21380

6 files changed

Lines changed: 486 additions & 0 deletions

File tree

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""
19+
Convert TPC-H or TPC-DS Parquet data to Delta Lake tables.
20+
21+
Usage:
22+
spark-submit \
23+
--packages io.delta:delta-spark_2.12:3.3.2 \
24+
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
25+
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \
26+
create-delta-tables.py \
27+
--benchmark tpch \
28+
--parquet-path /path/to/tpch/parquet \
29+
--warehouse /path/to/delta-warehouse
30+
31+
spark-submit \
32+
--packages io.delta:delta-spark_2.12:3.3.2 \
33+
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
34+
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \
35+
create-delta-tables.py \
36+
--benchmark tpcds \
37+
--parquet-path /path/to/tpcds/parquet \
38+
--warehouse /path/to/delta-warehouse
39+
"""
40+
41+
import argparse
42+
import os
43+
import sys
44+
45+
from pyspark.sql import SparkSession
46+
47+
48+
TPCH_TABLES = [
49+
"customer", "lineitem", "nation", "orders",
50+
"part", "partsupp", "region", "supplier"
51+
]
52+
53+
TPCDS_TABLES = [
54+
"call_center", "catalog_page", "catalog_returns", "catalog_sales",
55+
"customer", "customer_address", "customer_demographics", "date_dim",
56+
"household_demographics", "income_band", "inventory", "item",
57+
"promotion", "reason", "ship_mode", "store", "store_returns",
58+
"store_sales", "time_dim", "warehouse", "web_page", "web_returns",
59+
"web_sales", "web_site"
60+
]
61+
62+
63+
def create_delta_tables(spark, benchmark, parquet_path, warehouse):
64+
tables = TPCH_TABLES if benchmark == "tpch" else TPCDS_TABLES
65+
66+
for table_name in tables:
67+
input_path = os.path.join(parquet_path, table_name)
68+
output_path = os.path.join(warehouse, table_name)
69+
70+
if not os.path.exists(input_path) and not input_path.startswith("s3"):
71+
print(f" Skipping {table_name}: {input_path} does not exist")
72+
continue
73+
74+
print(f" Converting {table_name}: {input_path} -> {output_path}")
75+
df = spark.read.parquet(input_path)
76+
df.write.format("delta").mode("overwrite").save(output_path)
77+
print(f" {table_name}: {df.count()} rows written")
78+
79+
80+
def main():
81+
parser = argparse.ArgumentParser(
82+
description="Convert TPC Parquet data to Delta Lake tables"
83+
)
84+
parser.add_argument(
85+
"--benchmark", required=True, choices=["tpch", "tpcds"],
86+
help="Which TPC benchmark to convert"
87+
)
88+
parser.add_argument(
89+
"--parquet-path", required=True,
90+
help="Path to the TPC Parquet data directory"
91+
)
92+
parser.add_argument(
93+
"--warehouse", required=True,
94+
help="Path to the Delta warehouse directory"
95+
)
96+
args = parser.parse_args()
97+
98+
spark = SparkSession.builder \
99+
.appName(f"Create Delta {args.benchmark.upper()} Tables") \
100+
.getOrCreate()
101+
102+
print(f"Converting {args.benchmark.upper()} tables from Parquet to Delta...")
103+
create_delta_tables(spark, args.benchmark, args.parquet_path, args.warehouse)
104+
print("Done.")
105+
106+
spark.stop()
107+
108+
109+
if __name__ == "__main__":
110+
main()
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[engine]
19+
name = "comet-delta-hashjoin"
20+
21+
[env]
22+
required = ["COMET_JAR", "DELTA_JAR", "DELTA_WAREHOUSE"]
23+
24+
[spark_submit]
25+
jars = ["$COMET_JAR", "$DELTA_JAR"]
26+
driver_class_path = ["$COMET_JAR", "$DELTA_JAR"]
27+
28+
[spark_conf]
29+
"spark.driver.extraClassPath" = "$COMET_JAR:$DELTA_JAR"
30+
"spark.executor.extraClassPath" = "$COMET_JAR:$DELTA_JAR"
31+
"spark.plugins" = "org.apache.spark.CometPlugin"
32+
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
33+
"spark.comet.expression.Cast.allowIncompatible" = "true"
34+
"spark.comet.enabled" = "true"
35+
"spark.comet.exec.enabled" = "true"
36+
"spark.comet.exec.shuffle.enabled" = "true"
37+
"spark.comet.scan.deltaNative.enabled" = "true"
38+
"spark.comet.explainFallback.enabled" = "true"
39+
"spark.sql.extensions" = "io.delta.sql.DeltaSparkSessionExtension"
40+
"spark.sql.catalog.spark_catalog" = "org.apache.spark.sql.delta.catalog.DeltaCatalog"
41+
"spark.sql.join.preferSortMergeJoin" = "false"
42+
"spark.sql.autoBroadcastJoinThreshold" = "-1"
43+
44+
[tpcbench_args]
45+
data_path = "$DELTA_WAREHOUSE"
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[engine]
19+
name = "comet-delta"
20+
21+
[env]
22+
required = ["COMET_JAR", "DELTA_JAR", "DELTA_WAREHOUSE"]
23+
24+
[spark_submit]
25+
jars = ["$COMET_JAR", "$DELTA_JAR"]
26+
driver_class_path = ["$COMET_JAR", "$DELTA_JAR"]
27+
28+
[spark_conf]
29+
"spark.driver.extraClassPath" = "$COMET_JAR:$DELTA_JAR"
30+
"spark.executor.extraClassPath" = "$COMET_JAR:$DELTA_JAR"
31+
"spark.plugins" = "org.apache.spark.CometPlugin"
32+
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
33+
"spark.comet.expression.Cast.allowIncompatible" = "true"
34+
"spark.comet.enabled" = "true"
35+
"spark.comet.exec.enabled" = "true"
36+
"spark.comet.scan.deltaNative.enabled" = "true"
37+
"spark.comet.explainFallback.enabled" = "true"
38+
"spark.sql.extensions" = "io.delta.sql.DeltaSparkSessionExtension"
39+
"spark.sql.catalog.spark_catalog" = "org.apache.spark.sql.delta.catalog.DeltaCatalog"
40+
41+
[tpcbench_args]
42+
data_path = "$DELTA_WAREHOUSE"
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet
21+
22+
import java.nio.file.Files
23+
24+
import org.apache.spark.SparkConf
25+
import org.apache.spark.sql.CometTestBase
26+
27+
/**
28+
* Quick benchmark comparing vanilla Spark+Delta vs Comet+Delta-kernel.
29+
*
30+
* Run with: export SPARK_LOCAL_IP=127.0.0.1 && ./mvnw -Pspark-3.5 -pl spark -am test \
31+
* -Dsuites=org.apache.comet.CometDeltaBenchmarkTest -Dmaven.gitcommitid.skip
32+
*/
33+
class CometDeltaBenchmarkTest extends CometTestBase {
34+
35+
private def deltaSparkAvailable: Boolean =
36+
try {
37+
Class.forName("org.apache.spark.sql.delta.DeltaParquetFileFormat")
38+
true
39+
} catch {
40+
case _: ClassNotFoundException => false
41+
}
42+
43+
override protected def sparkConf: SparkConf = {
44+
val conf = super.sparkConf
45+
conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
46+
conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
47+
conf.set("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem")
48+
conf.set("spark.databricks.delta.testOnly.dataFileNamePrefix", "")
49+
conf.set("spark.databricks.delta.testOnly.dvFileNamePrefix", "")
50+
conf
51+
}
52+
53+
test("benchmark: SUM aggregation - vanilla vs Comet native Delta") {
54+
assume(deltaSparkAvailable, "delta-spark not on the test classpath")
55+
56+
val tempDir = Files.createTempDirectory("comet-delta-bench").toFile
57+
try {
58+
val tablePath = new java.io.File(tempDir, "bench").getAbsolutePath
59+
val numRows = 5 * 1000 * 1000 // 5M rows
60+
val numFiles = 4
61+
62+
// scalastyle:off println
63+
println(s"\n=== Comet Delta Benchmark: $numRows rows, $numFiles files ===\n")
64+
// scalastyle:on println
65+
66+
// Generate data
67+
val ss = spark
68+
import ss.implicits._
69+
val df =
70+
(0 until numRows).map(i => (i.toLong, i * 1.5, s"name_$i")).toDF("id", "score", "name")
71+
df.repartition(numFiles).write.format("delta").save(tablePath)
72+
73+
val warmupIters = 2
74+
val benchIters = 5
75+
76+
// Vanilla Spark+Delta
77+
val vanillaTimes = new scala.collection.mutable.ArrayBuffer[Long]()
78+
withSQLConf(
79+
CometConf.COMET_ENABLED.key -> "false",
80+
CometConf.COMET_EXEC_ENABLED.key -> "false") {
81+
for (i <- 0 until (warmupIters + benchIters)) {
82+
val start = System.nanoTime()
83+
spark.sql(s"SELECT SUM(id), SUM(score) FROM delta.`$tablePath`").collect()
84+
val elapsed = (System.nanoTime() - start) / 1000000
85+
if (i >= warmupIters) vanillaTimes += elapsed
86+
}
87+
}
88+
89+
// Comet native Delta
90+
val cometTimes = new scala.collection.mutable.ArrayBuffer[Long]()
91+
withSQLConf(
92+
CometConf.COMET_ENABLED.key -> "true",
93+
CometConf.COMET_EXEC_ENABLED.key -> "true",
94+
CometConf.COMET_DELTA_NATIVE_ENABLED.key -> "true") {
95+
for (i <- 0 until (warmupIters + benchIters)) {
96+
val start = System.nanoTime()
97+
spark.sql(s"SELECT SUM(id), SUM(score) FROM delta.`$tablePath`").collect()
98+
val elapsed = (System.nanoTime() - start) / 1000000
99+
if (i >= warmupIters) cometTimes += elapsed
100+
}
101+
}
102+
103+
val vanillaAvg = vanillaTimes.sum.toDouble / vanillaTimes.size
104+
val cometAvg = cometTimes.sum.toDouble / cometTimes.size
105+
val speedup = vanillaAvg / cometAvg
106+
107+
// scalastyle:off println
108+
println(f"\n=== Results (${benchIters} iterations, ${warmupIters} warmup) ===")
109+
println(
110+
f" Vanilla Spark+Delta: ${vanillaAvg}%.0f ms avg (${vanillaTimes.mkString(", ")} ms)")
111+
println(f" Comet Native Delta: ${cometAvg}%.0f ms avg (${cometTimes.mkString(", ")} ms)")
112+
println(f" Speedup: ${speedup}%.2fx")
113+
println()
114+
// scalastyle:on println
115+
116+
// Don't assert on speedup - just report numbers.
117+
// On debug builds the native path may actually be slower due to no LTO.
118+
} finally {
119+
def deleteRecursively(file: java.io.File): Unit = {
120+
if (file.isDirectory) { Option(file.listFiles()).foreach(_.foreach(deleteRecursively)) }
121+
file.delete()
122+
}
123+
deleteRecursively(tempDir)
124+
}
125+
}
126+
127+
test("benchmark: filter scan - vanilla vs Comet native Delta") {
128+
assume(deltaSparkAvailable, "delta-spark not on the test classpath")
129+
130+
val tempDir = Files.createTempDirectory("comet-delta-bench-filter").toFile
131+
try {
132+
val tablePath = new java.io.File(tempDir, "bench").getAbsolutePath
133+
val numRows = 2 * 1000 * 1000
134+
val numFiles = 4
135+
136+
// scalastyle:off println
137+
println(s"\n=== Comet Delta Filter Benchmark: $numRows rows, $numFiles files ===\n")
138+
// scalastyle:on println
139+
140+
val ss = spark
141+
import ss.implicits._
142+
val df =
143+
(0 until numRows).map(i => (i.toLong, i * 1.5, s"name_$i")).toDF("id", "score", "name")
144+
df.repartition(numFiles).write.format("delta").save(tablePath)
145+
146+
val warmupIters = 2
147+
val benchIters = 5
148+
val query = s"SELECT COUNT(*), SUM(score) FROM delta.`$tablePath` WHERE id > ${numRows / 2}"
149+
150+
val vanillaTimes = new scala.collection.mutable.ArrayBuffer[Long]()
151+
withSQLConf(
152+
CometConf.COMET_ENABLED.key -> "false",
153+
CometConf.COMET_EXEC_ENABLED.key -> "false") {
154+
for (i <- 0 until (warmupIters + benchIters)) {
155+
val start = System.nanoTime()
156+
spark.sql(query).collect()
157+
val elapsed = (System.nanoTime() - start) / 1000000
158+
if (i >= warmupIters) vanillaTimes += elapsed
159+
}
160+
}
161+
162+
val cometTimes = new scala.collection.mutable.ArrayBuffer[Long]()
163+
withSQLConf(
164+
CometConf.COMET_ENABLED.key -> "true",
165+
CometConf.COMET_EXEC_ENABLED.key -> "true",
166+
CometConf.COMET_DELTA_NATIVE_ENABLED.key -> "true") {
167+
for (i <- 0 until (warmupIters + benchIters)) {
168+
val start = System.nanoTime()
169+
spark.sql(query).collect()
170+
val elapsed = (System.nanoTime() - start) / 1000000
171+
if (i >= warmupIters) cometTimes += elapsed
172+
}
173+
}
174+
175+
val vanillaAvg = vanillaTimes.sum.toDouble / vanillaTimes.size
176+
val cometAvg = cometTimes.sum.toDouble / cometTimes.size
177+
val speedup = vanillaAvg / cometAvg
178+
179+
// scalastyle:off println
180+
println(f"\n=== Filter Results (${benchIters} iterations, ${warmupIters} warmup) ===")
181+
println(
182+
f" Vanilla Spark+Delta: ${vanillaAvg}%.0f ms avg (${vanillaTimes.mkString(", ")} ms)")
183+
println(f" Comet Native Delta: ${cometAvg}%.0f ms avg (${cometTimes.mkString(", ")} ms)")
184+
println(f" Speedup: ${speedup}%.2fx")
185+
println()
186+
// scalastyle:on println
187+
} finally {
188+
def deleteRecursively(file: java.io.File): Unit = {
189+
if (file.isDirectory) { Option(file.listFiles()).foreach(_.foreach(deleteRecursively)) }
190+
file.delete()
191+
}
192+
deleteRecursively(tempDir)
193+
}
194+
}
195+
}

0 commit comments

Comments
 (0)