Skip to content

Commit 71b01aa

Browse files
committed
[flink] Preserve log-only source restore mode
1 parent c108476 commit 71b01aa

2 files changed

Lines changed: 111 additions & 1 deletion

File tree

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,19 @@ public class FlinkSourceEnumerator
134134
/** Buckets that have been assigned to readers. */
135135
private final Set<TableBucket> assignedTableBuckets;
136136

137+
/**
138+
* Remaining lake snapshot and hybrid lake/Fluss splits to assign.
139+
*
140+
* <p>The field has three states:
141+
*
142+
* <ul>
143+
* <li>{@code null}: lake split initialization has not run yet.
144+
* <li>empty list: lake split initialization has run, or this enumerator was started in
145+
* Fluss-only (non-lake) mode and must not initialize lake splits after restore.
146+
* <li>non-empty list: lake split initialization has run and these splits still need to be
147+
* assigned.
148+
* </ul>
149+
*/
137150
@Nullable private List<SourceSplitBase> pendingHybridLakeFlussSplits;
138151

139152
private final long scanPartitionDiscoveryIntervalMs;
@@ -1207,11 +1220,16 @@ public void addReader(int subtaskId) {
12071220

12081221
@Override
12091222
public SourceEnumeratorState snapshotState(long checkpointId) {
1223+
List<SourceSplitBase> remainingHybridLakeFlussSplits =
1224+
// Preserve Fluss-only (non-lake) startup across restore. Otherwise a restored
1225+
// enumerator with a non-null lakeSource would treat null as "not initialized yet"
1226+
// and generate lake snapshot splits.
1227+
lakeSource == null ? Collections.emptyList() : pendingHybridLakeFlussSplits;
12101228
final SourceEnumeratorState enumeratorState =
12111229
new SourceEnumeratorState(
12121230
assignedTableBuckets,
12131231
assignedPartitions,
1214-
pendingHybridLakeFlussSplits,
1232+
remainingHybridLakeFlussSplits,
12151233
leaseContext.getKvSnapshotLeaseId());
12161234
LOG.debug("Source Checkpoint is {}", enumeratorState);
12171235
return enumeratorState;

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.fluss.flink.source.split.LogSplit;
3333
import org.apache.fluss.flink.source.split.SnapshotSplit;
3434
import org.apache.fluss.flink.source.split.SourceSplitBase;
35+
import org.apache.fluss.flink.source.state.SourceEnumeratorState;
3536
import org.apache.fluss.flink.utils.FlinkTestBase;
3637
import org.apache.fluss.lake.source.LakeSource;
3738
import org.apache.fluss.lake.source.LakeSplit;
@@ -210,6 +211,97 @@ void testInvalidSplitAssignmentBatchSize() throws Exception {
210211
}
211212
}
212213

214+
@Test
215+
void testRestoreFlussOnlySourceWithLakeSourceDoesNotGenerateLakeSplits(@TempDir Path tempDir)
216+
throws Throwable {
217+
long tableId =
218+
createTable(DEFAULT_TABLE_PATH, DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR);
219+
ZooKeeperClient zooKeeperClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
220+
Map<Long, String> partitionNameByIds =
221+
waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH);
222+
Long partitionId = partitionNameByIds.keySet().stream().sorted().findFirst().get();
223+
String partitionName = partitionNameByIds.get(partitionId);
224+
225+
LakeTableSnapshot lakeTableSnapshot =
226+
new LakeTableSnapshot(
227+
0,
228+
ImmutableMap.of(
229+
new TableBucket(tableId, partitionId, 0), 50L,
230+
new TableBucket(tableId, partitionId, 1), 50L,
231+
new TableBucket(tableId, partitionId, 2), 50L));
232+
LakeTableHelper lakeTableHelper = new LakeTableHelper(zooKeeperClient, tempDir.toString());
233+
lakeTableHelper.registerLakeTableSnapshotV1(tableId, lakeTableSnapshot);
234+
235+
ResolvedPartitionSpec partitionSpec =
236+
ResolvedPartitionSpec.fromPartitionName(
237+
Collections.singletonList("name"), partitionName);
238+
LakeSource<LakeSplit> lakeSource =
239+
new TestingLakeSource(
240+
DEFAULT_BUCKET_NUM,
241+
Collections.singletonList(
242+
new PartitionInfo(
243+
partitionId, partitionSpec, DEFAULT_REMOTE_DATA_DIR)));
244+
245+
SourceEnumeratorState checkpointState;
246+
try (MockSplitEnumeratorContext<SourceSplitBase> context =
247+
new MockSplitEnumeratorContext<>(1)) {
248+
FlinkSourceEnumerator enumerator =
249+
new FlinkSourceEnumerator(
250+
DEFAULT_TABLE_PATH,
251+
flussConf,
252+
true,
253+
false,
254+
context,
255+
OffsetsInitializer.timestamp(1000L),
256+
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
257+
streaming,
258+
null,
259+
null,
260+
LeaseContext.DEFAULT,
261+
false);
262+
263+
checkpointState = enumerator.snapshotState(1L);
264+
assertThat(checkpointState.getRemainingHybridLakeFlussSplits()).isNotNull().isEmpty();
265+
}
266+
267+
try (MockSplitEnumeratorContext<SourceSplitBase> context =
268+
new MockSplitEnumeratorContext<>(DEFAULT_BUCKET_NUM);
269+
MockWorkExecutor workExecutor = new MockWorkExecutor(context);
270+
FlinkSourceEnumerator restoredEnumerator =
271+
new FlinkSourceEnumerator(
272+
DEFAULT_TABLE_PATH,
273+
flussConf,
274+
false,
275+
true,
276+
context,
277+
checkpointState.getAssignedBuckets(),
278+
checkpointState.getAssignedPartitions(),
279+
checkpointState.getRemainingHybridLakeFlussSplits(),
280+
OffsetsInitializer.full(),
281+
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
282+
streaming,
283+
null,
284+
lakeSource,
285+
workExecutor,
286+
LeaseContext.DEFAULT,
287+
true)) {
288+
restoredEnumerator.start();
289+
runPeriodicPartitionDiscovery(workExecutor);
290+
291+
for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
292+
registerReader(context, restoredEnumerator, i);
293+
}
294+
295+
List<SourceSplitBase> assignedSplits =
296+
getReadersAssignments(context).values().stream()
297+
.flatMap(List::stream)
298+
.collect(Collectors.toList());
299+
assertThat(assignedSplits).isNotEmpty();
300+
assertThat(assignedSplits).allMatch(split -> split instanceof LogSplit);
301+
assertThat(assignedSplits).noneMatch(split -> split instanceof LakeSnapshotSplit);
302+
}
303+
}
304+
213305
@Test
214306
void testPkTableWithSnapshotSplits() throws Throwable {
215307
long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR);

0 commit comments

Comments
 (0)