Skip to content

Commit 38095c3

Browse files
committed
feat: add CometCompactionRule to intercept CALL rewrite_data_files
1 parent 23a8dd2 commit 38095c3

4 files changed

Lines changed: 182 additions & 4 deletions

File tree

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,13 @@ object CometConf extends ShimCometConf {
152152

153153
val COMET_ICEBERG_COMPACTION_ENABLED: ConfigEntry[Boolean] =
154154
conf("spark.comet.iceberg.compaction.enabled")
155-
.category(CATEGORY_TESTING)
155+
.category(CATEGORY_EXEC)
156156
.doc(
157157
"Whether to enable Comet-accelerated Iceberg compaction. When enabled, " +
158-
"CALL rewrite_data_files() uses Comet's native scan for the read path, " +
159-
"reducing JVM overhead during compaction. Experimental.")
158+
"CALL rewrite_data_files() is intercepted and executed via Comet's native " +
159+
"Rust/DataFusion engine for direct Parquet read/write, bypassing Spark's " +
160+
"DAG execution. Only bin-pack strategy is supported; sort and z-order " +
161+
"fall back to Spark's default. Requires Iceberg on the classpath. Experimental.")
160162
.booleanConf
161163
.createWithDefault(false)
162164

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution._
3232
import org.apache.spark.sql.internal.SQLConf
3333

3434
import org.apache.comet.CometConf._
35-
import org.apache.comet.rules.{CometExecRule, CometScanRule, EliminateRedundantTransitions}
35+
import org.apache.comet.rules.{CometCompactionRule, CometExecRule, CometScanRule, EliminateRedundantTransitions}
3636
import org.apache.comet.shims.ShimCometSparkSessionExtensions
3737

