Skip to content

Commit 2ae9642

Browse files
Handle empty EarliestCommitToRetain (#732)
* handle empty EarliestCommitToRetain * Fix build failures: qualify GREATER_THAN and fix test mocks * Use PathBasedPartitionSpecExtractor to align with main branch rename --------- Co-authored-by: Vinish Reddy <vinish@apache.org>
1 parent 62453c7 commit 2ae9642

2 files changed

Lines changed: 212 additions & 0 deletions

File tree

xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
4040
import org.apache.hudi.common.util.Option;
4141

42+
import com.google.common.base.Strings;
4243
import com.google.common.collect.Iterators;
4344
import com.google.common.collect.PeekingIterator;
4445

@@ -184,11 +185,34 @@ private boolean isAffectedByCleanupProcess(Instant instant) {
184185
TimelineMetadataUtils.deserializeHoodieCleanMetadata(
185186
metaClient.getActiveTimeline().getInstantDetails(lastCleanInstant.get()).get());
186187
String earliestCommitToRetain = cleanMetadata.getEarliestCommitToRetain();
188+
if (Strings.isNullOrEmpty(earliestCommitToRetain)) {
189+
return cleanInstantsOccurredSinceLastSyncedInstant(instant);
190+
}
187191
Instant earliestCommitToRetainInstant =
188192
HudiInstantUtils.parseFromInstantTime(earliestCommitToRetain);
189193
return earliestCommitToRetainInstant.isAfter(instant);
190194
}
191195

196+
// When clean instants have empty earliestCommitToRetain, trigger full snapshot sync if any
197+
// clean instants occurred after the last synced instant to err on the side of caution
198+
private boolean cleanInstantsOccurredSinceLastSyncedInstant(Instant instant) {
199+
String lastSyncedCommitTime = HudiInstantUtils.convertInstantToCommit(instant);
200+
List<HoodieInstant> cleanInstantsAfterLastSync =
201+
metaClient
202+
.getActiveTimeline()
203+
.getCleanerTimeline()
204+
.filterCompletedInstants()
205+
.filter(
206+
cleanInstant ->
207+
HoodieTimeline.compareTimestamps(
208+
cleanInstant.getTimestamp(),
209+
HoodieTimeline.GREATER_THAN,
210+
lastSyncedCommitTime))
211+
.getInstants();
212+
213+
return !cleanInstantsAfterLastSync.isEmpty();
214+
}
215+
192216
private CommitsPair getCompletedAndPendingCommitsForInstants(List<Instant> lastPendingInstants) {
193217
List<HoodieInstant> lastPendingHoodieInstants = getCommitsForInstants(lastPendingInstants);
194218
List<HoodieInstant> lastPendingHoodieInstantsCompleted =
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.xtable.hudi;
20+
21+
import static org.junit.jupiter.api.Assertions.assertFalse;
22+
import static org.junit.jupiter.api.Assertions.assertTrue;
23+
import static org.mockito.ArgumentMatchers.any;
24+
import static org.mockito.Mockito.mock;
25+
import static org.mockito.Mockito.when;
26+
27+
import java.time.Instant;
28+
import java.util.Arrays;
29+
import java.util.Collections;
30+
import java.util.HashMap;
31+
32+
import org.apache.hadoop.conf.Configuration;
33+
import org.apache.hadoop.fs.Path;
34+
import org.junit.jupiter.api.Test;
35+
36+
import org.apache.hudi.avro.model.HoodieCleanMetadata;
37+
import org.apache.hudi.common.table.HoodieTableConfig;
38+
import org.apache.hudi.common.table.HoodieTableMetaClient;
39+
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
40+
import org.apache.hudi.common.table.timeline.HoodieInstant;
41+
import org.apache.hudi.common.table.timeline.HoodieTimeline;
42+
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
43+
import org.apache.hudi.common.util.Option;
44+
45+
class TestHudiConversionSource {
46+
47+
private static byte[] serializeCleanMetadata(String earliestCommitToRetain) throws Exception {
48+
HoodieCleanMetadata metadata =
49+
HoodieCleanMetadata.newBuilder()
50+
.setStartCleanTime("000")
51+
.setTimeTakenInMillis(0L)
52+
.setTotalFilesDeleted(0)
53+
.setEarliestCommitToRetain(earliestCommitToRetain)
54+
.setBootstrapPartitionMetadata(new HashMap<>())
55+
.setPartitionMetadata(new HashMap<>())
56+
.build();
57+
return TimelineMetadataUtils.serializeCleanMetadata(metadata).get();
58+
}
59+
60+
@Test
61+
void testIsIncrementalSyncSafeFromWithNullEarliestCommitToRetainNoCleanInstants()
62+
throws Exception {
63+
HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
64+
HoodieActiveTimeline mockActiveTimeline = mock(HoodieActiveTimeline.class);
65+
HoodieTimeline mockCleanerTimeline = mock(HoodieTimeline.class);
66+
HoodieTimeline mockFilteredCleanerTimeline = mock(HoodieTimeline.class);
67+
HoodieTimeline mockCompletedCommitsTimeline = mock(HoodieTimeline.class);
68+
HoodieInstant mockCleanInstant = mock(HoodieInstant.class);
69+
HoodieInstant mockCommitInstant = mock(HoodieInstant.class);
70+
71+
HoodieTableConfig mockTableConfig = mock(HoodieTableConfig.class);
72+
when(mockMetaClient.getActiveTimeline()).thenReturn(mockActiveTimeline);
73+
when(mockMetaClient.getHadoopConf()).thenReturn(new Configuration());
74+
when(mockMetaClient.getTableConfig()).thenReturn(mockTableConfig);
75+
when(mockTableConfig.isMetadataTableAvailable()).thenReturn(false);
76+
when(mockMetaClient.getBasePathV2()).thenReturn(new Path("/tmp/test-table"));
77+
when(mockActiveTimeline.getCleanerTimeline()).thenReturn(mockCleanerTimeline);
78+
when(mockCleanerTimeline.filterCompletedInstants()).thenReturn(mockCleanerTimeline);
79+
when(mockCleanerTimeline.lastInstant()).thenReturn(Option.of(mockCleanInstant));
80+
// Use empty string — Strings.isNullOrEmpty("") is true, same behavior as null
81+
when(mockActiveTimeline.getInstantDetails(mockCleanInstant))
82+
.thenReturn(Option.of(serializeCleanMetadata("")));
83+
84+
when(mockCleanerTimeline.filter(any())).thenReturn(mockFilteredCleanerTimeline);
85+
when(mockFilteredCleanerTimeline.getInstants()).thenReturn(Collections.emptyList());
86+
87+
when(mockActiveTimeline.filterCompletedInstants()).thenReturn(mockCompletedCommitsTimeline);
88+
when(mockCompletedCommitsTimeline.findInstantsBeforeOrEquals(any(String.class)))
89+
.thenReturn(mockCompletedCommitsTimeline);
90+
when(mockCommitInstant.getTimestamp()).thenReturn("20200101120000000");
91+
when(mockCompletedCommitsTimeline.lastInstant()).thenReturn(Option.of(mockCommitInstant));
92+
93+
HudiConversionSource hudiConversionSource =
94+
new HudiConversionSource(mockMetaClient, mock(PathBasedPartitionSpecExtractor.class));
95+
96+
Instant testInstant = Instant.now().minusSeconds(3600);
97+
assertTrue(
98+
hudiConversionSource.isIncrementalSyncSafeFrom(testInstant),
99+
"isIncrementalSyncSafeFrom should return true when earliestCommitToRetain is null and no clean instants after last sync");
100+
}
101+
102+
@Test
103+
void testIsIncrementalSyncSafeFromWithNullEarliestCommitToRetainWithCleanInstants()
104+
throws Exception {
105+
HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
106+
HoodieActiveTimeline mockActiveTimeline = mock(HoodieActiveTimeline.class);
107+
HoodieTimeline mockCleanerTimeline = mock(HoodieTimeline.class);
108+
HoodieTimeline mockFilteredCleanerTimeline = mock(HoodieTimeline.class);
109+
HoodieTimeline mockCompletedCommitsTimeline = mock(HoodieTimeline.class);
110+
HoodieInstant mockCleanInstant = mock(HoodieInstant.class);
111+
HoodieInstant mockCleanInstantAfterSync = mock(HoodieInstant.class);
112+
HoodieInstant mockCommitInstant = mock(HoodieInstant.class);
113+
114+
HoodieTableConfig mockTableConfig = mock(HoodieTableConfig.class);
115+
when(mockMetaClient.getActiveTimeline()).thenReturn(mockActiveTimeline);
116+
when(mockMetaClient.getHadoopConf()).thenReturn(new Configuration());
117+
when(mockMetaClient.getTableConfig()).thenReturn(mockTableConfig);
118+
when(mockTableConfig.isMetadataTableAvailable()).thenReturn(false);
119+
when(mockMetaClient.getBasePathV2()).thenReturn(new Path("/tmp/test-table"));
120+
when(mockActiveTimeline.getCleanerTimeline()).thenReturn(mockCleanerTimeline);
121+
when(mockCleanerTimeline.filterCompletedInstants()).thenReturn(mockCleanerTimeline);
122+
when(mockCleanerTimeline.lastInstant()).thenReturn(Option.of(mockCleanInstant));
123+
// Use empty string — Strings.isNullOrEmpty("") is true, same behavior as null
124+
when(mockActiveTimeline.getInstantDetails(mockCleanInstant))
125+
.thenReturn(Option.of(serializeCleanMetadata("")));
126+
127+
when(mockCleanerTimeline.filter(any())).thenReturn(mockFilteredCleanerTimeline);
128+
when(mockFilteredCleanerTimeline.getInstants())
129+
.thenReturn(Arrays.asList(mockCleanInstantAfterSync));
130+
131+
when(mockActiveTimeline.filterCompletedInstants()).thenReturn(mockCompletedCommitsTimeline);
132+
when(mockCompletedCommitsTimeline.findInstantsBeforeOrEquals(any(String.class)))
133+
.thenReturn(mockCompletedCommitsTimeline);
134+
when(mockCommitInstant.getTimestamp()).thenReturn("20200101120000000");
135+
when(mockCompletedCommitsTimeline.lastInstant()).thenReturn(Option.of(mockCommitInstant));
136+
137+
HudiConversionSource hudiConversionSource =
138+
new HudiConversionSource(mockMetaClient, mock(PathBasedPartitionSpecExtractor.class));
139+
140+
Instant testInstant = Instant.now().minusSeconds(3600);
141+
assertFalse(
142+
hudiConversionSource.isIncrementalSyncSafeFrom(testInstant),
143+
"isIncrementalSyncSafeFrom should return false when earliestCommitToRetain is null but clean instants exist after last sync");
144+
}
145+
146+
@Test
147+
void testIsIncrementalSyncSafeFromWithEmptyEarliestCommitToRetainWithCleanInstants()
148+
throws Exception {
149+
HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
150+
HoodieActiveTimeline mockActiveTimeline = mock(HoodieActiveTimeline.class);
151+
HoodieTimeline mockCleanerTimeline = mock(HoodieTimeline.class);
152+
HoodieTimeline mockFilteredCleanerTimeline = mock(HoodieTimeline.class);
153+
HoodieTimeline mockCompletedCommitsTimeline = mock(HoodieTimeline.class);
154+
HoodieInstant mockCleanInstant = mock(HoodieInstant.class);
155+
HoodieInstant mockCleanInstantAfterSync = mock(HoodieInstant.class);
156+
HoodieInstant mockCommitInstant = mock(HoodieInstant.class);
157+
158+
HoodieTableConfig mockTableConfig = mock(HoodieTableConfig.class);
159+
when(mockMetaClient.getActiveTimeline()).thenReturn(mockActiveTimeline);
160+
when(mockMetaClient.getHadoopConf()).thenReturn(new Configuration());
161+
when(mockMetaClient.getTableConfig()).thenReturn(mockTableConfig);
162+
when(mockTableConfig.isMetadataTableAvailable()).thenReturn(false);
163+
when(mockMetaClient.getBasePathV2()).thenReturn(new Path("/tmp/test-table"));
164+
when(mockActiveTimeline.getCleanerTimeline()).thenReturn(mockCleanerTimeline);
165+
when(mockCleanerTimeline.filterCompletedInstants()).thenReturn(mockCleanerTimeline);
166+
when(mockCleanerTimeline.lastInstant()).thenReturn(Option.of(mockCleanInstant));
167+
when(mockActiveTimeline.getInstantDetails(mockCleanInstant))
168+
.thenReturn(Option.of(serializeCleanMetadata("")));
169+
170+
when(mockCleanerTimeline.filter(any())).thenReturn(mockFilteredCleanerTimeline);
171+
when(mockFilteredCleanerTimeline.getInstants())
172+
.thenReturn(Arrays.asList(mockCleanInstantAfterSync));
173+
174+
when(mockActiveTimeline.filterCompletedInstants()).thenReturn(mockCompletedCommitsTimeline);
175+
when(mockCompletedCommitsTimeline.findInstantsBeforeOrEquals(any(String.class)))
176+
.thenReturn(mockCompletedCommitsTimeline);
177+
when(mockCommitInstant.getTimestamp()).thenReturn("20200101120000000");
178+
when(mockCompletedCommitsTimeline.lastInstant()).thenReturn(Option.of(mockCommitInstant));
179+
180+
HudiConversionSource hudiConversionSource =
181+
new HudiConversionSource(mockMetaClient, mock(PathBasedPartitionSpecExtractor.class));
182+
183+
Instant testInstant = Instant.now().minusSeconds(3600);
184+
assertFalse(
185+
hudiConversionSource.isIncrementalSyncSafeFrom(testInstant),
186+
"isIncrementalSyncSafeFrom should return false when earliestCommitToRetain is empty and clean instants exist after last sync");
187+
}
188+
}

0 commit comments

Comments
 (0)