Skip to content

Commit b5aa839

Browse files
author
Jordan Epstein
committed
feat: add split-operator plan for Iceberg V2 writes
1 parent e79183e commit b5aa839

17 files changed

Lines changed: 1418 additions & 0 deletions

File tree

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ jobs:
287287
org.apache.comet.exec.CometNativeReaderSuite
288288
org.apache.comet.CometIcebergNativeSuite
289289
org.apache.comet.CometIcebergRewriteActionSuite
290+
org.apache.comet.CometIcebergWriteActionSuite
290291
org.apache.comet.iceberg.IcebergReflectionSuite
291292
org.apache.comet.csv.CometCsvNativeReadSuite
292293
org.apache.comet.CometFuzzTestSuite

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ jobs:
129129
org.apache.comet.exec.CometNativeReaderSuite
130130
org.apache.comet.CometIcebergNativeSuite
131131
org.apache.comet.CometIcebergRewriteActionSuite
132+
org.apache.comet.CometIcebergWriteActionSuite
132133
org.apache.comet.iceberg.IcebergReflectionSuite
133134
org.apache.comet.csv.CometCsvNativeReadSuite
134135
org.apache.comet.CometFuzzTestSuite
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
<!---
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# Accelerating Apache Iceberg V2 Writes using Comet
21+
22+
## Overview
23+
24+
Comet's first layer for accelerating Iceberg V2 writes is a **split-operator plan**: Comet
25+
rewrites Iceberg's single `V2ExistingTableWriteExec` command into a pair of operators — a
26+
**committer** and a **writer** — so that AQE and Comet's columnar rules can see
27+
and re-optimise the data sub-query. The actual file-writing work still runs through Iceberg's
28+
stock JVM writer; nothing in this layer changes Iceberg's commit semantics. A further toggle for
29+
delegating the per-task parquet write to iceberg-rust is planned but not part of this layer.
30+
31+
The split-operator plan is off by default; turn it on per session. Iceberg-Java writes the data
32+
unchanged when it's off.
33+
34+
## What changes about the Iceberg plan
35+
36+
### Iceberg's stock V2 plan
37+
38+
```
39+
V2ExistingTableWriteExec(write, query) ← single V2 command; commit + write together
40+
└── <query> ← data sub-query (scans, shuffles, ...)
41+
```
42+
43+
This is a single command. Spark's `InsertAdaptiveSparkPlan` runs it as a leaf from AQE's
44+
perspective — AQE never sees the data sub-query inside, so Comet's columnar rules can't convert
45+
the scans / shuffles inside it either.
46+
47+
### Comet's split plan
48+
49+
```
50+
IcebergCommitExec(batchWrite, refreshCache) ← committer; runs BatchWrite.commit()
51+
└── AdaptiveSparkPlanExec ← AQE bubble (Spark inserts this)
52+
└── IcebergWriteExec(batchWrite) ← writer (UnaryExecNode); per-task write
53+
└── <query> ← data sub-query: now visible to AQE / Comet
54+
```
55+
56+
The committer keeps Iceberg's commit semantics intact. The writer is a normal
57+
`UnaryExecNode`, so AQE wraps it whenever the data sub-query has a shuffle, and Comet's
58+
standard `transformUp` rules can convert the scans / projects / sorts / exchanges inside the
59+
AQE bubble to their Comet counterparts (`CometScan`, `CometProject`, `CometColumnarExchange`,
60+
…).
61+
62+
### What flows between the two execs
63+
64+
The writer produces **one row per Spark task** with a single `BINARY` column:
65+
Java-serialised bytes of an Iceberg `SparkWrite$TaskCommit(DataFile[])`. The committer
66+
`executeCollect()`s that RDD, deserialises each row's bytes back into a `WriterCommitMessage`,
67+
and hands the resulting array to `BatchWrite.commit(messages)` — the same call Iceberg-Java
68+
would have made internally.
69+
70+
## Configuration
71+
72+
Standard Comet + Iceberg setup ([`iceberg.md`](iceberg.md)) plus the split-operator toggle:
73+
74+
```
75+
# Standard Comet / Iceberg wiring
76+
spark.plugins=org.apache.spark.CometPlugin
77+
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
78+
spark.sql.catalog.<name>=org.apache.iceberg.spark.SparkCatalog
79+
spark.sql.catalog.<name>.type=hadoop # or hive / glue / rest / ...
80+
spark.sql.catalog.<name>.warehouse=...
81+
82+
# Comet write toggle (off by default)
83+
spark.comet.write.iceberg.splitOperator.enabled=true
84+
```
85+
86+
`IcebergSparkSessionExtensions` is **mandatory on Spark 3.4** for `UPDATE` / `MERGE` on V2
87+
tables — Spark 3.4's stock planner rejects `UPDATE TABLE` on V2 sources, and Iceberg's analyzer
88+
extension (`RewriteUpdateTable` / `RewriteMergeIntoTable`) is what provides the rewrite. On
89+
Spark 3.5+ the extension is optional but recommended (Spark added native row-level operation
90+
support in 3.5). `INSERT INTO` / `INSERT OVERWRITE` / `DELETE FROM` work without the extension
91+
on every Spark version.
92+
93+
If `splitOperator.enabled=false`, Comet leaves Iceberg's stock plan alone — every write goes
94+
straight through Iceberg-Java.
95+
96+
## Spark version compatibility
97+
98+
Copy-on-write coverage is identical across versions (`UPDATE` / `MERGE` on 3.4 requires
99+
`IcebergSparkSessionExtensions`). Merge-on-read (`WriteDelta`) is intentionally left on Spark's
100+
default path: the per-task `DeltaWriter` is row-dispatched and no native acceleration is
101+
planned, so routing it through the split-operator plan would add planning complexity for no
102+
realisable benefit.
103+
104+
| Capability | 3.4 | 3.5 | 4.0 |
105+
|---------------------------------------------|-------|-----|-----|
106+
| `INSERT INTO` (`AppendData`) ||||
107+
| `INSERT OVERWRITE` (static + dynamic) ||||
108+
| `DELETE FROM` copy-on-write (`ReplaceData`) ||||
109+
| `UPDATE` / `MERGE INTO` copy-on-write |[¹] |||
110+
111+
- [¹] Requires `IcebergSparkSessionExtensions` (see Configuration above).
112+
113+
## Tests
114+
115+
- **`CometIcebergWriteActionSuite`** — end-to-end scenarios against a temporary Hadoop catalog
116+
covering the copy-on-write V2 logical-write variants on the split-operator path, plus parity
117+
vs Iceberg-Java's writer, the disabled-config fallback, and the commit-once invariant. Passes
118+
on Spark 3.4 / 3.5 / 4.0 with `IcebergSparkSessionExtensions` loaded.
119+
120+
## Abort behaviour
121+
122+
The writer calls `writer.abort()` per task on failure to release task-level resources.
123+
The committer (`IcebergCommitExec.run()`) calls `BatchWrite.abort(messages)` if `commit()`
124+
raises.
125+
126+
If task writers stage files locally but their commit messages never reach the driver (e.g.
127+
driver crash mid-collect), the staged Parquet files become **unreferenced orphans**. Iceberg's
128+
catalog-level `RemoveOrphanFiles` action reaps these on the next maintenance run. Schedule
129+
`RemoveOrphanFiles` if you want to avoid storage drift on failed writes. This diverges from
130+
Spark's per-task abort behaviour and is a deliberate trade-off in favour of a simpler
131+
driver-side commit loop.
132+
133+
---
134+
135+
# Developer notes
136+
137+
The sections below document architectural decisions, interactions with Spark internals, and
138+
load-bearing implementation details. They're not required reading for *using* the writer — only
139+
for working on it.
140+
141+
## How the split survives AQE re-planning
142+
143+
AQE calls `reOptimize(inputPlan.logicalLink.get)` every time a query stage materialises and
144+
re-runs the planner on the result. The trick is making sure the writer stays in
145+
place and the commit stays exactly one — not zero, not two.
146+
147+
Two design choices keep this stable:
148+
149+
1. **File writer exec uses a stable logical anchor.** `IcebergWriteLogical` is a
150+
Catalyst `UnaryNode` that wraps the data sub-query plus the `BatchWrite` reference. The
151+
planner's `setLogicalLink` pins the writer to this anchor (not to the surrounding
152+
`ReplaceData` / `AppendData` / etc. logical node) so AQE's `reOptimize` re-plans only the
153+
data query. Without the anchor AQE would either re-fire the surrounding Iceberg write
154+
logical node (duplicating the commit) or strip the writer away (no write
155+
happens).
156+
2. **The `BatchWrite` instance is shared between committer and writer.** Iceberg's
157+
`Write.toBatch()` returns a freshly-constructed `BatchWrite` on every call; for CoW DML the
158+
instance holds the scan state and emitted-file tracking, so the writer's writer
159+
factory and the committer **must** operate on the same instance for `OverwriteFiles`'
160+
serialisable-isolation validation to walk the right snapshot range. The strategy calls
161+
`toBatch` once and threads the result through both execs.
162+
163+
## Why a custom logical anchor (`IcebergWriteLogical`)
164+
165+
Without `IcebergWriteLogical`, AQE's `reOptimize` would either:
166+
167+
- **Vanish** the writer, if its logical link points at the data sub-query (which
168+
carries no write semantics). The re-planned tree would have no writer at all.
169+
- **Duplicate** the committer, if the writer's logical link points at the
170+
surrounding logical write (`ReplaceData` / `AppendData` / …). The strategy would re-fire the
171+
entire two-op tree on every AQE iteration, each iteration emitting a fresh
172+
`IcebergCommitExec` — and Iceberg's `OverwriteFiles` validation then sees the prior
173+
iteration's file as a newly-added conflicting file and fails.
174+
175+
`IcebergWriteLogical` sits between the two and lets the strategy re-emit a fresh
176+
`IcebergWriteExec` on every iteration without touching the committer.
177+
178+
## Iceberg 1.5.2 (Spark 3.4) logical-write skew
179+
180+
Iceberg 1.5.2 (paired with Spark 3.4) ships its own `ReplaceIcebergData` logical node because
181+
Spark 3.4 lacks native row-level operation support. Field shape is identical to Spark 3.5+'s
182+
stock `ReplaceData`, so `IcebergWriteStrategy` matches it by FQCN and the extracted tuple feeds
183+
the same dispatcher.

