Skip to content

Commit 9ce897f

Browse files
committed
[flink] Normalize restored lake split state
1 parent e58b0cb commit 9ce897f

3 files changed

Lines changed: 59 additions & 48 deletions

File tree

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@
5252

5353
import javax.annotation.Nullable;
5454

55+
import java.util.Collections;
56+
import java.util.List;
57+
5558
import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR;
5659
import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getClientScannerIoTmpDir;
5760

@@ -236,6 +239,14 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> createEnumerator(
236239
public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> restoreEnumerator(
237240
SplitEnumeratorContext<SourceSplitBase> splitEnumeratorContext,
238241
SourceEnumeratorState sourceEnumeratorState) {
242+
List<SourceSplitBase> remainingHybridLakeFlussSplits =
243+
sourceEnumeratorState.getRemainingHybridLakeFlussSplits();
244+
// A fresh null means lake splits are not initialized yet. When restoring, null means
245+
// nothing is pending, so normalize it here to avoid generating lake splits later.
246+
if (remainingHybridLakeFlussSplits == null) {
247+
remainingHybridLakeFlussSplits = Collections.emptyList();
248+
}
249+
239250
return new FlinkSourceEnumerator(
240251
tablePath,
241252
flussConf,
@@ -244,7 +255,7 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> restoreEnumerator
244255
splitEnumeratorContext,
245256
sourceEnumeratorState.getAssignedBuckets(),
246257
sourceEnumeratorState.getAssignedPartitions(),
247-
sourceEnumeratorState.getRemainingHybridLakeFlussSplits(),
258+
remainingHybridLakeFlussSplits,
248259
offsetsInitializer,
249260
scanPartitionDiscoveryIntervalMs,
250261
splitPerAssignmentBatchSize,

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -526,9 +526,8 @@ private void startInBatchMode() {
526526
private void startInStreamModeForNonPartitionedTable() {
527527
if (lakeSource != null) {
528528
// Generate lake splits synchronously so that they are available before the
529-
// first checkpoint. This is consistent with the partitioned-table path in
530-
// start() and ensures generateHybridLakeFlussSplits() can safely use
531-
// checkpointTriggeredBefore to distinguish fresh starts from restores.
529+
// first checkpoint. This is consistent with the partitioned-table path in
530+
// start().
532531
List<SourceSplitBase> splits = generateHybridLakeFlussSplits();
533532
if (splits == null) {
534533
// no lake snapshot, fall back to normal Fluss splits
@@ -887,13 +886,6 @@ private List<SourceSplitBase> generateHybridLakeFlussSplits() {
887886
LOG.info("Still have pending lake fluss splits, shouldn't list splits again.");
888887
return pendingHybridLakeFlussSplits;
889888
}
890-
// Restored from checkpoint but pending lake split is null(e.g. the source was
891-
// originally started in Fluss-only mode without lake). Do not generate lake
892-
// splits for this restore; mark as initialized and return empty list.
893-
if (checkpointTriggeredBefore) {
894-
pendingHybridLakeFlussSplits = Collections.emptyList();
895-
return pendingHybridLakeFlussSplits;
896-
}
897889
try {
898890
LakeSplitGenerator lakeSplitGenerator =
899891
new LakeSplitGenerator(

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java

Lines changed: 45 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.fluss.flink.FlinkConnectorOptions;
2626
import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
2727
import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
28+
import org.apache.fluss.flink.source.FlinkSource;
29+
import org.apache.fluss.flink.source.deserializer.RowDataDeserializationSchema;
2830
import org.apache.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent;
2931
import org.apache.fluss.flink.source.event.PartitionsRemovedEvent;
3032
import org.apache.fluss.flink.source.reader.LeaseContext;
@@ -54,8 +56,10 @@
5456

5557
import org.apache.flink.api.connector.source.ReaderInfo;
5658
import org.apache.flink.api.connector.source.SourceEvent;
59+
import org.apache.flink.api.connector.source.SplitEnumerator;
5760
import org.apache.flink.api.connector.source.SplitsAssignment;
5861
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
62+
import org.apache.flink.table.data.RowData;
5963
import org.junit.jupiter.api.BeforeAll;
6064
import org.junit.jupiter.api.Test;
6165
import org.junit.jupiter.api.io.TempDir;
@@ -244,48 +248,52 @@ void testRestoreFlussOnlySourceWithLakeSourceDoesNotGenerateLakeSplits(@TempDir
244248

245249
SourceEnumeratorState checkpointState;
246250
try (MockSplitEnumeratorContext<SourceSplitBase> context =
247-
new MockSplitEnumeratorContext<>(1)) {
248-
FlinkSourceEnumerator enumerator =
249-
new FlinkSourceEnumerator(
250-
DEFAULT_TABLE_PATH,
251-
flussConf,
252-
true,
253-
false,
254-
context,
255-
OffsetsInitializer.timestamp(1000L),
256-
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
257-
streaming,
258-
null,
259-
null,
260-
LeaseContext.DEFAULT,
261-
false);
262-
251+
new MockSplitEnumeratorContext<>(1);
252+
SplitEnumerator<SourceSplitBase, SourceEnumeratorState> enumerator =
253+
new FlinkSource<RowData>(
254+
flussConf,
255+
DEFAULT_TABLE_PATH,
256+
false,
257+
true,
258+
DEFAULT_LOG_TABLE_SCHEMA.getRowType(),
259+
null,
260+
null,
261+
OffsetsInitializer.timestamp(1000L),
262+
0L,
263+
new RowDataDeserializationSchema(),
264+
streaming,
265+
null,
266+
LeaseContext.DEFAULT)
267+
.createEnumerator(context)) {
263268
checkpointState = enumerator.snapshotState(1L);
269+
assertThat(checkpointState.getRemainingHybridLakeFlussSplits()).isNull();
264270
}
265271

266272
try (MockSplitEnumeratorContext<SourceSplitBase> context =
267273
new MockSplitEnumeratorContext<>(DEFAULT_BUCKET_NUM);
268-
MockWorkExecutor workExecutor = new MockWorkExecutor(context);
269-
FlinkSourceEnumerator restoredEnumerator =
270-
new FlinkSourceEnumerator(
271-
DEFAULT_TABLE_PATH,
272-
flussConf,
273-
false,
274-
true,
275-
context,
276-
checkpointState.getAssignedBuckets(),
277-
checkpointState.getAssignedPartitions(),
278-
checkpointState.getRemainingHybridLakeFlussSplits(),
279-
OffsetsInitializer.full(),
280-
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
281-
streaming,
282-
null,
283-
lakeSource,
284-
workExecutor,
285-
LeaseContext.DEFAULT,
286-
true)) {
274+
SplitEnumerator<SourceSplitBase, SourceEnumeratorState> restoredEnumerator =
275+
new FlinkSource<RowData>(
276+
flussConf,
277+
DEFAULT_TABLE_PATH,
278+
false,
279+
true,
280+
DEFAULT_LOG_TABLE_SCHEMA.getRowType(),
281+
null,
282+
null,
283+
OffsetsInitializer.full(),
284+
0L,
285+
new RowDataDeserializationSchema(),
286+
streaming,
287+
null,
288+
lakeSource,
289+
LeaseContext.DEFAULT)
290+
.restoreEnumerator(context, checkpointState)) {
291+
assertThat(restoredEnumerator.snapshotState(1L).getRemainingHybridLakeFlussSplits())
292+
.isEmpty();
293+
287294
restoredEnumerator.start();
288-
runPeriodicPartitionDiscovery(workExecutor);
295+
context.runNextOneTimeCallable();
296+
context.runNextOneTimeCallable();
289297

290298
for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
291299
registerReader(context, restoredEnumerator, i);
@@ -943,7 +951,7 @@ void testPartitionsExpiredInFlussButExistInLake(
943951
// ---------------------
944952
private void registerReader(
945953
MockSplitEnumeratorContext<SourceSplitBase> context,
946-
FlinkSourceEnumerator enumerator,
954+
SplitEnumerator<SourceSplitBase, SourceEnumeratorState> enumerator,
947955
int readerId) {
948956
context.registerReader(new ReaderInfo(readerId, "location " + readerId));
949957
enumerator.addReader(readerId);

0 commit comments

Comments
 (0)