@@ -27,16 +27,14 @@ Comet accelerates Iceberg V2 copy-on-write table writes in two complementary lay
2727 ` V2ExistingTableWriteExec ` command into a pair of operators — a ** committer** and a
2828 ** writer** — so that AQE (and Comet's columnar rules) can see and re-optimise the
2929 data sub-query. The actual file-writing work still runs through Iceberg's stock JVM writer.
30- 2 . ** Native parquet write — detection only at this stage** (further toggle). Once the
31- split-operator plan is in place, a serde inspects each ` IcebergWriteExec ` to decide
32- whether the table's properties and write shape would allow delegating the per-task parquet
33- write to [ iceberg-rust] ( https://github.com/apache/iceberg-rust ) . The detection / fall-back
34- table is wired up and tested; the actual native exec lands in a follow-up commit. With the
35- gate on today, writes still go through the JVM two-op path — only the diagnostic surface is
36- live.
30+ 2 . ** Native parquet write** (further toggle). Once the split-operator plan is in place, the
31+ writer can be swapped for a native variant that delegates the per-task parquet write to
32+ [ iceberg-rust] ( https://github.com/apache/iceberg-rust ) . The committer / table-metadata
33+ update / cache-refresh stay on the JVM exactly as before.
3734
38- Both layers fall back to plain Iceberg-Java when they can't safely apply, and both layers are
39- configured per-session.
35+ Both layers fall back to plain Iceberg-Java when they can't safely apply — and both layers are
36+ configured per-session, so you can opt into the split plan but skip the native write while it
37+ burns in.
4038
4139## What changes about the Iceberg plan
4240
@@ -89,7 +87,7 @@ spark.sql.catalog.<name>.warehouse=...
8987
9088# Comet write toggles (both off by default; can be turned on independently)
9189spark.comet.write.iceberg.splitOperator.enabled=true # layer 1: split-operator plan
92- spark.comet.write.iceberg.nativeAcceleration=true # layer 2: native parquet write (detection only at this stage)
90+ spark.comet.write.iceberg.nativeAcceleration=true # layer 2: native parquet write
9391```
9492
9593` IcebergSparkSessionExtensions ` is ** mandatory on Spark 3.4** for ` UPDATE ` / ` MERGE ` on V2 tables —
@@ -119,6 +117,21 @@ realisable benefit.
119117
120118- [ ¹] Requires ` IcebergSparkSessionExtensions ` (see Configuration above).
121119
120+ Native parquet write coverage is more selective:
121+
122+ | Write | Spark 3.4 | Spark 3.5 | Spark 4.0 |
123+ | ----------------------------------------------| -------------| -----------| -----------|
124+ | ` AppendData ` (` INSERT INTO ` ) | native | native | native |
125+ | ` OverwriteByExpression ` (static) | native | native | native |
126+ | ` OverwritePartitionsDynamic ` | native | native | native |
127+ | ` ReplaceData ` (CoW ` DELETE ` / ` UPDATE ` ) | native | native | native |
128+ | ` ReplaceData ` (CoW ` MERGE ` ) | fallback[ ²] | fallback[ ²] | fallback[ ²] |
129+
130+ - [ ²] MERGE flows through Spark's (or Iceberg 1.5.2's) ` MergeRowsExec ` , which per-row dispatches
131+ the matched / not-matched / not-matched-by-source clauses. There's no native equivalent in
132+ Comet * yet* — this is planned (see Known limitations). The scan / join / sort / shuffle parts
133+ still run native; only the per-row merge dispatch and the file write stay JVM.
134+
122135## Fallback triggers
123136
124137Before accepting a write into the native path, the serde checks for things iceberg-rust either
@@ -153,16 +166,28 @@ Parquet Rust encoder that iceberg-rust drives).
153166
154167Every trigger has a pinning test in ` CometIcebergWriteDetectionSuite ` .
155168
169+ ## Known limitations and planned work
170+
171+ - ** CoW MERGE — native acceleration planned.** The split-operator plan already applies (scan,
172+ join, sort, shuffle run native); only the per-row merge dispatch (` MergeRowsExec ` ) and the
173+ file write itself stay JVM. The path forward is well-understood: add a ` CometMergeRowsExec `
174+ that mirrors Spark's instruction-driven dispatch model (` Keep ` / ` Discard ` / ` Split ` over
175+ matched / not-matched / not-matched-by-source clauses, plus a bitmap-backed cardinality
176+ check). Tracked.
177+
156178## Tests
157179
158180- ** ` CometIcebergWriteActionSuite ` ** — end-to-end scenarios against a temporary Hadoop catalog
159- covering the copy-on-write V2 logical-write variants on the split-operator path, plus parity
160- vs Iceberg-Java's writer, the disabled-config fallback, and the commit-once invariant. Passes
161- on Spark 3.4 / 3.5 / 4.0 with ` IcebergSparkSessionExtensions ` loaded.
181+ covering the copy-on-write V2 logical-write variants on both the split-operator-only and
182+ full-native paths, plus parity vs Iceberg-Java, the disabled-config fallback, the
183+ commit-once invariant, and the documented native-engagement-vs-fallback contract for MERGE.
184+ Passes on Spark 3.4 / 3.5 / 4.0 with ` IcebergSparkSessionExtensions ` loaded.
162185- ** ` CometIcebergWriteDetectionSuite ` ** — tests that build a planned-but-not-executed
163186 ` IcebergWriteExec ` per fall-back trigger and assert
164187 ` CometIcebergNativeWrite.getSupportLevel ` returns ` Unsupported ` with the expected reason
165188 fragment, plus a positive baseline (clean parquet V2 table → ` Compatible ` ).
189+ - ** ` IcebergWriteProtoTranslationSuite ` ** — unit tests pinning the JVM-side property → proto
190+ translation table-by-table.
166191
167192## Abort behaviour
168193
@@ -206,6 +231,66 @@ Two design choices keep this stable:
206231 serialisable-isolation validation to walk the right snapshot range. The strategy calls
207232 ` toBatch ` once and threads the result through both execs.
208233
234+ ## Other AQE / planning interactions
235+
236+ - ** AQE re-fire produces a fresh ` IcebergWriteExec ` over the existing ` CometIcebergWriteExec ` .**
237+ Without protection this would stack two writers. ` CometExecRule ` 's ` IcebergWriteExec ` case
238+ detects this (via ` unwrapToCometIcebergWrite ` ) and returns the existing writer instead —
239+ mirroring the ` DataWritingCommandExec(WriteFilesExec(CometNativeWriteExec)) ` unwrap V1
240+ parquet uses.
241+ - ** ` CometIcebergWriteExec ` is a block boundary in the serialisation loop.** The post-convert
242+ serialization loop in ` CometExecRule ` resets ` firstNativeOp = true ` after
243+ ` CometIcebergWriteExec ` (alongside ` CometNativeWriteExec ` ) so the upstream Comet chain's
244+ outermost node gets ` convertBlock() ` called on it and populates its ` serializedPlanOpt ` .
245+ Without this, an upstream ` CometProject ` reaches execution without a serialised plan and
246+ ` doExecuteColumnar ` throws.
247+ - ** ` CometMetricNode.fromCometPlan ` stops at non-Comet descendants.** AQE-stage nodes
248+ (` AQEShuffleReadExec ` ) can have a null ` session ` field when constructed off the planning
249+ thread; their lazy ` metrics ` val NPEs on ` sparkContext ` . They're not ours to update anyway,
250+ so the metric tree walk just stops at the boundary.
251+
252+ ## Native acceleration internals
253+
254+ The high-level shape: the JVM-side serde marshals everything iceberg-rust needs (table
255+ properties, write schema, partition spec, target file size, writer mode, per-task IDs, …) into
256+ a single proto, and on each task iceberg-rust returns one ` BINARY ` row containing an Iceberg V2
257+ data manifest (Avro container bytes). The JVM-side commit decodes those bytes back into
258+ ` DataFile ` s via ` ManifestFiles.read ` and feeds them to ` BatchWrite.commit(messages) ` . So
259+ everything iceberg-java would do post-write (snapshot id assignment, manifest list aggregation,
260+ commit retries, cache refresh) is untouched — only the per-task parquet emission moved.
261+
262+ Notable details:
263+
264+ - ** Per-task IDs are stamped onto the proto.** ` CometIcebergWriteExec.doExecuteColumnar ` reads
265+ ` TaskContext.getPartitionId() ` and ` taskAttemptId() ` and rebuilds the ` IcebergWrite ` proto's
266+ ` partition_id ` / ` task_attempt_id ` per task before serialising. Without this every task mints
267+ the same filename prefix (` 00000-00000-<op_id> ` ) and OpenDAL's ` CompleteWriter ` trips on the
268+ resulting concurrent-write byte-count drift. Mirrors ` CometNativeWriteExec ` 's parquet pattern.
269+ - ** Spark 4.x ` ReplaceData ` carries extra columns** that the native writer can't pass through
270+ as-is; ` CometIcebergNativeWrite.maybeProjectReplaceDataColumns ` splices a DataFusion
271+ ` Projection ` between the FFI scan and ` IcebergWrite ` to strip them. The method docstring
272+ has the full mechanics.
273+
274+ ## Iceberg 1.5.2 (Spark 3.4) reflection skew
275+
276+ Iceberg 1.5.2 differs in a handful of small ways from 1.8+. All of them are papered over with
277+ reflection in ` IcebergReflection ` rather than per-version source shims:
278+
279+ - ** ` useFanoutWriter ` field renamed.** 1.5.2 calls it ` partitionedFanoutEnabled ` . The
280+ ` getUseFanoutWriterFromSparkWrite ` helper tries the new name first and falls back to the old.
281+ - ** ` GenericManifestFile ` constructor.** Iceberg 1.6 added a 3-arg form
282+ ` (InputFile, int, long) ` that takes V3's ` firstRowId ` . 1.5.2 only has the 2-arg form
283+ ` (InputFile, int) ` . ` newDataManifestFile ` tries the 3-arg form first and falls back.
284+ - ** Null ` snapshotId ` rejected on read.** Iceberg 1.5.2's
285+ ` InheritableMetadataFactory.fromManifest ` throws "Cannot read from ManifestFile with null
286+ (unassigned) snapshot ID". 1.8+ relaxed that check. The helper reflectively writes a ` 0L `
287+ placeholder onto ` GenericManifestFile.snapshotId ` after construction; it's overwritten by
288+ ` BatchWrite.commit ` later and never reaches storage.
289+ - ** Separate logical write node.** Iceberg 1.5.2 ships its own ` ReplaceIcebergData ` logical
290+ node because Spark 3.4 lacks native row-level operation support. Field shape is identical to
291+ Spark 3.5+'s stock ` ReplaceData ` , so ` IcebergWriteStrategy ` matches it by FQCN and the
292+ extracted tuple reuses the existing dispatcher.
293+
209294## Why a custom logical anchor (` IcebergWriteLogical ` )
210295
211296Without ` IcebergWriteLogical ` , AQE's ` reOptimize ` would either:
@@ -220,10 +305,3 @@ Without `IcebergWriteLogical`, AQE's `reOptimize` would either:
220305
221306` IcebergWriteLogical ` sits between the two and lets the strategy re-emit a fresh
222307` IcebergWriteExec ` on every iteration without touching the committer.
223-
224- ## Iceberg 1.5.2 (Spark 3.4) logical-write skew
225-
226- Iceberg 1.5.2 (paired with Spark 3.4) ships its own ` ReplaceIcebergData ` logical node because
227- Spark 3.4 lacks native row-level operation support. Field shape is identical to Spark 3.5+'s
228- stock ` ReplaceData ` , so ` IcebergWriteStrategy ` matches it by FQCN and the extracted tuple feeds
229- the same dispatcher.
0 commit comments