Skip to content

Commit 7ea1cd4

Browse files
andygroveclaude
andauthored
Add microbenchmark for IcebergScan operator serde roundtrip (#3296)
This benchmark measures the serialization/deserialization performance of Iceberg FileScanTask objects to protobuf, starting from actual Iceberg Java objects rather than pre-constructed protobuf messages. The benchmark: - Creates a real Iceberg table with configurable number of partitions - Extracts FileScanTask objects through query planning - Benchmarks conversion from FileScanTask to Protobuf - Benchmarks serialization to bytes and deserialization Usage: make benchmark-org.apache.spark.sql.benchmark.CometOperatorSerdeBenchmark -- 30000 Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 22fdec9 commit 7ea1cd4

1 file changed

Lines changed: 311 additions & 0 deletions

File tree

Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.benchmark
21+
22+
import java.io.File
23+
import java.nio.file.Files
24+
25+
import org.apache.spark.benchmark.Benchmark
26+
import org.apache.spark.sql.comet.{CometBatchScanExec, CometIcebergNativeScanExec}
27+
import org.apache.spark.sql.execution.SparkPlan
28+
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
29+
30+
import org.apache.comet.CometConf
31+
import org.apache.comet.serde.OperatorOuterClass
32+
import org.apache.comet.serde.operator.CometIcebergNativeScan
33+
34+
/**
35+
* Benchmark for operator serialization/deserialization roundtrip performance.
36+
*
37+
* This benchmark measures the time to serialize Iceberg FileScanTask objects to protobuf,
38+
* starting from actual Iceberg Java objects rather than pre-constructed protobuf messages.
39+
*
40+
* To run this benchmark:
41+
* {{{
42+
* SPARK_GENERATE_BENCHMARK_FILES=1 make \
43+
* benchmark-org.apache.spark.sql.benchmark.CometOperatorSerdeBenchmark
44+
* }}}
45+
*
46+
* Results will be written to "spark/benchmarks/CometOperatorSerdeBenchmark-**results.txt".
47+
*/
48+
object CometOperatorSerdeBenchmark extends CometBenchmarkBase {
49+
50+
// Check if Iceberg is available in classpath
51+
private def icebergAvailable: Boolean = {
52+
try {
53+
Class.forName("org.apache.iceberg.catalog.Catalog")
54+
true
55+
} catch {
56+
case _: ClassNotFoundException => false
57+
}
58+
}
59+
60+
// Helper to create temp directory for Iceberg warehouse
61+
private def withTempIcebergDir(f: File => Unit): Unit = {
62+
val dir = Files.createTempDirectory("comet-serde-benchmark").toFile
63+
try {
64+
f(dir)
65+
} finally {
66+
def deleteRecursively(file: File): Unit = {
67+
if (file.isDirectory) {
68+
Option(file.listFiles()).foreach(_.foreach(deleteRecursively))
69+
}
70+
file.delete()
71+
}
72+
deleteRecursively(dir)
73+
}
74+
}
75+
76+
/**
77+
* Extracts CometIcebergNativeScanExec from a query plan, unwrapping AQE if present.
78+
*/
79+
private def extractIcebergNativeScanExec(
80+
plan: SparkPlan): Option[CometIcebergNativeScanExec] = {
81+
val unwrapped = plan match {
82+
case aqe: AdaptiveSparkPlanExec => aqe.executedPlan
83+
case other => other
84+
}
85+
86+
def find(p: SparkPlan): Option[CometIcebergNativeScanExec] = {
87+
p match {
88+
case scan: CometIcebergNativeScanExec => Some(scan)
89+
case _ => p.children.flatMap(find).headOption
90+
}
91+
}
92+
find(unwrapped)
93+
}
94+
95+
/**
96+
* Reconstructs a CometBatchScanExec from CometIcebergNativeScanExec for benchmarking the
97+
* conversion process.
98+
*/
99+
private def reconstructBatchScanExec(
100+
nativeScan: CometIcebergNativeScanExec): CometBatchScanExec = {
101+
CometBatchScanExec(
102+
wrapped = nativeScan.originalPlan,
103+
runtimeFilters = Seq.empty,
104+
nativeIcebergScanMetadata = Some(nativeScan.nativeIcebergScanMetadata))
105+
}
106+
107+
/**
108+
* Creates an Iceberg table with the specified number of partitions. Each partition contains one
109+
* data file.
110+
*/
111+
private def createPartitionedIcebergTable(
112+
warehouseDir: File,
113+
numPartitions: Int,
114+
tableName: String = "serde_bench_table"): Unit = {
115+
// Configure Hadoop catalog
116+
spark.conf.set("spark.sql.catalog.bench_cat", "org.apache.iceberg.spark.SparkCatalog")
117+
spark.conf.set("spark.sql.catalog.bench_cat.type", "hadoop")
118+
spark.conf.set("spark.sql.catalog.bench_cat.warehouse", warehouseDir.getAbsolutePath)
119+
120+
val fullTableName = s"bench_cat.db.$tableName"
121+
122+
// Drop table if exists
123+
spark.sql(s"DROP TABLE IF EXISTS $fullTableName")
124+
spark.sql("CREATE NAMESPACE IF NOT EXISTS bench_cat.db")
125+
126+
// Create partitioned Iceberg table
127+
spark.sql(s"""
128+
CREATE TABLE $fullTableName (
129+
id BIGINT,
130+
name STRING,
131+
value DOUBLE,
132+
partition_col INT
133+
) USING iceberg
134+
PARTITIONED BY (partition_col)
135+
TBLPROPERTIES (
136+
'format-version'='2',
137+
'write.parquet.compression-codec' = 'snappy'
138+
)
139+
""")
140+
141+
// Insert data to create the specified number of partitions
142+
// Use a range to create unique partition values
143+
// scalastyle:off println
144+
println(s"Creating Iceberg table with $numPartitions partitions...")
145+
// scalastyle:on println
146+
147+
// Insert in batches to avoid memory issues
148+
val batchSize = 1000
149+
var partitionsCreated = 0
150+
151+
while (partitionsCreated < numPartitions) {
152+
val batchEnd = math.min(partitionsCreated + batchSize, numPartitions)
153+
val partitionRange = partitionsCreated until batchEnd
154+
155+
// Create DataFrame with partition data
156+
import spark.implicits._
157+
val df = partitionRange
158+
.map { p =>
159+
(p.toLong, s"name_$p", p * 1.5, p)
160+
}
161+
.toDF("id", "name", "value", "partition_col")
162+
163+
df.writeTo(fullTableName).append()
164+
partitionsCreated = batchEnd
165+
166+
if (partitionsCreated % 5000 == 0 || partitionsCreated == numPartitions) {
167+
// scalastyle:off println
168+
println(s" Created $partitionsCreated / $numPartitions partitions")
169+
// scalastyle:on println
170+
}
171+
}
172+
}
173+
174+
/**
175+
* Benchmarks the serialization of IcebergScan operator from FileScanTask objects.
176+
*/
177+
def icebergScanSerdeBenchmark(numPartitions: Int): Unit = {
178+
if (!icebergAvailable) {
179+
// scalastyle:off println
180+
println("Iceberg not available in classpath, skipping benchmark")
181+
// scalastyle:on println
182+
return
183+
}
184+
185+
withTempIcebergDir { warehouseDir =>
186+
withSQLConf(
187+
"spark.sql.catalog.bench_cat" -> "org.apache.iceberg.spark.SparkCatalog",
188+
"spark.sql.catalog.bench_cat.type" -> "hadoop",
189+
"spark.sql.catalog.bench_cat.warehouse" -> warehouseDir.getAbsolutePath,
190+
CometConf.COMET_ENABLED.key -> "true",
191+
CometConf.COMET_EXEC_ENABLED.key -> "true",
192+
CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
193+
194+
// Create the partitioned table
195+
createPartitionedIcebergTable(warehouseDir, numPartitions)
196+
197+
val fullTableName = "bench_cat.db.serde_bench_table"
198+
199+
// Plan a query to get the CometIcebergNativeScanExec with FileScanTasks
200+
val df = spark.sql(s"SELECT * FROM $fullTableName")
201+
val plan = df.queryExecution.executedPlan
202+
203+
val nativeScanOpt = extractIcebergNativeScanExec(plan)
204+
205+
nativeScanOpt match {
206+
case Some(nativeScan) =>
207+
// Get metadata and tasks
208+
val metadata = nativeScan.nativeIcebergScanMetadata
209+
val tasks = metadata.tasks
210+
// scalastyle:off println
211+
println(s"Found ${tasks.size()} FileScanTasks")
212+
// scalastyle:on println
213+
214+
// Reconstruct CometBatchScanExec for conversion benchmarking
215+
val scanExec = reconstructBatchScanExec(nativeScan)
216+
217+
// Benchmark the serialization
218+
val iterations = 100
219+
val benchmark = new Benchmark(
220+
s"IcebergScan serde ($numPartitions partitions, ${tasks.size()} tasks)",
221+
iterations,
222+
output = output)
223+
224+
// Benchmark: Convert FileScanTasks to protobuf (the convert() method)
225+
benchmark.addCase("FileScanTask -> Protobuf (convert)") { _ =>
226+
var i = 0
227+
while (i < iterations) {
228+
val builder = OperatorOuterClass.Operator.newBuilder()
229+
CometIcebergNativeScan.convert(scanExec, builder)
230+
i += 1
231+
}
232+
}
233+
234+
// Benchmark: Full roundtrip - convert to protobuf and serialize to bytes
235+
benchmark.addCase("FileScanTask -> Protobuf -> bytes") { _ =>
236+
var i = 0
237+
while (i < iterations) {
238+
val builder = OperatorOuterClass.Operator.newBuilder()
239+
val operatorOpt = CometIcebergNativeScan.convert(scanExec, builder)
240+
operatorOpt.foreach(_.toByteArray)
241+
i += 1
242+
}
243+
}
244+
245+
// Get serialized bytes for deserialization benchmark
246+
val builder = OperatorOuterClass.Operator.newBuilder()
247+
val operatorOpt = CometIcebergNativeScan.convert(scanExec, builder)
248+
249+
operatorOpt match {
250+
case Some(operator) =>
251+
val serializedBytes = operator.toByteArray
252+
val sizeKB = serializedBytes.length / 1024.0
253+
val sizeMB = sizeKB / 1024.0
254+
255+
// scalastyle:off println
256+
println(
257+
s"Serialized IcebergScan size: ${f"$sizeKB%.1f"} KB (${f"$sizeMB%.2f"} MB)")
258+
// scalastyle:on println
259+
260+
// Benchmark: Deserialize from bytes
261+
benchmark.addCase("bytes -> Protobuf (parseFrom)") { _ =>
262+
var i = 0
263+
while (i < iterations) {
264+
OperatorOuterClass.Operator.parseFrom(serializedBytes)
265+
i += 1
266+
}
267+
}
268+
269+
// Benchmark: Full roundtrip including deserialization
270+
benchmark.addCase("Full roundtrip (convert + serialize + deserialize)") { _ =>
271+
var i = 0
272+
while (i < iterations) {
273+
val b = OperatorOuterClass.Operator.newBuilder()
274+
val op = CometIcebergNativeScan.convert(scanExec, b)
275+
op.foreach { o =>
276+
val bytes = o.toByteArray
277+
OperatorOuterClass.Operator.parseFrom(bytes)
278+
}
279+
i += 1
280+
}
281+
}
282+
283+
case None =>
284+
// scalastyle:off println
285+
println("WARNING: convert() returned None, cannot benchmark serialization")
286+
// scalastyle:on println
287+
}
288+
289+
benchmark.run()
290+
291+
case None =>
292+
// scalastyle:off println
293+
println("WARNING: Could not find CometIcebergNativeScanExec in query plan")
294+
println(s"Plan:\n$plan")
295+
// scalastyle:on println
296+
}
297+
298+
// Cleanup
299+
spark.sql(s"DROP TABLE IF EXISTS $fullTableName")
300+
}
301+
}
302+
}
303+
304+
override def runCometBenchmark(args: Array[String]): Unit = {
305+
val numPartitions = if (args.nonEmpty) args(0).toInt else 30000
306+
307+
runBenchmark("IcebergScan Operator Serde Benchmark") {
308+
icebergScanSerdeBenchmark(numPartitions)
309+
}
310+
}
311+
}

0 commit comments

Comments
 (0)