Skip to content

Commit 7118d08

Browse files
author
Jordan Epstein
committed
feat: implement native Iceberg V2 writer via iceberg-rust
1 parent ddd80da commit 7118d08

26 files changed

Lines changed: 3551 additions & 157 deletions

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ jobs:
343343
org.apache.comet.CometIcebergWriteActionSuite
344344
org.apache.comet.CometIcebergWriteDetectionSuite
345345
org.apache.comet.iceberg.IcebergReflectionSuite
346+
org.apache.comet.serde.operator.IcebergWriteProtoTranslationSuite
346347
org.apache.comet.csv.CometCsvNativeReadSuite
347348
org.apache.comet.CometFuzzTestSuite
348349
org.apache.comet.CometFuzzIcebergSuite

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ jobs:
183183
org.apache.comet.CometIcebergWriteActionSuite
184184
org.apache.comet.CometIcebergWriteDetectionSuite
185185
org.apache.comet.iceberg.IcebergReflectionSuite
186+
org.apache.comet.serde.operator.IcebergWriteProtoTranslationSuite
186187
org.apache.comet.csv.CometCsvNativeReadSuite
187188
org.apache.comet.CometFuzzTestSuite
188189
org.apache.comet.CometFuzzIcebergSuite

docs/source/user-guide/latest/iceberg-writes.md

