Skip to content

Commit 54591a9

Browse files
committed
[spark] Fire PaimonMergeInto for data-evolution tables on Spark 4.1 useV2Write=true
Round 8's MergeInto guard unconditionally hands Spark 4.1's RewriteMergeIntoTable ownership over any SparkTableWithRowLevelOps (useV2Write=true) target. That is correct for simple V2 row-level tables (row-tracking only), but Spark 4.1's rule fails the `rewritable` / `aligned` / `needSchemaEvolution` gate for the more complex data-evolution + blob-field Blob tables. When it silently skips those plans, the original MergeIntoTable falls through to the physical planner which throws UNSUPPORTED_FEATURE.TABLE_OPERATION — breaking the 3 Blob merge tests: - Blob: merge-into rejects updating raw-data BLOB column - Blob: merge-into updates non-blob column on descriptor blob table - Blob: merge-into updates descriptor blob column with external storage end-to-end Narrow the skip to SparkTableWithRowLevelOps tables that DO NOT have data evolution enabled. For DE tables Paimon's rule continues to fire and converts MergeIntoTable to MergeIntoPaimonDataEvolutionTable exactly as Round 3 already did.
1 parent b921c30 commit 54591a9

1 file changed

Lines changed: 16 additions & 5 deletions

File tree

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,26 @@ trait PaimonMergeIntoBase
4848
// `AnalysisHelper.allowInvokingTransformsInAnalyzer` so the in-analyzer assertion does not
4949
// trip.
5050
val skipV2RowLevelTables = org.apache.spark.SPARK_VERSION >= "4.1"
51+
52+
// Whether to let Spark's built-in RewriteMergeIntoTable take the plan. On Spark 4.1 we skip our
53+
// rewrite when the table already exposes SupportsRowLevelOperations AND data-evolution is
54+
// disabled. Data-evolution tables (e.g. Blob tables with 'blob-field' + 'data-evolution.enabled')
55+
// cause Spark 4.1's RewriteMergeIntoTable to silently skip (its `rewritable` / `aligned` /
56+
// `needSchemaEvolution` gate fails on such tables), leaving the plan as a MergeIntoTable that
57+
// the physical planner rejects with UNSUPPORTED_FEATURE.TABLE_OPERATION. Paimon's rule knows
58+
// how to route those tables through MergeIntoPaimonDataEvolutionTable, so it must still fire.
59+
def spark41V2OwnsPlan(target: LogicalPlan): Boolean = {
60+
if (!skipV2RowLevelTables) return false
61+
val v2Table = PaimonRelation.getPaimonRelation(target).table
62+
v2Table.isInstanceOf[SparkTableWithRowLevelOps] &&
63+
!v2Table.asInstanceOf[SparkTable].coreOptions.dataEvolutionEnabled()
64+
}
65+
5166
AnalysisHelper.allowInvokingTransformsInAnalyzer {
5267
plan.transformDown {
5368
case merge: MergeIntoTable
5469
if merge.resolved && PaimonRelation.isPaimonTable(merge.targetTable) &&
55-
(!skipV2RowLevelTables ||
56-
!PaimonRelation
57-
.getPaimonRelation(merge.targetTable)
58-
.table
59-
.isInstanceOf[SparkTableWithRowLevelOps]) =>
70+
!spark41V2OwnsPlan(merge.targetTable) =>
6071
val relation = PaimonRelation.getPaimonRelation(merge.targetTable)
6172
val v2Table = relation.table.asInstanceOf[SparkTable]
6273
val dataEvolutionEnabled = v2Table.coreOptions.dataEvolutionEnabled()

0 commit comments

Comments
 (0)