[core] improve insert overwrite conflict detection#7595
[core] improve insert overwrite conflict detection#7595hbgstc123 wants to merge 1 commit intoapache:masterfrom
Conversation
| readIncrementalChanges( | ||
| snapshotManager.snapshot(baseSnapshotId), | ||
| latestSnapshot, | ||
| partitionFilter), | ||
| changedPartitions); | ||
| Collection<SimpleFileEntry> mergedIncremental = |
There was a problem hiding this comment.
[Critical] 这里把 readIncrementalChanges(...) 调成了当前类方法,但实际定义在 scanner 上;同时 Collection<SimpleFileEntry> 缺少 java.util.Collection import。这两处都会导致新增冲突检测逻辑无法编译。
建议改为 scanner.readIncrementalChanges(...),并补充 java.util.Collection import。
— gpt-5.4 via Qwen Code /review
wenshao
left a comment
There was a problem hiding this comment.
Request changes — one inline comment was posted successfully, and two findings could not be attached to diff lines so they are included below.
-
[Critical]
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java:620,626
tryOverwritePartition新增baseSnapshotId参数后,这两处内部调用仍沿用旧签名,当前参数个数不匹配,paimon-core编译会直接失败。
建议在dropPartitions和truncateTable里的调用末尾补传null,保持非 sort compact 路径的现有行为,例如:tryOverwritePartition(..., new HashMap<>(), null); -
[Critical]
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java:343
SortCompactAction已经设置了withOverwriteBaseSnapshot(readSnapshotId),但 unaware-bucket 路径这里没有继续把overwriteBaseSnapshotId传给最终 sink/committer。这样在bucket = -1的 Flink sort compact 场景下,新增冲突检测实际上不会生效。
建议让 unaware-bucket 路径和 dynamic-bucket 路径一样透传overwriteBaseSnapshotId,并补一个对应的 Flink 测试覆盖该场景。
Reviewed by gpt-5.4 via Qwen Code /review
Purpose
Problem
Sort Compact reads data from snapshot S_read, sorts it, then commits as OVERWRITE. However,
tryOverwritePartitionbuilds the DELETE list fromlatestSnapshot(which may beS_read+N). If concurrent writes add new files between S_read and latestSnapshot, those files are deleted without their data being included in the sorted output, causing silent
data loss.
Root Cause
Timeline:
Solution
Part 1: Pin DELETE list to base snapshot
Build the DELETE list from the snapshot the sort compact actually read from (base snapshot) instead of latestSnapshot. This ensures we only delete files that were included in the
sorted output.
Part 2: Concurrent write detection
When the base snapshot differs from the latest snapshot, use
readIncrementalChanges(baseSnapshot, latestSnapshot)to detect new files added to the overwritten partitions. If newfiles exist, fail the commit with a clear conflict error instead of silently losing data.
Changes
Core
FileStoreCommit.java: Add@Nullable Long baseSnapshotIdparameter tooverwritePartition()FileStoreCommitImpl.java:tryOverwritePartition()to build DELETE list frombaseSnapshotInnerTableCommit.java: AddwithOverwriteBaseSnapshot(@Nullable Long snapshotId)methodTableCommitImpl.java: ImplementwithOverwriteBaseSnapshot()and passbaseSnapshotIdto commitFlink
SortCompactAction.java: Capture read snapshot ID before building source, pass to sink builderFlinkSinkBuilder.java: AddoverwriteBaseSnapshotIdfield and setterFlinkWriteSink.java: AddoverwriteBaseSnapshotIdfield and setter, pass to committerSpark
CompactProcedure.java: Capture read snapshot ID for sort compact operationsPaimonSparkWriter.scala: AddwithOverwriteBaseSnapshot()methodBackward Compatibility
When
baseSnapshotId == null,tryOverwritePartitionfalls back to current behavior (latestSnapshot), so existing code paths (SQL INSERT OVERWRITE, DROP PARTITION, TRUNCATE,etc.) are unaffected.
The concurrent write detection only activates when:
baseSnapshotId != null(explicitly set by sort compact)Data Flow After Fix
Tests