You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
-[²] MERGE flows through Spark's (or Iceberg 1.5.2's) `MergeRowsExec`, which per-row dispatches
131
+
the matched / not-matched / not-matched-by-source clauses. There's no native equivalent in
132
+
Comet *yet* — this is planned (see Known limitations). The scan / join / sort / shuffle parts
133
+
still run native; only the per-row merge dispatch and the file write stay JVM.
134
+
122
135
## Fallback triggers
123
136
124
137
Before accepting a write into the native path, the serde checks for things iceberg-rust either
@@ -144,25 +157,61 @@ Parquet Rust encoder that iceberg-rust drives).
144
157
|`write.parquet.bloom-filter-enabled.column.<col>`| any column set to `true`|**iceberg-rust**: bloom-filter sizing diverges from iceberg-java (same root cause as above — no cap clamping) |
145
158
|`write.parquet.row-group-check-min-record-count`| set to a non-default value (default `100`) |**parquet-rs**: row-group close cadence is byte-driven and doesn't expose parquet-mr's per-row-count knobs, so iceberg-rust can't honour this property |
146
159
|`write.parquet.row-group-check-max-record-count`| set to a non-default value (default `10000`) |**parquet-rs**: same as above |
147
-
|`write.parquet.page-version`| set to a non-default value (default `v1`) |**parquet-rs**: the default writer does not implement DataPageV2 encoding; iceberg-java with `v2` produces a different on-disk format. |
148
-
|`write.parquet.stats-enabled.column.<col>`| any column override set |**parquet-rs**: no per-column statistics-enabled override on `WriterProperties` matching parquet-mr's; iceberg-rust would silently ignore the request. |
160
+
|`write.parquet.stats-enabled.column.<col>`| any column override set **and** the running Iceberg version defines `TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX` (1.10.0+) |**parquet-rs**: no per-column statistics-enabled override matching parquet-mr's. The property was only added in Iceberg 1.10.0; on 1.5.2 / 1.8.1 Iceberg-Java ignores it too, so there's nothing to diverge from and the write is **not** gated on those versions. |
149
161
|`parquet.enable.dictionary`| set |**parquet-rs**: explicit dictionary override is not translated to the native writer settings; parquet-rs would fall back to its own default and diverge from parquet-mr. |
150
-
|`write.metadata.metrics.max-inferred-column-defaults`| set to less than the number of fields in the schema (default `100`) |**iceberg-rust**: doesn't apply `MetricsModes.None` to columns past this limit; iceberg-java does, so the produced column statistics would diverge |
162
+
|`write.metadata.metrics.max-inferred-column-defaults`| the number of projected field IDs exceeds this limit (default `100`) **and** no explicit `write.metadata.metrics.default` is set |**iceberg-rust**: doesn't apply `MetricsModes.None` to columns past this limit. Iceberg only applies that inferred-column truncation when no default mode is configured (a user-set default applies to all columns regardless of count), so the gate is skipped once a default is present. |
163
+
164
+
> `write.parquet.page-version` is intentionally **not** a fallback trigger: no Iceberg version Comet
165
+
> targets (1.5.2 / 1.8.1 / 1.10.0) maps a table property to the Parquet writer version — it is
166
+
> hardwired to `PARQUET_1_0`. Setting it is a no-op in Iceberg-Java exactly as in the native path.
151
167
|`io-impl` (catalog property) | set |**iceberg-rust**: native FileIO is selected by URI scheme (`OpenDalStorageFactory`), so an explicit Java `FileIO` class can't be honoured. |
152
168
| Resolved data-location URI scheme | not in `{file, memory, s3, s3a, gs, oss}`|**iceberg-rust**: `iceberg_common::storage_factory_for` resolves only the listed schemes; `hdfs`, `abfs`/`abfss`, `wasb`/`wasbs`, and similar would crash at write time. |
153
169
154
170
Every trigger has a pinning test in `CometIcebergWriteDetectionSuite`.
155
171
172
+
## Known limitations and planned work
173
+
174
+
-**CoW MERGE — native acceleration planned.** The split-operator plan already applies (scan,
175
+
join, sort, shuffle run native); only the per-row merge dispatch (`MergeRowsExec`) and the
176
+
file write itself stay JVM. The path forward is well-understood: add a `CometMergeRowsExec`
177
+
that mirrors Spark's instruction-driven dispatch model (`Keep` / `Discard` / `Split` over
178
+
matched / not-matched / not-matched-by-source clauses, plus a bitmap-backed cardinality
179
+
check). Tracked.
180
+
181
+
-**`sort_order_id` is always stamped `0` (unsorted).** This matches Iceberg-Java for every
182
+
version Comet targets: the Spark `SparkWrite$WriterFactory` in Iceberg 1.5.2 / 1.8.1 / 1.10.0
183
+
builds `SparkFileWriterFactory`**without**`.dataSortOrder(...)`, so a stock batch append stamps
184
+
`sort_order_id = 0` on committed data files even for a table that has a non-default sort order.
185
+
The native writer does the same. (Iceberg 1.11+ adds
186
+
`SparkWriteConf.outputSortOrderId(writeRequirements)` and wires the resolved id into the writer
187
+
factory; Comet reflects that resolver when present, so behaviour stays correct if the pinned
188
+
runtime is bumped.)
189
+
190
+
-**Nested (list/map) column bounds differ from Iceberg-Java.** iceberg-java's
191
+
`ParquetUtil.shouldStoreBounds` suppresses `lower_bounds` / `upper_bounds` for any field reached
192
+
through a repeated (list or map) field, so collection-element primitives get no manifest bounds.
193
+
iceberg-rust's `MinMaxColAggregator` computes bounds for every leaf primitive — including
194
+
list-element / map-key / map-value fields — so a native write to a schema containing list or map
195
+
columns produces **extra**`lower_bounds` / `upper_bounds` entries for those nested field IDs that
196
+
Iceberg-Java would omit. `column_sizes` / `value_counts` / `null_value_counts` match. The extra
197
+
bounds are not used by Iceberg's metrics evaluators (predicates can't address list/map elements),
198
+
so the functional risk is low, but the manifest is not byte-identical to the JVM path for nested
199
+
schemas. Strict parity would require stripping those nested-field bounds before manifest encoding
200
+
(or in iceberg-rust); not yet done.
201
+
156
202
## Tests
157
203
158
204
-**`CometIcebergWriteActionSuite`** — end-to-end scenarios against a temporary Hadoop catalog
159
-
covering the copy-on-write V2 logical-write variants on the split-operator path, plus parity
160
-
vs Iceberg-Java's writer, the disabled-config fallback, and the commit-once invariant. Passes
161
-
on Spark 3.4 / 3.5 / 4.0 with `IcebergSparkSessionExtensions` loaded.
205
+
covering the copy-on-write V2 logical-write variants on both the split-operator-only and
206
+
full-native paths, plus parity vs Iceberg-Java, the disabled-config fallback, the
207
+
commit-once invariant, and the documented native-engagement-vs-fallback contract for MERGE.
208
+
Passes on Spark 3.4 / 3.5 / 4.0 with `IcebergSparkSessionExtensions` loaded.
162
209
-**`CometIcebergWriteDetectionSuite`** — tests that build a planned-but-not-executed
163
210
`IcebergWriteExec` per fall-back trigger and assert
164
211
`CometIcebergNativeWrite.getSupportLevel` returns `Unsupported` with the expected reason
165
212
fragment, plus a positive baseline (clean parquet V2 table → `Compatible`).
213
+
-**`IcebergWriteProtoTranslationSuite`** — unit tests pinning the JVM-side property → proto
214
+
translation table-by-table.
166
215
167
216
## Abort behaviour
168
217
@@ -206,6 +255,66 @@ Two design choices keep this stable:
206
255
serialisable-isolation validation to walk the right snapshot range. The strategy calls
207
256
`toBatch` once and threads the result through both execs.
208
257
258
+
## Other AQE / planning interactions
259
+
260
+
-**AQE re-fire produces a fresh `IcebergWriteExec` over the existing `CometIcebergWriteExec`.**
261
+
Without protection this would stack two writers. `CometExecRule`'s `IcebergWriteExec` case
262
+
detects this (via `unwrapToCometIcebergWrite`) and returns the existing writer instead —
263
+
mirroring the `DataWritingCommandExec(WriteFilesExec(CometNativeWriteExec))` unwrap V1
264
+
parquet uses.
265
+
-**`CometIcebergWriteExec` is a block boundary in the serialisation loop.** The post-convert
266
+
serialization loop in `CometExecRule` resets `firstNativeOp = true` after
267
+
`CometIcebergWriteExec` (alongside `CometNativeWriteExec`) so the upstream Comet chain's
268
+
outermost node gets `convertBlock()` called on it and populates its `serializedPlanOpt`.
269
+
Without this, an upstream `CometProject` reaches execution without a serialised plan and
270
+
`doExecuteColumnar` throws.
271
+
-**`CometMetricNode.fromCometPlan` stops at non-Comet descendants.** AQE-stage nodes
272
+
(`AQEShuffleReadExec`) can have a null `session` field when constructed off the planning
273
+
thread; their lazy `metrics` val NPEs on `sparkContext`. They're not ours to update anyway,
274
+
so the metric tree walk just stops at the boundary.
275
+
276
+
## Native acceleration internals
277
+
278
+
The high-level shape: the JVM-side serde marshals everything iceberg-rust needs (table
0 commit comments