Skip to content

Commit fa57a21

Browse files
authored
[flink][cdc] Track schema event progress in CDC source reader (#8311)
The CDC source reader restored split progress only from data row `recordsToSkip`. Schema change events can be emitted before any data rows are consumed, but that progress was not checkpointed independently. After recovery, the reader could emit the same schema change events again. This PR adds explicit schema event progress to `TableAwareFileStoreSourceSplit` and tracks it with a CDC-specific split state. The split reader skips already emitted schema change events on restore, while preserving V1 checkpoint compatibility through a legacy progress marker.
1 parent 71b815d commit fa57a21

9 files changed

Lines changed: 552 additions & 27 deletions

File tree

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/TableAwareFileStoreSourceSplit.java

Lines changed: 99 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public class TableAwareFileStoreSourceSplit extends FileStoreSourceSplit {
4343
private final Identifier identifier;
4444
private final @Nullable Long lastSchemaId;
4545
private final long schemaId;
46+
private final long schemaChangeEventsToSkip;
47+
private final boolean legacySchemaProgress;
4648

4749
public TableAwareFileStoreSourceSplit(
4850
String id,
@@ -51,10 +53,43 @@ public TableAwareFileStoreSourceSplit(
5153
Identifier identifier,
5254
@Nullable Long lastSchemaId,
5355
long schemaId) {
56+
this(id, split, recordsToSkip, identifier, lastSchemaId, schemaId, 0L);
57+
}
58+
59+
public TableAwareFileStoreSourceSplit(
60+
String id,
61+
Split split,
62+
long recordsToSkip,
63+
Identifier identifier,
64+
@Nullable Long lastSchemaId,
65+
long schemaId,
66+
long schemaChangeEventsToSkip) {
67+
this(
68+
id,
69+
split,
70+
recordsToSkip,
71+
identifier,
72+
lastSchemaId,
73+
schemaId,
74+
schemaChangeEventsToSkip,
75+
false);
76+
}
77+
78+
private TableAwareFileStoreSourceSplit(
79+
String id,
80+
Split split,
81+
long recordsToSkip,
82+
Identifier identifier,
83+
@Nullable Long lastSchemaId,
84+
long schemaId,
85+
long schemaChangeEventsToSkip,
86+
boolean legacySchemaProgress) {
5487
super(id, split, recordsToSkip);
5588
this.identifier = identifier;
5689
this.lastSchemaId = lastSchemaId;
5790
this.schemaId = schemaId;
91+
this.schemaChangeEventsToSkip = schemaChangeEventsToSkip;
92+
this.legacySchemaProgress = legacySchemaProgress;
5893
}
5994

6095
public Identifier getIdentifier() {
@@ -69,10 +104,38 @@ public long getSchemaId() {
69104
return schemaId;
70105
}
71106

107+
public long schemaChangeEventsToSkip() {
108+
return schemaChangeEventsToSkip;
109+
}
110+
111+
public TableAwareFileStoreSourceSplit updateWithProgress(
112+
long recordsToSkip, long schemaChangeEventsToSkip) {
113+
return new TableAwareFileStoreSourceSplit(
114+
splitId(),
115+
split(),
116+
recordsToSkip,
117+
identifier,
118+
lastSchemaId,
119+
schemaId,
120+
schemaChangeEventsToSkip,
121+
legacySchemaProgress);
122+
}
123+
124+
public boolean isLegacySchemaProgress() {
125+
return legacySchemaProgress;
126+
}
127+
72128
@Override
73129
public TableAwareFileStoreSourceSplit updateWithRecordsToSkip(long recordsToSkip) {
74130
return new TableAwareFileStoreSourceSplit(
75-
splitId(), split(), recordsToSkip, identifier, lastSchemaId, schemaId);
131+
splitId(),
132+
split(),
133+
recordsToSkip,
134+
identifier,
135+
lastSchemaId,
136+
schemaId,
137+
schemaChangeEventsToSkip,
138+
legacySchemaProgress);
76139
}
77140

78141
@Override
@@ -87,13 +150,22 @@ public boolean equals(Object o) {
87150
&& recordsToSkip() == other.recordsToSkip()
88151
&& identifier.equals(other.identifier)
89152
&& Objects.equals(lastSchemaId, other.lastSchemaId)
90-
&& schemaId == other.schemaId;
153+
&& schemaId == other.schemaId
154+
&& schemaChangeEventsToSkip == other.schemaChangeEventsToSkip
155+
&& legacySchemaProgress == other.legacySchemaProgress;
91156
}
92157

93158
@Override
94159
public int hashCode() {
95160
return Objects.hash(
96-
splitId(), split(), recordsToSkip(), identifier, lastSchemaId, schemaId);
161+
splitId(),
162+
split(),
163+
recordsToSkip(),
164+
identifier,
165+
lastSchemaId,
166+
schemaId,
167+
schemaChangeEventsToSkip,
168+
legacySchemaProgress);
97169
}
98170

99171
@Override
@@ -112,17 +184,23 @@ public String toString() {
112184
+ lastSchemaId
113185
+ ", schemaId="
114186
+ schemaId
187+
+ ", schemaChangeEventsToSkip="
188+
+ schemaChangeEventsToSkip
189+
+ ", legacySchemaProgress="
190+
+ legacySchemaProgress
115191
+ '}';
116192
}
117193

118194
/** The serializer for {@link TableAwareFileStoreSourceSplit}. */
119195
public static class Serializer
120196
implements SimpleVersionedSerializer<TableAwareFileStoreSourceSplit> {
197+
private static final int VERSION_1 = 1;
198+
private static final int VERSION_2 = 2;
121199
private static final Long NULL_SCHEMA_ID = -1L;
122200

123201
@Override
124202
public int getVersion() {
125-
return 1;
203+
return VERSION_2;
126204
}
127205

128206
@Override
@@ -136,12 +214,18 @@ public byte[] serialize(TableAwareFileStoreSourceSplit split) throws IOException
136214
view.writeLong(
137215
split.getLastSchemaId() == null ? NULL_SCHEMA_ID : split.getLastSchemaId());
138216
view.writeLong(split.getSchemaId());
217+
view.writeLong(split.schemaChangeEventsToSkip());
218+
view.writeBoolean(split.isLegacySchemaProgress());
139219
return out.toByteArray();
140220
}
141221

142222
@Override
143223
public TableAwareFileStoreSourceSplit deserialize(int version, byte[] serialized)
144224
throws IOException {
225+
if (version != VERSION_1 && version != VERSION_2) {
226+
throw new IOException(
227+
"Unsupported TableAwareFileStoreSourceSplit version: " + version);
228+
}
145229
ByteArrayInputStream in = new ByteArrayInputStream(serialized);
146230
DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
147231
String splitId = view.readUTF();
@@ -158,8 +242,18 @@ public TableAwareFileStoreSourceSplit deserialize(int version, byte[] serialized
158242
lastSchemaId = null;
159243
}
160244
long schemaId = view.readLong();
245+
long schemaChangeEventsToSkip = version == VERSION_2 ? view.readLong() : 0L;
246+
boolean legacySchemaProgress =
247+
version == VERSION_2 ? view.readBoolean() : version == VERSION_1;
161248
return new TableAwareFileStoreSourceSplit(
162-
splitId, split, recordsToSkip, identifier, lastSchemaId, schemaId);
249+
splitId,
250+
split,
251+
recordsToSkip,
252+
identifier,
253+
lastSchemaId,
254+
schemaId,
255+
schemaChangeEventsToSkip,
256+
legacySchemaProgress);
163257
}
164258
}
165259
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCRecordsWithSplitIds.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.paimon.flink.pipeline.cdc.source.reader;
2020

