From 95008b17fb7f9a40552f45b74b03048355a72628 Mon Sep 17 00:00:00 2001 From: Vinish Reddy Date: Wed, 20 May 2026 14:15:29 -0700 Subject: [PATCH] fix(hudi): emit REMOVE when CLEAN deletes previous file version before incremental sync When Hudi CLEAN physically removes an old base file from storage before XTable processes the corresponding UPSERT commit incrementally, getAllBaseFiles() only returns the new file. The else-if branch in getUpdatesToPartition that emits a REMOVE for the previous version never fires, leaving a stale ADD in the downstream Delta log that causes FILE_NOT_FOUND errors on read. Fix: after the base-file loop, if an ADD was emitted but no REMOVE was emitted and HoodieWriteStat.prevCommit indicates a prior version exists, recover the old file path by reading the previous commit's metadata from the visible Hudi timeline and emit a REMOVE. Gracefully degrades (logs warning, skips) if the previous commit has been archived. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../xtable/hudi/HudiDataFileExtractor.java | 85 +++++++++++- .../apache/xtable/ITConversionController.java | 45 ++++++ .../hudi/TestHudiDataFileExtractor.java | 129 ++++++++++++++++++ 3 files changed, 253 insertions(+), 6 deletions(-) create mode 100644 xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiDataFileExtractor.java diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java index f3b73ec2f..0d397916d 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java @@ -25,12 +25,15 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.Builder; import lombok.Value; +import lombok.extern.log4j.Log4j2; import org.apache.hadoop.fs.Path; @@ -71,6 +74,7 @@ import org.apache.xtable.model.storage.PartitionFileGroup; /** Extracts all the files for Hudi table represented by {@link InternalTable}. */ +@Log4j2 public class HudiDataFileExtractor implements AutoCloseable { private final HoodieTableMetadata tableMetadata; private final HoodieTableMetaClient metaClient; @@ -161,16 +165,13 @@ private AddedAndRemovedFiles getAddedAndRemovedPartitionInfo( .getPartitionToWriteStats() .forEach( (partitionPath, writeStats) -> { - Set affectedFileIds = - writeStats.stream() - .map(HoodieWriteStat::getFileId) - .collect(Collectors.toSet()); AddedAndRemovedFiles addedAndRemovedFiles = getUpdatesToPartition( fsView, + timeline, instantToConsider, partitionPath, - affectedFileIds, + writeStats, partitioningFields); addedFiles.addAll(addedAndRemovedFiles.getAdded()); removedFiles.addAll(addedAndRemovedFiles.getRemoved()); @@ -280,10 +281,26 @@ private List getRemovedFiles( private AddedAndRemovedFiles getUpdatesToPartition( TableFileSystemView fsView, + HoodieTimeline timeline, HoodieInstant instantToConsider, String partitionPath, - Set affectedFileIds, + List writeStats, List partitioningFields) { + Set affectedFileIds = + writeStats.stream().map(HoodieWriteStat::getFileId).collect(Collectors.toSet()); + // Build a map from fileId → prevCommit for file groups that are UPSERTs (not brand-new). + // Used below to recover the old file path when CLEAN has already deleted it from the FS. + Map fileIdToPrevCommit = + writeStats.stream() + .filter( + ws -> + ws.getPrevCommit() != null + && !HoodieTimeline.INIT_INSTANT_TS.equals(ws.getPrevCommit())) + .collect( + Collectors.toMap( + HoodieWriteStat::getFileId, + HoodieWriteStat::getPrevCommit, + (existing, replacement) -> existing)); List filesToAdd = new ArrayList<>(affectedFileIds.size()); List filesToRemove = new ArrayList<>(affectedFileIds.size()); List partitionValues = @@ -298,6 +315,7 @@ private AddedAndRemovedFiles getUpdatesToPartition( List baseFiles = fileGroup.getAllBaseFiles().collect(Collectors.toList()); boolean newBaseFileAdded = false; + boolean removeEmitted = false; for (HoodieBaseFile baseFile : baseFiles) { if (baseFile.getCommitTime().equals(instantToConsider.getTimestamp())) { newBaseFileAdded = true; @@ -306,13 +324,68 @@ private AddedAndRemovedFiles getUpdatesToPartition( // if a new base file was added, then the previous base file for the group needs // to be removed filesToRemove.add(buildFileWithoutStats(partitionValues, baseFile)); + removeEmitted = true; break; } } + // Recover old file path from timeline when CLEAN deleted it before this sync. + if (newBaseFileAdded && !removeEmitted) { + String fileId = fileGroup.getFileGroupId().getFileId(); + String prevCommit = fileIdToPrevCommit.get(fileId); + if (prevCommit != null) { + recoverRemovedFile(timeline, partitionPath, fileId, prevCommit, partitionValues) + .ifPresent(filesToRemove::add); + } + } }); return AddedAndRemovedFiles.builder().added(filesToAdd).removed(filesToRemove).build(); } + /** + * Looks up the previous base file path from the timeline when CLEAN has already deleted it from + * storage, so the downstream format can emit a proper REMOVE for the stale file. + * + * @return the old {@link InternalDataFile}, or empty if the previous commit is no longer in the + * visible timeline (e.g. archived before this sync ran) + */ + Optional recoverRemovedFile( + HoodieTimeline timeline, + String partitionPath, + String fileId, + String prevCommitTime, + List partitionValues) { + return timeline.getInstants().stream() + .filter(instant -> prevCommitTime.equals(instant.getTimestamp())) + .findFirst() + .flatMap( + prevInstant -> { + try { + HoodieCommitMetadata prevMeta = + HoodieCommitMetadata.fromBytes( + timeline.getInstantDetails(prevInstant).get(), HoodieCommitMetadata.class); + return prevMeta + .getPartitionToWriteStats() + .getOrDefault(partitionPath, Collections.emptyList()) + .stream() + .filter(ws -> fileId.equals(ws.getFileId())) + .findFirst() + .map( + ws -> + buildFileWithoutStats( + partitionValues, new HoodieBaseFile(ws.getPath()))); + } catch (IOException e) { + log.warn( + "Unable to read previous commit {} to recover removed file for fileId {} " + + "in partition {}", + prevCommitTime, + fileId, + partitionPath, + e); + return Optional.empty(); + } + }); + } + private AddedAndRemovedFiles getUpdatesToPartitionForReplaceCommit( TableFileSystemView fsView, HoodieInstant instantToConsider, diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index 2da3078b6..d89240129 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -597,6 +597,51 @@ public void testPartitionedData(TableFormatPartitionDataHolder tableFormatPartit } } + /** + * Verifies that incremental sync emits a REMOVE when Hudi CLEAN deletes the previous base file + * version from storage before XTable processes the UPSERT commit. + */ + @Test + public void testIncrementalSyncEmitsRemoveWhenHudiCleanRunsBeforeSync() { + String tableName = getTableName(); + ConversionSourceProvider conversionSourceProvider = getConversionSourceProvider(HUDI); + try (TestJavaHudiTable table = + TestJavaHudiTable.forStandardSchema( + tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) { + ConversionConfig conversionConfig = + getTableSyncConfig( + HUDI, SyncMode.INCREMENTAL, tableName, table, ImmutableList.of(DELTA), null, null); + ConversionController conversionController = + new ConversionController(jsc.hadoopConfiguration()); + + // Commit A: insert 100 records; sync so Delta has ADD actions for commitA files. + List> insertedRecords = table.insertRecords(100, true); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(HUDI, table, Collections.singletonList(DELTA), 100); + + // Commit B: upsert 50 records, creating new file versions for those file groups. + // XTable does NOT sync here — simulating the lag between Hudi writes and XTable runs. + table.upsertRecords(insertedRecords.subList(0, 50), true); + + // Commit C: insert 20 brand-new records. A third commit is required before CLEAN so + // that Hudi's clean plan is non-empty (retainCommits=1 needs N+1 commits to purge N). + table.insertRecords(20, true); + + // CLEAN physically deletes commitA's file versions from disk — this is the race condition. + // After this point getAllBaseFiles() for the upserted file groups returns only commitB files. + table.clean(); + + // Incremental sync picks up commits B and C. For the upserted file groups in commit B, the + // fix recovers the commitA file path from prevCommit metadata and emits a REMOVE, preventing + // Delta from retaining a stale ADD that would cause FileNotFoundException on read. + conversionController.sync(conversionConfig, conversionSourceProvider); + + // 100 original + 20 new inserts = 120 records (50 upserted records are updated, not added). + // Reading via Spark would throw FileNotFoundException on stale commitA ADDs without the fix. + checkDatasetEquivalence(HUDI, table, Collections.singletonList(DELTA), 120); + } + } + @ParameterizedTest @EnumSource(value = SyncMode.class) public void testSyncWithSingleFormat(SyncMode syncMode) { diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiDataFileExtractor.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiDataFileExtractor.java new file mode 100644 index 000000000..762c795c5 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiDataFileExtractor.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.hudi; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.Optional; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.Test; + +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; + +import org.apache.xtable.model.storage.InternalDataFile; + +class TestHudiDataFileExtractor { + + @Test + void recoverRemovedFile_returnsFileWhenPreviousCommitInTimeline() throws Exception { + HudiDataFileExtractor extractor = buildExtractorWithBasePath("file:///tmp/test-table"); + String partition = "year=2023"; + String fileId = "fg-1"; + String prevCommitTime = "20230101000000000"; + String oldPath = partition + "/" + fileId + "_0-0-0_" + prevCommitTime + ".parquet"; + + HoodieInstant prevInstant = + new HoodieInstant( + HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, prevCommitTime); + HoodieTimeline timeline = mock(HoodieTimeline.class); + when(timeline.getInstants()).thenReturn(Collections.singletonList(prevInstant)); + when(timeline.getInstantDetails(prevInstant)) + .thenReturn( + Option.of(singleStatCommit(partition, fileId, oldPath).toJsonString().getBytes())); + + Optional result = + extractor.recoverRemovedFile( + timeline, partition, fileId, prevCommitTime, Collections.emptyList()); + + assertTrue(result.isPresent()); + assertEquals(oldPath, result.get().getPhysicalPath()); + } + + @Test + void recoverRemovedFile_returnsEmptyWhenPreviousCommitArchived() { + HudiDataFileExtractor extractor = buildExtractorWithBasePath("file:///tmp/test-table"); + + HoodieTimeline timeline = mock(HoodieTimeline.class); + when(timeline.getInstants()).thenReturn(Collections.emptyList()); + + Optional result = + extractor.recoverRemovedFile( + timeline, "year=2023", "fg-1", "20230101000000000", Collections.emptyList()); + + assertFalse(result.isPresent()); + } + + @Test + void recoverRemovedFile_returnsEmptyWhenFileIdNotInPrevCommit() throws Exception { + HudiDataFileExtractor extractor = buildExtractorWithBasePath("file:///tmp/test-table"); + String partition = "year=2023"; + String prevCommitTime = "20230101000000000"; + + HoodieInstant prevInstant = + new HoodieInstant( + HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, prevCommitTime); + HoodieTimeline timeline = mock(HoodieTimeline.class); + when(timeline.getInstants()).thenReturn(Collections.singletonList(prevInstant)); + when(timeline.getInstantDetails(prevInstant)) + .thenReturn( + Option.of( + singleStatCommit(partition, "different-fg", partition + "/different-fg.parquet") + .toJsonString() + .getBytes())); + + Optional result = + extractor.recoverRemovedFile( + timeline, partition, "fg-1", prevCommitTime, Collections.emptyList()); + + assertFalse(result.isPresent()); + } + + private static HoodieCommitMetadata singleStatCommit( + String partition, String fileId, String path) { + HoodieWriteStat stat = new HoodieWriteStat(); + stat.setFileId(fileId); + stat.setPath(path); + HoodieCommitMetadata metadata = new HoodieCommitMetadata(); + metadata.addWriteStat(partition, stat); + return metadata; + } + + private static HudiDataFileExtractor buildExtractorWithBasePath(String basePathStr) { + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + HoodieTableConfig tableConfig = mock(HoodieTableConfig.class); + when(metaClient.getHadoopConf()).thenReturn(new Configuration()); + when(metaClient.getTableConfig()).thenReturn(tableConfig); + when(tableConfig.isMetadataTableAvailable()).thenReturn(false); + when(metaClient.getBasePathV2()).thenReturn(new Path(basePathStr)); + return new HudiDataFileExtractor(metaClient, null, null); + } +}