Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import lombok.Builder;
import lombok.Value;
import lombok.extern.log4j.Log4j2;

import org.apache.hadoop.fs.Path;

Expand Down Expand Up @@ -71,6 +74,7 @@
import org.apache.xtable.model.storage.PartitionFileGroup;

/** Extracts all the files for Hudi table represented by {@link InternalTable}. */
@Log4j2
public class HudiDataFileExtractor implements AutoCloseable {
private final HoodieTableMetadata tableMetadata;
private final HoodieTableMetaClient metaClient;
Expand Down Expand Up @@ -161,16 +165,13 @@ private AddedAndRemovedFiles getAddedAndRemovedPartitionInfo(
.getPartitionToWriteStats()
.forEach(
(partitionPath, writeStats) -> {
Set<String> affectedFileIds =
writeStats.stream()
.map(HoodieWriteStat::getFileId)
.collect(Collectors.toSet());
AddedAndRemovedFiles addedAndRemovedFiles =
getUpdatesToPartition(
fsView,
timeline,
instantToConsider,
partitionPath,
affectedFileIds,
writeStats,
partitioningFields);
addedFiles.addAll(addedAndRemovedFiles.getAdded());
removedFiles.addAll(addedAndRemovedFiles.getRemoved());
Expand Down Expand Up @@ -280,10 +281,26 @@ private List<InternalDataFile> getRemovedFiles(

private AddedAndRemovedFiles getUpdatesToPartition(
TableFileSystemView fsView,
HoodieTimeline timeline,
HoodieInstant instantToConsider,
String partitionPath,
Set<String> affectedFileIds,
List<HoodieWriteStat> writeStats,
List<InternalPartitionField> partitioningFields) {
Set<String> affectedFileIds =
writeStats.stream().map(HoodieWriteStat::getFileId).collect(Collectors.toSet());
// Build a map from fileId → prevCommit for file groups that are UPSERTs (not brand-new).
// Used below to recover the old file path when CLEAN has already deleted it from the FS.
Map<String, String> fileIdToPrevCommit =
writeStats.stream()
.filter(
ws ->
ws.getPrevCommit() != null
&& !HoodieTimeline.INIT_INSTANT_TS.equals(ws.getPrevCommit()))
.collect(
Collectors.toMap(
HoodieWriteStat::getFileId,
HoodieWriteStat::getPrevCommit,
(existing, replacement) -> existing));
List<InternalDataFile> filesToAdd = new ArrayList<>(affectedFileIds.size());
List<InternalDataFile> filesToRemove = new ArrayList<>(affectedFileIds.size());
List<PartitionValue> partitionValues =
Expand All @@ -298,6 +315,7 @@ private AddedAndRemovedFiles getUpdatesToPartition(
List<HoodieBaseFile> baseFiles =
fileGroup.getAllBaseFiles().collect(Collectors.toList());
boolean newBaseFileAdded = false;
boolean removeEmitted = false;
for (HoodieBaseFile baseFile : baseFiles) {
if (baseFile.getCommitTime().equals(instantToConsider.getTimestamp())) {
newBaseFileAdded = true;
Expand All @@ -306,13 +324,68 @@ private AddedAndRemovedFiles getUpdatesToPartition(
// if a new base file was added, then the previous base file for the group needs
// to be removed
filesToRemove.add(buildFileWithoutStats(partitionValues, baseFile));
removeEmitted = true;
break;
}
}
// Recover old file path from timeline when CLEAN deleted it before this sync.
if (newBaseFileAdded && !removeEmitted) {
String fileId = fileGroup.getFileGroupId().getFileId();
String prevCommit = fileIdToPrevCommit.get(fileId);
if (prevCommit != null) {
recoverRemovedFile(timeline, partitionPath, fileId, prevCommit, partitionValues)
.ifPresent(filesToRemove::add);
}
}
});
return AddedAndRemovedFiles.builder().added(filesToAdd).removed(filesToRemove).build();
}

/**
* Looks up the previous base file path from the timeline when CLEAN has already deleted it from
* storage, so the downstream format can emit a proper REMOVE for the stale file.
*
* @return the old {@link InternalDataFile}, or empty if the previous commit is no longer in the
* visible timeline (e.g. archived before this sync ran)
*/
Optional<InternalDataFile> recoverRemovedFile(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will parse the clean metadata per file. Can we create some lightweight object that maintains an in-memory cache of the clean plans for this sync so it is more performant when this case is hit?

HoodieTimeline timeline,
String partitionPath,
String fileId,
String prevCommitTime,
List<PartitionValue> partitionValues) {
return timeline.getInstants().stream()
.filter(instant -> prevCommitTime.equals(instant.getTimestamp()))
.findFirst()
.flatMap(
prevInstant -> {
try {
HoodieCommitMetadata prevMeta =
HoodieCommitMetadata.fromBytes(
timeline.getInstantDetails(prevInstant).get(), HoodieCommitMetadata.class);
return prevMeta
.getPartitionToWriteStats()
.getOrDefault(partitionPath, Collections.emptyList())
.stream()
.filter(ws -> fileId.equals(ws.getFileId()))
.findFirst()
.map(
ws ->
buildFileWithoutStats(
partitionValues, new HoodieBaseFile(ws.getPath())));
} catch (IOException e) {
log.warn(
"Unable to read previous commit {} to recover removed file for fileId {} "
+ "in partition {}",
prevCommitTime,
fileId,
partitionPath,
e);
return Optional.empty();
}
});
}

private AddedAndRemovedFiles getUpdatesToPartitionForReplaceCommit(
TableFileSystemView fsView,
HoodieInstant instantToConsider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,51 @@ public void testPartitionedData(TableFormatPartitionDataHolder tableFormatPartit
}
}

/**
* Verifies that incremental sync emits a REMOVE when Hudi CLEAN deletes the previous base file
* version from storage before XTable processes the UPSERT commit.
*/
@Test
public void testIncrementalSyncEmitsRemoveWhenHudiCleanRunsBeforeSync() {
String tableName = getTableName();
ConversionSourceProvider<?> conversionSourceProvider = getConversionSourceProvider(HUDI);
try (TestJavaHudiTable table =
TestJavaHudiTable.forStandardSchema(
tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
ConversionConfig conversionConfig =
getTableSyncConfig(
HUDI, SyncMode.INCREMENTAL, tableName, table, ImmutableList.of(DELTA), null, null);
ConversionController conversionController =
new ConversionController(jsc.hadoopConfiguration());

// Commit A: insert 100 records; sync so Delta has ADD actions for commitA files.
List<HoodieRecord<HoodieAvroPayload>> insertedRecords = table.insertRecords(100, true);
conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(HUDI, table, Collections.singletonList(DELTA), 100);

// Commit B: upsert 50 records, creating new file versions for those file groups.
// XTable does NOT sync here — simulating the lag between Hudi writes and XTable runs.
table.upsertRecords(insertedRecords.subList(0, 50), true);

// Commit C: insert 20 brand-new records. A third commit is required before CLEAN so
// that Hudi's clean plan is non-empty (retainCommits=1 needs N+1 commits to purge N).
table.insertRecords(20, true);

// CLEAN physically deletes commitA's file versions from disk — this is the race condition.
// After this point getAllBaseFiles() for the upserted file groups returns only commitB files.
table.clean();

// Incremental sync picks up commits B and C. For the upserted file groups in commit B, the
// fix recovers the commitA file path from prevCommit metadata and emits a REMOVE, preventing
// Delta from retaining a stale ADD that would cause FileNotFoundException on read.
conversionController.sync(conversionConfig, conversionSourceProvider);

// 100 original + 20 new inserts = 120 records (50 upserted records are updated, not added).
// Reading via Spark would throw FileNotFoundException on stale commitA ADDs without the fix.
checkDatasetEquivalence(HUDI, table, Collections.singletonList(DELTA), 120);
}
}

@ParameterizedTest
@EnumSource(value = SyncMode.class)
public void testSyncWithSingleFormat(SyncMode syncMode) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.hudi;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.Collections;
import java.util.Optional;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;

import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;

import org.apache.xtable.model.storage.InternalDataFile;

class TestHudiDataFileExtractor {

@Test
void recoverRemovedFile_returnsFileWhenPreviousCommitInTimeline() throws Exception {
HudiDataFileExtractor extractor = buildExtractorWithBasePath("file:///tmp/test-table");
String partition = "year=2023";
String fileId = "fg-1";
String prevCommitTime = "20230101000000000";
String oldPath = partition + "/" + fileId + "_0-0-0_" + prevCommitTime + ".parquet";

HoodieInstant prevInstant =
new HoodieInstant(
HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, prevCommitTime);
HoodieTimeline timeline = mock(HoodieTimeline.class);
when(timeline.getInstants()).thenReturn(Collections.singletonList(prevInstant));
when(timeline.getInstantDetails(prevInstant))
.thenReturn(
Option.of(singleStatCommit(partition, fileId, oldPath).toJsonString().getBytes()));

Optional<InternalDataFile> result =
extractor.recoverRemovedFile(
timeline, partition, fileId, prevCommitTime, Collections.emptyList());

assertTrue(result.isPresent());
assertEquals(oldPath, result.get().getPhysicalPath());
}

@Test
void recoverRemovedFile_returnsEmptyWhenPreviousCommitArchived() {
HudiDataFileExtractor extractor = buildExtractorWithBasePath("file:///tmp/test-table");

HoodieTimeline timeline = mock(HoodieTimeline.class);
when(timeline.getInstants()).thenReturn(Collections.emptyList());

Optional<InternalDataFile> result =
extractor.recoverRemovedFile(
timeline, "year=2023", "fg-1", "20230101000000000", Collections.emptyList());

assertFalse(result.isPresent());
}

@Test
void recoverRemovedFile_returnsEmptyWhenFileIdNotInPrevCommit() throws Exception {
HudiDataFileExtractor extractor = buildExtractorWithBasePath("file:///tmp/test-table");
String partition = "year=2023";
String prevCommitTime = "20230101000000000";

HoodieInstant prevInstant =
new HoodieInstant(
HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, prevCommitTime);
HoodieTimeline timeline = mock(HoodieTimeline.class);
when(timeline.getInstants()).thenReturn(Collections.singletonList(prevInstant));
when(timeline.getInstantDetails(prevInstant))
.thenReturn(
Option.of(
singleStatCommit(partition, "different-fg", partition + "/different-fg.parquet")
.toJsonString()
.getBytes()));

Optional<InternalDataFile> result =
extractor.recoverRemovedFile(
timeline, partition, "fg-1", prevCommitTime, Collections.emptyList());

assertFalse(result.isPresent());
}

private static HoodieCommitMetadata singleStatCommit(
String partition, String fileId, String path) {
HoodieWriteStat stat = new HoodieWriteStat();
stat.setFileId(fileId);
stat.setPath(path);
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
metadata.addWriteStat(partition, stat);
return metadata;
}

private static HudiDataFileExtractor buildExtractorWithBasePath(String basePathStr) {
HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
when(metaClient.getHadoopConf()).thenReturn(new Configuration());
when(metaClient.getTableConfig()).thenReturn(tableConfig);
when(tableConfig.isMetadataTableAvailable()).thenReturn(false);
when(metaClient.getBasePathV2()).thenReturn(new Path(basePathStr));
return new HudiDataFileExtractor(metaClient, null, null);
}
}