Skip to content

Commit 08ea1bf

Browse files
committed
fix: Scala 2.13 compilation errors and unused parameter warnings
- Fix type mismatch by adding .toSeq to ArrayBuffer in CometNativeCompaction - Replace asScala on CloseableIterator with explicit while loop for Scala 2.13 compatibility - Remove unused warehouseDir parameters and filesBefore/nativeFilesAfter variables - Add @transient annotation to unused spark parameter
1 parent 9573823 commit 08ea1bf

3 files changed

Lines changed: 13 additions & 19 deletions

File tree

spark/src/test/scala/org/apache/comet/CometIcebergCompactionBenchmarkTest.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ class CometIcebergCompactionBenchmarkTest extends CometTestBase {
6969
CometConf.COMET_EXEC_ENABLED.key -> "true",
7070
CometConf.COMET_ICEBERG_COMPACTION_ENABLED.key -> "true")
7171

72+
// scalastyle:off parameter.number
7273
private def runTableBenchmark(
73-
warehouseDir: File,
7474
sourceTable: String,
7575
schema: String,
7676
numFragments: Int,
@@ -91,8 +91,6 @@ class CometIcebergCompactionBenchmarkTest extends CometTestBase {
9191
""")
9292
}
9393

94-
val filesBefore = spark.sql(s"SELECT * FROM $tableName.files").count()
95-
9694
// Benchmark 1: Spark default compaction
9795
val sparkStart = System.nanoTime()
9896
val sparkTable = Spark3Util.loadIcebergTable(spark, tableName)
@@ -147,7 +145,6 @@ class CometIcebergCompactionBenchmarkTest extends CometTestBase {
147145
l_returnflag STRING, l_linestatus STRING"""
148146
val (lSpark, lNative, lSpeedup) =
149147
runTableBenchmark(
150-
warehouseDir,
151148
"lineitem",
152149
lineitemSchema,
153150
numFragments,
@@ -160,7 +157,7 @@ class CometIcebergCompactionBenchmarkTest extends CometTestBase {
160157
o_orderdate DATE, o_orderpriority STRING, o_clerk STRING, o_shippriority INT,
161158
o_comment STRING"""
162159
val (oSpark, oNative, oSpeedup) =
163-
runTableBenchmark(warehouseDir, "orders", ordersSchema, numFragments, rowsPerFragment)
160+
runTableBenchmark("orders", ordersSchema, numFragments, rowsPerFragment)
164161
println(f"${"orders"}%-15s $oSpark%12d $oNative%12d ${oSpeedup}%9.2fx")
165162

166163
// Customer benchmark
@@ -169,7 +166,6 @@ class CometIcebergCompactionBenchmarkTest extends CometTestBase {
169166
c_phone STRING, c_acctbal DOUBLE, c_mktsegment STRING, c_comment STRING"""
170167
val (cSpark, cNative, cSpeedup) =
171168
runTableBenchmark(
172-
warehouseDir,
173169
"customer",
174170
customerSchema,
175171
numFragments,

spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergTPCCompactionBenchmark.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ object CometIcebergTPCCompactionBenchmark extends CometBenchmarkBase {
127127
private def runPartitionedTableBenchmark(dataLocation: String, numFragments: Int): Unit = {
128128
val tableFilePath = resolveTablePath(dataLocation, "lineitem")
129129

130-
withIcebergWarehouse { (warehouseDir, catalog) =>
130+
withIcebergWarehouse { (_, catalog) =>
131131
val icebergTableName = s"$catalog.db.lineitem_partitioned"
132132

133133
// Create fragmented partitioned table
@@ -193,7 +193,7 @@ object CometIcebergTPCCompactionBenchmark extends CometBenchmarkBase {
193193

194194
val tableFilePath = resolveTablePath(dataLocation, tableName)
195195

196-
withIcebergWarehouse { (warehouseDir, catalog) =>
196+
withIcebergWarehouse { (_, catalog) =>
197197
val icebergTableName = s"$catalog.db.${tableName}_iceberg"
198198

199199
// Create fragmented table once to measure metadata
@@ -217,7 +217,6 @@ object CometIcebergTPCCompactionBenchmark extends CometBenchmarkBase {
217217
val nativeTable = Spark3Util.loadIcebergTable(spark, icebergTableName)
218218
new CometNativeCompaction(spark).rewriteDataFiles(nativeTable)
219219
val nativeTimeMs = (System.nanoTime() - nativeStart) / 1000000
220-
val nativeFilesAfter = spark.sql(s"SELECT * FROM $icebergTableName.files").count()
221220

222221
// Calculate speedup
223222
val speedup = if (nativeTimeMs > 0) sparkTimeMs.toDouble / nativeTimeMs.toDouble else 0.0

spark/src/test/scala/org/apache/spark/sql/comet/CometNativeCompaction.scala

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ case class NativeCompactionResult(
8585
/**
8686
* Native Iceberg compaction using Rust/DataFusion for scan+write, Java API for commit.
8787
*/
88-
class CometNativeCompaction(spark: SparkSession) extends Logging {
88+
class CometNativeCompaction(@transient spark: SparkSession) extends Logging {
8989

9090
private val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule)
9191
private val native = new Native()
@@ -136,8 +136,7 @@ class CometNativeCompaction(spark: SparkSession) extends Logging {
136136
tableConfig,
137137
group,
138138
targetFileSizeBytes,
139-
compression,
140-
table.location())
139+
compression)
141140
val result = executeNativeCompaction(compactionConfig)
142141

143142
result match {
@@ -172,7 +171,8 @@ class CometNativeCompaction(spark: SparkSession) extends Logging {
172171
s"Committing compaction: ${allFilesToDelete.size} files to delete, " +
173172
s"${allFilesToAdd.size} files to add")
174173

175-
val commitSuccess = commitCompaction(table, allFilesToDelete, allFilesToAdd)
174+
val commitSuccess =
175+
commitCompaction(table, allFilesToDelete.toSeq, allFilesToAdd.toSeq)
176176

177177
if (!commitSuccess) {
178178
throw new RuntimeException("Failed to commit compaction results")
@@ -264,8 +264,7 @@ class CometNativeCompaction(spark: SparkSession) extends Logging {
264264
tableConfig: IcebergTableConfig,
265265
tasks: Seq[FileScanTask],
266266
targetFileSizeBytes: Long,
267-
compression: String,
268-
tableLocation: String): CompactionTaskConfig = {
267+
compression: String): CompactionTaskConfig = {
269268

270269
val fileScanTaskConfigs = tasks.map { task =>
271270
val partitionPath = task.spec().partitionToPath(task.file().partition())
@@ -312,12 +311,12 @@ class CometNativeCompaction(spark: SparkSession) extends Logging {
312311
try {
313312
val specs = table.specs()
314313
val deleteFiles: java.util.Set[DataFile] = new java.util.HashSet[DataFile]()
315-
val deletePathSet = filesToDelete.toSet
316314
val snapshot = table.currentSnapshot()
317315
if (snapshot != null) {
318-
import scala.jdk.CollectionConverters._
319-
val fileScanTasks = table.newScan().planFiles().iterator().asScala
320-
fileScanTasks.foreach { task =>
316+
val deletePathSet = filesToDelete.toSet
317+
val fileScanTasks = table.newScan().planFiles().iterator()
318+
while (fileScanTasks.hasNext) {
319+
val task = fileScanTasks.next()
321320
val dataFile = task.file()
322321
if (deletePathSet.contains(dataFile.path().toString)) {
323322
deleteFiles.add(dataFile)

0 commit comments

Comments
 (0)