Skip to content

Commit 12c4ec1

Browse files
make skip-unsubscribed-table binlog deserialization configurable
1 parent 9b3e395 commit 12c4ec1

18 files changed

Lines changed: 332 additions & 81 deletions

File tree

docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,17 @@ Flink SQL> SELECT * FROM orders;
423423
<li>false(默认):所有类型的消息都保持原样下发。</li>
424424
</td>
425425
</tr>
426+
<tr>
427+
<td>scan.binlog.skip-unsubscribed-tables.enabled</td>
428+
<td>optional</td>
429+
<td style="word-wrap: break-word;">false</td>
430+
<td>Boolean</td>
431+
<td>
432+
在增量解析阶段,是否跳过未订阅表的增量数据(binlog)反序列化。<br>
433+
建议在订阅部分表的场景下选择开启,能过滤非订阅表的增量数据解析,提升解析性能。<br>
434+
这是一项实验性功能,默认关闭。
435+
</td>
436+
</tr>
426437
<tr>
427438
<td>scan.incremental.snapshot.backfill.skip</td>
428439
<td>optional</td>

docs/content.zh/docs/connectors/pipeline-connectors/mysql.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,17 @@ pipeline:
286286
scan.binlog.newly-added-table.enabled: 只在 binlog 读取阶段读取新增表的增量数据。
287287
</td>
288288
</tr>
289+
<tr>
290+
<td>scan.binlog.skip-unsubscribed-tables.enabled</td>
291+
<td>optional</td>
292+
<td style="word-wrap: break-word;">false</td>
293+
<td>Boolean</td>
294+
<td>
295+
在增量解析阶段,是否跳过未订阅表的增量数据(binlog)反序列化。<br>
296+
建议在订阅部分表的场景下选择开启,能过滤非订阅表的增量数据解析,提升解析性能。<br>
297+
这是一项实验性功能,默认关闭。
298+
</td>
299+
</tr>
289300
<tr>
290301
<td>scan.parse.online.schema.changes.enabled</td>
291302
<td>optional</td>

docs/content/docs/connectors/flink-sources/mysql-cdc.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,17 @@ Only valid for cdc 1.x version. During a snapshot operation, the connector will
448448
<li>false (default): All types of messages are sent as is.</li>
449449
</td>
450450
</tr>
451+
<tr>
452+
<td>scan.binlog.skip-unsubscribed-tables.enabled</td>
453+
<td>optional</td>
454+
<td style="word-wrap: break-word;">false</td>
455+
<td>Boolean</td>
456+
<td>
457+
During incremental reading, whether to skip deserialization of incremental data (binlog) for unsubscribed tables.<br>
458+
It is recommended to enable this option when only a subset of tables are subscribed. It can avoid parsing incremental events of unsubscribed tables and improve performance.<br>
459+
This is an experimental feature and is disabled by default.
460+
</td>
461+
</tr>
451462
<tr>
452463
<td>scan.incremental.snapshot.backfill.skip</td>
453464
<td>optional</td>

