Skip to content

Commit 634bcb6

Browse files
authored
[bug] Fix batch INSERT_OVERWRITE replacecommits dropping adds in HudiDataFileExtractor (#816)
1 parent 2ae9642 commit 634bcb6

File tree

3 files changed

+60
-3
lines changed

3 files changed

+60
-3
lines changed

xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -326,9 +326,7 @@ private AddedAndRemovedFiles getUpdatesToPartitionForReplaceCommit(
326326
partitionValuesExtractor.extractPartitionValues(partitioningFields, partitionPath);
327327
Stream<HoodieFileGroup> fileGroups =
328328
Stream.concat(
329-
fsView.getAllFileGroups(partitionPath),
330-
fsView.getReplacedFileGroupsBeforeOrOn(
331-
instantToConsider.getTimestamp(), partitionPath));
329+
fsView.getAllFileGroups(partitionPath), fsView.getAllReplacedFileGroups(partitionPath));
332330
fileGroups.forEach(
333331
fileGroup -> {
334332
List<HoodieBaseFile> baseFiles = fileGroup.getAllBaseFiles().collect(Collectors.toList());

xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,17 @@ public void deletePartition(String partition, HoodieTableType tableType) {
220220
assertNoWriteErrors(result);
221221
}
222222

223+
public void insertOverwrite(
224+
List<HoodieRecord<HoodieAvroPayload>> records, HoodieTableType tableType) {
225+
String actionType =
226+
CommitUtils.getCommitActionType(WriteOperationType.INSERT_OVERWRITE, tableType);
227+
String instant = getStartCommitOfActionType(actionType);
228+
JavaRDD<HoodieRecord<HoodieAvroPayload>> writeRecords = jsc.parallelize(records, 1);
229+
HoodieWriteResult writeResult = writeClient.insertOverwrite(writeRecords, instant);
230+
List<WriteStatus> result = writeResult.getWriteStatuses().collect();
231+
assertNoWriteErrors(result);
232+
}
233+
223234
public void cluster() {
224235
String instant = writeClient.scheduleClustering(Option.empty()).get();
225236
writeClient.cluster(instant, true);

xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,54 @@ public void testsForDropPartition(HoodieTableType tableType) {
419419
}
420420
}
421421

422+
@ParameterizedTest
423+
@MethodSource("testsForAllTableTypes")
424+
public void testMultipleInsertOverwriteOnSamePartitions(HoodieTableType tableType) {
425+
String tableName = "test_table_" + UUID.randomUUID();
426+
try (TestSparkHudiTable table =
427+
TestSparkHudiTable.forStandardSchema(tableName, tempDir, jsc, "level:SIMPLE", tableType)) {
428+
List<List<String>> allBaseFilePaths = new ArrayList<>();
429+
List<TableChange> allTableChanges = new ArrayList<>();
430+
431+
// Initial insert into partition "INFO"
432+
String commitInstant1 = table.startCommit();
433+
List<HoodieRecord<HoodieAvroPayload>> insertsForCommit1 = table.generateRecords(50, "INFO");
434+
table.insertRecordsWithCommitAlreadyStarted(insertsForCommit1, commitInstant1, true);
435+
allBaseFilePaths.add(table.getAllLatestBaseFilePaths());
436+
437+
// INSERT_OVERWRITE on "INFO" partition (replacecommit A — new file groups replace initial)
438+
List<HoodieRecord<HoodieAvroPayload>> overwriteRecords1 = table.generateRecords(30, "INFO");
439+
table.insertOverwrite(overwriteRecords1, tableType);
440+
allBaseFilePaths.add(table.getAllLatestBaseFilePaths());
441+
442+
// INSERT_OVERWRITE on "INFO" partition again (replacecommit B — new file groups replace A's)
443+
List<HoodieRecord<HoodieAvroPayload>> overwriteRecords2 = table.generateRecords(20, "INFO");
444+
table.insertOverwrite(overwriteRecords2, tableType);
445+
allBaseFilePaths.add(table.getAllLatestBaseFilePaths());
446+
447+
HudiConversionSource hudiClient =
448+
getHudiSourceClient(CONFIGURATION, table.getBasePath(), "level:VALUE");
449+
// Get the current snapshot
450+
InternalSnapshot internalSnapshot = hudiClient.getCurrentSnapshot();
451+
ValidationTestHelper.validateSnapshot(
452+
internalSnapshot, allBaseFilePaths.get(allBaseFilePaths.size() - 1));
453+
// Get changes in Incremental format since the initial insert
454+
InstantsForIncrementalSync instantsForIncrementalSync =
455+
InstantsForIncrementalSync.builder()
456+
.lastSyncInstant(HudiInstantUtils.parseFromInstantTime(commitInstant1))
457+
.build();
458+
CommitsBacklog<HoodieInstant> instantCommitsBacklog =
459+
hudiClient.getCommitsBacklog(instantsForIncrementalSync);
460+
for (HoodieInstant instant : instantCommitsBacklog.getCommitsToProcess()) {
461+
TableChange tableChange = hudiClient.getTableChangeForCommit(instant);
462+
allTableChanges.add(tableChange);
463+
}
464+
// Without the fix, replacecommit A would have 0 adds because the FileSystemView
465+
// built from the full timeline marks A's file groups as replaced by B.
466+
ValidationTestHelper.validateTableChanges(allBaseFilePaths, allTableChanges);
467+
}
468+
}
469+
422470
@ParameterizedTest
423471
@MethodSource("testsForAllTableTypes")
424472
public void testsForDeleteAllRecordsInPartition(HoodieTableType tableType) {

0 commit comments

Comments
 (0)