Skip to content

Commit 80051d0

Browse files
committed
feat: add COMET_ICEBERG_COMPACTION_ENABLED config option
1 parent 9c01c57 commit 80051d0

2 files changed

Lines changed: 16 additions & 2 deletions

File tree

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,16 @@ object CometConf extends ShimCometConf {
150150
.booleanConf
151151
.createWithDefault(false)
152152

153+
val COMET_ICEBERG_COMPACTION_ENABLED: ConfigEntry[Boolean] =
154+
conf("spark.comet.iceberg.compaction.enabled")
155+
.category(CATEGORY_TESTING)
156+
.doc(
157+
"Whether to enable Comet-accelerated Iceberg compaction. When enabled, " +
158+
"CALL rewrite_data_files() uses Comet's native scan for the read path, " +
159+
"reducing JVM overhead during compaction. Experimental.")
160+
.booleanConf
161+
.createWithDefault(false)
162+
153163
val COMET_CSV_V2_NATIVE_ENABLED: ConfigEntry[Boolean] =
154164
conf("spark.comet.scan.csv.v2.enabled")
155165
.category(CATEGORY_TESTING)

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,9 +315,13 @@ case class CometScanRule(session: SparkSession)
315315
}
316316

317317
// Iceberg scan - detected by class name (works with unpatched Iceberg)
318+
// SparkBatchQueryScan: normal query scans
319+
// SparkStagedScan: compaction scans via ScanTaskSetManager
318320
case _
319-
if scanExec.scan.getClass.getName ==
320-
"org.apache.iceberg.spark.source.SparkBatchQueryScan" =>
321+
if Set(
322+
"org.apache.iceberg.spark.source.SparkBatchQueryScan",
323+
"org.apache.iceberg.spark.source.SparkStagedScan").contains(
324+
scanExec.scan.getClass.getName) =>
321325
val fallbackReasons = new ListBuffer[String]()
322326

323327
// Native Iceberg scan requires both configs to be enabled

0 commit comments

Comments
 (0)