Skip to content

Commit 72ef84b

Browse files
committed
[spark] Version-conditional Resolution rule for Spark 4.1 row-level rewrites
Spark 4.1 moved its built-in RewriteUpdateTable / RewriteDeleteFromTable / RewriteMergeIntoTable into the Resolution batch, and the Resolution batch marks the plan analyzed=true before postHoc rules run. Diagnostic runs on PR 7648 confirmed that for Spark 4.1 + useV2Write=false + append-only tables, PaimonUpdateTable's postHoc rule matches, takes the V1 fallback branch, and returns UpdatePaimonTableCommand, yet the physical planner still sees the original UpdateTable node and fails with UNSUPPORTED_FEATURE.TABLE_OPERATION. The transformed plan is silently dropped in Spark 4.1's analyzed-plan flow. Register PaimonUpdateTable / PaimonDeleteTable / PaimonMergeInto as Resolution rules ONLY on Spark 4.1+, so their transform output sticks. Spark 3.x keeps the original postHoc-only registration — it has no competing Resolution-batch rewrite rule and the postHoc path works as before. Inside the rules, skip tables that expose SupportsRowLevelOperations when on Spark 4.1+. Those tables already go through Spark's built-in V2 row-level rewrite; hijacking them here would force V1 fallback and drop V2-only signals such as `_ROW_ID`, tripping CheckAnalysis with MISSING_ATTRIBUTES. The guard is version-gated so Spark 3.2/3.3 (no built-in V2 merge rewrite) keeps using Paimon's rule for useV2Write=true tables, avoiding the regression that appeared when the guard was unconditional (Round 6 on PR 7648).
1 parent 136ec45 commit 72ef84b

4 files changed

Lines changed: 39 additions & 4 deletions

File tree

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.spark.catalyst.analysis
2020

21+
import org.apache.paimon.spark.SparkTableWithRowLevelOps
2122
import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
2223
import org.apache.paimon.table.FileStoreTable
2324

