Acceleration : Iceberg table compaction [iceberg]#3519
Conversation
| // under the License. | ||
|
|
||
| //! Iceberg Parquet writer operator for writing RecordBatches to Parquet files | ||
| //! with Iceberg-compatible metadata (DataFile structures). |
There was a problem hiding this comment.
DataFusion execution operator that writes Arrow RecordBatches to Parquet files with Iceberg-compatible metadata.
It enables native Rust to produce files that Iceberg's Java API can directly commit.
Metadata is serialized as JSON and passed back to JVM via JNI for commit.
|
|
||
| //! JNI bridge for Iceberg compaction operations. | ||
| //! | ||
| //! This module provides JNI functions for native Iceberg compaction (scan + write). |
There was a problem hiding this comment.
JNI bridge that exposes native Rust compaction to Scala/JVM.
executeIcebergCompaction() | JNI entry point - reads Parquet files via DataFusion, writes compacted output
|
|
||
| /// Configuration for Iceberg table metadata passed from JVM | ||
| #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| pub struct IcebergTableConfig { |
There was a problem hiding this comment.
Table metadata from JVM (identifier, warehouse, snapshot ID, file IO props)
|
|
||
| logDebug(s"Executing native compaction with config: $configJson") | ||
|
|
||
| val resultJson = native.executeIcebergCompaction(configJson) |
There was a problem hiding this comment.
JNI entry point - reads Parquet files via DataFusion, writes compacted output
|
|
||
| def isAvailable: Boolean = { | ||
| try { | ||
| val version = new Native().getIcebergCompactionVersion() |
There was a problem hiding this comment.
Returns native library version for compatibility checks
| "Iceberg reflection failure: Failed to get filter expressions from SparkScan: " + | ||
| s"${e.getMessage}") | ||
| None | ||
| findMethodInHierarchy(scan.getClass, "filterExpressions").flatMap { filterExpressionsMethod => |
There was a problem hiding this comment.
previously we were assuming a fixed Iceberg class hierarchy, this findMethodInHierarchy walks up the class tree - better approach.
For compaction to work, we need to extract FileScanTask objects from the scan. Different Iceberg scan types expose tasks differently:
SparkBatchQueryScan -> tasks() method
SparkStagedScan -> taskGroups() method (returns groups, need to extract tasks from each)
08ea1bf to
961ac46
Compare
|
Interesting PR, thanks @Shekharrajak! To help start the review process, could you:
|
ba454b4 to
326e6cc
Compare
The rewrite commit API reference : apache/iceberg-rust#2106 - so in this PR commit is happening in JVM, in future PRs we can have it native as well.
The compaction is all about reading small files -> writing back larger files, so it is I/O intensive work. Making read and write in rust is improving the performance: The entire I/O pipeline (Parquet read -> Arrow RecordBatch -> Parquet write) happens in Rust (reading and writing Parquet through the same Arrow memory layout), eliminating the entire Spark orchestration layer, not just replacing individual operators within it. |
| // Measure Spark compaction (single run - compaction is destructive) | ||
| val sparkStart = System.nanoTime() | ||
| val sparkTable = Spark3Util.loadIcebergTable(spark, icebergTableName) | ||
| SparkActions.get(spark).rewriteDataFiles(sparkTable).binPack().execute() |
There was a problem hiding this comment.
default Spark action API
| file_io_properties = fileIOProperties) | ||
| } | ||
|
|
||
| /** Plan file groups using bin-pack strategy. */ |
There was a problem hiding this comment.
Right now, we are using bin-pack runner strategy for small files into groups.
We can extend the other runner and planners.
| * Integration tests for CALL rewrite_data_files() procedure intercepted by CometCompactionRule. | ||
| * Verifies that the SQL procedure path routes through native compaction when enabled. | ||
| */ | ||
| class CometIcebergCompactionProcedureSuite extends CometTestBase { |
There was a problem hiding this comment.
Validation for calling rewrite API using procedure for iceberg table compaction.
414961a to
1baacaa
Compare
|
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. |

Which issue does this PR close?
Ref #3371
PR Description
Rationale for this change
Iceberg table compaction using Spark's default
rewriteDataFiles()action is slow due to Spark shuffle and task scheduling overhead. This PR adds native Rust-based compaction using DataFusion for direct Parquet read/write, achieving 1.5-1.8x speedup over Spark's default compaction.What changes are included in this PR?
CometNativeCompactionclass that executes native compaction (Executes native scan + write via JNI) and commits via Iceberg Java APIspark.comet.iceberg.compaction.enabledconfig optionHow are these changes tested?
CometIcebergCompactionSuitecovering:CometIcebergTPCCompactionBenchmark) measuring performance on lineitem, orders, customer tables