Skip to content

Commit 6d5ebe0

Browse files
authored
[fix](iceberg)fix iceberg rewrite_data_file fail when table had been updated. (#61112)
### What problem does this PR solve? Related PR: #56413 Problem Summary: fix iceberg `rewrite_data_file` fail when table had been updated.
1 parent 802f0a9 commit 6d5ebe0

4 files changed

Lines changed: 127 additions & 0 deletions

File tree

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
use demo.test_db;
2+
3+
4+
create table if not exists test_rewrite_data_with_update (
5+
id INT,
6+
name STRING
7+
)
8+
USING iceberg
9+
TBLPROPERTIES (
10+
'format-version' = '2',
11+
'write.delete.mode' = 'merge-on-read',
12+
'write.update.mode' = 'merge-on-read',
13+
'write.merge.mode' = 'merge-on-read'
14+
);
15+
16+
17+
INSERT INTO test_rewrite_data_with_update VALUES
18+
(1, 'a'),(2, 'b'),(3, 'c');
19+
20+
update test_rewrite_data_with_update set name = "bb" where id = 1;
21+
22+
23+
24+
create table if not exists test_rewrite_data_with_delete (
25+
id INT,
26+
name STRING
27+
)
28+
USING iceberg
29+
TBLPROPERTIES (
30+
'format-version' = '2',
31+
'write.delete.mode' = 'merge-on-read',
32+
'write.update.mode' = 'merge-on-read',
33+
'write.merge.mode' = 'merge-on-read'
34+
);
35+
36+
37+
INSERT INTO test_rewrite_data_with_delete VALUES
38+
(1, 'a'),(2, 'b'),(3, 'c');
39+
40+
delete from test_rewrite_data_with_delete where id = 1;

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public class IcebergTransaction implements Transaction {
7373
private String branchName;
7474

7575
// Rewrite operation support
76+
long startingSnapshotId = -1L; // Track the starting snapshot ID for rewrite operations
7677
private final List<DataFile> filesToDelete = Lists.newArrayList();
7778
private final List<DataFile> filesToAdd = Lists.newArrayList();
7879
private boolean isRewriteMode = false;
@@ -133,6 +134,9 @@ public void beginRewrite(ExternalTable dorisTable) throws UserException {
133134
// create and start the iceberg transaction
134135
this.table = IcebergUtils.getIcebergTable(dorisTable);
135136

137+
// Capture the starting snapshot ID for validation during rewrite commit
138+
this.startingSnapshotId = table.currentSnapshot().snapshotId();
139+
136140
// For rewrite operations, we work directly on the main table
137141
// No branch information needed
138142
this.transaction = table.newTransaction();
@@ -198,6 +202,8 @@ private void updateManifestAfterRewrite() {
198202

199203
RewriteFiles rewriteFiles = transaction.newRewrite();
200204

205+
rewriteFiles = rewriteFiles.validateFromSnapshot(startingSnapshotId);
206+
201207
// For rewrite operations, we work directly on the main table
202208
rewriteFiles = rewriteFiles.scanManifestsWith(ops.getThreadPoolWithPreAuth());
203209

regression-test/data/external_table_p0/iceberg/action/test_iceberg_rewrite_data_files.out

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,21 @@
6868
8 EAST 280.00 2024-01-02
6969
9 EAST 380.00 2024-01-02
7070

71+
-- !before_rewrite_update --
72+
1 bb
73+
2 b
74+
3 c
75+
76+
-- !after_rewrite_update --
77+
1 bb
78+
2 b
79+
3 c
80+
81+
-- !before_rewrite_delete --
82+
2 b
83+
3 c
84+
85+
-- !after_rewrite_delete --
86+
2 b
87+
3 c
88+

regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_data_files.groovy

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,4 +493,67 @@ suite("test_iceberg_rewrite_data_files", "p0,external") {
493493

494494
logger.info("Specific partition rewrite test completed successfully")
495495

496+
// =====================================================================================
497+
// Test Case 4: Rewrite data files for merge-on-read update table
498+
//
499+
// Tables `test_rewrite_data_with_update` and `test_rewrite_data_with_delete`
500+
// are pre-created in Docker initialization (run23.sql) using Spark SQL with
501+
// format-version = 2 and merge-on-read enabled for delete/update/merge.
502+
//
503+
// This case verifies that executing rewrite_data_files on a table that has
504+
// already performed UPDATE operations (implemented via delete + insert) does
505+
// not change the logical query results.
506+
// =====================================================================================
507+
logger.info("Starting rewrite_data_files test for merge-on-read UPDATE table")
508+
509+
def table_name_update = "test_rewrite_data_with_update"
510+
511+
// Verify data before rewrite: id = 1 should have been updated to 'bb'
512+
qt_before_rewrite_update """SELECT id, name FROM ${table_name_update} ORDER BY id"""
513+
514+
def rewriteResultUpdate = sql """
515+
ALTER TABLE ${catalog_name}.${db_name}.${table_name_update}
516+
EXECUTE rewrite_data_files(
517+
"target-file-size-bytes" = "10485760",
518+
"min-input-files" = "1"
519+
)
520+
"""
521+
logger.info("Rewrite data files result for update table: ${rewriteResultUpdate}")
522+
523+
// Verify data after rewrite (logical rows should remain the same)
524+
qt_after_rewrite_update """SELECT id, name FROM ${table_name_update} ORDER BY id"""
525+
526+
def totalUpdateRecords = sql """SELECT COUNT(*) FROM ${table_name_update}"""
527+
assertTrue(totalUpdateRecords[0][0] == 3, "Update table should still have 3 logical records after rewrite")
528+
529+
// =====================================================================================
530+
// Test Case 5: Rewrite data files for merge-on-read delete table
531+
//
532+
// This case verifies that executing rewrite_data_files on a table that has
533+
// already performed DELETE operations does not resurrect deleted rows and
534+
// keeps the logical result set unchanged.
535+
// =====================================================================================
536+
logger.info("Starting rewrite_data_files test for merge-on-read DELETE table")
537+
538+
def table_name_delete = "test_rewrite_data_with_delete"
539+
540+
// Verify data before rewrite: row with id = 1 should have been deleted
541+
qt_before_rewrite_delete """SELECT id, name FROM ${table_name_delete} ORDER BY id"""
542+
543+
def rewriteResultDelete = sql """
544+
ALTER TABLE ${catalog_name}.${db_name}.${table_name_delete}
545+
EXECUTE rewrite_data_files(
546+
"target-file-size-bytes" = "10485760",
547+
"min-input-files" = "1"
548+
)
549+
"""
550+
logger.info("Rewrite data files result for delete table: ${rewriteResultDelete}")
551+
552+
// Verify data after rewrite (deleted rows should not reappear)
553+
qt_after_rewrite_delete """SELECT id, name FROM ${table_name_delete} ORDER BY id"""
554+
555+
def totalDeleteRecords = sql """SELECT COUNT(*) FROM ${table_name_delete}"""
556+
assertTrue(totalDeleteRecords[0][0] == 2, "Delete table should still have 2 logical records after rewrite")
557+
558+
logger.info("Merge-on-read update/delete rewrite_data_files tests completed successfully")
496559
}

0 commit comments

Comments
 (0)