3838
/**
@@ -47,6 +47,7 @@ class CometSparkSessionExtensions
4747
override def apply(extensions: SparkSessionExtensions): Unit = {
4848
extensions.injectColumnar { session => CometScanColumnar(session) }
4949
extensions.injectColumnar { session => CometExecColumnar(session) }
50+
extensions.injectColumnar { session => CometCompactionColumnar(session) }
5051
extensions.injectQueryStagePrepRule { session => CometScanRule(session) }
5152
extensions.injectQueryStagePrepRule { session => CometExecRule(session) }
5253
}
@@ -61,6 +62,10 @@ class CometSparkSessionExtensions
6162
override def postColumnarTransitions: Rule[SparkPlan] =
6263
EliminateRedundantTransitions(session)
6364
}
65+
66+
case class CometCompactionColumnar(session: SparkSession) extends ColumnarRule {
67+
override def preColumnarTransitions: Rule[SparkPlan] = CometCompactionRule(session)
68+
}
6469
}
6570

6671
object CometSparkSessionExtensions extends Logging {
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.rules
21+
22+
import org.apache.spark.internal.Logging
23+
import org.apache.spark.sql.SparkSession
24+
import org.apache.spark.sql.catalyst.InternalRow
25+
import org.apache.spark.sql.catalyst.rules.Rule
26+
import org.apache.spark.sql.comet.{CometNativeCompaction, CometNativeCompactionExec}
27+
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
28+
import org.apache.spark.sql.execution.SparkPlan
29+
30+
import org.apache.comet.CometConf
31+
32+
/**
33+
* Replaces Iceberg's CallExec targeting RewriteDataFilesProcedure with native Comet compaction.
34+
*
35+
* Uses reflection to detect CallExec (from Iceberg extensions) to avoid hard compile-time
36+
* dependency. Only active when spark.comet.iceberg.compaction.enabled is true and native
37+
* compaction is available. Currently supports Spark 3.x only (Spark 4.0 uses InvokeProcedures at
38+
* the analysis phase, handled separately via shim).
39+
*/
40+
case class CometCompactionRule(session: SparkSession) extends Rule[SparkPlan] with Logging {
41+
42+
private val CALL_EXEC_CLASS = "org.apache.spark.sql.execution.datasources.v2.CallExec"
43+
private val REWRITE_PROCEDURE_NAME = "RewriteDataFilesProcedure"
44+
45+
override def apply(plan: SparkPlan): SparkPlan = {
46+
if (!isEnabled) return plan
47+
48+
plan.transformUp {
49+
case exec if isRewriteCallExec(exec) =>
50+
replaceWithNative(exec).getOrElse(exec)
51+
}
52+
}
53+
54+
private def isEnabled: Boolean =
55+
CometConf.COMET_ICEBERG_COMPACTION_ENABLED.get(session.sessionState.conf) &&
56+
CometNativeCompaction.isAvailable
57+
58+
private def isRewriteCallExec(plan: SparkPlan): Boolean = {
59+
plan.getClass.getName == CALL_EXEC_CLASS && {
60+
try {
61+
val proc = plan.getClass.getMethod("procedure").invoke(plan)
62+
proc.getClass.getSimpleName == REWRITE_PROCEDURE_NAME
63+
} catch { case _: Exception => false }
64+
}
65+
}
66+
67+
private def replaceWithNative(exec: SparkPlan): Option[SparkPlan] = {
68+
try {
69+
val proc = exec.getClass.getMethod("procedure").invoke(exec)
70+
val input = exec.getClass.getMethod("input").invoke(exec).asInstanceOf[InternalRow]
71+
72+
// Only intercept bin-pack strategy (default when strategy is null)
73+
if (!input.isNullAt(1)) {
74+
val strategy = input.getUTF8String(1).toString
75+
if (!strategy.equalsIgnoreCase("binpack")) {
76+
logInfo(s"Native compaction skipped: unsupported strategy '$strategy'")
77+
return None
78+
}
79+
}
80+
81+
val tableCatalog = extractTableCatalog(proc)
82+
val tableIdent = parseIdentifier(input.getUTF8String(0).toString)
83+
84+
logInfo(s"Replacing CallExec with CometNativeCompactionExec for $tableIdent")
85+
Some(CometNativeCompactionExec(exec.output, tableCatalog, tableIdent))
86+
} catch {
87+
case e: Exception =>
88+
logWarning(s"Cannot replace with native compaction: ${e.getMessage}")
89+
None
90+
}
91+
}
92+
93+
/** Extract TableCatalog from BaseProcedure via reflection (field is private). */
94+
private def extractTableCatalog(procedure: Any): TableCatalog = {
95+
val field = procedure.getClass.getSuperclass.getDeclaredField("tableCatalog")
96+
field.setAccessible(true)
97+
field.get(procedure).asInstanceOf[TableCatalog]
98+
}
99+
100+
private def parseIdentifier(identStr: String): Identifier = {
101+
val parts = identStr.split("\\.")
102+
Identifier.of(parts.dropRight(1), parts.last)
103+
}
104+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.comet
21+
22+
import org.apache.iceberg.spark.source.SparkTable
23+
import org.apache.spark.internal.Logging
24+
import org.apache.spark.sql.SparkSession
25+
import org.apache.spark.sql.catalyst.InternalRow
26+
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow}
27+
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
28+
import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
29+
30+
/**
31+
* Executes Iceberg compaction via Comet's native Rust/DataFusion engine. Replaces CallExec for
32+
* RewriteDataFilesProcedure when native compaction is enabled.
33+
*
34+
* Output row is built dynamically to match the procedure's output schema, which varies across
35+
* Iceberg versions (e.g. removed_delete_files_count added in later versions).
36+
*/
37+
case class CometNativeCompactionExec(
38+
output: Seq[Attribute],
39+
@transient tableCatalog: TableCatalog,
40+
tableIdent: Identifier)
41+
extends LeafV2CommandExec
42+
with Logging {
43+
44+
override protected def run(): Seq[InternalRow] = {
45+
val spark = SparkSession.active
46+
val icebergTable = tableCatalog
47+
.loadTable(tableIdent)
48+
.asInstanceOf[SparkTable]
49+
.table()
50+
51+
logInfo(s"Executing native compaction for $tableIdent")
52+
val summary = CometNativeCompaction(spark).rewriteDataFiles(icebergTable)
53+
54+
val fieldValues: Map[String, Any] = Map(
55+
"rewritten_data_files_count" -> summary.filesDeleted,
56+
"added_data_files_count" -> summary.filesAdded,
57+
"rewritten_bytes_count" -> summary.bytesDeleted,
58+
"failed_data_files_count" -> 0,
59+
"removed_delete_files_count" -> 0)
60+
61+
val values = output.map(attr => fieldValues.getOrElse(attr.name, 0))
62+
Seq(new GenericInternalRow(values.toArray))
63+
}
64+
65+
override def simpleString(maxFields: Int): String =
66+
s"CometNativeCompactionExec[$tableIdent]"
67+
}

0 commit comments

Comments
 (0)