1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one or more
3+ * contributor license agreements. See the NOTICE file distributed with
4+ * this work for additional information regarding copyright ownership.
5+ * The ASF licenses this file to You under the Apache License, Version 2.0
6+ * (the "License"); you may not use this file except in compliance with
7+ * the License. You may obtain a copy of the License at
8+ *
9+ * http://www.apache.org/licenses/LICENSE-2.0
10+ *
11+ * Unless required by applicable law or agreed to in writing, software
12+ * distributed under the License is distributed on an "AS IS" BASIS,
13+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+ * See the License for the specific language governing permissions and
15+ * limitations under the License.
16+ */
17+
18+ package org .apache .flink .cdc .connectors .base .source .reader .external ;
19+
20+ import org .apache .flink .cdc .common .annotation .VisibleForTesting ;
21+ import org .apache .flink .cdc .connectors .base .source .meta .split .SnapshotSplit ;
22+ import org .apache .flink .cdc .connectors .base .source .meta .split .SourceRecords ;
23+ import org .apache .flink .cdc .connectors .base .source .meta .split .SourceSplitBase ;
24+ import org .apache .flink .util .FlinkRuntimeException ;
25+
26+ import org .apache .flink .shaded .guava31 .com .google .common .util .concurrent .ThreadFactoryBuilder ;
27+
28+ import io .debezium .connector .base .ChangeEventQueue ;
29+ import io .debezium .pipeline .DataChangeEvent ;
30+ import io .debezium .relational .TableId ;
31+ import org .apache .kafka .connect .data .Struct ;
32+ import org .apache .kafka .connect .source .SourceRecord ;
33+ import org .slf4j .Logger ;
34+ import org .slf4j .LoggerFactory ;
35+
36+ import javax .annotation .Nullable ;
37+
38+ import java .util .ArrayList ;
39+ import java .util .Collections ;
40+ import java .util .HashMap ;
41+ import java .util .Iterator ;
42+ import java .util .List ;
43+ import java .util .Map ;
44+ import java .util .concurrent .ExecutorService ;
45+ import java .util .concurrent .Executors ;
46+ import java .util .concurrent .ThreadFactory ;
47+ import java .util .concurrent .TimeUnit ;
48+ import java .util .concurrent .atomic .AtomicBoolean ;
49+
50+ import static org .apache .flink .cdc .connectors .base .source .meta .wartermark .WatermarkEvent .isEndWatermarkEvent ;
51+ import static org .apache .flink .cdc .connectors .base .source .meta .wartermark .WatermarkEvent .isHighWatermarkEvent ;
52+ import static org .apache .flink .cdc .connectors .base .source .meta .wartermark .WatermarkEvent .isLowWatermarkEvent ;
53+ import static org .apache .flink .util .Preconditions .checkState ;
54+
55+ /**
56+ * Copy from https://github.com/apache/flink-cdc/blob/release-3.6.0/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java
57+ * Fix with FLINK-39633, change to method isChangeRecordInChunkRange
58+ */
59+ public class IncrementalSourceScanFetcher implements Fetcher <SourceRecords , SourceSplitBase > {
60+
61+ private static final Logger LOG = LoggerFactory .getLogger (IncrementalSourceScanFetcher .class );
62+
63+ public AtomicBoolean hasNextElement ;
64+ public AtomicBoolean reachEnd ;
65+
66+ private final FetchTask .Context taskContext ;
67+ private final ExecutorService executorService ;
68+ private volatile ChangeEventQueue <DataChangeEvent > queue ;
69+ private volatile Throwable readException ;
70+
71+ // task to read snapshot for current split
72+ private FetchTask <SourceSplitBase > snapshotSplitReadTask ;
73+ private SnapshotSplit currentSnapshotSplit ;
74+
75+ private static final long READER_CLOSE_TIMEOUT_SECONDS = 30L ;
76+
77+ public IncrementalSourceScanFetcher (FetchTask .Context taskContext , int subtaskId ) {
78+ this .taskContext = taskContext ;
79+ ThreadFactory threadFactory =
80+ new ThreadFactoryBuilder ()
81+ .setNameFormat ("debezium-snapshot-reader-" + subtaskId )
82+ .setUncaughtExceptionHandler (
83+ (thread , throwable ) -> setReadException (throwable ))
84+ .build ();
85+ this .executorService = Executors .newSingleThreadExecutor (threadFactory );
86+ this .hasNextElement = new AtomicBoolean (false );
87+ this .reachEnd = new AtomicBoolean (false );
88+ }
89+
90+ @ Override
91+ public void submitTask (FetchTask <SourceSplitBase > fetchTask ) {
92+ this .snapshotSplitReadTask = fetchTask ;
93+ this .currentSnapshotSplit = fetchTask .getSplit ().asSnapshotSplit ();
94+ taskContext .configure (currentSnapshotSplit );
95+ this .queue = taskContext .getQueue ();
96+ this .hasNextElement .set (true );
97+ this .reachEnd .set (false );
98+
99+ executorService .execute (
100+ () -> {
101+ try {
102+ snapshotSplitReadTask .execute (taskContext );
103+ } catch (Exception e ) {
104+ setReadException (e );
105+ }
106+ });
107+ }
108+
109+ @ Override
110+ public boolean isFinished () {
111+ return currentSnapshotSplit == null
112+ || (!snapshotSplitReadTask .isRunning () && !hasNextElement .get () && reachEnd .get ());
113+ }
114+
115+ @ Nullable
116+ @ Override
117+ public Iterator <SourceRecords > pollSplitRecords () throws InterruptedException {
118+ checkReadException ();
119+
120+ if (hasNextElement .get ()) {
121+ if (taskContext .getSourceConfig ().isSkipSnapshotBackfill ()) {
122+ return pollWithoutBuffer ();
123+ } else {
124+ return pollWithBuffer ();
125+ }
126+ }
127+ // the data has been polled, no more data
128+ reachEnd .compareAndSet (false , true );
129+ return null ;
130+ }
131+
132+ public Iterator <SourceRecords > pollWithoutBuffer () throws InterruptedException {
133+ checkReadException ();
134+ List <DataChangeEvent > batch = queue .poll ();
135+ final List <SourceRecord > records = new ArrayList <>();
136+ for (DataChangeEvent event : batch ) {
137+ if (isEndWatermarkEvent (event .getRecord ())) {
138+ hasNextElement .set (false );
139+ break ;
140+ }
141+ records .add (event .getRecord ());
142+ }
143+
144+ return Collections .singletonList (new SourceRecords (records )).iterator ();
145+ }
146+
147+ public Iterator <SourceRecords > pollWithBuffer () throws InterruptedException {
148+ // eg:
149+ // data input: [low watermark event][snapshot events][high watermark event][change
150+ // events][end watermark event]
151+ // data output: [low watermark event][normalized events][high watermark event]
152+ boolean reachChangeLogStart = false ;
153+ boolean reachChangeLogEnd = false ;
154+ SourceRecord lowWatermark = null ;
155+ SourceRecord highWatermark = null ;
156+ Map <Struct , SourceRecord > outputBuffer = new HashMap <>();
157+ while (!reachChangeLogEnd ) {
158+ checkReadException ();
159+ List <DataChangeEvent > batch = queue .poll ();
160+ for (DataChangeEvent event : batch ) {
161+ SourceRecord record = event .getRecord ();
162+ if (lowWatermark == null ) {
163+ lowWatermark = record ;
164+ assertLowWatermark (lowWatermark );
165+ continue ;
166+ }
167+
168+ if (highWatermark == null && isHighWatermarkEvent (record )) {
169+ highWatermark = record ;
170+ // snapshot events capture end and begin to capture stream events
171+ reachChangeLogStart = true ;
172+ continue ;
173+ }
174+
175+ if (reachChangeLogStart && isEndWatermarkEvent (record )) {
176+ // capture to end watermark events, stop the loop
177+ reachChangeLogEnd = true ;
178+ break ;
179+ }
180+
181+ if (!reachChangeLogStart ) {
182+ outputBuffer .put ((Struct ) record .key (), record );
183+ } else {
184+ if (isChangeRecordInChunkRange (record )) {
185+ // rewrite overlapping snapshot records through the record key
186+ taskContext .rewriteOutputBuffer (outputBuffer , record );
187+ }
188+ }
189+ }
190+ }
191+ // snapshot split return its data once
192+ hasNextElement .set (false );
193+
194+ final List <SourceRecord > normalizedRecords = new ArrayList <>();
195+ normalizedRecords .add (lowWatermark );
196+ normalizedRecords .addAll (taskContext .formatMessageTimestamp (outputBuffer .values ()));
197+ normalizedRecords .add (highWatermark );
198+
199+ final List <SourceRecords > sourceRecordsSet = new ArrayList <>();
200+ sourceRecordsSet .add (new SourceRecords (normalizedRecords ));
201+ return sourceRecordsSet .iterator ();
202+ }
203+
204+ private void checkReadException () {
205+ if (readException != null ) {
206+ throw new FlinkRuntimeException (
207+ String .format (
208+ "Read split %s error due to %s." ,
209+ currentSnapshotSplit , readException .getMessage ()),
210+ readException );
211+ }
212+ }
213+
214+ private void setReadException (Throwable throwable ) {
215+ LOG .error (
216+ String .format (
217+ "Execute snapshot read task for snapshot split %s fail" ,
218+ currentSnapshotSplit ),
219+ throwable );
220+ if (readException == null ) {
221+ readException = throwable ;
222+ } else {
223+ readException .addSuppressed (throwable );
224+ }
225+ }
226+
227+ @ Override
228+ public void close () {
229+ try {
230+ if (taskContext != null ) {
231+ taskContext .close ();
232+ }
233+
234+ if (snapshotSplitReadTask != null ) {
235+ snapshotSplitReadTask .close ();
236+ }
237+
238+ if (executorService != null ) {
239+ executorService .shutdown ();
240+ if (!executorService .awaitTermination (
241+ READER_CLOSE_TIMEOUT_SECONDS , TimeUnit .SECONDS )) {
242+ LOG .warn (
243+ "Failed to close the scan fetcher in {} seconds." ,
244+ READER_CLOSE_TIMEOUT_SECONDS );
245+ }
246+ }
247+ } catch (Exception e ) {
248+ LOG .error ("Close scan fetcher error" , e );
249+ }
250+ }
251+
252+ @ VisibleForTesting
253+ public ExecutorService getExecutorService () {
254+ return executorService ;
255+ }
256+
257+ private void assertLowWatermark (SourceRecord lowWatermark ) {
258+ checkState (
259+ isLowWatermarkEvent (lowWatermark ),
260+ String .format (
261+ "The first record should be low watermark signal event, but actual is %s" ,
262+ lowWatermark ));
263+ }
264+
265+ @ VisibleForTesting
266+ boolean isChangeRecordInChunkRange (SourceRecord record ) {
267+ if (!taskContext .isDataChangeRecord (record )) {
268+ return false ;
269+ }
270+ // Skip records of other captured tables; their schema may not be loaded yet
271+ // and their PKs do not align with this chunk's bounds.
272+ TableId recordTableId = taskContext .getTableId (record );
273+ if (recordTableId == null || !recordTableId .equals (currentSnapshotSplit .getTableId ())) {
274+ return false ;
275+ }
276+ return taskContext .isRecordBetween (
277+ record , currentSnapshotSplit .getSplitStart (), currentSnapshotSplit .getSplitEnd ());
278+ }
279+
280+ @ VisibleForTesting
281+ void setCurrentSnapshotSplit (SnapshotSplit currentSnapshotSplit ) {
282+ this .currentSnapshotSplit = currentSnapshotSplit ;
283+ }
284+ }
0 commit comments