docs/source/user-guide/latest/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ to read more.
7979
:hidden:
8080

8181
Iceberg Guide <iceberg>
82+
Iceberg Writes <iceberg-writes>
8283
S3 Credential Providers <s3-credential-providers>
8384
Kubernetes Guide <kubernetes>
8485

spark/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,18 @@ object CometConf extends ShimCometConf {
121121
.booleanConf
122122
.createWithDefault(true)
123123

124+
val COMET_ICEBERG_WRITE_SPLIT_OPERATOR_ENABLED: ConfigEntry[Boolean] =
125+
conf("spark.comet.write.iceberg.splitOperator.enabled")
126+
.category(CATEGORY_TESTING)
127+
.doc(
128+
"Whether to rewrite Iceberg V2 writes from Spark's combined V2 write/commit operator " +
129+
"into Comet's two-operator shape: a file writer exec (inside AQE) and a committer " +
130+
"(outside AQE). This lets AQE and Comet's columnar rules see and re-optimise the " +
131+
"data sub-query without affecting Iceberg's commit semantics. Off by default; " +
132+
"Iceberg-Java writes the data unchanged when the gate is off. Highly experimental.")
133+
.booleanConf
134+
.createWithDefault(false)
135+
124136
val COMET_ICEBERG_DATA_FILE_CONCURRENCY_LIMIT: ConfigEntry[Int] =
125137
conf("spark.comet.scan.icebergNative.dataFileConcurrencyLimit")
126138
.category(CATEGORY_SCAN)

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.spark.sql.execution._
3232
import org.apache.spark.sql.internal.SQLConf
3333

3434
import org.apache.comet.CometConf._
35+
import org.apache.comet.iceberg.IcebergWriteStrategy
3536
import org.apache.comet.rules.{CometExecRule, CometPlanAdaptiveDynamicPruningFilters, CometReuseSubquery, CometScanRule, CometSpark34AqeDppFallbackRule, EliminateRedundantTransitions}
3637
import org.apache.comet.shims.ShimCometSparkSessionExtensions
3738

@@ -97,6 +98,9 @@ class CometSparkSessionExtensions
9798
extensions.injectQueryStagePrepRule { session => CometExecRule(session) }
9899
injectQueryStageOptimizerRuleShim(extensions, CometPlanAdaptiveDynamicPruningFilters)
99100
injectQueryStageOptimizerRuleShim(extensions, CometReuseSubquery)
101+
// Runs before Spark's DataSourceV2Strategy because `experimentalMethods.extraStrategies` is
102+
// prepended to the planner's strategy list.
103+
extensions.injectPlannerStrategy { session => IcebergWriteStrategy(session) }
100104
}
101105

