Skip to content

Commit 9dca0f3

Browse files
committed
test: add Iceberg compaction unit tests and TPC-H benchmark
1 parent 80051d0 commit 9dca0f3

3 files changed

Lines changed: 1080 additions & 0 deletions

File tree

Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet
21+
22+
import java.io.File
23+
import java.nio.file.Files
24+
25+
import org.apache.iceberg.spark.Spark3Util
26+
import org.apache.iceberg.spark.actions.SparkActions
27+
import org.apache.spark.sql.CometTestBase
28+
import org.apache.spark.sql.comet.CometNativeCompaction
29+
30+
/**
31+
* Simple benchmark test for Iceberg compaction comparing Spark default vs Native compaction. Run
32+
* with: mvn test -pl spark -Dsuites=org.apache.comet.CometIcebergCompactionBenchmarkTest
33+
*/
34+
class CometIcebergCompactionBenchmarkTest extends CometTestBase {
35+
36+
private val dataLocation = "/tmp/tpch/sf1_parquet"
37+
38+
private def icebergAvailable: Boolean = {
39+
try {
40+
Class.forName("org.apache.iceberg.catalog.Catalog")
41+
true
42+
} catch {
43+
case _: ClassNotFoundException => false
44+
}
45+
}
46+
47+
private def tpcDataAvailable: Boolean = {
48+
new File(s"$dataLocation/lineitem").exists()
49+
}
50+
51+
private def withTempIcebergDir(f: File => Unit): Unit = {
52+
val dir = Files.createTempDirectory("comet-benchmark").toFile
53+
try {
54+
f(dir)
55+
} finally {
56+
def deleteRecursively(file: File): Unit = {
57+
if (file.isDirectory) file.listFiles().foreach(deleteRecursively)
58+
file.delete()
59+
}
60+
deleteRecursively(dir)
61+
}
62+
}
63+
64+
private def icebergCatalogConf(warehouseDir: File): Map[String, String] = Map(
65+
"spark.sql.catalog.bench_cat" -> "org.apache.iceberg.spark.SparkCatalog",
66+
"spark.sql.catalog.bench_cat.type" -> "hadoop",
67+
"spark.sql.catalog.bench_cat.warehouse" -> warehouseDir.getAbsolutePath,
68+
CometConf.COMET_ENABLED.key -> "true",
69+
CometConf.COMET_EXEC_ENABLED.key -> "true",
70+
CometConf.COMET_ICEBERG_COMPACTION_ENABLED.key -> "true")
71+
72+
private def runTableBenchmark(
73+
warehouseDir: File,
74+
sourceTable: String,
75+
schema: String,
76+
numFragments: Int,
77+
rowsPerFragment: Int): (Long, Long, Double) = {
78+
79+
val tableName = s"bench_cat.db.${sourceTable}_bench"
80+
81+
// Create fragmented Iceberg table
82+
spark.sql(s"CREATE TABLE $tableName ($schema) USING iceberg")
83+
84+
// Insert fragments from TPC-H source
85+
val cols = schema.split(",").map(_.trim.split(" ")(0)).mkString(", ")
86+
for (i <- 0 until numFragments) {
87+
spark.sql(s"""
88+
INSERT INTO $tableName
89+
SELECT $cols FROM parquet.`$dataLocation/$sourceTable`
90+
LIMIT $rowsPerFragment OFFSET ${i * rowsPerFragment}
91+
""")
92+
}
93+
94+
val filesBefore = spark.sql(s"SELECT * FROM $tableName.files").count()
95+
96+
// Benchmark 1: Spark default compaction
97+
val sparkStart = System.nanoTime()
98+
val sparkTable = Spark3Util.loadIcebergTable(spark, tableName)
99+
SparkActions.get(spark).rewriteDataFiles(sparkTable).binPack().execute()
100+
val sparkDuration = (System.nanoTime() - sparkStart) / 1000000
101+
102+
// Re-create for native benchmark
103+
spark.sql(s"DROP TABLE $tableName")
104+
spark.sql(s"CREATE TABLE $tableName ($schema) USING iceberg")
105+
for (i <- 0 until numFragments) {
106+
spark.sql(s"""
107+
INSERT INTO $tableName
108+
SELECT $cols FROM parquet.`$dataLocation/$sourceTable`
109+
LIMIT $rowsPerFragment OFFSET ${i * rowsPerFragment}
110+
""")
111+
}
112+
113+
// Benchmark 2: Native compaction
114+
val nativeStart = System.nanoTime()
115+
val nativeTable = Spark3Util.loadIcebergTable(spark, tableName)
116+
CometNativeCompaction(spark).rewriteDataFiles(nativeTable)
117+
val nativeDuration = (System.nanoTime() - nativeStart) / 1000000
118+
119+
spark.sql(s"DROP TABLE $tableName")
120+
121+
val speedup = if (nativeDuration > 0) sparkDuration.toDouble / nativeDuration else 0
122+
(sparkDuration, nativeDuration, speedup)
123+
}
124+
125+
test("TPC-H compaction benchmark: lineitem, orders, customer") {
126+
assume(icebergAvailable, "Iceberg not available")
127+
assume(tpcDataAvailable, s"TPC-H data not found at $dataLocation")
128+
assume(CometNativeCompaction.isAvailable, "Native compaction not available")
129+
130+
withTempIcebergDir { warehouseDir =>
131+
withSQLConf(icebergCatalogConf(warehouseDir).toSeq: _*) {
132+
val numFragments = 10
133+
val rowsPerFragment = 5000
134+
135+
// scalastyle:off println
136+
println("\n" + "=" * 60)
137+
println(" TPC-H ICEBERG COMPACTION BENCHMARK")
138+
println(" Spark Default vs Native (Comet) Compaction")
139+
println("=" * 60)
140+
println(f"${"Table"}%-15s ${"Spark(ms)"}%12s ${"Native(ms)"}%12s ${"Speedup"}%10s")
141+
println("-" * 60)
142+
143+
// Lineitem benchmark
144+
val lineitemSchema =
145+
"""l_orderkey BIGINT, l_partkey BIGINT, l_suppkey BIGINT, l_linenumber INT,
146+
l_quantity DOUBLE, l_extendedprice DOUBLE, l_discount DOUBLE, l_tax DOUBLE,
147+
l_returnflag STRING, l_linestatus STRING"""
148+
val (lSpark, lNative, lSpeedup) =
149+
runTableBenchmark(
150+
warehouseDir,
151+
"lineitem",
152+
lineitemSchema,
153+
numFragments,
154+
rowsPerFragment)
155+
println(f"${"lineitem"}%-15s $lSpark%12d $lNative%12d ${lSpeedup}%9.2fx")
156+
157+
// Orders benchmark
158+
val ordersSchema =
159+
"""o_orderkey BIGINT, o_custkey BIGINT, o_orderstatus STRING, o_totalprice DOUBLE,
160+
o_orderdate DATE, o_orderpriority STRING, o_clerk STRING, o_shippriority INT,
161+
o_comment STRING"""
162+
val (oSpark, oNative, oSpeedup) =
163+
runTableBenchmark(warehouseDir, "orders", ordersSchema, numFragments, rowsPerFragment)
164+
println(f"${"orders"}%-15s $oSpark%12d $oNative%12d ${oSpeedup}%9.2fx")
165+
166+
// Customer benchmark
167+
val customerSchema =
168+
"""c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey BIGINT,
169+
c_phone STRING, c_acctbal DOUBLE, c_mktsegment STRING, c_comment STRING"""
170+
val (cSpark, cNative, cSpeedup) =
171+
runTableBenchmark(
172+
warehouseDir,
173+
"customer",
174+
customerSchema,
175+
numFragments,
176+
rowsPerFragment)
177+
println(f"${"customer"}%-15s $cSpark%12d $cNative%12d ${cSpeedup}%9.2fx")
178+
179+
println("-" * 60)
180+
val avgSpeedup = (lSpeedup + oSpeedup + cSpeedup) / 3
181+
println(
182+
f"${"AVERAGE"}%-15s ${lSpark + oSpark + cSpark}%12d ${lNative + oNative + cNative}%12d ${avgSpeedup}%9.2fx")
183+
println("=" * 60 + "\n")
184+
// scalastyle:on println
185+
}
186+
}
187+
}
188+
189+
test("benchmark: Spark vs Native compaction on lineitem (SF0.01 subset)") {
190+
assume(icebergAvailable, "Iceberg not available")
191+
assume(tpcDataAvailable, s"TPC-H data not found at $dataLocation")
192+
assume(CometNativeCompaction.isAvailable, "Native compaction not available")
193+
194+
withTempIcebergDir { warehouseDir =>
195+
withSQLConf(icebergCatalogConf(warehouseDir).toSeq: _*) {
196+
val tableName = "bench_cat.db.lineitem_bench"
197+
val numFragments = 10
198+
val rowsPerFragment = 1000
199+
200+
// Create fragmented Iceberg table
201+
spark.sql(s"""
202+
CREATE TABLE $tableName (
203+
l_orderkey BIGINT, l_partkey BIGINT, l_suppkey BIGINT, l_linenumber INT,
204+
l_quantity DOUBLE, l_extendedprice DOUBLE, l_discount DOUBLE,
205+
l_tax DOUBLE, l_returnflag STRING, l_linestatus STRING
206+
) USING iceberg
207+
""")
208+
209+
// Insert fragments from TPC-H lineitem
210+
for (i <- 0 until numFragments) {
211+
spark.sql(s"""
212+
INSERT INTO $tableName
213+
SELECT l_orderkey, l_partkey, l_suppkey, l_linenumber,
214+
l_quantity, l_extendedprice, l_discount, l_tax,
215+
l_returnflag, l_linestatus
216+
FROM parquet.`$dataLocation/lineitem`
217+
LIMIT $rowsPerFragment OFFSET ${i * rowsPerFragment}
218+
""")
219+
}
220+
221+
val filesBefore = spark.sql(s"SELECT * FROM $tableName.files").count()
222+
val rowCount = spark.sql(s"SELECT COUNT(*) FROM $tableName").first().getLong(0)
223+
224+
// scalastyle:off println
225+
println(s"\n========== COMPACTION BENCHMARK ==========")
226+
println(s"Table: $tableName")
227+
println(s"Files before: $filesBefore, Rows: $rowCount")
228+
println("=" * 45)
229+
230+
// Benchmark 1: Spark default compaction
231+
val sparkStart = System.nanoTime()
232+
val sparkTable = Spark3Util.loadIcebergTable(spark, tableName)
233+
SparkActions.get(spark).rewriteDataFiles(sparkTable).binPack().execute()
234+
val sparkDuration = (System.nanoTime() - sparkStart) / 1000000
235+
236+
spark.sql(s"REFRESH TABLE $tableName")
237+
val filesAfterSpark = spark.sql(s"SELECT * FROM $tableName.files").count()
238+
println(s"Spark compaction: ${sparkDuration}ms ($filesBefore -> $filesAfterSpark files)")
239+
240+
// Re-create fragmented table for native benchmark
241+
spark.sql(s"DROP TABLE $tableName")
242+
spark.sql(s"""
243+
CREATE TABLE $tableName (
244+
l_orderkey BIGINT, l_partkey BIGINT, l_suppkey BIGINT, l_linenumber INT,
245+
l_quantity DOUBLE, l_extendedprice DOUBLE, l_discount DOUBLE,
246+
l_tax DOUBLE, l_returnflag STRING, l_linestatus STRING
247+
) USING iceberg
248+
""")
249+
250+
for (i <- 0 until numFragments) {
251+
spark.sql(s"""
252+
INSERT INTO $tableName
253+
SELECT l_orderkey, l_partkey, l_suppkey, l_linenumber,
254+
l_quantity, l_extendedprice, l_discount, l_tax,
255+
l_returnflag, l_linestatus
256+
FROM parquet.`$dataLocation/lineitem`
257+
LIMIT $rowsPerFragment OFFSET ${i * rowsPerFragment}
258+
""")
259+
}
260+
261+
// Benchmark 2: Native compaction
262+
val nativeStart = System.nanoTime()
263+
val nativeTable = Spark3Util.loadIcebergTable(spark, tableName)
264+
CometNativeCompaction(spark).rewriteDataFiles(nativeTable)
265+
val nativeDuration = (System.nanoTime() - nativeStart) / 1000000
266+
267+
spark.sql(s"REFRESH TABLE $tableName")
268+
val filesAfterNative = spark.sql(s"SELECT * FROM $tableName.files").count()
269+
println(
270+
s"Native compaction: ${nativeDuration}ms ($filesBefore -> $filesAfterNative files)")
271+
272+
val speedup = if (nativeDuration > 0) sparkDuration.toDouble / nativeDuration else 0
273+
println(s"Speedup: ${f"$speedup%.2f"}x")
274+
println("=" * 45 + "\n")
275+
// scalastyle:on println
276+
277+
spark.sql(s"DROP TABLE $tableName")
278+
}
279+
}
280+
}
281+
}

0 commit comments

Comments
 (0)