Lines changed: 125 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,14 @@ Comet accelerates Iceberg V2 copy-on-write table writes in two complementary lay
2727
`V2ExistingTableWriteExec` command into a pair of operators — a **committer** and a
2828
**writer** — so that AQE (and Comet's columnar rules) can see and re-optimise the
2929
data sub-query. The actual file-writing work still runs through Iceberg's stock JVM writer.
30-
2. **Native parquet write — detection only at this stage** (further toggle). Once the
31-
split-operator plan is in place, a serde inspects each `IcebergWriteExec` to decide
32-
whether the table's properties and write shape would allow delegating the per-task parquet
33-
write to [iceberg-rust](https://github.com/apache/iceberg-rust). The detection / fall-back
34-
table is wired up and tested; the actual native exec lands in a follow-up commit. With the
35-
gate on today, writes still go through the JVM two-op path — only the diagnostic surface is
36-
live.
30+
2. **Native parquet write** (further toggle). Once the split-operator plan is in place, the
31+
writer can be swapped for a native variant that delegates the per-task parquet write to
32+
[iceberg-rust](https://github.com/apache/iceberg-rust). The committer / table-metadata
33+
update / cache-refresh stay on the JVM exactly as before.
3734

38-
Both layers fall back to plain Iceberg-Java when they can't safely apply, and both layers are
39-
configured per-session.
35+
Both layers fall back to plain Iceberg-Java when they can't safely apply — and both layers are
36+
configured per-session, so you can opt into the split plan but skip the native write while it
37+
burns in.
4038

4139
## What changes about the Iceberg plan
4240

@@ -89,7 +87,7 @@ spark.sql.catalog.<name>.warehouse=...
8987
9088
# Comet write toggles (both off by default; can be turned on independently)
9189
spark.comet.write.iceberg.splitOperator.enabled=true # layer 1: split-operator plan
92-
spark.comet.write.iceberg.nativeAcceleration=true # layer 2: native parquet write (detection only at this stage)
90+
spark.comet.write.iceberg.nativeAcceleration=true # layer 2: native parquet write
9391
```
9492

9593
`IcebergSparkSessionExtensions` is **mandatory on Spark 3.4** for `UPDATE` / `MERGE` on V2 tables —
@@ -119,6 +117,21 @@ realisable benefit.
119117

120118
- [¹] Requires `IcebergSparkSessionExtensions` (see Configuration above).
121119

120+
Native parquet write coverage is more selective:
121+
122+
| Write | Spark 3.4 | Spark 3.5 | Spark 4.0 |
123+
|----------------------------------------------|-------------|-----------|-----------|
124+
| `AppendData` (`INSERT INTO`) | native | native | native |
125+
| `OverwriteByExpression` (static) | native | native | native |
126+
| `OverwritePartitionsDynamic` | native | native | native |
127+
| `ReplaceData` (CoW `DELETE` / `UPDATE`) | native | native | native |
128+
| `ReplaceData` (CoW `MERGE`) | fallback[²] | fallback[²] | fallback[²] |
129+
130+
- [²] MERGE flows through Spark's (or Iceberg 1.5.2's) `MergeRowsExec`, which per-row dispatches
131+
the matched / not-matched / not-matched-by-source clauses. There's no native equivalent in
132+
Comet *yet* — this is planned (see Known limitations). The scan / join / sort / shuffle parts
133+
still run native; only the per-row merge dispatch and the file write stay JVM.
134+
122135
## Fallback triggers
123136

124137
Before accepting a write into the native path, the serde checks for things iceberg-rust either
@@ -144,25 +157,61 @@ Parquet Rust encoder that iceberg-rust drives).
144157
| `write.parquet.bloom-filter-enabled.column.<col>` | any column set to `true` | **iceberg-rust**: bloom-filter sizing diverges from iceberg-java (same root cause as above — no cap clamping) |
145158
| `write.parquet.row-group-check-min-record-count` | set to a non-default value (default `100`) | **parquet-rs**: row-group close cadence is byte-driven and doesn't expose parquet-mr's per-row-count knobs, so iceberg-rust can't honour this property |
146159
| `write.parquet.row-group-check-max-record-count` | set to a non-default value (default `10000`) | **parquet-rs**: same as above |
147-
| `write.parquet.page-version` | set to a non-default value (default `v1`) | **parquet-rs**: the default writer does not implement DataPageV2 encoding; iceberg-java with `v2` produces a different on-disk format. |
148-
| `write.parquet.stats-enabled.column.<col>` | any column override set | **parquet-rs**: no per-column statistics-enabled override on `WriterProperties` matching parquet-mr's; iceberg-rust would silently ignore the request. |
160+
| `write.parquet.stats-enabled.column.<col>` | any column override set **and** the running Iceberg version defines `TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX` (1.10.0+) | **parquet-rs**: no per-column statistics-enabled override matching parquet-mr's. The property was only added in Iceberg 1.10.0; on 1.5.2 / 1.8.1 Iceberg-Java ignores it too, so there's nothing to diverge from and the write is **not** gated on those versions. |
149161
| `parquet.enable.dictionary` | set | **parquet-rs**: explicit dictionary override is not translated to the native writer settings; parquet-rs would fall back to its own default and diverge from parquet-mr. |
150-
| `write.metadata.metrics.max-inferred-column-defaults` | set to less than the number of fields in the schema (default `100`) | **iceberg-rust**: doesn't apply `MetricsModes.None` to columns past this limit; iceberg-java does, so the produced column statistics would diverge |
162+
| `write.metadata.metrics.max-inferred-column-defaults` | the number of projected field IDs exceeds this limit (default `100`) **and** no explicit `write.metadata.metrics.default` is set | **iceberg-rust**: doesn't apply `MetricsModes.None` to columns past this limit. Iceberg only applies that inferred-column truncation when no default mode is configured (a user-set default applies to all columns regardless of count), so the gate is skipped once a default is present. |
163+
164+
> `write.parquet.page-version` is intentionally **not** a fallback trigger: no Iceberg version Comet
165+
> targets (1.5.2 / 1.8.1 / 1.10.0) maps a table property to the Parquet writer version — it is
166+
> hardwired to `PARQUET_1_0`. Setting it is a no-op in Iceberg-Java exactly as in the native path.
151167
| `io-impl` (catalog property) | set | **iceberg-rust**: native FileIO is selected by URI scheme (`OpenDalStorageFactory`), so an explicit Java `FileIO` class can't be honoured. |
152168
| Resolved data-location URI scheme | not in `{file, memory, s3, s3a, gs, oss}` | **iceberg-rust**: `iceberg_common::storage_factory_for` resolves only the listed schemes; `hdfs`, `abfs`/`abfss`, `wasb`/`wasbs`, and similar would crash at write time. |
153169

154170
Every trigger has a pinning test in `CometIcebergWriteDetectionSuite`.
155171

172+
## Known limitations and planned work
173+
174+
- **CoW MERGE — native acceleration planned.** The split-operator plan already applies (scan,
175+
join, sort, shuffle run native); only the per-row merge dispatch (`MergeRowsExec`) and the
176+
file write itself stay JVM. The path forward is well-understood: add a `CometMergeRowsExec`
177+
that mirrors Spark's instruction-driven dispatch model (`Keep` / `Discard` / `Split` over
178+
matched / not-matched / not-matched-by-source clauses, plus a bitmap-backed cardinality
179+
check). Tracked.
180+
181+
- **`sort_order_id` is always stamped `0` (unsorted).** This matches Iceberg-Java for every
182+
version Comet targets: the Spark `SparkWrite$WriterFactory` in Iceberg 1.5.2 / 1.8.1 / 1.10.0
183+
builds `SparkFileWriterFactory` **without** `.dataSortOrder(...)`, so a stock batch append stamps
184+
`sort_order_id = 0` on committed data files even for a table that has a non-default sort order.
185+
The native writer does the same. (Iceberg 1.11+ adds
186+
`SparkWriteConf.outputSortOrderId(writeRequirements)` and wires the resolved id into the writer
187+
factory; Comet reflects that resolver when present, so behaviour stays correct if the pinned
188+
runtime is bumped.)
189+
190+
- **Nested (list/map) column bounds differ from Iceberg-Java.** iceberg-java's
191+
`ParquetUtil.shouldStoreBounds` suppresses `lower_bounds` / `upper_bounds` for any field reached
192+
through a repeated (list or map) field, so collection-element primitives get no manifest bounds.
193+
iceberg-rust's `MinMaxColAggregator` computes bounds for every leaf primitive — including
194+
list-element / map-key / map-value fields — so a native write to a schema containing list or map
195+
columns produces **extra** `lower_bounds` / `upper_bounds` entries for those nested field IDs that
196+
Iceberg-Java would omit. `column_sizes` / `value_counts` / `null_value_counts` match. The extra
197+
bounds are not used by Iceberg's metrics evaluators (predicates can't address list/map elements),
198+
so the functional risk is low, but the manifest is not byte-identical to the JVM path for nested
199+
schemas. Strict parity would require stripping those nested-field bounds before manifest encoding
200+
(or in iceberg-rust); not yet done.
201+
156202
## Tests
157203

158204
- **`CometIcebergWriteActionSuite`** — end-to-end scenarios against a temporary Hadoop catalog
159-
covering the copy-on-write V2 logical-write variants on the split-operator path, plus parity
160-
vs Iceberg-Java's writer, the disabled-config fallback, and the commit-once invariant. Passes
161-
on Spark 3.4 / 3.5 / 4.0 with `IcebergSparkSessionExtensions` loaded.
205+
covering the copy-on-write V2 logical-write variants on both the split-operator-only and
206+
full-native paths, plus parity vs Iceberg-Java, the disabled-config fallback, the
207+
commit-once invariant, and the documented native-engagement-vs-fallback contract for MERGE.
208+
Passes on Spark 3.4 / 3.5 / 4.0 with `IcebergSparkSessionExtensions` loaded.
162209
- **`CometIcebergWriteDetectionSuite`** — tests that build a planned-but-not-executed
163210
`IcebergWriteExec` per fall-back trigger and assert
164211
`CometIcebergNativeWrite.getSupportLevel` returns `Unsupported` with the expected reason
165212
fragment, plus a positive baseline (clean parquet V2 table → `Compatible`).
213+
- **`IcebergWriteProtoTranslationSuite`** — unit tests pinning the JVM-side property → proto
214+
translation table-by-table.
166215

167216
## Abort behaviour
168217

@@ -206,6 +255,66 @@ Two design choices keep this stable:
206255
serialisable-isolation validation to walk the right snapshot range. The strategy calls
207256
`toBatch` once and threads the result through both execs.
208257

258+
## Other AQE / planning interactions
259+
260+
- **AQE re-fire produces a fresh `IcebergWriteExec` over the existing `CometIcebergWriteExec`.**
261+
Without protection this would stack two writers. `CometExecRule`'s `IcebergWriteExec` case
262+
detects this (via `unwrapToCometIcebergWrite`) and returns the existing writer instead —
263+
mirroring the `DataWritingCommandExec(WriteFilesExec(CometNativeWriteExec))` unwrap V1
264+
parquet uses.
265+
- **`CometIcebergWriteExec` is a block boundary in the serialisation loop.** The post-convert
266+
serialization loop in `CometExecRule` resets `firstNativeOp = true` after
267+
`CometIcebergWriteExec` (alongside `CometNativeWriteExec`) so the upstream Comet chain's
268+
outermost node gets `convertBlock()` called on it and populates its `serializedPlanOpt`.
269+
Without this, an upstream `CometProject` reaches execution without a serialised plan and
270+
`doExecuteColumnar` throws.
271+
- **`CometMetricNode.fromCometPlan` stops at non-Comet descendants.** AQE-stage nodes
272+
(`AQEShuffleReadExec`) can have a null `session` field when constructed off the planning
273+
thread; their lazy `metrics` val NPEs on `sparkContext`. They're not ours to update anyway,
274+
so the metric tree walk just stops at the boundary.
275+
276+
## Native acceleration internals
277+
278+
The high-level shape: the JVM-side serde marshals everything iceberg-rust needs (table
279+
properties, write schema, partition spec, target file size, writer mode, per-task IDs, …) into
280+
a single proto, and on each task iceberg-rust returns one `BINARY` row containing an Iceberg V2
281+
data manifest (Avro container bytes). The JVM-side commit decodes those bytes back into
282+
`DataFile`s via `ManifestFiles.read` and feeds them to `BatchWrite.commit(messages)`. So
283+
everything iceberg-java would do post-write (snapshot id assignment, manifest list aggregation,
284+
commit retries, cache refresh) is untouched — only the per-task parquet emission moved.
285+
286+
Notable details:
287+
288+
- **Per-task IDs are stamped onto the proto.** `CometIcebergWriteExec.doExecuteColumnar` reads
289+
`TaskContext.getPartitionId()` and `taskAttemptId()` and rebuilds the `IcebergWrite` proto's
290+
`partition_id` / `task_attempt_id` per task before serialising. Without this every task mints
291+
the same filename prefix (`00000-00000-<op_id>`) and OpenDAL's `CompleteWriter` trips on the
292+
resulting concurrent-write byte-count drift. Mirrors `CometNativeWriteExec`'s parquet pattern.
293+
- **Spark 4.x `ReplaceData` carries extra columns** that the native writer can't pass through
294+
as-is; `CometIcebergNativeWrite.maybeProjectReplaceDataColumns` splices a DataFusion
295+
`Projection` between the FFI scan and `IcebergWrite` to strip them. The method docstring
296+
has the full mechanics.
297+
298+
## Iceberg 1.5.2 (Spark 3.4) reflection skew
299+
300+
Iceberg 1.5.2 differs in a handful of small ways from 1.8+. All of them are papered over with
301+
reflection in `IcebergReflection` rather than per-version source shims:
302+
303+
- **`useFanoutWriter` field renamed.** 1.5.2 calls it `partitionedFanoutEnabled`. The
304+
`getUseFanoutWriterFromSparkWrite` helper tries the new name first and falls back to the old.
305+
- **`GenericManifestFile` constructor.** Iceberg 1.6 added a 3-arg form
306+
`(InputFile, int, long)` that takes V3's `firstRowId`. 1.5.2 only has the 2-arg form
307+
`(InputFile, int)`. `newDataManifestFile` tries the 3-arg form first and falls back.
308+
- **Null `snapshotId` rejected on read.** Iceberg 1.5.2's
309+
`InheritableMetadataFactory.fromManifest` throws "Cannot read from ManifestFile with null
310+
(unassigned) snapshot ID". 1.8+ relaxed that check. The helper reflectively writes a `0L`
311+
placeholder onto `GenericManifestFile.snapshotId` after construction; it's overwritten by
312+
`BatchWrite.commit` later and never reaches storage.
313+
- **Separate logical write node.** Iceberg 1.5.2 ships its own `ReplaceIcebergData` logical
314+
node because Spark 3.4 lacks native row-level operation support. Field shape is identical to
315+
Spark 3.5+'s stock `ReplaceData`, so `IcebergWriteStrategy` matches it by FQCN and the
316+
extracted tuple reuses the existing dispatcher.
317+
209318
## Why a custom logical anchor (`IcebergWriteLogical`)
210319

211320
Without `IcebergWriteLogical`, AQE's `reOptimize` would either:
@@ -220,10 +329,3 @@ Without `IcebergWriteLogical`, AQE's `reOptimize` would either:
220329

221330
`IcebergWriteLogical` sits between the two and lets the strategy re-emit a fresh
222331
`IcebergWriteExec` on every iteration without touching the committer.
223-
224-
## Iceberg 1.5.2 (Spark 3.4) logical-write skew
225-
226-
Iceberg 1.5.2 (paired with Spark 3.4) ships its own `ReplaceIcebergData` logical node because
227-
Spark 3.4 lacks native row-level operation support. Field shape is identical to Spark 3.5+'s
228-
stock `ReplaceData`, so `IcebergWriteStrategy` matches it by FQCN and the extracted tuple feeds
229-
the same dispatcher.

native/core/src/execution/jni_api.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ fn op_name(op: &OpStruct) -> &'static str {
236236
OpStruct::Window(_) => "Window",
237237
OpStruct::NativeScan(_) => "NativeScan",
238238
OpStruct::IcebergScan(_) => "IcebergScan",
239+
OpStruct::IcebergWrite(_) => "IcebergWrite",
239240
OpStruct::ParquetWriter(_) => "ParquetWriter",
240241
OpStruct::Explode(_) => "Explode",
241242
OpStruct::CsvScan(_) => "CsvScan",
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Helpers shared between the Iceberg scan and Iceberg write operators.
19+
20+
use std::collections::HashMap;
21+
use std::sync::Arc;
22+
23+
use datafusion::common::DataFusionError;
24+
use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
25+
use iceberg_storage_opendal::OpenDalStorageFactory;
26+
27+
/// Pick an OpenDAL storage backend from a URI's scheme. `file` (or no scheme) falls through to
28+
/// the local file system. `memory` is used by the write path to assemble manifest bytes that
29+
/// stay entirely in-process.
30+
pub(crate) fn storage_factory_for(path: &str) -> Result<Arc<dyn StorageFactory>, DataFusionError> {
31+
let scheme = if path.contains("://") {
32+
path.split("://").next().unwrap_or("file")
33+
} else {
34+
"file"
35+
};
36+
match scheme {
37+
"file" => Ok(Arc::new(OpenDalStorageFactory::Fs)),
38+
"memory" => Ok(Arc::new(OpenDalStorageFactory::Memory)),
39+
"s3" | "s3a" => Ok(Arc::new(OpenDalStorageFactory::S3 {
40+
customized_credential_load: None,
41+
})),
42+
"gs" => Ok(Arc::new(OpenDalStorageFactory::Gcs)),
43+
"oss" => Ok(Arc::new(OpenDalStorageFactory::Oss)),
44+
_ => Err(DataFusionError::Execution(format!(
45+
"Unsupported storage scheme: {scheme}"
46+
))),
47+
}
48+
}
49+
50+
/// Build a `FileIO` whose storage scheme is inferred from `reference_path` and whose properties
51+
/// come from the catalog. The reference path is the metadata location for reads or the data
52+
/// location for writes — anything that carries the right URI scheme.
53+
pub(crate) fn load_file_io(
54+
catalog_properties: &HashMap<String, String>,
55+
reference_path: &str,
56+
) -> Result<FileIO, DataFusionError> {
57+
let factory = storage_factory_for(reference_path)?;
58+
let mut file_io_builder = FileIOBuilder::new(factory);
59+
for (key, value) in catalog_properties {
60+
file_io_builder = file_io_builder.with_prop(key, value);
61+
}
62+
Ok(file_io_builder.build())
63+
}

0 commit comments

Comments
 (0)