Skip to content

Commit 92d58ea

Browse files
author
Jordan Epstein
committed
feat: detect Iceberg V2 writes and emit fall-back reasons
1 parent b5aa839 commit 92d58ea

7 files changed

Lines changed: 1098 additions & 21 deletions

File tree

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,7 @@ jobs:
288288
org.apache.comet.CometIcebergNativeSuite
289289
org.apache.comet.CometIcebergRewriteActionSuite
290290
org.apache.comet.CometIcebergWriteActionSuite
291+
org.apache.comet.CometIcebergWriteDetectionSuite
291292
org.apache.comet.iceberg.IcebergReflectionSuite
292293
org.apache.comet.csv.CometCsvNativeReadSuite
293294
org.apache.comet.CometFuzzTestSuite

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ jobs:
130130
org.apache.comet.CometIcebergNativeSuite
131131
org.apache.comet.CometIcebergRewriteActionSuite
132132
org.apache.comet.CometIcebergWriteActionSuite
133+
org.apache.comet.CometIcebergWriteDetectionSuite
133134
org.apache.comet.iceberg.IcebergReflectionSuite
134135
org.apache.comet.csv.CometCsvNativeReadSuite
135136
org.apache.comet.CometFuzzTestSuite

docs/source/user-guide/latest/iceberg-writes.md

Lines changed: 67 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,22 @@
2121

2222
## Overview
2323