102106
case class CometScanColumnar(session: SparkSession) extends ColumnarRule {

spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,14 @@ object IcebergReflection extends Logging {
5151
val UNBOUND_PREDICATE = "org.apache.iceberg.expressions.UnboundPredicate"
5252
val SPARK_BATCH_QUERY_SCAN = "org.apache.iceberg.spark.source.SparkBatchQueryScan"
5353
val SPARK_STAGED_SCAN = "org.apache.iceberg.spark.source.SparkStagedScan"
54+
val SPARK_WRITE = "org.apache.iceberg.spark.source.SparkWrite"
55+
56+
// Iceberg 1.5.2 (Spark 3.4 profile) ships its own `ReplaceIcebergData` logical write node via
57+
// the iceberg-spark-extensions module instead of using Spark's stock `ReplaceData`. Iceberg
58+
// 1.8+ dropped it in favour of Spark 3.5's native row-level operation support, so this class
59+
// name will only resolve on the 3.4 + Iceberg 1.5.2 combo (where SQL UPDATE / MERGE on V2
60+
// tables require the Iceberg extension).
61+
val REPLACE_ICEBERG_DATA = "org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData"
5462
}
5563

5664
/**
@@ -94,6 +102,66 @@ object IcebergReflection extends Logging {
94102
val UNKNOWN = "unknown"
95103
}
96104

105+
/**
106+
* Probes a class via reflection once, caches the outcome. Returns `None` when Iceberg is not on
107+
* the classpath so Comet stays buildable in non-Iceberg deployments.
108+
*/
109+
private def tryLoadClass(name: String): Option[Class[_]] =
110+
try Some(loadClass(name))
111+
catch { case _: ClassNotFoundException => None }
112+
113+
private lazy val sparkWriteClassOpt: Option[Class[_]] = tryLoadClass(ClassNames.SPARK_WRITE)
114+
115+
/**
116+
* Whether `write` is an Iceberg `SparkWrite` (or subclass). Returns false if Iceberg is not on
117+
* the classpath. Used by the write strategy to decide whether to intercept a V2 logical write.
118+
*/
119+
def isIcebergSparkWrite(write: Any): Boolean =
120+
sparkWriteClassOpt.exists(_.isInstance(write))
121+
122+
/**
123+
* Whether `plan` is Iceberg's `ReplaceIcebergData` logical node (Iceberg 1.5.2's UPDATE / MERGE
124+
* rewrite target on Spark 3.4). Matched by FQCN so the main module doesn't compile-depend on
125+
* iceberg-spark-extensions.
126+
*/
127+
def isReplaceIcebergData(plan: Any): Boolean =
128+
plan != null && plan.getClass.getName == ClassNames.REPLACE_ICEBERG_DATA
129+
130+
/**
131+
* Read a declared (or inherited) field on `plan` reflectively. Used for extracting fields off
132+
* Iceberg 1.5.2's `ReplaceIcebergData` logical node when iceberg-spark-extensions isn't on the
133+
* main classpath.
134+
*/
135+
private def reflectField(plan: Any, fieldName: String): Option[AnyRef] =
136+
try {
137+
val field = plan.getClass.getDeclaredField(fieldName)
138+
field.setAccessible(true)
139+
Option(field.get(plan))
140+
} catch {
141+
case e: Exception =>
142+
logError(
143+
s"Iceberg reflection failure: $fieldName on ${plan.getClass.getName}: ${e.getMessage}")
144+
None
145+
}
146+
147+
/**
148+
* Pull `(table, query, originalTable, write)` out of a `ReplaceIcebergData` instance. Iceberg's
149+
* 1.5.2 case class shape matches Spark 3.5+'s stock `ReplaceData` exactly, so the extracted
150+
* tuple can feed our `matchedSparkWrite` dispatcher unchanged.
151+
*/
152+
def extractReplaceIcebergDataFields(plan: Any): Option[(AnyRef, AnyRef, AnyRef, AnyRef)] = {
153+
if (!isReplaceIcebergData(plan)) return None
154+
for {
155+
table <- reflectField(plan, "table")
156+
query <- reflectField(plan, "query")
157+
originalTable <- reflectField(plan, "originalTable")
158+
write <- reflectField(
159+
plan,
160+
"write"
161+
) // Option[Write]; field can be Some(null) so kept AnyRef
162+
} yield (table, query, originalTable, write)
163+
}
164+
97165
/**
98166
* Loads a class using the thread context classloader first, then falls back to the system
99167
* classloader.
@@ -639,6 +707,7 @@ object IcebergReflection extends Logging {
639707

640708
unsupportedTypes.toList
641709
}
710+
642711
}
643712

644713
/**
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.iceberg
21+
22+
import org.apache.spark.sql.catalyst.expressions.Attribute
23+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
24+
import org.apache.spark.sql.connector.write.BatchWrite
25+
26+
/** Logical anchor for the writer. See `IcebergWriteStrategy` for the rationale. */
27+
case class IcebergWriteLogical(
28+
child: LogicalPlan,
29+
// Logical plans stay driver-side: AQE re-planning is driver-local, and write commands sit
30+
// at the top of the plan so they aren't cached or checkpointed.
31+
@transient batchWrite: BatchWrite,
32+
replaceDataDispatch: Option[ReplaceDataDispatchInfo] = None)
33+
extends UnaryNode {
34+
35+
override def output: Seq[Attribute] = Nil
36+
37+
override protected def withNewChildInternal(newChild: LogicalPlan): IcebergWriteLogical =
38+
copy(child = newChild)
39+
}

0 commit comments

Comments
 (0)