diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java index f3b73ec2f..0d397916d 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java @@ -25,12 +25,15 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.Builder; import lombok.Value; +import lombok.extern.log4j.Log4j2; import org.apache.hadoop.fs.Path; @@ -71,6 +74,7 @@ import org.apache.xtable.model.storage.PartitionFileGroup; /** Extracts all the files for Hudi table represented by {@link InternalTable}. */ +@Log4j2 public class HudiDataFileExtractor implements AutoCloseable { private final HoodieTableMetadata tableMetadata; private final HoodieTableMetaClient metaClient; @@ -161,16 +165,13 @@ private AddedAndRemovedFiles getAddedAndRemovedPartitionInfo( .getPartitionToWriteStats() .forEach( (partitionPath, writeStats) -> { - Set affectedFileIds = - writeStats.stream() - .map(HoodieWriteStat::getFileId) - .collect(Collectors.toSet()); AddedAndRemovedFiles addedAndRemovedFiles = getUpdatesToPartition( fsView, + timeline, instantToConsider, partitionPath, - affectedFileIds, + writeStats, partitioningFields); addedFiles.addAll(addedAndRemovedFiles.getAdded()); removedFiles.addAll(addedAndRemovedFiles.getRemoved()); @@ -280,10 +281,26 @@ private List getRemovedFiles( private AddedAndRemovedFiles getUpdatesToPartition( TableFileSystemView fsView, + HoodieTimeline timeline, HoodieInstant instantToConsider, String partitionPath, - Set affectedFileIds, + List writeStats, List partitioningFields) { + Set affectedFileIds = + writeStats.stream().map(HoodieWriteStat::getFileId).collect(Collectors.toSet()); + // Build a map from fileId → prevCommit for file groups that are UPSERTs (not brand-new). + // Used below to recover the old file path when CLEAN has already deleted it from the FS. + Map fileIdToPrevCommit = + writeStats.stream() + .filter( + ws -> + ws.getPrevCommit() != null + && !HoodieTimeline.INIT_INSTANT_TS.equals(ws.getPrevCommit())) + .collect( + Collectors.toMap( + HoodieWriteStat::getFileId, + HoodieWriteStat::getPrevCommit, + (existing, replacement) -> existing)); List filesToAdd = new ArrayList<>(affectedFileIds.size()); List filesToRemove = new ArrayList<>(affectedFileIds.size()); List partitionValues = @@ -298,6 +315,7 @@ private AddedAndRemovedFiles getUpdatesToPartition( List baseFiles = fileGroup.getAllBaseFiles().collect(Collectors.toList()); boolean newBaseFileAdded = false; + boolean removeEmitted = false; for (HoodieBaseFile baseFile : baseFiles) { if (baseFile.getCommitTime().equals(instantToConsider.getTimestamp())) { newBaseFileAdded = true; @@ -306,13 +324,68 @@ private AddedAndRemovedFiles getUpdatesToPartition( // if a new base file was added, then the previous base file for the group needs // to be removed filesToRemove.add(buildFileWithoutStats(partitionValues, baseFile)); + removeEmitted = true; break; } } + // Recover old file path from timeline when CLEAN deleted it before this sync. + if (newBaseFileAdded && !removeEmitted) { + String fileId = fileGroup.getFileGroupId().getFileId(); + String prevCommit = fileIdToPrevCommit.get(fileId); + if (prevCommit != null) { + recoverRemovedFile(timeline, partitionPath, fileId, prevCommit, partitionValues) + .ifPresent(filesToRemove::add); + } + } }); return AddedAndRemovedFiles.builder().added(filesToAdd).removed(filesToRemove).build(); } + /** + * Looks up the previous base file path from the timeline when CLEAN has already deleted it from + * storage, so the downstream format can emit a proper REMOVE for the stale file. + * + * @return the old {@link InternalDataFile}, or empty if the previous commit is no longer in the + * visible timeline (e.g. archived before this sync ran) + */ + Optional recoverRemovedFile( + HoodieTimeline timeline, + String partitionPath, + String fileId, + String prevCommitTime, + List partitionValues) { + return timeline.getInstants().stream() + .filter(instant -> prevCommitTime.equals(instant.getTimestamp())) + .findFirst() + .flatMap( + prevInstant -> { + try { + HoodieCommitMetadata prevMeta = + HoodieCommitMetadata.fromBytes( + timeline.getInstantDetails(prevInstant).get(), HoodieCommitMetadata.class); + return prevMeta + .getPartitionToWriteStats() + .getOrDefault(partitionPath, Collections.emptyList()) + .stream() + .filter(ws -> fileId.equals(ws.getFileId())) + .findFirst() + .map( + ws -> + buildFileWithoutStats( + partitionValues, new HoodieBaseFile(ws.getPath()))); + } catch (IOException e) { + log.warn( + "Unable to read previous commit {} to recover removed file for fileId {} " + + "in partition {}", + prevCommitTime, + fileId, + partitionPath, + e); + return Optional.empty(); + } + }); + } + private AddedAndRemovedFiles getUpdatesToPartitionForReplaceCommit( TableFileSystemView fsView, HoodieInstant instantToConsider, diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index 2da3078b6..d89240129 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -597,6 +597,51 @@ public void testPartitionedData(TableFormatPartitionDataHolder tableFormatPartit } } + /** + * Verifies that incremental sync emits a REMOVE when Hudi CLEAN deletes the previous base file + * version from storage before XTable processes the UPSERT commit. + */ + @Test + public void testIncrementalSyncEmitsRemoveWhenHudiCleanRunsBeforeSync() { + String tableName = getTableName(); + ConversionSourceProvider conversionSourceProvider = getConversionSourceProvider(HUDI); + try (TestJavaHudiTable table = + TestJavaHudiTable.forStandardSchema( + tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) { + ConversionConfig conversionConfig = + getTableSyncConfig( + HUDI, SyncMode.INCREMENTAL, tableName, table, ImmutableList.of(DELTA), null, null); + ConversionController conversionController = + new ConversionController(jsc.hadoopConfiguration()); + + // Commit A: insert 100 records; sync so Delta has ADD actions for commitA files. + List> insertedRecords = table.insertRecords(100, true); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(HUDI, table, Collections.singletonList(DELTA), 100); + + // Commit B: upsert 50 records, creating new file versions for those file groups. + // XTable does NOT sync here — simulating the lag between Hudi writes and XTable runs. + table.upsertRecords(insertedRecords.subList(0, 50), true); + + // Commit C: insert 20 brand-new records. A third commit is required before CLEAN so + // that Hudi's clean plan is non-empty (retainCommits=1 needs N+1 commits to purge N). + table.insertRecords(20, true); + + // CLEAN physically deletes commitA's file versions from disk — this is the race condition. + // After this point getAllBaseFiles() for the upserted file groups returns only commitB files. + table.clean(); + + // Incremental sync picks up commits B and C. For the upserted file groups in commit B, the + // fix recovers the commitA file path from prevCommit metadata and emits a REMOVE, preventing + // Delta from retaining a stale ADD that would cause FileNotFoundException on read. + conversionController.sync(conversionConfig, conversionSourceProvider); + + // 100 original + 20 new inserts = 120 records (50 upserted records are updated, not added). + // Reading via Spark would throw FileNotFoundException on stale commitA ADDs without the fix. + checkDatasetEquivalence(HUDI, table, Collections.singletonList(DELTA), 120); + } + } + @ParameterizedTest @EnumSource(value = SyncMode.class) public void testSyncWithSingleFormat(SyncMode syncMode) { diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiDataFileExtractor.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiDataFileExtractor.java new file mode 100644 index 000000000..762c795c5 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiDataFileExtractor.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.hudi; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.Optional; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.Test; + +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; + +import org.apache.xtable.model.storage.InternalDataFile; + +class TestHudiDataFileExtractor { + + @Test + void recoverRemovedFile_returnsFileWhenPreviousCommitInTimeline() throws Exception { + HudiDataFileExtractor extractor = buildExtractorWithBasePath("file:///tmp/test-table"); + String partition = "year=2023"; + String fileId = "fg-1"; + String prevCommitTime = "20230101000000000"; + String oldPath = partition + "/" + fileId + "_0-0-0_" + prevCommitTime + ".parquet"; + + HoodieInstant prevInstant = + new HoodieInstant( + HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, prevCommitTime); + HoodieTimeline timeline = mock(HoodieTimeline.class); + when(timeline.getInstants()).thenReturn(Collections.singletonList(prevInstant)); + when(timeline.getInstantDetails(prevInstant)) + .thenReturn( + Option.of(singleStatCommit(partition, fileId, oldPath).toJsonString().getBytes())); + + Optional result = + extractor.recoverRemovedFile( + timeline, partition, fileId, prevCommitTime, Collections.emptyList()); + + assertTrue(result.isPresent()); + assertEquals(oldPath, result.get().getPhysicalPath()); + } + + @Test + void recoverRemovedFile_returnsEmptyWhenPreviousCommitArchived() { + HudiDataFileExtractor extractor = buildExtractorWithBasePath("file:///tmp/test-table"); + + HoodieTimeline timeline = mock(HoodieTimeline.class); + when(timeline.getInstants()).thenReturn(Collections.emptyList()); + + Optional result = + extractor.recoverRemovedFile( + timeline, "year=2023", "fg-1", "20230101000000000", Collections.emptyList()); + + assertFalse(result.isPresent()); + } + + @Test + void recoverRemovedFile_returnsEmptyWhenFileIdNotInPrevCommit() throws Exception { + HudiDataFileExtractor extractor = buildExtractorWithBasePath("file:///tmp/test-table"); + String partition = "year=2023"; + String prevCommitTime = "20230101000000000"; + + HoodieInstant prevInstant = + new HoodieInstant( + HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, prevCommitTime); + HoodieTimeline timeline = mock(HoodieTimeline.class); + when(timeline.getInstants()).thenReturn(Collections.singletonList(prevInstant)); + when(timeline.getInstantDetails(prevInstant)) + .thenReturn( + Option.of( + singleStatCommit(partition, "different-fg", partition + "/different-fg.parquet") + .toJsonString() + .getBytes())); + + Optional result = + extractor.recoverRemovedFile( + timeline, partition, "fg-1", prevCommitTime, Collections.emptyList()); + + assertFalse(result.isPresent()); + } + + private static HoodieCommitMetadata singleStatCommit( + String partition, String fileId, String path) { + HoodieWriteStat stat = new HoodieWriteStat(); + stat.setFileId(fileId); + stat.setPath(path); + HoodieCommitMetadata metadata = new HoodieCommitMetadata(); + metadata.addWriteStat(partition, stat); + return metadata; + } + + private static HudiDataFileExtractor buildExtractorWithBasePath(String basePathStr) { + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + HoodieTableConfig tableConfig = mock(HoodieTableConfig.class); + when(metaClient.getHadoopConf()).thenReturn(new Configuration()); + when(metaClient.getTableConfig()).thenReturn(tableConfig); + when(tableConfig.isMetadataTableAvailable()).thenReturn(false); + when(metaClient.getBasePathV2()).thenReturn(new Path(basePathStr)); + return new HudiDataFileExtractor(metaClient, null, null); + } +}