@@ -36,10 +37,13 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {
3637
// (which unconditionally visits every node) guarded by
3738
// `AnalysisHelper.allowInvokingTransformsInAnalyzer` so the in-analyzer assertion does not
3839
// trip.
40+
val skipV2RowLevelTables = org.apache.spark.SPARK_VERSION >= "4.1"
3941
AnalysisHelper.allowInvokingTransformsInAnalyzer {
4042
plan.transformDown {
4143
case d @ DeleteFromTable(PaimonRelation(table), condition)
42-
if d.resolved && shouldFallbackToV1Delete(table, condition) =>
44+
if d.resolved && (!skipV2RowLevelTables ||
45+
!table.isInstanceOf[SparkTableWithRowLevelOps]) &&
46+
shouldFallbackToV1Delete(table, condition) =>
4347
checkPaimonTable(table.getTable)
4448

4549
table.getTable match {

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package org.apache.paimon.spark.catalyst.analysis
2020

21-
import org.apache.paimon.spark.SparkTable
21+
import org.apache.paimon.spark.{SparkTable, SparkTableWithRowLevelOps}
2222
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
2323
import org.apache.paimon.spark.commands.{MergeIntoPaimonDataEvolutionTable, MergeIntoPaimonTable}
2424

@@ -47,10 +47,16 @@ trait PaimonMergeIntoBase
4747
// `transformDown` (which unconditionally visits every node) guarded by
4848
// `AnalysisHelper.allowInvokingTransformsInAnalyzer` so the in-analyzer assertion does not
4949
// trip.
50+
val skipV2RowLevelTables = org.apache.spark.SPARK_VERSION >= "4.1"
5051
AnalysisHelper.allowInvokingTransformsInAnalyzer {
5152
plan.transformDown {
5253
case merge: MergeIntoTable
53-
if merge.resolved && PaimonRelation.isPaimonTable(merge.targetTable) =>
54+
if merge.resolved && PaimonRelation.isPaimonTable(merge.targetTable) &&
55+
(!skipV2RowLevelTables ||
56+
!PaimonRelation
57+
.getPaimonRelation(merge.targetTable)
58+
.table
59+
.isInstanceOf[SparkTableWithRowLevelOps]) =>
5460
val relation = PaimonRelation.getPaimonRelation(merge.targetTable)
5561
val v2Table = relation.table.asInstanceOf[SparkTable]
5662
val dataEvolutionEnabled = v2Table.coreOptions.dataEvolutionEnabled()

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.spark.catalyst.analysis
2020

21+
import org.apache.paimon.spark.SparkTableWithRowLevelOps
2122
import org.apache.paimon.spark.commands.UpdatePaimonTableCommand
2223
import org.apache.paimon.table.FileStoreTable
2324

@@ -42,9 +43,18 @@ object PaimonUpdateTable
4243
// `transformDown` (which unconditionally visits every node) guarded by
4344
// `AnalysisHelper.allowInvokingTransformsInAnalyzer` so the in-analyzer assertion does not
4445
// trip. The pattern guard keeps the rewrite restricted to fully resolved plans.
46+
// On Spark 4.1 this rule is also registered as a resolutionRule (postHoc output is silently
47+
// discarded by Spark 4.1's analyzed-plan handling). When that happens, skip tables exposing
48+
// SupportsRowLevelOperations because Spark's built-in RewriteUpdateTable will handle the V2
49+
// row-level path for them and any hijack here would drop V2-only signals such as `_ROW_ID`
50+
// and cause CheckAnalysis to reject the plan with MISSING_ATTRIBUTES. Older Spark versions
51+
// have no competing RewriteUpdateTable, so the rule still fires for every Paimon relation.
52+
val skipV2RowLevelTables = org.apache.spark.SPARK_VERSION >= "4.1"
4553
AnalysisHelper.allowInvokingTransformsInAnalyzer {
4654
plan.transformDown {
47-
case u @ UpdateTable(PaimonRelation(table), assignments, condition) if u.resolved =>
55+
case u @ UpdateTable(PaimonRelation(table), assignments, condition)
56+
if u.resolved && (!skipV2RowLevelTables ||
57+
!table.isInstanceOf[SparkTableWithRowLevelOps]) =>
4858
checkPaimonTable(table.getTable)
4959

5060
table.getTable match {

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,21 @@ class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
5151
extensions.injectPostHocResolutionRule(_ => PaimonDeleteTable)
5252
extensions.injectPostHocResolutionRule(spark => PaimonMergeInto(spark))
5353

54+
// Spark 4.1 additionally registers the same rules in the Resolution batch. Spark 4.1 moved its
55+
// own RewriteUpdateTable / RewriteDeleteFromTable / RewriteMergeIntoTable into the Resolution
56+
// batch and marks the plan as analyzed=true before postHoc rules run. The postHoc rule's
57+
// transformed output is then silently dropped, leaving the original UpdateTable / DeleteFromTable
58+
// / MergeIntoTable in the plan for the physical planner to reject with
59+
// UNSUPPORTED_FEATURE.TABLE_OPERATION. Running them in the Resolution batch lets Paimon's
60+
// rewrite actually stick. The rules themselves skip tables advertising SupportsRowLevelOperations
61+
// (gated on the same Spark 4.1 version check), so Spark's built-in rewrite still owns the V2
62+
// row-level path for useV2Write=true tables.
63+
if (org.apache.spark.SPARK_VERSION >= "4.1") {
64+
extensions.injectResolutionRule(_ => PaimonUpdateTable)
65+
extensions.injectResolutionRule(_ => PaimonDeleteTable)
66+
extensions.injectResolutionRule(spark => PaimonMergeInto(spark))
67+
}
68+
5469
// table function extensions
5570
PaimonTableValuedFunctions.supportedFnNames.foreach {
5671
fnName =>

0 commit comments

Comments
 (0)