24-
Comet's first layer for accelerating Iceberg V2 writes is a **split-operator plan**: Comet
25-
rewrites Iceberg's single `V2ExistingTableWriteExec` command into a pair of operators — a
26-
**committer** and a **writer** — so that AQE and Comet's columnar rules can see
27-
and re-optimise the data sub-query. The actual file-writing work still runs through Iceberg's
28-
stock JVM writer; nothing in this layer changes Iceberg's commit semantics. A further toggle for
29-
delegating the per-task parquet write to iceberg-rust is planned but not part of this layer.
30-
31-
The split-operator plan is off by default; turn it on per session. Iceberg-Java writes the data
32-
unchanged when it's off.
24+
Comet accelerates Iceberg V2 copy-on-write table writes in two complementary layers:
25+
26+
1. **Split-operator plan** (on by toggle). Comet rewrites Iceberg's single
27+
`V2ExistingTableWriteExec` command into a pair of operators — a **committer** and a
28+
**writer** — so that AQE (and Comet's columnar rules) can see and re-optimise the
29+
data sub-query. The actual file-writing work still runs through Iceberg's stock JVM writer.
30+
2. **Native parquet write — detection only at this stage** (further toggle). Once the
31+
split-operator plan is in place, a serde inspects each `IcebergWriteExec` to decide
32+
whether the table's properties and write shape would allow delegating the per-task parquet
33+
write to [iceberg-rust](https://github.com/apache/iceberg-rust). The detection / fall-back
34+
table is wired up and tested; the actual native exec lands in a follow-up commit. With the
35+
gate on today, writes still go through the JVM two-op path — only the diagnostic surface is
36+
live.
37+
38+
Both layers fall back to plain Iceberg-Java when they can't safely apply, and both layers are
39+
configured per-session.
3340

3441
## What changes about the Iceberg plan
3542

@@ -69,7 +76,8 @@ would have made internally.
6976

7077
## Configuration
7178

72-
Standard Comet + Iceberg setup ([`iceberg.md`](iceberg.md)) plus the split-operator toggle:
79+
Standard Comet + Iceberg setup ([`iceberg.md`](iceberg.md)) plus two toggles for the write-side
80+
behaviour:
7381

7482
```
7583
# Standard Comet / Iceberg wiring
@@ -79,16 +87,17 @@ spark.sql.catalog.<name>=org.apache.iceberg.spark.SparkCatalog
7987
spark.sql.catalog.<name>.type=hadoop # or hive / glue / rest / ...
8088
spark.sql.catalog.<name>.warehouse=...
8189
82-
# Comet write toggle (off by default)
83-
spark.comet.write.iceberg.splitOperator.enabled=true
90+
# Comet write toggles (both off by default; can be turned on independently)
91+
spark.comet.write.iceberg.splitOperator.enabled=true # layer 1: split-operator plan
92+
spark.comet.write.iceberg.nativeAcceleration=true # layer 2: native parquet write (detection only at this stage)
8493
```
8594

86-
`IcebergSparkSessionExtensions` is **mandatory on Spark 3.4** for `UPDATE` / `MERGE` on V2
87-
tables — Spark 3.4's stock planner rejects `UPDATE TABLE` on V2 sources, and Iceberg's analyzer
88-
extension (`RewriteUpdateTable` / `RewriteMergeIntoTable`) is what provides the rewrite. On
89-
Spark 3.5+ the extension is optional but recommended (Spark added native row-level operation
90-
support in 3.5). `INSERT INTO` / `INSERT OVERWRITE` / `DELETE FROM` work without the extension
91-
on every Spark version.
95+
`IcebergSparkSessionExtensions` is **mandatory on Spark 3.4** for `UPDATE` / `MERGE` on V2 tables —
96+
Spark 3.4's stock planner rejects `UPDATE TABLE` on V2 sources, and Iceberg's analyzer extension
97+
(`RewriteUpdateTable` / `RewriteMergeIntoTable`) is what provides the rewrite. On Spark 3.5+ the
98+
extension is optional but recommended (Spark added native row-level operation support in 3.5).
99+
`INSERT INTO` / `INSERT OVERWRITE` / `DELETE FROM` work without the extension on every Spark
100+
version.
92101

93102
If `splitOperator.enabled=false`, Comet leaves Iceberg's stock plan alone — every write goes
94103
straight through Iceberg-Java.
@@ -110,18 +119,55 @@ realisable benefit.
110119

111120
- [¹] Requires `IcebergSparkSessionExtensions` (see Configuration above).
112121

122+
## Fallback triggers
123+
124+
Before accepting a write into the native path, the serde checks for things iceberg-rust either
125+
doesn't support or implements differently from iceberg-java. The first matching trigger short-
126+
circuits the conversion and the plan stays on the JVM two-op path.
127+
128+
Each row attributes the gap to the layer that's the actual root cause: **iceberg-rust** (the
129+
high-level Iceberg writer / commit / metadata logic) or **parquet-rs** (the underlying Apache
130+
Parquet Rust encoder that iceberg-rust drives).
131+
132+
| Property / state | Falls back when … | Why |
133+
|---|---|---|
134+
| resolved write format | `SparkWrite.format` (i.e. effective `write-format` option overlaid on `write.format.default`) is not `parquet` | **iceberg-rust**: writer stack is parquet-only today. The check reads the resolved format, so per-write `option("write-format", "orc")` is honoured even when the table default is parquet. |
135+
| `write.object-storage.enabled` | `true` | **iceberg-rust**: no hashed-prefix object-storage location generator |
136+
| `write.location-provider.impl` | set | **iceberg-rust**: can't load a Java `LocationProvider` class |
137+
| `format-version` | `>= 3` | **iceberg-rust**: V3 row lineage / DVs / variant types not implemented |
138+
| `encryption.*` (any key) | set | **iceberg-rust + parquet-rs**: no Parquet modular encryption support in either layer |
139+
| `write.metadata.metrics.default` | mentions `counts` | **iceberg-rust**: doesn't implement Iceberg's `counts`-without-bounds metrics mode. parquet-rs supports stats on or off but has no "value counts only" knob, so iceberg-rust would need to track and emit those counts itself; it doesn't. |
140+
| `write.metadata.metrics.default` | `none` | **iceberg-rust**: always populates `column_sizes` / `value_counts` / `null_value_counts` / bounds from the parquet footer regardless of the requested mode; iceberg-java's `none` emits an empty `DataFile.metrics`, so a native write would produce strictly richer manifests. |
141+
| `write.metadata.metrics.column.<col>` | any column set to `counts` | **iceberg-rust**: same as default `counts`, per-column |
142+
| `write.metadata.metrics.column.<col>` | any column set to `none` | **iceberg-rust**: same as default `none`, per-column |
143+
| `write.parquet.bloom-filter-max-bytes` | explicitly set | **iceberg-rust**: doesn't enforce iceberg-java's bloom-filter byte cap. parquet-rs has no global cap parameter on `WriterProperties`, so iceberg-rust would need to clamp NDV / FPP itself before passing them through. |
144+
| `write.parquet.bloom-filter-enabled.column.<col>` | any column set to `true` | **iceberg-rust**: bloom-filter sizing diverges from iceberg-java (same root cause as above — no cap clamping) |
145+
| `write.parquet.row-group-check-min-record-count` | set to a non-default value (default `100`) | **parquet-rs**: row-group close cadence is byte-driven and doesn't expose parquet-mr's per-row-count knobs, so iceberg-rust can't honour this property |
146+
| `write.parquet.row-group-check-max-record-count` | set to a non-default value (default `10000`) | **parquet-rs**: same as above |
147+
| `write.parquet.page-version` | set to a non-default value (default `v1`) | **parquet-rs**: the default writer does not implement DataPageV2 encoding; iceberg-java with `v2` produces a different on-disk format. |
148+
| `write.parquet.stats-enabled.column.<col>` | any column override set | **parquet-rs**: no per-column statistics-enabled override on `WriterProperties` matching parquet-mr's; iceberg-rust would silently ignore the request. |
149+
| `parquet.enable.dictionary` | set | **parquet-rs**: explicit dictionary override is not translated to the native writer settings; parquet-rs would fall back to its own default and diverge from parquet-mr. |
150+
| `write.metadata.metrics.max-inferred-column-defaults` | set to less than the number of fields in the schema (default `100`) | **iceberg-rust**: doesn't apply `MetricsModes.None` to columns past this limit; iceberg-java does, so the produced column statistics would diverge |
151+
| `io-impl` (catalog property) | set | **iceberg-rust**: native FileIO is selected by URI scheme (`OpenDalStorageFactory`), so an explicit Java `FileIO` class can't be honoured. |
152+
| Resolved data-location URI scheme | not in `{file, memory, s3, s3a, gs, oss}` | **iceberg-rust**: `iceberg_common::storage_factory_for` resolves only the listed schemes; `hdfs`, `abfs`/`abfss`, `wasb`/`wasbs`, and similar would crash at write time. |
153+
154+
Every trigger has a pinning test in `CometIcebergWriteDetectionSuite`.
155+
113156
## Tests
114157

115158
- **`CometIcebergWriteActionSuite`** — end-to-end scenarios against a temporary Hadoop catalog
116159
covering the copy-on-write V2 logical-write variants on the split-operator path, plus parity
117160
vs Iceberg-Java's writer, the disabled-config fallback, and the commit-once invariant. Passes
118161
on Spark 3.4 / 3.5 / 4.0 with `IcebergSparkSessionExtensions` loaded.
162+
- **`CometIcebergWriteDetectionSuite`** — tests that build a planned-but-not-executed
163+
`IcebergWriteExec` per fall-back trigger and assert
164+
`CometIcebergNativeWrite.getSupportLevel` returns `Unsupported` with the expected reason
165+
fragment, plus a positive baseline (clean parquet V2 table → `Compatible`).
119166

120167
## Abort behaviour
121168

122-
The writer calls `writer.abort()` per task on failure to release task-level resources.
123-
The committer (`IcebergCommitExec.run()`) calls `BatchWrite.abort(messages)` if `commit()`
124-
raises.
169+
The writer calls `writer.abort()` per task on failure to release task-level resources. The
170+
committer (`IcebergCommitExec.run()`) calls `BatchWrite.abort(messages)` if `commit()` raises.
125171

126172
If task writers stage files locally but their commit messages never reach the driver (e.g.
127173
driver crash mid-collect), the staged Parquet files become **unreferenced orphans**. Iceberg's

spark/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,22 @@ object CometConf extends ShimCometConf {
133133
.booleanConf
134134
.createWithDefault(false)
135135

136+
val COMET_ICEBERG_NATIVE_WRITE_ENABLED: ConfigEntry[Boolean] =
137+
conf("spark.comet.write.iceberg.nativeAcceleration")
138+
.category(CATEGORY_TESTING)
139+
.doc(
140+
"Whether to delegate the executor-side Parquet write to Comet's native (iceberg-rust) " +
141+
"writer when the table's properties allow it. Requires " +
142+
"`spark.comet.write.iceberg.splitOperator.enabled = true`. Off by default. " +
143+
"Falls back to the JVM two-op path automatically for unsupported scenarios " +
144+
"(MoR DML, encrypted / object-storage / custom-location-provider tables, " +
145+
"non-parquet format, format-version >= 3, counts-only metrics, bloom filters, " +
146+
"non-default parquet row-group cadence, schemas wider than " +
147+
"`write.metadata.metrics.max-inferred-column-defaults`). See " +
148+
"`docs/source/user-guide/latest/iceberg-writes.md` for the full trigger table.")
149+
.booleanConf
150+
.createWithDefault(false)
151+
136152
val COMET_ICEBERG_DATA_FILE_CONCURRENCY_LIMIT: ConfigEntry[Int] =
137153
conf("spark.comet.scan.icebergNative.dataFileConcurrencyLimit")
138154
.category(CATEGORY_SCAN)

spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ object IcebergReflection extends Logging {
5252
val SPARK_BATCH_QUERY_SCAN = "org.apache.iceberg.spark.source.SparkBatchQueryScan"
5353
val SPARK_STAGED_SCAN = "org.apache.iceberg.spark.source.SparkStagedScan"
5454
val SPARK_WRITE = "org.apache.iceberg.spark.source.SparkWrite"
55+
val TABLE_PROPERTIES = "org.apache.iceberg.TableProperties"
56+
val TYPE_UTIL = "org.apache.iceberg.types.TypeUtil"
5557

5658
// Iceberg 1.5.2 (Spark 3.4 profile) ships its own `ReplaceIcebergData` logical write node via
5759
// the iceberg-spark-extensions module instead of using Spark's stock `ReplaceData`. Iceberg
@@ -119,6 +121,41 @@ object IcebergReflection extends Logging {
119121
def isIcebergSparkWrite(write: Any): Boolean =
120122
sparkWriteClassOpt.exists(_.isInstance(write))
121123

124+
/**
125+
* Whether `batchWrite` is one of Iceberg's `SparkWrite` inner `BatchWrite` impls
126+
* (`BatchAppend`, `OverwriteByFilter`, `DynamicOverwrite`, `CopyOnWriteOperation`,
127+
* `RewriteFiles`). These are private inner classes of `SparkWrite`; we detect them by name
128+
* prefix because they are not exposed as a public superclass.
129+
*/
130+
def isIcebergBatchWrite(batchWrite: Any): Boolean = {
131+
if (batchWrite == null) return false
132+
batchWrite.getClass.getName.startsWith(ClassNames.SPARK_WRITE + "$")
133+
}
134+
135+
/**
136+
* Extract the outer `SparkWrite` instance from an Iceberg `BatchWrite` inner class via the
137+
* synthetic `this$0` reference Java compilers emit for non-static inner classes. The `Table`
138+
* and table-property accessors live on the outer class.
139+
*/
140+
def getOuterSparkWrite(batchWrite: Any): Option[Any] = {
141+
if (batchWrite == null) None
142+
else {
143+
try {
144+
val field = batchWrite.getClass.getDeclaredField("this$0")
145+
field.setAccessible(true)
146+
Option(field.get(batchWrite))
147+
} catch {
148+
case _: NoSuchFieldException =>
149+
// batchWrite may already be the outer class (static inner) -- fall back to itself.
150+
Some(batchWrite)
151+
case e: Exception =>
152+
logError(
153+
s"Iceberg reflection failure: outer SparkWrite from BatchWrite: ${e.getMessage}")
154+
None
155+
}
156+
}
157+
}
158+
122159
/**
123160
* Whether `plan` is Iceberg's `ReplaceIcebergData` logical node (Iceberg 1.5.2's UPDATE / MERGE
124161
* rewrite target on Spark 3.4). Matched by FQCN so the main module doesn't compile-depend on
@@ -708,6 +745,86 @@ object IcebergReflection extends Logging {
708745
unsupportedTypes.toList
709746
}
710747

748+
private def getSparkWriteField(sparkWrite: Any, fieldName: String): Option[Any] =
749+
sparkWriteClassOpt.flatMap { cls =>
750+
try {
751+
val field = cls.getDeclaredField(fieldName)
752+
field.setAccessible(true)
753+
Option(field.get(sparkWrite))
754+
} catch {
755+
case _: NoSuchFieldException => None
756+
case e: Exception =>
757+
logError(
758+
s"Iceberg reflection failure: Failed to read SparkWrite.$fieldName: ${e.getMessage}")
759+
None
760+
}
761+
}
762+
763+
/**
764+
* Effective output file format resolved by Iceberg via `SparkWriteConf.dataFileFormat()`. Java
765+
* consults the `write-format` write option BEFORE the `write.format.default` table property, so
766+
* a per-write option override must win - gating only on table properties produces false-pass
767+
* and false-fall-back outcomes when the two disagree.
768+
*
769+
* `SparkWrite.format` is a `FileFormat` enum (`PARQUET`/`ORC`/`AVRO`); returned lower-cased.
770+
*/
771+
def getFormatFromSparkWrite(sparkWrite: Any): Option[String] =
772+
getSparkWriteField(sparkWrite, "format")
773+
.map(_.toString.toLowerCase(java.util.Locale.ROOT))
774+
775+
/**
776+
* Reads the `private final` `table` field from a `SparkWrite`. Same setAccessible-required
777+
* pattern as the other private-final accessors.
778+
*/
779+
def getTableFromSparkWrite(sparkWrite: Any): Option[Any] =
780+
getSparkWriteField(sparkWrite, "table")
781+
782+
private lazy val tablePropertiesClassOpt: Option[Class[_]] =
783+
tryLoadClass(ClassNames.TABLE_PROPERTIES)
784+
785+
/**
786+
* Reads a static `String` constant from Iceberg's `org.apache.iceberg.TableProperties` by field
787+
* name (e.g. `"DEFAULT_FILE_FORMAT"` -> `"write.format.default"`). Throws when Iceberg is
788+
* absent or the constant has been renamed/removed; both indicate a version we have not vetted.
789+
*/
790+
def tablePropertyConstant(fieldName: String): String =
791+
readTablePropertiesField(fieldName).asInstanceOf[String]
792+
793+
/**
794+
* Reads a static `int` constant from `TableProperties` (e.g. one of the `*_DEFAULT` numerics).
795+
*/
796+
def tablePropertyIntConstant(fieldName: String): Int =
797+
readTablePropertiesField(fieldName).asInstanceOf[Integer].intValue()
798+
799+
private def readTablePropertiesField(fieldName: String): Any = {
800+
val cls = tablePropertiesClassOpt.getOrElse(
801+
throw new IllegalStateException(s"${ClassNames.TABLE_PROPERTIES} is not on the classpath"))
802+
try cls.getField(fieldName).get(null)
803+
catch {
804+
case e: NoSuchFieldException =>
805+
throw new IllegalStateException(
806+
s"${ClassNames.TABLE_PROPERTIES}.$fieldName not found " +
807+
"(unsupported Iceberg version?)",
808+
e)
809+
}
810+
}
811+
812+
/**
813+
* Returns the count of projected field IDs in `schema` -- mirrors Iceberg-Java's
814+
* `TypeUtil.getProjectedIds(Schema).size()` used to gate
815+
* `write.metadata.metrics.max-inferred-column-defaults`.
816+
*/
817+
def getProjectedFieldIdCount(schema: Any): Option[Int] = {
818+
for {
819+
typeUtilClass <- tryLoadClass(ClassNames.TYPE_UTIL)
820+
schemaClass <- tryLoadClass(ClassNames.SCHEMA)
821+
} yield {
822+
val method = typeUtilClass.getMethod("getProjectedIds", schemaClass)
823+
val ids = method.invoke(null, schema.asInstanceOf[AnyRef]).asInstanceOf[java.util.Set[_]]
824+
ids.size()
825+
}
826+
}
827+
711828
}
712829

713830
/**

0 commit comments

Comments
 (0)