Skip to content

Commit a97a5bc

Browse files
authored
[FLINK-39621] Fix incremental-snapshot based sources may get stuck in binlog-backfill stage (#4388)
* [FLINK-39621] Fix incremental-snapshot based sources may get stuck in binlog-backfill stage * Use AtomicInteger for thread-safe reference counting.
1 parent b3acffe commit a97a5bc

3 files changed

Lines changed: 121 additions & 5 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.concurrent.Executors;
4848
import java.util.concurrent.ThreadFactory;
4949
import java.util.concurrent.TimeUnit;
50+
import java.util.concurrent.atomic.AtomicInteger;
5051

5152
import static org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isEndWatermarkEvent;
5253

@@ -57,9 +58,9 @@ public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, So
5758
private final FetchTask.Context taskContext;
5859
private final ExecutorService executorService;
5960
private final Set<TableId> pureStreamPhaseTables;
61+
private final AtomicInteger numberOfRunningTasks;
6062

6163
private volatile ChangeEventQueue<DataChangeEvent> queue;
62-
private volatile boolean currentTaskRunning;
6364
private volatile Throwable readException;
6465

6566
private FetchTask<SourceSplitBase> streamFetchTask;
@@ -77,10 +78,10 @@ public IncrementalSourceStreamFetcher(FetchTask.Context taskContext, int subTask
7778
ThreadFactory threadFactory =
7879
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build();
7980
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
80-
this.currentTaskRunning = true;
8181
this.pureStreamPhaseTables = new HashSet<>();
8282
this.isBackfillSkipped = taskContext.getSourceConfig().isSkipSnapshotBackfill();
8383
this.supportsSplitKeyOptimization = taskContext.supportsSplitKeyOptimization();
84+
this.numberOfRunningTasks = new AtomicInteger(0);
8485
}
8586

8687
@Override
@@ -90,6 +91,7 @@ public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
9091
configureFilter();
9192
taskContext.configure(currentStreamSplit);
9293
this.queue = taskContext.getQueue();
94+
startReadTask();
9395
executorService.submit(
9496
() -> {
9597
try {
@@ -107,7 +109,7 @@ public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
107109

108110
@Override
109111
public boolean isFinished() {
110-
return currentStreamSplit == null || !currentTaskRunning;
112+
return currentStreamSplit == null || numberOfRunningTasks.get() == 0;
111113
}
112114

113115
@Nullable
@@ -116,7 +118,7 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
116118
checkReadException();
117119
final List<SourceRecord> sourceRecords = new ArrayList<>();
118120
// what happens if currentTaskRunning
119-
if (currentTaskRunning) {
121+
if (numberOfRunningTasks.get() > 0) {
120122
List<DataChangeEvent> batch = queue.poll();
121123
for (DataChangeEvent event : batch) {
122124
if (isEndWatermarkEvent(event.getRecord())) {
@@ -282,8 +284,12 @@ private void configureFilter() {
282284
this.pureStreamPhaseTables.clear();
283285
}
284286

287+
public void startReadTask() {
288+
this.numberOfRunningTasks.incrementAndGet();
289+
}
290+
285291
public void stopReadTask() throws Exception {
286-
this.currentTaskRunning = false;
292+
this.numberOfRunningTasks.decrementAndGet();
287293

288294
if (taskContext != null) {
289295
taskContext.close();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,57 @@ void testSnapshotScanSkipBackfillWithPreHighWatermark() throws Exception {
243243
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
244244
}
245245

246+
@Test
247+
void testMultipleSplitsWithBackfill() throws Exception {
248+
customDatabase.createAndInitialize();
249+
250+
TestTableId tableId = new TestTableId(schemaName, tableName);
251+
PostgresSourceConfigFactory sourceConfigFactory =
252+
getMockPostgresSourceConfigFactory(
253+
customDatabase, schemaName, tableName, null, 4, false);
254+
PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0);
255+
PostgresDialect postgresDialect = new PostgresDialect(sourceConfigFactory.create(0));
256+
257+
SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks();
258+
snapshotHooks.setPreHighWatermarkAction(
259+
(postgresSourceConfig, split) -> {
260+
try (PostgresConnection conn = postgresDialect.openJdbcConnection()) {
261+
conn.execute(
262+
"UPDATE "
263+
+ tableId.toSql()
264+
+ " SET address = 'Beijing' WHERE \"Id\" = 103");
265+
conn.commit();
266+
}
267+
});
268+
269+
final DataType dataType =
270+
DataTypes.ROW(
271+
DataTypes.FIELD("Id", DataTypes.BIGINT()),
272+
DataTypes.FIELD("Name", DataTypes.STRING()),
273+
DataTypes.FIELD("address", DataTypes.STRING()),
274+
DataTypes.FIELD("phone_number", DataTypes.STRING()));
275+
276+
PostgresSourceFetchTaskContext postgresSourceFetchTaskContext =
277+
new PostgresSourceFetchTaskContext(sourceConfig, postgresDialect);
278+
List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, postgresDialect);
279+
280+
List<String> actual =
281+
readTableSnapshotSplits(
282+
reOrderSnapshotSplits(snapshotSplits),
283+
postgresSourceFetchTaskContext,
284+
snapshotSplits.size(),
285+
dataType,
286+
snapshotHooks);
287+
288+
// Verify the ScanFetcher can successfully process all splits without getting stuck
289+
// (the FLINK-39207 bug would cause the reader to appear finished/stuck
290+
// when reusing a stopped ScanFetcher for the next split).
291+
// The preHighWatermark hook forces backfill phase for each split by making
292+
// highWatermark > lowWatermark.
293+
assertThat(actual).hasSize(21);
294+
assertThat(actual).contains("+I[103, user_3, Beijing, 123567891234]");
295+
}
296+
246297
@Test
247298
void testSnapshotFetchSize() throws Exception {
248299
customDatabase.createAndInitialize();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,65 @@ void testInsertDataInSnapshotScan() throws Exception {
202202
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
203203
}
204204

205+
@Test
206+
void testMultipleSplitsWithBackfill() throws Exception {
207+
String databaseName = "customer";
208+
String tableName = "dbo.customers";
209+
210+
initializeSqlServerTable(databaseName);
211+
212+
SqlServerSourceConfigFactory sourceConfigFactory =
213+
getConfigFactory(databaseName, new String[] {tableName}, 4);
214+
SqlServerSourceConfig sourceConfig = sourceConfigFactory.create(0);
215+
SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfig);
216+
217+
String tableId = databaseName + "." + tableName;
218+
SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
219+
hooks.setPreHighWatermarkAction(
220+
(config, split) -> {
221+
executeSql(
222+
(SqlServerSourceConfig) config,
223+
new String[] {
224+
"UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 103"
225+
});
226+
try {
227+
Thread.sleep(10 * 1000);
228+
} catch (InterruptedException e) {
229+
throw new RuntimeException(e);
230+
}
231+
});
232+
SqlServerSourceFetchTaskContext sqlServerSourceFetchTaskContext =
233+
new SqlServerSourceFetchTaskContext(
234+
sourceConfig,
235+
sqlServerDialect,
236+
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()),
237+
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()));
238+
239+
final DataType dataType =
240+
DataTypes.ROW(
241+
DataTypes.FIELD("id", DataTypes.BIGINT()),
242+
DataTypes.FIELD("name", DataTypes.STRING()),
243+
DataTypes.FIELD("address", DataTypes.STRING()),
244+
DataTypes.FIELD("phone_number", DataTypes.STRING()));
245+
List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, sqlServerDialect);
246+
247+
List<String> actual =
248+
readTableSnapshotSplits(
249+
reOrderSnapshotSplits(snapshotSplits),
250+
sqlServerSourceFetchTaskContext,
251+
snapshotSplits.size(),
252+
dataType,
253+
hooks);
254+
255+
// Verify the ScanFetcher can successfully process all splits without getting stuck
256+
// (the FLINK-39207 bug would cause the reader to appear finished/stuck
257+
// when reusing a stopped ScanFetcher for the next split).
258+
// The preHighWatermark hook forces backfill phase for each split by making
259+
// highWatermark > lowWatermark.
260+
Assertions.assertThat(actual).hasSize(21);
261+
Assertions.assertThat(actual).contains("+I[103, user_3, Beijing, 123567891234]");
262+
}
263+
205264
@Test
206265
void testDateTimePrimaryKey() throws Exception {
207266
String databaseName = "pk";

0 commit comments

Comments
 (0)