21-
import org.apache.paimon.flink.source.FileStoreSourceSplitState;
2221
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
2322
import org.apache.paimon.utils.Reference;
2423

@@ -109,7 +108,7 @@ public static void emitRecord(
109108
SourceReaderContext context,
110109
RecordIterator<Event> element,
111110
SourceOutput<Event> output,
112-
FileStoreSourceSplitState state,
111+
CDCSourceSplitState state,
113112
FileStoreSourceReaderMetrics metrics) {
114113
long timestamp = TimestampAssigner.NO_TIMESTAMP;
115114
if (metrics.getLatestFileCreationTime() != FileStoreSourceReaderMetrics.UNDEFINED) {

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceReader.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.paimon.disk.IOManager;
2222
import org.apache.paimon.flink.pipeline.cdc.source.CDCSource;
2323
import org.apache.paimon.flink.pipeline.cdc.source.TableAwareFileStoreSourceSplit;
24-
import org.apache.paimon.flink.source.FileStoreSourceSplitState;
2524
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
2625

2726
import org.apache.flink.api.connector.source.SourceReader;
@@ -37,10 +36,7 @@
3736
/** A {@link SourceReader} that read records from {@link TableAwareFileStoreSourceSplit}. */
3837
public class CDCSourceReader
3938
extends SingleThreadMultiplexSourceReaderBase<
40-
RecordIterator<Event>,
41-
Event,
42-
TableAwareFileStoreSourceSplit,
43-
FileStoreSourceSplitState> {
39+
RecordIterator<Event>, Event, TableAwareFileStoreSourceSplit, CDCSourceSplitState> {
4440
private static final Logger LOG = LoggerFactory.getLogger(CDCSourceReader.class);
4541

4642
private final IOManager ioManager;
@@ -69,7 +65,7 @@ public void start() {
6965
}
7066

7167
@Override
72-
protected void onSplitFinished(Map<String, FileStoreSourceSplitState> finishedSplitIds) {
68+
protected void onSplitFinished(Map<String, CDCSourceSplitState> finishedSplitIds) {
7369
// this method is called each time when we consume one split
7470
// it is possible that one response from the coordinator contains multiple splits
7571
// we should only require for more splits after we've consumed all given splits
@@ -79,16 +75,16 @@ protected void onSplitFinished(Map<String, FileStoreSourceSplitState> finishedSp
7975
}
8076

8177
@Override
82-
protected FileStoreSourceSplitState initializedState(TableAwareFileStoreSourceSplit split) {
78+
protected CDCSourceSplitState initializedState(TableAwareFileStoreSourceSplit split) {
8379
LOG.info("Initializing split {}", split);
84-
return new FileStoreSourceSplitState(split);
80+
return new CDCSourceSplitState(split);
8581
}
8682

8783
@Override
8884
protected TableAwareFileStoreSourceSplit toSplitType(
89-
String splitId, FileStoreSourceSplitState splitState) {
85+
String splitId, CDCSourceSplitState splitState) {
9086
LOG.info("Converting split state {} with id {} to split", splitState, splitId);
91-
return (TableAwareFileStoreSourceSplit) splitState.toSourceSplit();
87+
return splitState.toSourceSplit();
9288
}
9389

9490
@Override

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReader.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,15 +231,33 @@ private void checkSplitOrStartNext() throws IOException {
231231
currentReader = createLazyRecordReader(nextSplit.split());
232232
currentDataRowsRead = nextSplit.recordsToSkip();
233233
currentSchemaChangeEvents.clear();
234-
if (currentDataRowsRead == 0) {
235-
currentSchemaChangeEvents.addAll(schemaChangeEvents);
234+
long schemaChangeEventsToSkip =
235+
schemaChangeEventsToSkip(nextSplit, schemaChangeEvents.size());
236+
for (int i = (int) schemaChangeEventsToSkip; i < schemaChangeEvents.size(); i++) {
237+
currentSchemaChangeEvents.add(schemaChangeEvents.get(i));
236238
}
237239

238240
if (currentDataRowsRead > 0) {
239241
seek(currentDataRowsRead);
240242
}
241243
}
242244

245+
private long schemaChangeEventsToSkip(
246+
TableAwareFileStoreSourceSplit split, int schemaChangeEventCount) throws IOException {
247+
long schemaChangeEventsToSkip =
248+
split.isLegacySchemaProgress() && split.recordsToSkip() > 0
249+
? schemaChangeEventCount
250+
: split.schemaChangeEventsToSkip();
251+
if (schemaChangeEventsToSkip < 0 || schemaChangeEventsToSkip > schemaChangeEventCount) {
252+
throw new IOException(
253+
String.format(
254+
"Invalid schema change event skip count %s for split %s. "
255+
+ "The split has only %s schema change events.",
256+
schemaChangeEventsToSkip, split.splitId(), schemaChangeEventCount));
257+
}
258+
return schemaChangeEventsToSkip;
259+
}
260+
243261
@VisibleForTesting
244262
protected LazyRecordReader createLazyRecordReader(Split split) {
245263
return new LazyRecordReader(split, currentTableReaderInfo, tableManager);
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.pipeline.cdc.source.reader;
20+
21+
import org.apache.paimon.flink.pipeline.cdc.source.TableAwareFileStoreSourceSplit;
22+
23+
import org.apache.flink.cdc.common.event.Event;
24+
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
25+
import org.apache.flink.connector.file.src.util.CheckpointedPosition;
26+
import org.apache.flink.connector.file.src.util.RecordAndPosition;
27+
28+
import static org.apache.paimon.utils.Preconditions.checkArgument;
29+
import static org.apache.paimon.utils.Preconditions.checkNotNull;
30+
31+
/** State of the CDC source reader with independent data row and schema event progress. */
32+
public class CDCSourceSplitState {
33+
34+
private final TableAwareFileStoreSourceSplit split;
35+
36+
private long recordsToSkip;
37+
38+
private long schemaChangeEventsToSkip;
39+
40+
public CDCSourceSplitState(TableAwareFileStoreSourceSplit split) {
41+
this.split = checkNotNull(split);
42+
this.recordsToSkip = split.recordsToSkip();
43+
this.schemaChangeEventsToSkip = split.schemaChangeEventsToSkip();
44+
}
45+
46+
public void setPosition(RecordAndPosition<Event> position) {
47+
checkArgument(position.getOffset() == CheckpointedPosition.NO_OFFSET);
48+
if (position.getRecord() instanceof SchemaChangeEvent) {
49+
schemaChangeEventsToSkip++;
50+
} else {
51+
recordsToSkip = position.getRecordSkipCount();
52+
}
53+
}
54+
55+
public long recordsToSkip() {
56+
return recordsToSkip;
57+
}
58+
59+
public long schemaChangeEventsToSkip() {
60+
return schemaChangeEventsToSkip;
61+
}
62+
63+
public TableAwareFileStoreSourceSplit toSourceSplit() {
64+
return split.updateWithProgress(recordsToSkip, schemaChangeEventsToSkip);
65+
}
66+
67+
@Override
68+
public String toString() {
69+
return "CDCSourceSplitState{"
70+
+ "split="
71+
+ split
72+
+ ", recordsToSkip="
73+
+ recordsToSkip
74+
+ ", schemaChangeEventsToSkip="
75+
+ schemaChangeEventsToSkip
76+
+ '}';
77+
}
78+
}

0 commit comments

Comments
 (0)