Skip to content

Commit d89e1a2

Browse files
committed
[flink] Cover restore through FlinkSource
1 parent a593b91 commit d89e1a2

1 file changed

Lines changed: 41 additions & 71 deletions

File tree

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java

Lines changed: 41 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -248,48 +248,52 @@ void testRestoreFlussOnlySourceWithLakeSourceDoesNotGenerateLakeSplits(@TempDir
248248

249249
SourceEnumeratorState checkpointState;
250250
try (MockSplitEnumeratorContext<SourceSplitBase> context =
251-
new MockSplitEnumeratorContext<>(1)) {
252-
FlinkSourceEnumerator enumerator =
253-
new FlinkSourceEnumerator(
254-
DEFAULT_TABLE_PATH,
255-
flussConf,
256-
true,
257-
false,
258-
context,
259-
OffsetsInitializer.timestamp(1000L),
260-
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
261-
streaming,
262-
null,
263-
null,
264-
LeaseContext.DEFAULT,
265-
false);
266-
251+
new MockSplitEnumeratorContext<>(1);
252+
SplitEnumerator<SourceSplitBase, SourceEnumeratorState> enumerator =
253+
new FlinkSource<RowData>(
254+
flussConf,
255+
DEFAULT_TABLE_PATH,
256+
false,
257+
true,
258+
DEFAULT_LOG_TABLE_SCHEMA.getRowType(),
259+
null,
260+
null,
261+
OffsetsInitializer.timestamp(1000L),
262+
0L,
263+
new RowDataDeserializationSchema(),
264+
streaming,
265+
null,
266+
LeaseContext.DEFAULT)
267+
.createEnumerator(context)) {
267268
checkpointState = enumerator.snapshotState(1L);
269+
assertThat(checkpointState.getRemainingHybridLakeFlussSplits()).isNull();
268270
}
269271

270272
try (MockSplitEnumeratorContext<SourceSplitBase> context =
271273
new MockSplitEnumeratorContext<>(DEFAULT_BUCKET_NUM);
272-
MockWorkExecutor workExecutor = new MockWorkExecutor(context);
273-
FlinkSourceEnumerator restoredEnumerator =
274-
new FlinkSourceEnumerator(
275-
DEFAULT_TABLE_PATH,
276-
flussConf,
277-
false,
278-
true,
279-
context,
280-
checkpointState.getAssignedBuckets(),
281-
checkpointState.getAssignedPartitions(),
282-
Collections.emptyList(),
283-
OffsetsInitializer.full(),
284-
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
285-
streaming,
286-
null,
287-
lakeSource,
288-
workExecutor,
289-
LeaseContext.DEFAULT,
290-
true)) {
274+
SplitEnumerator<SourceSplitBase, SourceEnumeratorState> restoredEnumerator =
275+
new FlinkSource<RowData>(
276+
flussConf,
277+
DEFAULT_TABLE_PATH,
278+
false,
279+
true,
280+
DEFAULT_LOG_TABLE_SCHEMA.getRowType(),
281+
null,
282+
null,
283+
OffsetsInitializer.full(),
284+
0L,
285+
new RowDataDeserializationSchema(),
286+
streaming,
287+
null,
288+
lakeSource,
289+
LeaseContext.DEFAULT)
290+
.restoreEnumerator(context, checkpointState)) {
291+
assertThat(restoredEnumerator.snapshotState(1L).getRemainingHybridLakeFlussSplits())
292+
.isEmpty();
293+
291294
restoredEnumerator.start();
292-
runPeriodicPartitionDiscovery(workExecutor);
295+
context.runNextOneTimeCallable();
296+
context.runNextOneTimeCallable();
293297

294298
for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
295299
registerReader(context, restoredEnumerator, i);
@@ -305,40 +309,6 @@ void testRestoreFlussOnlySourceWithLakeSourceDoesNotGenerateLakeSplits(@TempDir
305309
}
306310
}
307311

308-
@Test
309-
void testRestoreEnumeratorNormalizesNullLakeSplits() throws Exception {
310-
SourceEnumeratorState checkpointState =
311-
new SourceEnumeratorState(
312-
Collections.emptySet(),
313-
Collections.emptyMap(),
314-
null,
315-
LeaseContext.DEFAULT.getKvSnapshotLeaseId());
316-
FlinkSource<RowData> source =
317-
new FlinkSource<>(
318-
flussConf,
319-
DEFAULT_TABLE_PATH,
320-
false,
321-
true,
322-
DEFAULT_LOG_TABLE_SCHEMA.getRowType(),
323-
null,
324-
null,
325-
OffsetsInitializer.full(),
326-
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
327-
new RowDataDeserializationSchema(),
328-
streaming,
329-
null,
330-
new TestingLakeSource(DEFAULT_BUCKET_NUM, Collections.emptyList()),
331-
LeaseContext.DEFAULT);
332-
333-
try (MockSplitEnumeratorContext<SourceSplitBase> context =
334-
new MockSplitEnumeratorContext<>(1);
335-
SplitEnumerator<SourceSplitBase, SourceEnumeratorState> restoredEnumerator =
336-
source.restoreEnumerator(context, checkpointState)) {
337-
assertThat(restoredEnumerator.snapshotState(1L).getRemainingHybridLakeFlussSplits())
338-
.isEmpty();
339-
}
340-
}
341-
342312
@Test
343313
void testPkTableWithSnapshotSplits() throws Throwable {
344314
long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR);
@@ -981,7 +951,7 @@ void testPartitionsExpiredInFlussButExistInLake(
981951
// ---------------------
982952
private void registerReader(
983953
MockSplitEnumeratorContext<SourceSplitBase> context,
984-
FlinkSourceEnumerator enumerator,
954+
SplitEnumerator<SourceSplitBase, SourceEnumeratorState> enumerator,
985955
int readerId) {
986956
context.registerReader(new ReaderInfo(readerId, "location " + readerId));
987957
enumerator.addReader(readerId);

0 commit comments

Comments
 (0)