Skip to content

Commit b921c30

Browse files
committed
Revert "[spark] Revert MergeInto changes from Round 8; keep Update/Delete Resolution rule fix"
This reverts commit 432a263.
1 parent 432a263 commit b921c30

2 files changed

Lines changed: 18 additions & 13 deletions

File tree

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package org.apache.paimon.spark.catalyst.analysis
2020

21-
import org.apache.paimon.spark.SparkTable
21+
import org.apache.paimon.spark.{SparkTable, SparkTableWithRowLevelOps}
2222
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
2323
import org.apache.paimon.spark.commands.{MergeIntoPaimonDataEvolutionTable, MergeIntoPaimonTable}
2424

@@ -47,10 +47,16 @@ trait PaimonMergeIntoBase
4747
// `transformDown` (which unconditionally visits every node) guarded by
4848
// `AnalysisHelper.allowInvokingTransformsInAnalyzer` so the in-analyzer assertion does not
4949
// trip.
50+
val skipV2RowLevelTables = org.apache.spark.SPARK_VERSION >= "4.1"
5051
AnalysisHelper.allowInvokingTransformsInAnalyzer {
5152
plan.transformDown {
5253
case merge: MergeIntoTable
53-
if merge.resolved && PaimonRelation.isPaimonTable(merge.targetTable) =>
54+
if merge.resolved && PaimonRelation.isPaimonTable(merge.targetTable) &&
55+
(!skipV2RowLevelTables ||
56+
!PaimonRelation
57+
.getPaimonRelation(merge.targetTable)
58+
.table
59+
.isInstanceOf[SparkTableWithRowLevelOps]) =>
5460
val relation = PaimonRelation.getPaimonRelation(merge.targetTable)
5561
val v2Table = relation.table.asInstanceOf[SparkTable]
5662
val dataEvolutionEnabled = v2Table.coreOptions.dataEvolutionEnabled()

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,19 @@ class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
5151
extensions.injectPostHocResolutionRule(_ => PaimonDeleteTable)
5252
extensions.injectPostHocResolutionRule(spark => PaimonMergeInto(spark))
5353

54-
// Spark 4.1 additionally registers Update/Delete rules in the Resolution batch. Spark 4.1 moved
55-
// its own RewriteUpdateTable / RewriteDeleteFromTable into the Resolution batch and marks the
56-
// plan as analyzed=true before postHoc rules run. The postHoc rule's transformed output is then
57-
// silently dropped for UpdateTable / DeleteFromTable nodes, leaving them in the plan for the
58-
// physical planner to reject with UNSUPPORTED_FEATURE.TABLE_OPERATION. Running these rules in
59-
// the Resolution batch lets Paimon's rewrite actually stick. Each rule skips tables advertising
60-
// SupportsRowLevelOperations (gated on the same Spark 4.1 version check) so Spark's built-in
61-
// rewrite owns the V2 row-level path for useV2Write=true tables. `PaimonMergeInto` is
62-
// intentionally left out here — the postHoc path already works for merge plans (verified by
63-
// Blob merge tests passing pre-Round-8 on PR 7648) and re-registering it in the Resolution
64-
// batch introduced regressions on Blob tests.
54+
// Spark 4.1 additionally registers the same rules in the Resolution batch. Spark 4.1 moved its
55+
// own RewriteUpdateTable / RewriteDeleteFromTable / RewriteMergeIntoTable into the Resolution
56+
// batch and marks the plan as analyzed=true before postHoc rules run. The postHoc rule's
57+
// transformed output is then silently dropped, leaving the original UpdateTable / DeleteFromTable
58+
// / MergeIntoTable in the plan for the physical planner to reject with
59+
// UNSUPPORTED_FEATURE.TABLE_OPERATION. Running them in the Resolution batch lets Paimon's
60+
// rewrite actually stick. The rules themselves skip tables advertising SupportsRowLevelOperations
61+
// (gated on the same Spark 4.1 version check), so Spark's built-in rewrite still owns the V2
62+
// row-level path for useV2Write=true tables.
6563
if (org.apache.spark.SPARK_VERSION >= "4.1") {
6664
extensions.injectResolutionRule(_ => PaimonUpdateTable)
6765
extensions.injectResolutionRule(_ => PaimonDeleteTable)
66+
extensions.injectResolutionRule(spark => PaimonMergeInto(spark))
6867
}
6968

7069
// table function extensions

0 commit comments

Comments
 (0)