docs/content/docs/connectors/pipeline-connectors/mysql.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,17 @@ pipeline:
293293
scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase.
294294
</td>
295295
</tr>
296+
<tr>
297+
<td>scan.binlog.skip-unsubscribed-tables.enabled</td>
298+
<td>optional</td>
299+
<td style="word-wrap: break-word;">false</td>
300+
<td>Boolean</td>
301+
<td>
302+
During incremental reading, whether to skip deserialization of incremental data (binlog) for unsubscribed tables.<br>
303+
It is recommended to enable this option when only a subset of tables are subscribed. It can avoid parsing incremental events of unsubscribed tables and improve performance.<br>
304+
This is an experimental feature and is disabled by default.
305+
</td>
306+
</tr>
296307
<tr>
297308
<td>scan.parse.online.schema.changes.enabled</td>
298309
<td>optional</td>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
7777
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
7878
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
79+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED;
7980
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
8081
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
8182
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
@@ -167,6 +168,8 @@ public DataSource createDataSource(Context context) {
167168
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
168169
boolean isAssignUnboundedChunkFirst =
169170
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
171+
boolean skipBinlogDeserializationOfUnsubscribedTables =
172+
config.get(SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED);
170173

171174
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
172175
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -220,6 +223,8 @@ public DataSource createDataSource(Context context) {
220223
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
221224
.useLegacyJsonFormat(useLegacyJsonFormat)
222225
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
226+
.skipBinlogDeserializationOfUnsubscribedTables(
227+
skipBinlogDeserializationOfUnsubscribedTables)
223228
.skipSnapshotBackfill(skipSnapshotBackfill);
224229

225230
List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
@@ -351,6 +356,7 @@ public Set<ConfigOption<?>> optionalOptions() {
351356
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
352357
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
353358
options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
359+
options.add(SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED);
354360
options.add(METADATA_LIST);
355361
options.add(INCLUDE_COMMENTS_ENABLED);
356362
options.add(USE_LEGACY_JSON_FORMAT);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,4 +330,12 @@ public class MySqlDataSourceOptions {
330330
.defaultValue(false)
331331
.withDescription(
332332
"Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.");
333+
334+
@Experimental
335+
public static final ConfigOption<Boolean> SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED =
336+
ConfigOptions.key("scan.binlog.skip-unsubscribed-tables.enabled")
337+
.booleanType()
338+
.defaultValue(false)
339+
.withDescription(
340+
"Whether to skip deserialization of binlog row events for unsubscribed tables.");
333341
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,16 +88,16 @@
8888
* Copied from Debezium project(1.9.8.Final) to fix
8989
* https://github.com/ververica/flink-cdc-connectors/issues/1944.
9090
*
91-
* <p>Line 1427-1433 : Adjust GTID merging logic to support recovering from job which previously
91+
* <p>Line 1450-1466 : Adjust GTID merging logic to support recovering from job which previously
9292
* specifying starting offset on start.
9393
*
94-
* <p>Line 1485 : Add more error details for some exceptions.
94+
* <p>Line 1504-1522 : Add more error details for some exceptions.
9595
*
96-
* <p>Line 947-958 : Use iterator instead of index-based loop to avoid O(n²) complexity when
96+
* <p>Line 960-972 : Use iterator instead of index-based loop to avoid O(n²) complexity when
9797
* processing LinkedList rows in handleChange method. See FLINK-38846.
9898
*
99-
* <p>Line 317, 1358-1366 : Use a {@link TableIdFilter} to skip binlog deserialization of unmatched
100-
* tables.
99+
* <p>Line 319-325, 1361-1368 : Use a {@link TableIdFilter} to skip binlog deserialization of
100+
* unmatched tables.
101101
*/
102102
public class MySqlStreamingChangeEventSource
103103
implements StreamingChangeEventSource<MySqlPartition, MySqlOffsetContext> {
@@ -202,7 +202,8 @@ public MySqlStreamingChangeEventSource(
202202
ErrorHandler errorHandler,
203203
Clock clock,
204204
MySqlTaskContext taskContext,
205-
MySqlStreamingChangeEventSourceMetrics metrics) {
205+
MySqlStreamingChangeEventSourceMetrics metrics,
206+
boolean skipBinlogDeserializationOfUnsubscribedTables) {
206207

207208
this.taskContext = taskContext;
208209
this.connectorConfig = connectorConfig;
@@ -315,7 +316,13 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
315316
}
316317
};
317318

318-
final TableIdFilter tableIdFilter = getTableIdDeserializationFilter();
319+
LOGGER.info(
320+
"Skip binlog deserialization for unsubscribed tables: {}",
321+
skipBinlogDeserializationOfUnsubscribedTables);
322+
final TableIdFilter tableIdFilter =
323+
skipBinlogDeserializationOfUnsubscribedTables
324+
? getTableIdDeserializationFilter()
325+
: TableIdFilter.all();
319326

320327
// Add our custom deserializers ...
321328
eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,10 @@ public void submitSplit(MySqlSplit mySqlSplit) {
134134
(MySqlStreamingChangeEventSourceMetrics)
135135
statefulTaskContext.getStreamingChangeEventSourceMetrics(),
136136
currentBinlogSplit,
137-
createEventFilter());
137+
createEventFilter(),
138+
statefulTaskContext
139+
.getSourceConfig()
140+
.isSkipBinlogDeserializationOfUnsubscribedTables());
138141

139142
executorService.submit(
140143
() -> {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,10 @@ private MySqlBinlogSplitReadTask createBackfillBinlogReadTask(
254254
(MySqlStreamingChangeEventSourceMetrics)
255255
statefulTaskContext.getStreamingChangeEventSourceMetrics(),
256256
backfillBinlogSplit,
257-
event -> true);
257+
event -> true,
258+
statefulTaskContext
259+
.getSourceConfig()
260+
.isSkipBinlogDeserializationOfUnsubscribedTables());
258261
}
259262

260263
private void dispatchBinlogEndEvent(MySqlBinlogSplit backFillBinlogSplit)

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,17 @@ public MySqlBinlogSplitReadTask(
6868
MySqlTaskContext taskContext,
6969
MySqlStreamingChangeEventSourceMetrics metrics,
7070
MySqlBinlogSplit binlogSplit,
71-
Predicate<Event> eventFilter) {
72-
super(connectorConfig, connection, dispatcher, errorHandler, clock, taskContext, metrics);
71+
Predicate<Event> eventFilter,
72+
boolean skipBinlogDeserializationOfUnsubscribedTables) {
73+
super(
74+
connectorConfig,
75+
connection,
76+
dispatcher,
77+
errorHandler,
78+
clock,
79+
taskContext,
80+
metrics,
81+
skipBinlogDeserializationOfUnsubscribedTables);
7382
this.binlogSplit = binlogSplit;
7483
this.eventDispatcher = dispatcher;
7584
this.errorHandler = errorHandler;

0 commit comments

Comments
 (0)