diff --git a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md index 46822cd5487..bce7f8f54bd 100644 --- a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md @@ -433,6 +433,17 @@ Flink SQL> SELECT * FROM orders;
  • false(默认):所有类型的消息都保持原样下发。
  • + + scan.binlog.skip-unsubscribed-tables.enabled + optional + false + Boolean + + 在增量解析阶段,是否跳过未订阅表的增量数据(binlog)反序列化。
    + 建议在订阅部分表的场景下选择开启,能过滤非订阅表的增量数据解析,提升解析性能。
    + 这是一项实验性功能,默认关闭。 + + scan.incremental.snapshot.backfill.skip optional diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md index fc38481fdbc..1ffaa6f8737 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md @@ -286,6 +286,17 @@ pipeline: scan.binlog.newly-added-table.enabled: 只在 binlog 读取阶段读取新增表的增量数据。 + + scan.binlog.skip-unsubscribed-tables.enabled + optional + false + Boolean + + 在增量解析阶段,是否跳过未订阅表的增量数据(binlog)反序列化。
    + 建议在订阅部分表的场景下选择开启,能过滤非订阅表的增量数据解析,提升解析性能。
    + 这是一项实验性功能,默认关闭。 + + scan.parse.online.schema.changes.enabled optional diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md index ad641ef0e93..7654e8e7384 100644 --- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md @@ -458,6 +458,17 @@ Only valid for cdc 1.x version. During a snapshot operation, the connector will
  • false (default): All types of messages are sent as is.
  • + + scan.binlog.skip-unsubscribed-tables.enabled + optional + false + Boolean + + During incremental reading, whether to skip deserialization of incremental data (binlog) for unsubscribed tables.
    + It is recommended to enable this option when only a subset of tables are subscribed. It can avoid parsing incremental events of unsubscribed tables and improve performance.
    + This is an experimental feature and is disabled by default. + + scan.incremental.snapshot.backfill.skip optional diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index a8964cdaffb..00b774a2767 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -293,6 +293,17 @@ pipeline: scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase. + + scan.binlog.skip-unsubscribed-tables.enabled + optional + false + Boolean + + During incremental reading, whether to skip deserialization of incremental data (binlog) for unsubscribed tables.
    + It is recommended to enable this option when only a subset of tables are subscribed. It can avoid parsing incremental events of unsubscribed tables and improve performance.
    + This is an experimental feature and is disabled by default. + + scan.parse.online.schema.changes.enabled optional diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 1b3540da0bb..6d08739f483 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -76,6 +76,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; @@ -167,6 +168,8 @@ public DataSource createDataSource(Context context) { boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT); boolean isAssignUnboundedChunkFirst = config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); + boolean skipBinlogDeserializationOfUnsubscribedTables = + config.get(SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); @@ -220,6 +223,8 @@ public DataSource createDataSource(Context context) { .treatTinyInt1AsBoolean(treatTinyInt1AsBoolean) .useLegacyJsonFormat(useLegacyJsonFormat) .assignUnboundedChunkFirst(isAssignUnboundedChunkFirst) + .skipBinlogDeserializationOfUnsubscribedTables( + skipBinlogDeserializationOfUnsubscribedTables) .skipSnapshotBackfill(skipSnapshotBackfill); List tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null); @@ -351,6 +356,7 @@ public Set> optionalOptions() { options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED); + options.add(SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED); options.add(METADATA_LIST); options.add(INCLUDE_COMMENTS_ENABLED); options.add(USE_LEGACY_JSON_FORMAT); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 6aff556e7fa..74227501724 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -330,4 +330,12 @@ public class MySqlDataSourceOptions { .defaultValue(false) .withDescription( "Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially."); + + @Experimental + public static final ConfigOption SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED = + ConfigOptions.key("scan.binlog.skip-unsubscribed-tables.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to skip deserialization of binlog row events for unsubscribed tables."); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/DeleteRowsEventDataDeserializer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/DeleteRowsEventDataDeserializer.java new file mode 100644 index 00000000000..137c6bdddd1 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/DeleteRowsEventDataDeserializer.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.shyiko.mysql.binlog.event.deserialization; + +import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; +import com.github.shyiko.mysql.binlog.event.TableMapEventData; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; + +import java.io.IOException; +import java.io.Serializable; +import java.util.BitSet; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Copied from mysql-binlog-connector 0.27.2 to add a {@link TableIdFilter}. + * + *

    Line 52-56: Add a new constructor with {@link TableIdFilter} supplied. + * + *

    Line 70-74: Use a {@link TableIdFilter} to skip the binlog deserialization of unwanted tables. + */ +public class DeleteRowsEventDataDeserializer + extends AbstractRowsEventDataDeserializer { + + private boolean mayContainExtraInformation; + + /** the table id filter to skip further deserialization of unsubscribed table ids. */ + private final TableIdFilter tableIdFilter; + + public DeleteRowsEventDataDeserializer(Map tableMapEventByTableId) { + this(tableMapEventByTableId, TableIdFilter.all()); + } + + public DeleteRowsEventDataDeserializer( + Map tableMapEventByTableId, TableIdFilter tableIdFilter) { + super(tableMapEventByTableId); + this.tableIdFilter = tableIdFilter; + } + + public DeleteRowsEventDataDeserializer setMayContainExtraInformation( + boolean mayContainExtraInformation) { + this.mayContainExtraInformation = mayContainExtraInformation; + return this; + } + + @Override + public DeleteRowsEventData deserialize(ByteArrayInputStream inputStream) throws IOException { + DeleteRowsEventData eventData = new DeleteRowsEventData(); + eventData.setTableId(inputStream.readLong(6)); + + // skip further deserialization if the table id is unsubscribed + if (!tableIdFilter.test(eventData.getTableId())) { + eventData.setIncludedColumns(null); + eventData.setRows(Collections.emptyList()); + return eventData; + } + + inputStream.readInteger(2); // reserved + if (mayContainExtraInformation) { + int extraInfoLength = inputStream.readInteger(2); + inputStream.skip(extraInfoLength - 2); + } + int numberOfColumns = inputStream.readPackedInteger(); + eventData.setIncludedColumns(inputStream.readBitSet(numberOfColumns, true)); + eventData.setRows( + deserializeRows( + eventData.getTableId(), eventData.getIncludedColumns(), inputStream)); + return eventData; + } + + private List deserializeRows( + long tableId, BitSet includedColumns, ByteArrayInputStream inputStream) + throws IOException { + List result = new LinkedList(); + while (inputStream.available() > 0) { + result.add(deserializeRow(tableId, includedColumns, inputStream)); + } + return result; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableIdFilter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableIdFilter.java new file mode 100644 index 00000000000..20916a9054e --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableIdFilter.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.shyiko.mysql.binlog.event.deserialization; + +import java.util.function.Predicate; + +/** The filter used for skipping the binlog deserialization of unsubscribed table. */ +public interface TableIdFilter extends Predicate { + + @Override + boolean test(Long tableId); + + static TableIdFilter all() { + return tableId -> true; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/UpdateRowsEventDataDeserializer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/UpdateRowsEventDataDeserializer.java new file mode 100644 index 00000000000..2e99b740ede --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/UpdateRowsEventDataDeserializer.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.shyiko.mysql.binlog.event.deserialization; + +import com.github.shyiko.mysql.binlog.event.TableMapEventData; +import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; + +import java.io.IOException; +import java.io.Serializable; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * Copied from mysql-binlog-connector 0.27.2 to add a {@link TableIdFilter}. + * + *

    Line 53-57: Add a new constructor with {@link TableIdFilter} supplied. + * + *

    Line 71-75: Use a {@link TableIdFilter} to skip the binlog deserialization of unwanted tables. + */ +public class UpdateRowsEventDataDeserializer + extends AbstractRowsEventDataDeserializer { + + private boolean mayContainExtraInformation; + + /** the table id filter to skip further deserialization of unsubscribed table ids. */ + private final TableIdFilter tableIdFilter; + + public UpdateRowsEventDataDeserializer(Map tableMapEventByTableId) { + this(tableMapEventByTableId, TableIdFilter.all()); + } + + public UpdateRowsEventDataDeserializer( + Map tableMapEventByTableId, TableIdFilter tableIdFilter) { + super(tableMapEventByTableId); + this.tableIdFilter = tableIdFilter; + } + + public UpdateRowsEventDataDeserializer setMayContainExtraInformation( + boolean mayContainExtraInformation) { + this.mayContainExtraInformation = mayContainExtraInformation; + return this; + } + + @Override + public UpdateRowsEventData deserialize(ByteArrayInputStream inputStream) throws IOException { + UpdateRowsEventData eventData = new UpdateRowsEventData(); + eventData.setTableId(inputStream.readLong(6)); + + // skip further deserialization if the table id is unsubscribed + if (!tableIdFilter.test(eventData.getTableId())) { + eventData.setIncludedColumns(null); + eventData.setRows(Collections.emptyList()); + return eventData; + } + + inputStream.skip(2); // reserved + if (mayContainExtraInformation) { + int extraInfoLength = inputStream.readInteger(2); + inputStream.skip(extraInfoLength - 2); + } + int numberOfColumns = inputStream.readPackedInteger(); + eventData.setIncludedColumnsBeforeUpdate(inputStream.readBitSet(numberOfColumns, true)); + eventData.setIncludedColumns(inputStream.readBitSet(numberOfColumns, true)); + eventData.setRows(deserializeRows(eventData, inputStream)); + return eventData; + } + + private List> deserializeRows( + UpdateRowsEventData eventData, ByteArrayInputStream inputStream) throws IOException { + long tableId = eventData.getTableId(); + BitSet includedColumnsBeforeUpdate = eventData.getIncludedColumnsBeforeUpdate(), + includedColumns = eventData.getIncludedColumns(); + List> rows = + new ArrayList>(); + while (inputStream.available() > 0) { + rows.add( + new AbstractMap.SimpleEntry( + deserializeRow(tableId, includedColumnsBeforeUpdate, inputStream), + deserializeRow(tableId, includedColumns, inputStream))); + } + return rows; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/WriteRowsEventDataDeserializer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/WriteRowsEventDataDeserializer.java new file mode 100644 index 00000000000..1c7330b75b5 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/WriteRowsEventDataDeserializer.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.shyiko.mysql.binlog.event.deserialization; + +import com.github.shyiko.mysql.binlog.event.TableMapEventData; +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; + +import java.io.IOException; +import java.io.Serializable; +import java.util.BitSet; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Copied from mysql-binlog-connector 0.27.2 to add a {@link TableIdFilter}. + * + *

    Line 52-56: Add a new constructor with {@link TableIdFilter} supplied. + * + *

    Line 70-74: Use a {@link TableIdFilter} to skip the binlog deserialization of unwanted tables. + */ +public class WriteRowsEventDataDeserializer + extends AbstractRowsEventDataDeserializer { + + private boolean mayContainExtraInformation; + + /** the table id filter to skip further deserialization of unsubscribed table ids. */ + private final TableIdFilter tableIdFilter; + + public WriteRowsEventDataDeserializer(Map tableMapEventByTableId) { + this(tableMapEventByTableId, TableIdFilter.all()); + } + + public WriteRowsEventDataDeserializer( + Map tableMapEventByTableId, TableIdFilter tableIdFilter) { + super(tableMapEventByTableId); + this.tableIdFilter = tableIdFilter; + } + + public WriteRowsEventDataDeserializer setMayContainExtraInformation( + boolean mayContainExtraInformation) { + this.mayContainExtraInformation = mayContainExtraInformation; + return this; + } + + @Override + public WriteRowsEventData deserialize(ByteArrayInputStream inputStream) throws IOException { + WriteRowsEventData eventData = new WriteRowsEventData(); + eventData.setTableId(inputStream.readLong(6)); + + // skip further deserialization if the table id is unsubscribed + if (!tableIdFilter.test(eventData.getTableId())) { + eventData.setIncludedColumns(null); + eventData.setRows(Collections.emptyList()); + return eventData; + } + + inputStream.skip(2); // reserved + if (mayContainExtraInformation) { + int extraInfoLength = inputStream.readInteger(2); + inputStream.skip(extraInfoLength - 2); + } + int numberOfColumns = inputStream.readPackedInteger(); + eventData.setIncludedColumns(inputStream.readBitSet(numberOfColumns, true)); + eventData.setRows( + deserializeRows( + eventData.getTableId(), eventData.getIncludedColumns(), inputStream)); + return eventData; + } + + private List deserializeRows( + long tableId, BitSet includedColumns, ByteArrayInputStream inputStream) + throws IOException { + List result = new LinkedList(); + while (inputStream.available() > 0) { + result.add(deserializeRow(tableId, includedColumns, inputStream)); + } + return result; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index 557a149a2d0..9bfbdbfe79d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -24,6 +24,7 @@ import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException; import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.TableIdFilter; import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; import com.github.shyiko.mysql.binlog.network.AuthenticationException; import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory; @@ -87,18 +88,21 @@ * Copied from Debezium project(1.9.8.Final) to fix * https://github.com/ververica/flink-cdc-connectors/issues/1944. * - *

    Line 1432-1443 : Adjust GTID merging logic to support recovering from job which previously + *

    Line 1454-1466 : Adjust GTID merging logic to support recovering from job which previously * specifying starting offset on start. Uses {@link GtidUtils#fixOldChannelsGtidSet} for shared * EARLIEST/LATEST logic. * - *

    Line 1444-1452 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions + *

    Line 1467-1475 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions * when checkpoint GTID has non-contiguous ranges. Delegates to {@link * GtidUtils#computeLatestModeGtidSet}. See FLINK-39149. * - *

    Line 1490 : Add more error details for some exceptions. + *

    Line 1526 : Add more error details for some exceptions. * - *

    Line 951-963 : Use iterator instead of index-based loop to avoid O(n²) complexity when + *

    Line 965-977 : Use iterator instead of index-based loop to avoid O(n²) complexity when * processing LinkedList rows in handleChange method. See FLINK-38846. + * + *

    Line 324-330, 1366-1373 : Use a {@link TableIdFilter} to skip binlog deserialization of + * unmatched tables. */ public class MySqlStreamingChangeEventSource implements StreamingChangeEventSource { @@ -203,7 +207,8 @@ public MySqlStreamingChangeEventSource( ErrorHandler errorHandler, Clock clock, MySqlTaskContext taskContext, - MySqlStreamingChangeEventSourceMetrics metrics) { + MySqlStreamingChangeEventSourceMetrics metrics, + boolean skipBinlogDeserializationOfUnsubscribedTables) { this.taskContext = taskContext; this.connectorConfig = connectorConfig; @@ -316,29 +321,37 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException { } }; + LOGGER.info( + "Skip binlog deserialization for unsubscribed tables: {}", + skipBinlogDeserializationOfUnsubscribedTables); + final TableIdFilter tableIdFilter = + skipBinlogDeserializationOfUnsubscribedTables + ? getTableIdDeserializationFilter() + : TableIdFilter.all(); + // Add our custom deserializers ... eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer()); eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer()); eventDeserializer.setEventDataDeserializer( EventType.WRITE_ROWS, - new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId)); + new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId, tableIdFilter)); eventDeserializer.setEventDataDeserializer( EventType.UPDATE_ROWS, - new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId)); + new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId, tableIdFilter)); eventDeserializer.setEventDataDeserializer( EventType.DELETE_ROWS, - new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId)); + new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId, tableIdFilter)); eventDeserializer.setEventDataDeserializer( EventType.EXT_WRITE_ROWS, - new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId) + new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId, tableIdFilter) .setMayContainExtraInformation(true)); eventDeserializer.setEventDataDeserializer( EventType.EXT_UPDATE_ROWS, - new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId) + new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId, tableIdFilter) .setMayContainExtraInformation(true)); eventDeserializer.setEventDataDeserializer( EventType.EXT_DELETE_ROWS, - new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId) + new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId, tableIdFilter) .setMayContainExtraInformation(true)); client.setEventDeserializer(eventDeserializer); } @@ -1350,6 +1363,15 @@ protected void initSSLContext(SSLContext sc) throws GeneralSecurityException { return null; } + private TableIdFilter getTableIdDeserializationFilter() { + return tableId -> { + // since only subscribed table is recording schema, the result could be null + TableId table = taskContext.getSchema().getTableId(tableId); + return table != null + && connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(table); + }; + } + private void logStreamingSourceState() { logStreamingSourceState(Level.ERROR); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/RowDeserializers.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/RowDeserializers.java new file mode 100644 index 00000000000..fdd3c5356a0 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/RowDeserializers.java @@ -0,0 +1,684 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.debezium.connector.mysql; + +import com.github.shyiko.mysql.binlog.event.TableMapEventData; +import com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.DeleteRowsEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.TableIdFilter; +import com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; + +import java.io.IOException; +import java.io.Serializable; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.Year; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.Map; + +/** + * Copied from Debezium project(1.9.8.Final) to add constructors with {@link TableIdFilter}. + * + *

    Line 64-67: add an constructors with {@link TableIdFilter} of {@link DeleteRowsDeserializer}. + * + *

    Line 141-144: add an constructors with {@link TableIdFilter} of {@link + * UpdateRowsDeserializer}. + * + *

    Line 218-221: add an constructors with {@link TableIdFilter} of {@link WriteRowsDeserializer}. + */ +public class RowDeserializers { + + /** + * A specialization of {@link DeleteRowsEventDataDeserializer} that converts MySQL {@code DATE}, + * {@code TIME}, {@code DATETIME}, and {@code TIMESTAMP} values to {@link LocalDate}, {@link + * LocalTime}, {@link LocalDateTime}, and {@link OffsetDateTime} objects, respectively. + */ + public static class DeleteRowsDeserializer extends DeleteRowsEventDataDeserializer { + + public DeleteRowsDeserializer(Map tableMapEventByTableId) { + super(tableMapEventByTableId); + } + + public DeleteRowsDeserializer( + Map tableMapEventByTableId, TableIdFilter tableIdFilter) { + super(tableMapEventByTableId, tableIdFilter); + } + + @Override + protected Serializable deserializeString(int length, ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeString(length, inputStream); + } + + @Override + protected Serializable deserializeVarString(int meta, ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeVarString(meta, inputStream); + } + + @Override + protected Serializable deserializeDate(ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeDate(inputStream); + } + + @Override + protected Serializable deserializeDatetime(ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeDatetime(inputStream); + } + + @Override + protected Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeDatetimeV2(meta, inputStream); + } + + @Override + protected Serializable deserializeTimeV2(int meta, ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeTimeV2(meta, inputStream); + } + + @Override + protected Serializable deserializeTime(ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeTime(inputStream); + } + + @Override + protected Serializable deserializeTimestamp(ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeTimestamp(inputStream); + } + + @Override + protected Serializable deserializeTimestampV2(int meta, ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeTimestampV2(meta, inputStream); + } + + @Override + protected Serializable deserializeYear(ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeYear(inputStream); + } + } + + /** + * A specialization of {@link UpdateRowsEventDataDeserializer} that converts MySQL {@code DATE}, + * {@code TIME}, {@code DATETIME}, and {@code TIMESTAMP} values to {@link LocalDate}, {@link + * LocalTime}, {@link LocalDateTime}, and {@link OffsetDateTime} objects, respectively. + */ + public static class UpdateRowsDeserializer extends UpdateRowsEventDataDeserializer { + + public UpdateRowsDeserializer(Map tableMapEventByTableId) { + super(tableMapEventByTableId); + } + + public UpdateRowsDeserializer( + Map tableMapEventByTableId, TableIdFilter tableIdFilter) { + super(tableMapEventByTableId, tableIdFilter); + } + + @Override + protected Serializable deserializeString(int length, ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeString(length, inputStream); + } + + @Override + protected Serializable deserializeVarString(int meta, ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeVarString(meta, inputStream); + } + + @Override + protected Serializable deserializeDate(ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeDate(inputStream); + } + + @Override + protected Serializable deserializeDatetime(ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeDatetime(inputStream); + } + + @Override + protected Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeDatetimeV2(meta, inputStream); + } + + @Override + protected Serializable deserializeTimeV2(int meta, ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeTimeV2(meta, inputStream); + } + + @Override + protected Serializable deserializeTime(ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeTime(inputStream); + } + + @Override + protected Serializable deserializeTimestamp(ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeTimestamp(inputStream); + } + + @Override + protected Serializable deserializeTimestampV2(int meta, ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeTimestampV2(meta, inputStream); + } + + @Override + protected Serializable deserializeYear(ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeYear(inputStream); + } + } + + /** + * A specialization of {@link WriteRowsEventDataDeserializer} that converts MySQL {@code DATE}, + * {@code TIME}, {@code DATETIME}, and {@code TIMESTAMP} values to {@link LocalDate}, {@link + * LocalTime}, {@link LocalDateTime}, and {@link OffsetDateTime} objects, respectively. + */ + public static class WriteRowsDeserializer extends WriteRowsEventDataDeserializer { + + public WriteRowsDeserializer(Map tableMapEventByTableId) { + super(tableMapEventByTableId); + } + + public WriteRowsDeserializer( + Map tableMapEventByTableId, TableIdFilter tableIdFilter) { + super(tableMapEventByTableId, tableIdFilter); + } + + @Override + protected Serializable deserializeString(int length, ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeString(length, inputStream); + } + + @Override + protected Serializable deserializeVarString(int meta, ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeVarString(meta, inputStream); + } + + @Override + protected Serializable deserializeDate(ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeDate(inputStream); + } + + @Override + protected Serializable deserializeDatetime(ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeDatetime(inputStream); + } + + @Override + protected Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeDatetimeV2(meta, inputStream); + } + + @Override + protected Serializable deserializeTimeV2(int meta, ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeTimeV2(meta, inputStream); + } + + @Override + protected Serializable deserializeTime(ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeTime(inputStream); + } + + @Override + protected Serializable deserializeTimestamp(ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeTimestamp(inputStream); + } + + @Override + protected Serializable deserializeTimestampV2(int meta, ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeTimestampV2(meta, inputStream); + } + + @Override + protected Serializable deserializeYear(ByteArrayInputStream inputStream) + throws IOException { + return RowDeserializers.deserializeYear(inputStream); + } + } + + private static final int MASK_10_BITS = (1 << 10) - 1; + private static final int MASK_6_BITS = (1 << 6) - 1; + + /** + * Converts a MySQL string to a {@code byte[]}. + * + * @param length the number of bytes used to store the length of the string + * @param inputStream the binary stream containing the raw binlog event data for the value + * @return the {@code byte[]} object + * @throws IOException if there is an error reading from the binlog event data + */ + protected static Serializable deserializeString(int length, ByteArrayInputStream inputStream) + throws IOException { + // charset is not present in the binary log (meaning there is no way to distinguish between + // CHAR / BINARY) + // as a result - return byte[] instead of an actual String + int stringLength = length < 256 ? inputStream.readInteger(1) : inputStream.readInteger(2); + return inputStream.read(stringLength); + } + + /** + * Converts a MySQL string to a {@code byte[]}. + * + * @param meta the {@code meta} value containing the number of bytes in the length field + * @param inputStream the binary stream containing the raw binlog event data for the value + * @return the {@code byte[]} object + * @throws IOException if there is an error reading from the binlog event data + */ + protected static Serializable deserializeVarString(int meta, ByteArrayInputStream inputStream) + throws IOException { + int varcharLength = meta < 256 ? inputStream.readInteger(1) : inputStream.readInteger(2); + return inputStream.read(varcharLength); + } + + /** + * Converts a MySQL {@code DATE} value to a {@link LocalDate}. + * + *

    This method treats all zero values for + * {@code DATE} columns as NULL, since they cannot be accurately represented as valid {@link + * LocalDate} objects. + * + * @param inputStream the binary stream containing the raw binlog event data for the value + * @return the {@link LocalDate} object + * @throws IOException if there is an error reading from the binlog event data + */ + protected static Serializable deserializeDate(ByteArrayInputStream inputStream) + throws IOException { + int value = inputStream.readInteger(3); + int day = value % 32; // 1-based day of the month + value >>>= 5; + int month = value % 16; // 1-based month number + int year = value >> 4; + if (year == 0 || month == 0 || day == 0) { + return null; + } + return LocalDate.of(year, month, day); + } + + /** + * Converts a MySQL {@code TIME} value without fractional seconds to a {@link + * java.time.Duration}. + * + * @param inputStream the binary stream containing the raw binlog event data for the value + * @return the {@link LocalTime} object + * @throws IOException if there is an error reading from the binlog event data + */ + protected static Serializable deserializeTime(ByteArrayInputStream inputStream) + throws IOException { + // Times are stored as an integer as `HHMMSS`, so we need to split out the digits ... + int value = inputStream.readInteger(3); + int[] split = split(value, 100, 3); + int hours = split[2]; + int minutes = split[1]; + int seconds = split[0]; + return Duration.ofHours(hours).plusMinutes(minutes).plusSeconds(seconds); + } + + /** + * Converts a MySQL {@code TIME} value with fractional seconds to a {@link + * java.time.Duration}. + * + * @param meta the {@code meta} value containing the fractional second precision, or {@code fsp} + * @param inputStream the binary stream containing the raw binlog event data for the value + * @return the {@link java.time.Duration} object + * @throws IOException if there is an error reading from the binlog event data + */ + protected static Serializable deserializeTimeV2(int meta, ByteArrayInputStream inputStream) + throws IOException { + /* + * (in big endian) + * + * 1 bit sign (1= non-negative, 0= negative) + * 1 bit unused (reserved for future extensions) + * 10 bits hour (0-838) + * 6 bits minute (0-59) + * 6 bits second (0-59) + * + * (3 bytes in total) + * + * + fractional-seconds storage (size depends on meta) + */ + long time = bigEndianLong(inputStream.read(3), 0, 3); + boolean is_negative = bitSlice(time, 0, 1, 24) == 0; + int hours = bitSlice(time, 2, 10, 24); + int minutes = bitSlice(time, 12, 6, 24); + int seconds = bitSlice(time, 18, 6, 24); + int nanoSeconds; + if (is_negative) { // mysql binary arithmetic for negative encoded values + hours = ~hours & MASK_10_BITS; + hours = hours & ~(1 << 10); // unset sign bit + minutes = ~minutes & MASK_6_BITS; + minutes = minutes & ~(1 << 6); // unset sign bit + seconds = ~seconds & MASK_6_BITS; + seconds = seconds & ~(1 << 6); // unset sign bit + nanoSeconds = deserializeFractionalSecondsInNanosNegative(meta, inputStream); + if (nanoSeconds == 0 && seconds < 59) { // weird java Duration behavior + ++seconds; + } + hours = -hours; + minutes = -minutes; + seconds = -seconds; + nanoSeconds = -nanoSeconds; + } else { + nanoSeconds = deserializeFractionalSecondsInNanos(meta, inputStream); + } + return Duration.ofHours(hours) + .plusMinutes(minutes) + .plusSeconds(seconds) + .plusNanos(nanoSeconds); + } + + /** + * Converts a MySQL {@code DATETIME} value without fractional seconds to a {@link + * LocalDateTime}. + * + *

    This method treats all zero values for + * {@code DATETIME} columns as NULL, since they cannot be accurately represented as valid {@link + * LocalDateTime} objects. + * + * @param inputStream the binary stream containing the raw binlog event data for the value + * @return the {@link LocalDateTime} object + * @throws IOException if there is an error reading from the binlog event data + */ + protected static Serializable deserializeDatetime(ByteArrayInputStream inputStream) + throws IOException { + int[] split = split(inputStream.readLong(8), 100, 6); + int year = split[5]; + int month = split[4]; // 1-based month number + int day = split[3]; // 1-based day of the month + int hours = split[2]; + int minutes = split[1]; + int seconds = split[0]; + int nanoOfSecond = 0; // This version does not support fractional seconds + if (year == 0 || month == 0 || day == 0) { + return null; + } + return LocalDateTime.of(year, month, day, hours, minutes, seconds, nanoOfSecond); + } + + /** + * Converts a MySQL {@code DATETIME} value with fractional seconds to a {@link + * LocalDateTime}. + * + *

    This method treats all zero values for + * {@code DATETIME} columns as NULL, since they cannot be accurately represented as valid {@link + * LocalDateTime} objects. + * + * @param meta the {@code meta} value containing the fractional second precision, or {@code fsp} + * @param inputStream the binary stream containing the raw binlog event data for the value + * @return the {@link LocalDateTime} object + * @throws IOException if there is an error reading from the binlog event data + */ + protected static Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream) + throws IOException { + /* + * (in big endian) + * + * 1 bit sign (1= non-negative, 0= negative) + * 17 bits year*13+month (year 0-9999, month 0-12) + * 5 bits day (0-31) + * 5 bits hour (0-23) + * 6 bits minute (0-59) + * 6 bits second (0-59) + * + * (5 bytes in total) + * + * + fractional-seconds storage (size depends on meta) + */ + long datetime = bigEndianLong(inputStream.read(5), 0, 5); + int yearMonth = bitSlice(datetime, 1, 17, 40); + int year = yearMonth / 13; + int month = yearMonth % 13; // 1-based month number + int day = bitSlice(datetime, 18, 5, 40); // 1-based day of the month + int hours = bitSlice(datetime, 23, 5, 40); + int minutes = bitSlice(datetime, 28, 6, 40); + int seconds = bitSlice(datetime, 34, 6, 40); + int nanoOfSecond = deserializeFractionalSecondsInNanos(meta, inputStream); + if (year == 0 || month == 0 || day == 0) { + return null; + } + return LocalDateTime.of(year, month, day, hours, minutes, seconds, nanoOfSecond); + } + + /** + * Converts a MySQL {@code TIMESTAMP} value without fractional seconds to a {@link + * OffsetDateTime}. MySQL stores the {@code TIMESTAMP} values as seconds past epoch in UTC, but + * the resulting {@link OffsetDateTime} will be in the local timezone. + * + * @param inputStream the binary stream containing the raw binlog event data for the value + * @return the {@link OffsetDateTime} object + * @throws IOException if there is an error reading from the binlog event data + */ + protected static Serializable deserializeTimestamp(ByteArrayInputStream inputStream) + throws IOException { + long epochSecond = inputStream.readLong(4); + int nanoSeconds = 0; // no fractional seconds + return ZonedDateTime.ofInstant( + Instant.ofEpochSecond(epochSecond, nanoSeconds), ZoneOffset.UTC); + } + + /** + * Converts a MySQL {@code TIMESTAMP} value with fractional seconds to a {@link + * OffsetDateTime}. MySQL stores the {@code TIMESTAMP} values as seconds + fractional seconds + * past epoch in UTC, but the resulting {@link OffsetDateTime} will be in the local timezone. + * + * @param meta the {@code meta} value containing the fractional second precision, or {@code fsp} + * @param inputStream the binary stream containing the raw binlog event data for the value + * @return the {@link OffsetDateTime} object + * @throws IOException if there is an error reading from the binlog event data + */ + protected static Serializable deserializeTimestampV2(int meta, ByteArrayInputStream inputStream) + throws IOException { + long epochSecond = bigEndianLong(inputStream.read(4), 0, 4); + int nanoSeconds = deserializeFractionalSecondsInNanos(meta, inputStream); + return ZonedDateTime.ofInstant( + Instant.ofEpochSecond(epochSecond, nanoSeconds), ZoneOffset.UTC); + } + + /** + * Converts a MySQL {@code YEAR} value to a {@link Year} object. + * + * @param inputStream the binary stream containing the raw binlog event data for the value + * @return the {@link Year} object + * @throws IOException if there is an error reading from the binlog event data + */ + protected static Serializable deserializeYear(ByteArrayInputStream inputStream) + throws IOException { + return Year.of(1900 + inputStream.readInteger(1)); + } + + /** + * Split the integer into multiple integers. + * + *

    We can't use/access the private {@code split} method in the {@link + * AbstractRowsEventDataDeserializer} class, so we replicate it here. Note the original is + * licensed under the same Apache Software License 2.0 as Debezium. + * + * @param value the long value + * @param divider the value used to separate the individual values (e.g., 10 to separate each + * digit into a separate value, 100 to separate each pair of digits into a separate value, + * 1000 to separate each 3 digits into a separate value, etc.) + * @param length the expected length of the integer array + * @return the integer values + * @author Stanley Shyiko + */ + protected static int[] split(long value, int divider, int length) { + int[] result = new int[length]; + for (int i = 0; i < length - 1; i++) { + result[i] = (int) (value % divider); + value /= divider; + } + result[length - 1] = (int) value; + return result; + } + + /** + * Read a big-endian long value. + * + *

    We can't use/access the private {@code bigEndianLong} method in the {@link + * AbstractRowsEventDataDeserializer} class, so we replicate it here. Note the original is + * licensed under the same Apache Software License 2.0 as Debezium. + * + * @param bytes the bytes containing the big-endian representation of the value + * @param offset the offset within the {@code bytes} byte array where the value starts + * @param length the length of the byte representation within the {@code bytes} byte array + * @return the long value + * @author Stanley Shyiko + */ + protected static long bigEndianLong(byte[] bytes, int offset, int length) { + long result = 0; + for (int i = offset; i < (offset + length); i++) { + byte b = bytes[i]; + result = (result << 8) | (b >= 0 ? (int) b : (b + 256)); + } + return result; + } + + /** + * Slice an integer out of a portion of long value. + * + *

    We can't use/access the private {@code bitSlice} method in the {@link + * AbstractRowsEventDataDeserializer} class, so we replicate it here. Note the original is + * licensed under the same Apache Software License 2.0 as Debezium. + * + * @param value the long containing the integer encoded within it + * @param bitOffset the number of bits where the integer value starts + * @param numberOfBits the number of bits in the integer value + * @param payloadSize the total number of bits used in the {@code value} + * @return the integer value + */ + protected static int bitSlice(long value, int bitOffset, int numberOfBits, int payloadSize) { + long result = value >> payloadSize - (bitOffset + numberOfBits); + return (int) (result & ((1 << numberOfBits) - 1)); + } + + /** + * Read the binary input stream to obtain the number of nanoseconds given the fractional + * seconds precision, or fsp. + * + *

    We can't use/access the {@code deserializeFractionalSeconds} method in the {@link + * AbstractRowsEventDataDeserializer} class, so we replicate it here with modifications to + * support nanoseconds rather than microseconds. Note the original is licensed under the same + * Apache Software License 2.0 as Debezium. + * + * @param fsp the fractional seconds precision describing the number of digits precision used to + * store the fractional seconds (e.g., 1 for storing tenths of a second, 2 for storing + * hundredths, 3 for storing milliseconds, etc.) + * @param inputStream the binary data stream + * @return the number of nanoseconds + * @throws IOException if there is an error reading from the binlog event data + */ + protected static int deserializeFractionalSecondsInNanos( + int fsp, ByteArrayInputStream inputStream) throws IOException { + // Calculate the number of bytes to read, which is + // '1' when fsp=(1,2) -- 7 + // '2' when fsp=(3,4) and -- 12 + // '3' when fsp=(5,6) -- 21 + int length = (fsp + 1) / 2; + if (length > 0) { + long fraction = bigEndianLong(inputStream.read(length), 0, length); + // Convert the fractional value (which has extra trailing digit for fsp=1,3, and 5) to + // nanoseconds ... + return (int) (fraction / (0.0000001 * Math.pow(100, length - 1))); + } + return 0; + } + + /** + * Read the binary input stream to obtain the number of nanoseconds given the fractional + * seconds precision, or fsp. + * + *

    We can't use/access the {@code deserializeFractionalSeconds} method in the {@link + * AbstractRowsEventDataDeserializer} class, so we replicate it here with modifications to + * support nanoseconds rather than microseconds and negative values. Note the original is + * licensed under the same Apache Software License 2.0 as Debezium. + * + * @param fsp the fractional seconds precision describing the number of digits precision used to + * store the fractional seconds (e.g., 1 for storing tenths of a second, 2 for storing + * hundredths, 3 for storing milliseconds, etc.) + * @param inputStream the binary data stream + * @return the number of nanoseconds + * @throws IOException if there is an error reading from the binlog event data + */ + protected static int deserializeFractionalSecondsInNanosNegative( + int fsp, ByteArrayInputStream inputStream) throws IOException { + // Calculate the number of bytes to read, which is + // '1' when fsp=(1,2) + // '2' when fsp=(3,4) and + // '3' when fsp=(5,6) + int length = (fsp + 1) / 2; + if (length > 0) { + long fraction = bigEndianLong(inputStream.read(length), 0, length); + int maskBits = 0; + switch (length) { // mask bits according to field precision + case 1: + maskBits = 8; + break; + case 2: + maskBits = 15; + break; + case 3: + maskBits = 20; + break; + default: + break; + } + fraction = ~fraction & ((1 << maskBits) - 1); + fraction = (fraction & ~(1 << maskBits)) + 1; // unset sign bit + // Convert the fractional value (which has extra trailing digit for fsp=1,3, and 5) to + // nanoseconds ... + return (int) (fraction / (0.0000001 * Math.pow(100, length - 1))); + } + return 0; + } + + private RowDeserializers() {} +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index b1e6d1dfc86..70ffac35e28 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -137,7 +137,10 @@ public void submitSplit(MySqlSplit mySqlSplit) { (MySqlStreamingChangeEventSourceMetrics) statefulTaskContext.getStreamingChangeEventSourceMetrics(), currentBinlogSplit, - createEventFilter()); + createEventFilter(), + statefulTaskContext + .getSourceConfig() + .isSkipBinlogDeserializationOfUnsubscribedTables()); executorService.submit( () -> { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index e3549237065..2956afee4d3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -254,7 +254,10 @@ private MySqlBinlogSplitReadTask createBackfillBinlogReadTask( (MySqlStreamingChangeEventSourceMetrics) statefulTaskContext.getStreamingChangeEventSourceMetrics(), backfillBinlogSplit, - event -> true); + event -> true, + statefulTaskContext + .getSourceConfig() + .isSkipBinlogDeserializationOfUnsubscribedTables()); } private void dispatchBinlogEndEvent(MySqlBinlogSplit backFillBinlogSplit) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java index 7a6e7757f7d..50a722462e9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java @@ -68,8 +68,17 @@ public MySqlBinlogSplitReadTask( MySqlTaskContext taskContext, MySqlStreamingChangeEventSourceMetrics metrics, MySqlBinlogSplit binlogSplit, - Predicate eventFilter) { - super(connectorConfig, connection, dispatcher, errorHandler, clock, taskContext, metrics); + Predicate eventFilter, + boolean skipBinlogDeserializationOfUnsubscribedTables) { + super( + connectorConfig, + connection, + dispatcher, + errorHandler, + clock, + taskContext, + metrics, + skipBinlogDeserializationOfUnsubscribedTables); this.binlogSplit = binlogSplit; this.eventDispatcher = dispatcher; this.errorHandler = errorHandler; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java index 93fa2a0d36a..18b1fc27e00 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java @@ -312,6 +312,14 @@ public MySqlSourceBuilder assignUnboundedChunkFirst(boolean assignUnboundedCh return this; } + /** Whether to skip deserialization of binlog row events for unsubscribed tables. */ + public MySqlSourceBuilder skipBinlogDeserializationOfUnsubscribedTables( + boolean skipBinlogDeserializationOfUnsubscribedTables) { + this.configFactory.skipBinlogDeserializationOfUnsubscribedTables( + skipBinlogDeserializationOfUnsubscribedTables); + return this; + } + /** * Build the {@link MySqlSource}. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index cf456fcaed0..fb6375fe025 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -72,6 +72,7 @@ public class MySqlSourceConfig implements Serializable { private final boolean parseOnLineSchemaChanges; public static boolean useLegacyJsonFormat = true; private final boolean assignUnboundedChunkFirst; + private final boolean skipBinlogDeserializationOfUnsubscribedTables; // -------------------------------------------------------------------------------------------- // Debezium Configurations @@ -112,7 +113,8 @@ public class MySqlSourceConfig implements Serializable { boolean parseOnLineSchemaChanges, boolean treatTinyInt1AsBoolean, boolean useLegacyJsonFormat, - boolean assignUnboundedChunkFirst) { + boolean assignUnboundedChunkFirst, + boolean skipBinlogDeserializationOfUnsubscribedTables) { this.hostname = checkNotNull(hostname); this.port = port; this.username = checkNotNull(username); @@ -158,6 +160,8 @@ public class MySqlSourceConfig implements Serializable { this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean; this.useLegacyJsonFormat = useLegacyJsonFormat; this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; + this.skipBinlogDeserializationOfUnsubscribedTables = + skipBinlogDeserializationOfUnsubscribedTables; } public String getHostname() { @@ -299,4 +303,8 @@ public boolean isSkipSnapshotBackfill() { public boolean isTreatTinyInt1AsBoolean() { return treatTinyInt1AsBoolean; } + + public boolean isSkipBinlogDeserializationOfUnsubscribedTables() { + return skipBinlogDeserializationOfUnsubscribedTables; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 569b62232db..ae8e436b2aa 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -78,6 +78,8 @@ public class MySqlSourceConfigFactory implements Serializable { private boolean treatTinyInt1AsBoolean = true; private boolean useLegacyJsonFormat = true; private boolean assignUnboundedChunkFirst = false; + private boolean skipBinlogDeserializationOfUnsubscribedTables = + MySqlSourceOptions.SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue(); public MySqlSourceConfigFactory hostname(String hostname) { this.hostname = hostname; @@ -341,6 +343,14 @@ public MySqlSourceConfigFactory assignUnboundedChunkFirst(boolean assignUnbounde return this; } + /** Whether to skip deserialization of binlog row events for unsubscribed tables. */ + public MySqlSourceConfigFactory skipBinlogDeserializationOfUnsubscribedTables( + boolean skipBinlogDeserializationOfUnsubscribedTables) { + this.skipBinlogDeserializationOfUnsubscribedTables = + skipBinlogDeserializationOfUnsubscribedTables; + return this; + } + /** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */ public MySqlSourceConfig createConfig(int subtaskId) { // hard code server name, because we don't need to distinguish it, docs: @@ -444,6 +454,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { parseOnLineSchemaChanges, treatTinyInt1AsBoolean, useLegacyJsonFormat, - assignUnboundedChunkFirst); + assignUnboundedChunkFirst, + skipBinlogDeserializationOfUnsubscribedTables); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java index a8e143f5fc5..8dbf12f0092 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java @@ -292,4 +292,12 @@ public class MySqlSourceOptions { .defaultValue(true) .withDescription( "Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk."); + + @Experimental + public static final ConfigOption SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED = + ConfigOptions.key("scan.binlog.skip-unsubscribed-tables.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to skip deserialization of binlog row events for unsubscribed tables."); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java index 2a1f0519435..e8adbb8ce34 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java @@ -101,8 +101,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat final boolean parseOnlineSchemaChanges; private final boolean useLegacyJsonFormat; private final boolean assignUnboundedChunkFirst; - private final boolean appendOnly; + private final boolean skipBinlogDeserializationOfUnsubscribedTables; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -144,7 +144,8 @@ public MySqlTableSource( boolean parseOnlineSchemaChanges, boolean useLegacyJsonFormat, boolean assignUnboundedChunkFirst, - boolean appendOnly) { + boolean appendOnly, + boolean skipBinlogDeserializationOfUnsubscribedTables) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -178,6 +179,8 @@ public MySqlTableSource( this.useLegacyJsonFormat = useLegacyJsonFormat; this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; this.appendOnly = appendOnly; + this.skipBinlogDeserializationOfUnsubscribedTables = + skipBinlogDeserializationOfUnsubscribedTables; } @Override @@ -241,6 +244,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .parseOnLineSchemaChanges(parseOnlineSchemaChanges) .useLegacyJsonFormat(useLegacyJsonFormat) .assignUnboundedChunkFirst(assignUnboundedChunkFirst) + .skipBinlogDeserializationOfUnsubscribedTables( + skipBinlogDeserializationOfUnsubscribedTables) .build(); return SourceProvider.of(parallelSource); } else { @@ -330,7 +335,8 @@ public DynamicTableSource copy() { parseOnlineSchemaChanges, useLegacyJsonFormat, assignUnboundedChunkFirst, - appendOnly); + appendOnly, + skipBinlogDeserializationOfUnsubscribedTables); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -376,7 +382,9 @@ public boolean equals(Object o) { && parseOnlineSchemaChanges == that.parseOnlineSchemaChanges && useLegacyJsonFormat == that.useLegacyJsonFormat && assignUnboundedChunkFirst == that.assignUnboundedChunkFirst - && Objects.equals(appendOnly, that.appendOnly); + && Objects.equals(appendOnly, that.appendOnly) + && skipBinlogDeserializationOfUnsubscribedTables + == that.skipBinlogDeserializationOfUnsubscribedTables; } @Override @@ -413,7 +421,8 @@ public int hashCode() { parseOnlineSchemaChanges, useLegacyJsonFormat, assignUnboundedChunkFirst, - appendOnly); + appendOnly, + skipBinlogDeserializationOfUnsubscribedTables); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index 5ea430d94e7..3833cd95168 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -106,9 +106,10 @@ public DynamicTableSource createDynamicTableSource(Context context) { boolean useLegacyJsonFormat = config.get(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT); boolean assignUnboundedChunkFirst = config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST); - boolean appendOnly = config.get(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED); + boolean skipBinlogDeserializationOfUnsubscribedTables = + config.get(MySqlSourceOptions.SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED); if (enableParallelRead) { validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn); @@ -156,7 +157,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { parseOnLineSchemaChanges, useLegacyJsonFormat, assignUnboundedChunkFirst, - appendOnly); + appendOnly, + skipBinlogDeserializationOfUnsubscribedTables); } @Override @@ -206,6 +208,7 @@ public Set> optionalOptions() { options.add(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT); options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST); options.add(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED); + options.add(MySqlSourceOptions.SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED); return options; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index e7fd2c4cbb9..78740f53fd0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -66,6 +66,8 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.testcontainers.lifecycle.Startables; import java.sql.Connection; @@ -145,10 +147,16 @@ public void after() throws Exception { customerDatabase.dropDatabase(); } - @Test - void testReadSingleBinlogSplit() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testReadSingleBinlogSplit(boolean skipBinlogDeserializationOfUnsubscribedTables) + throws Exception { customerDatabase.createAndInitialize(); - MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"}); + MySqlSourceConfig sourceConfig = + getConfig( + new String[] {"customers_even_dist"}, + false, + skipBinlogDeserializationOfUnsubscribedTables); binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); final DataType dataType = @@ -180,28 +188,59 @@ void testReadSingleBinlogSplit() throws Exception { sourceConfig, 1, expected.length, - splits.get(splits.size() - 1).getTableId()); + splits.get(splits.size() - 1).getTableId(), + customerDatabase.qualifiedTableName("trivial")); assertEqualsInAnyOrder(Arrays.asList(expected), actual); } - @Test - void testSnapshotScanSkipBackfillWithPostHighWatermark() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testSnapshotScanSkipBackfillWithPostHighWatermark( + boolean skipBinlogDeserializationOfUnsubscribedTables) throws Exception { String tableName = "customers_even_dist"; testSnapshotScanSkipBackfill( - getSnapshotPhaseHooksWithPostHighWatermark(tableName), tableName); + getSnapshotPhaseHooksWithPostHighWatermark(tableName), + tableName, + true, + skipBinlogDeserializationOfUnsubscribedTables); } - @Test - void testSnapshotScanSkipBackfillWithPreHighWatermark() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testSnapshotScanSkipBackfillWithPreHighWatermark( + boolean skipBinlogDeserializationOfUnsubscribedTables) throws Exception { String tableName = "customers_even_dist"; testSnapshotScanSkipBackfill( - getSnapshotPhaseHooksWithPreHighWatermark(tableName), tableName); + getSnapshotPhaseHooksWithPreHighWatermark(tableName), + tableName, + true, + skipBinlogDeserializationOfUnsubscribedTables); } - void testSnapshotScanSkipBackfill(SnapshotPhaseHooks snapshotHooks, String tableName) + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testSnapshotScanWithBackfillSkipUnsubscribedTableBinlogDeserialization( + boolean skipBinlogDeserializationOfUnsubscribedTables) throws Exception { + String tableName = "customers_even_dist"; + testSnapshotScanSkipBackfill( + getSnapshotPhaseHooksWithPreHighWatermark(tableName), + tableName, + false, + skipBinlogDeserializationOfUnsubscribedTables); + } + + void testSnapshotScanSkipBackfill( + SnapshotPhaseHooks snapshotHooks, + String tableName, + boolean skipSnapshotBackfill, + boolean skipBinlogDeserializationOfUnsubscribedTables) throws Exception { customerDatabase.createAndInitialize(); - MySqlSourceConfig sourceConfig = getConfig(new String[] {tableName}, true); + MySqlSourceConfig sourceConfig = + getConfig( + new String[] {tableName}, + skipSnapshotBackfill, + skipBinlogDeserializationOfUnsubscribedTables); binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); @@ -212,32 +251,56 @@ void testSnapshotScanSkipBackfill(SnapshotPhaseHooks snapshotHooks, String table DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())); List splits = getMySqlSplits(new String[] {tableName}, sourceConfig); - String[] expected = - new String[] { - "+I[101, user_1, Shanghai, 123567891234]", - "+I[102, user_2, Shanghai, 123567891234]", - "+I[103, user_3, Shanghai, 123567891234]", - "+I[104, user_4, Shanghai, 123567891234]", - "+I[105, user_5, Shanghai, 123567891234]", - "+I[106, user_6, Shanghai, 123567891234]", - "+I[107, user_7, Shanghai, 123567891234]", - "+I[108, user_8, Shanghai, 123567891234]", - "-U[103, user_3, Shanghai, 123567891234]", - "+U[103, user_3, Hangzhou, 123567891234]", - "-D[102, user_2, Shanghai, 123567891234]", - "+I[102, user_2, Hangzhou, 123567891234]", - "-U[103, user_3, Hangzhou, 123567891234]", - "+U[103, user_3, Shanghai, 123567891234]", - "-U[103, user_3, Shanghai, 123567891234]", - "+U[103, user_3, Hangzhou, 123567891234]", - "-D[102, user_2, Hangzhou, 123567891234]", - "+I[102, user_2, Shanghai, 123567891234]", - "-U[103, user_3, Hangzhou, 123567891234]", - "+U[103, user_3, Shanghai, 123567891234]", - "+I[2001, user_22, Shanghai, 123567891234]", - "+I[2002, user_23, Shanghai, 123567891234]", - "+I[2003, user_24, Shanghai, 123567891234]" - }; + String[] expected = null; + if (skipSnapshotBackfill) { + expected = + new String[] { + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[104, user_4, Shanghai, 123567891234]", + "+I[105, user_5, Shanghai, 123567891234]", + "+I[106, user_6, Shanghai, 123567891234]", + "+I[107, user_7, Shanghai, 123567891234]", + "+I[108, user_8, Shanghai, 123567891234]", + "-U[103, user_3, Shanghai, 123567891234]", + "+U[103, user_3, Hangzhou, 123567891234]", + "-D[102, user_2, Shanghai, 123567891234]", + "+I[102, user_2, Hangzhou, 123567891234]", + "-U[103, user_3, Hangzhou, 123567891234]", + "+U[103, user_3, Shanghai, 123567891234]", + "-U[103, user_3, Shanghai, 123567891234]", + "+U[103, user_3, Hangzhou, 123567891234]", + "-D[102, user_2, Hangzhou, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "-U[103, user_3, Hangzhou, 123567891234]", + "+U[103, user_3, Shanghai, 123567891234]", + "+I[2001, user_22, Shanghai, 123567891234]", + "+I[2002, user_23, Shanghai, 123567891234]", + "+I[2003, user_24, Shanghai, 123567891234]" + }; + } else { + expected = + new String[] { + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Hangzhou, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[104, user_4, Shanghai, 123567891234]", + "+I[105, user_5, Shanghai, 123567891234]", + "+I[106, user_6, Shanghai, 123567891234]", + "+I[107, user_7, Shanghai, 123567891234]", + "+I[108, user_8, Shanghai, 123567891234]", + "-U[103, user_3, Shanghai, 123567891234]", + "+U[103, user_3, Hangzhou, 123567891234]", + "-D[102, user_2, Hangzhou, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "-U[103, user_3, Hangzhou, 123567891234]", + "+U[103, user_3, Shanghai, 123567891234]", + "+I[2001, user_22, Shanghai, 123567891234]", + "+I[2002, user_23, Shanghai, 123567891234]", + "+I[2003, user_24, Shanghai, 123567891234]" + }; + } // skip snapshot backfill makes highwatermark equal lowwatermark, so need 2 splits to List actual = @@ -246,8 +309,9 @@ void testSnapshotScanSkipBackfill(SnapshotPhaseHooks snapshotHooks, String table dataType, sourceConfig, 2, - 23, + expected.length, splits.get(splits.size() - 1).getTableId(), + customerDatabase.qualifiedTableName("trivial"), snapshotHooks); assertEqualsInAnyOrder(Arrays.asList(expected), actual); } @@ -255,16 +319,11 @@ void testSnapshotScanSkipBackfill(SnapshotPhaseHooks snapshotHooks, String table @NotNull private SnapshotPhaseHooks getSnapshotPhaseHooksWithPreHighWatermark(String tableName) { String tableId = customerDatabase.getDatabaseName() + "." + tableName; - String[] changingDataSql = - new String[] { - "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103", - "DELETE FROM " + tableId + " where id = 102", - "INSERT INTO " + tableId + " VALUES(102, 'user_2','Hangzhou','123567891234')", - "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103", - }; + String trivialTableId = customerDatabase.getDatabaseName() + ".trivial"; + String[] changingDataSql = produceBinlogSqls(tableId, trivialTableId); SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks(); - snapshotHooks.setPostHighWatermarkAction( + snapshotHooks.setPreHighWatermarkAction( (mySqlConnection, split) -> { if (split.splitId().equals(tableId + ":0")) { mySqlConnection.execute(changingDataSql); @@ -277,13 +336,8 @@ private SnapshotPhaseHooks getSnapshotPhaseHooksWithPreHighWatermark(String tabl @NotNull private SnapshotPhaseHooks getSnapshotPhaseHooksWithPostHighWatermark(String tableName) { String tableId = customerDatabase.getDatabaseName() + "." + tableName; - String[] changingDataSql = - new String[] { - "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103", - "DELETE FROM " + tableId + " where id = 102", - "INSERT INTO " + tableId + " VALUES(102, 'user_2','Hangzhou','123567891234')", - "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103", - }; + String trivialTableId = customerDatabase.getDatabaseName() + ".trivial"; + String[] changingDataSql = produceBinlogSqls(tableId, trivialTableId); SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks(); snapshotHooks.setPostHighWatermarkAction( @@ -296,10 +350,16 @@ private SnapshotPhaseHooks getSnapshotPhaseHooksWithPostHighWatermark(String tab return snapshotHooks; } - @Test - void testReadAllBinlogSplitsForOneTable() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testReadAllBinlogSplitsForOneTable(boolean skipBinlogDeserializationOfUnsubscribedTables) + throws Exception { customerDatabase.createAndInitialize(); - MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"}); + MySqlSourceConfig sourceConfig = + getConfig( + new String[] {"customers_even_dist"}, + false, + skipBinlogDeserializationOfUnsubscribedTables); binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); final DataType dataType = @@ -340,7 +400,8 @@ void testReadAllBinlogSplitsForOneTable() throws Exception { sourceConfig, splits.size(), expected.length, - splits.get(splits.size() - 1).getTableId()); + splits.get(splits.size() - 1).getTableId(), + customerDatabase.qualifiedTableName("trivial")); assertEqualsInAnyOrder(Arrays.asList(expected), actual); } @@ -375,7 +436,8 @@ void testReadAllBinlogForTableWithSingleLine() throws Exception { sourceConfig, splits.size(), expected.length, - splits.get(splits.size() - 1).getTableId()); + splits.get(splits.size() - 1).getTableId(), + customerDatabase.qualifiedTableName("trivial")); assertEqualsInAnyOrder(Arrays.asList(expected), actual); } @@ -432,15 +494,21 @@ void testReadAllBinlogSplitsForTables() throws Exception { TableId.parse( customerDatabase.getDatabaseName() + "." - + "customer_card_single_line")); + + "customer_card_single_line"), + customerDatabase.qualifiedTableName("trivial")); assertEqualsInAnyOrder(Arrays.asList(expected), actual); } - @Test - void testReadBinlogFromLatestOffset() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testReadBinlogFromLatestOffset(boolean skipBinlogDeserializationOfUnsubscribedTables) + throws Exception { customerDatabase.createAndInitialize(); MySqlSourceConfig sourceConfig = - getConfig(StartupOptions.latest(), new String[] {"customers"}); + getConfig( + StartupOptions.latest(), + new String[] {"customers"}, + skipBinlogDeserializationOfUnsubscribedTables); binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); @@ -451,7 +519,10 @@ void testReadBinlogFromLatestOffset() throws Exception { // Create some binlog events makeCustomersBinlogEvents( - mySqlConnection, customerDatabase.qualifiedTableName("customers"), false); + mySqlConnection, + customerDatabase.qualifiedTableName("customers"), + customerDatabase.qualifiedTableName("trivial"), + false); final DataType dataType = DataTypes.ROW( @@ -479,15 +550,18 @@ void testReadBinlogFromLatestOffset() throws Exception { reader.close(); } - @Test - void testReadBinlogWithoutGtidFromLatestOffset() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testReadBinlogWithoutGtidFromLatestOffset( + boolean skipBinlogDeserializationOfUnsubscribedTables) throws Exception { customerDatabaseNoGtid.createAndInitialize(); MySqlSourceConfig sourceConfig = getConfig( MYSQL_CONTAINER_NOGTID, customerDatabaseNoGtid, StartupOptions.latest(), - new String[] {"customers"}); + new String[] {"customers"}, + skipBinlogDeserializationOfUnsubscribedTables); binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); @@ -498,7 +572,10 @@ void testReadBinlogWithoutGtidFromLatestOffset() throws Exception { // Create some binlog events makeCustomersBinlogEvents( - mySqlConnection, customerDatabaseNoGtid.qualifiedTableName("customers"), false); + mySqlConnection, + customerDatabaseNoGtid.qualifiedTableName("customers"), + customerDatabaseNoGtid.qualifiedTableName("trivial"), + false); final DataType dataType = DataTypes.ROW( @@ -526,11 +603,16 @@ void testReadBinlogWithoutGtidFromLatestOffset() throws Exception { reader.close(); } - @Test - void testReadBinlogFromEarliestOffset() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testReadBinlogFromEarliestOffset(boolean skipBinlogDeserializationOfUnsubscribedTables) + throws Exception { customerDatabase.createAndInitialize(); MySqlSourceConfig sourceConfig = - getConfig(StartupOptions.earliest(), new String[] {"customers"}); + getConfig( + StartupOptions.earliest(), + new String[] {"customers"}, + skipBinlogDeserializationOfUnsubscribedTables); binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); @@ -541,7 +623,10 @@ void testReadBinlogFromEarliestOffset() throws Exception { // Create some binlog events makeCustomersBinlogEvents( - mySqlConnection, customerDatabase.qualifiedTableName("customers"), false); + mySqlConnection, + customerDatabase.qualifiedTableName("customers"), + customerDatabase.qualifiedTableName("trivial"), + false); final DataType dataType = DataTypes.ROW( @@ -624,11 +709,17 @@ void testReadBinlogFromEarliestOffsetAfterSchemaChange() throws Exception { + "binlog offset, which is not supported when startup mode is set to EARLIEST_OFFSET"); } - @Test - void testReadBinlogFromBinlogFilePosition() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testReadBinlogFromBinlogFilePosition(boolean skipBinlogDeserializationOfUnsubscribedTables) + throws Exception { // Preparations customerDatabase.createAndInitialize(); - MySqlSourceConfig connectionConfig = getConfig(new String[] {"customers"}); + MySqlSourceConfig connectionConfig = + getConfig( + new String[] {"customers"}, + false, + skipBinlogDeserializationOfUnsubscribedTables); binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig); DataType dataType = @@ -646,7 +737,8 @@ void testReadBinlogFromBinlogFilePosition() throws Exception { getConfig( StartupOptions.specificOffset( startingOffset.getFilename(), startingOffset.getPosition()), - new String[] {"customers"}); + new String[] {"customers"}, + skipBinlogDeserializationOfUnsubscribedTables); // Create reader and submit splits MySqlBinlogSplit split = createBinlogSplit(sourceConfig); @@ -655,7 +747,10 @@ void testReadBinlogFromBinlogFilePosition() throws Exception { // Create some binlog events makeCustomersBinlogEvents( - mySqlConnection, customerDatabase.qualifiedTableName("customers"), false); + mySqlConnection, + customerDatabase.qualifiedTableName("customers"), + customerDatabase.qualifiedTableName("trivial"), + false); // Read with binlog split reader and validate String[] expected = @@ -733,11 +828,17 @@ void testSkippingEvents() throws Exception { assertEqualsInOrder(Arrays.asList(expected), actual); } - @Test - void testReadBinlogFromGtidSet() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testReadBinlogFromGtidSet(boolean skipBinlogDeserializationOfUnsubscribedTables) + throws Exception { // Preparations customerDatabase.createAndInitialize(); - MySqlSourceConfig connectionConfig = getConfig(new String[] {"customers"}); + MySqlSourceConfig connectionConfig = + getConfig( + new String[] {"customers"}, + false, + skipBinlogDeserializationOfUnsubscribedTables); binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig); DataType dataType = @@ -754,7 +855,8 @@ void testReadBinlogFromGtidSet() throws Exception { MySqlSourceConfig sourceConfig = getConfig( StartupOptions.specificOffset(startingOffset.getGtidSet()), - new String[] {"customers"}); + new String[] {"customers"}, + skipBinlogDeserializationOfUnsubscribedTables); // Create reader and submit splits MySqlBinlogSplit split = createBinlogSplit(sourceConfig); @@ -763,7 +865,10 @@ void testReadBinlogFromGtidSet() throws Exception { // Create some binlog events makeCustomersBinlogEvents( - mySqlConnection, customerDatabase.qualifiedTableName("customers"), false); + mySqlConnection, + customerDatabase.qualifiedTableName("customers"), + customerDatabase.qualifiedTableName("trivial"), + false); // Read with binlog split reader and validate String[] expected = @@ -869,11 +974,17 @@ void testRestoreFromCheckpointWithGtidSetAndSkippingEventsAndRows() throws Excep assertEqualsInOrder(Arrays.asList(expected), actual); } - @Test - void testReadBinlogFromTimestamp() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testReadBinlogFromTimestamp(boolean skipBinlogDeserializationOfUnsubscribedTables) + throws Exception { // Preparations customerDatabase.createAndInitialize(); - MySqlSourceConfig connectionConfig = getConfig(new String[] {"customers"}); + MySqlSourceConfig connectionConfig = + getConfig( + new String[] {"customers"}, + false, + skipBinlogDeserializationOfUnsubscribedTables); binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig); DataType dataType = @@ -892,7 +1003,10 @@ void testReadBinlogFromTimestamp() throws Exception { // Create a new config to start reading from the offset captured above MySqlSourceConfig sourceConfig = - getConfig(StartupOptions.timestamp(startTimestamp), new String[] {"customers"}); + getConfig( + StartupOptions.timestamp(startTimestamp), + new String[] {"customers"}, + skipBinlogDeserializationOfUnsubscribedTables); // Create reader and submit splits MySqlBinlogSplit split = createBinlogSplit(sourceConfig); @@ -901,7 +1015,10 @@ void testReadBinlogFromTimestamp() throws Exception { // Create some binlog events makeCustomersBinlogEvents( - mySqlConnection, customerDatabase.qualifiedTableName("customers"), false); + mySqlConnection, + customerDatabase.qualifiedTableName("customers"), + customerDatabase.qualifiedTableName("trivial"), + false); // Read with binlog split reader and validate String[] expected = @@ -984,8 +1101,10 @@ void testReadBinlogFromTimestampAfterSchemaChange() throws Exception { assertEqualsInOrder(Arrays.asList(expected), actual); } - @Test - void testHeartbeatEvent() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testHeartbeatEvent(boolean skipBinlogDeserializationOfUnsubscribedTables) + throws Exception { // Initialize database customerDatabase.createAndInitialize(); @@ -1006,7 +1125,8 @@ void testHeartbeatEvent() throws Exception { MYSQL_CONTAINER, customerDatabase, new String[] {"customers"}, - false) + false, + skipBinlogDeserializationOfUnsubscribedTables) .startupOptions(StartupOptions.latest()) .heartbeatInterval(heartbeatInterval) .debeziumProperties(dbzProps) @@ -1023,6 +1143,7 @@ void testHeartbeatEvent() throws Exception { makeCustomersBinlogEvents( mySqlConnection, binlogSplit.getTableSchemas().keySet().iterator().next().toString(), + customerDatabase.qualifiedTableName("trivial"), false); // Keep polling until we receive heartbeat. We don't validate offset of heartbeat here @@ -1330,7 +1451,8 @@ private List readBinlogSplitsFromSnapshotSplits( MySqlSourceConfig sourceConfig, int scanSplitsNum, int expectedSize, - TableId binlogChangeTableId) + TableId binlogChangeTableId, + String trivialTableName) throws Exception { return readBinlogSplitsFromSnapshotSplits( sqlSplits, @@ -1339,6 +1461,7 @@ private List readBinlogSplitsFromSnapshotSplits( scanSplitsNum, expectedSize, binlogChangeTableId, + trivialTableName, SnapshotPhaseHooks.empty()); } @@ -1349,6 +1472,7 @@ private List readBinlogSplitsFromSnapshotSplits( int scanSplitsNum, int expectedSize, TableId binlogChangeTableId, + String trivialTableName, SnapshotPhaseHooks snapshotHooks) throws Exception { final StatefulTaskContext statefulTaskContext = @@ -1405,6 +1529,7 @@ private List readBinlogSplitsFromSnapshotSplits( makeCustomersBinlogEvents( statefulTaskContext.getConnection(), binlogChangeTableId.toString(), + trivialTableName, scanSplitsNum == 1); } else { makeCustomerCardsBinlogEvents( @@ -1438,14 +1563,29 @@ private void updateCustomersTableInBulk(JdbcConnection connection, String tableI } private void makeCustomersBinlogEvents( - JdbcConnection connection, String tableId, boolean firstSplitOnly) throws SQLException { + JdbcConnection connection, + String tableId, + String trivialTableId, + boolean firstSplitOnly) + throws SQLException { // make binlog events for the first split connection.setAutoCommit(false); connection.execute( + "UPDATE " + trivialTableId + " SET address = 'Chengdu' where id = 102", "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103", + "INSERT INTO " + + trivialTableId + + " VALUES(10001, 'user_11','Wuhan','123567891234')", "DELETE FROM " + tableId + " where id = 102", + "UPDATE " + trivialTableId + " SET address = 'Beijing' where id = 103", + "DELETE FROM " + trivialTableId + " where id = 10001", "INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')", - "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103"); + "INSERT INTO " + + trivialTableId + + " VALUES(10002, 'user_22','Guangzhou','123567891234')", + "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103", + "UPDATE " + trivialTableId + " SET address = 'Tianjin' where id = 10002", + "DELETE FROM " + trivialTableId + " where id = 10002"); connection.commit(); if (!firstSplitOnly) { @@ -1453,6 +1593,10 @@ private void makeCustomersBinlogEvents( connection.execute("UPDATE " + tableId + " SET name = 'Hangzhou' where id = 1010"); connection.commit(); + connection.execute( + "UPDATE " + trivialTableId + " SET name = 'Shanghai' where id = 1010"); + connection.commit(); + // make binlog events for the last split connection.execute( "INSERT INTO " @@ -1460,6 +1604,12 @@ private void makeCustomersBinlogEvents( + " VALUES(2001, 'user_22','Shanghai','123567891234')"); connection.commit(); + connection.execute( + "INSERT INTO " + + trivialTableId + + " VALUES(2001, 'user_22','Beijing','123567891234')"); + connection.commit(); + // make schema change binlog events connection.execute( "ALTER TABLE " @@ -1467,6 +1617,12 @@ private void makeCustomersBinlogEvents( + " ADD COLUMN email VARCHAR(128) DEFAULT 'user@flink.apache.org'"); connection.commit(); + connection.execute( + "ALTER TABLE " + + trivialTableId + + " ADD COLUMN email VARCHAR(128) DEFAULT 'user@flink.apache.org'"); + connection.commit(); + // make binlog events after schema changed connection.execute( "INSERT INTO " @@ -1474,12 +1630,24 @@ private void makeCustomersBinlogEvents( + " VALUES(2002, 'user_23','Shanghai','123567891234', 'test1@gmail.com')"); connection.commit(); + connection.execute( + "INSERT INTO " + + trivialTableId + + " VALUES(2002, 'user_23','Shanghai','123567891234', 'test1@gmail.com')"); + connection.commit(); + // make binlog again connection.execute( "INSERT INTO " + tableId + " VALUES(2003, 'user_24','Shanghai','123567891234', 'test2@gmail.com')"); connection.commit(); + + connection.execute( + "INSERT INTO " + + trivialTableId + + " VALUES(2003, 'user_24','Shanghai','123567891234', 'test2@gmail.com')"); + connection.commit(); } } @@ -1593,7 +1761,19 @@ private List getMySqlSplits( } private MySqlSourceConfig getConfig(StartupOptions startupOptions, String[] captureTables) { - return getConfig(MYSQL_CONTAINER, customerDatabase, startupOptions, captureTables); + return getConfig(startupOptions, captureTables, false); + } + + private MySqlSourceConfig getConfig( + StartupOptions startupOptions, + String[] captureTables, + boolean skipBinlogDeserializationOfUnsubscribedTables) { + return getConfig( + MYSQL_CONTAINER, + customerDatabase, + startupOptions, + captureTables, + skipBinlogDeserializationOfUnsubscribedTables); } private MySqlSourceConfig getConfig( @@ -1601,31 +1781,53 @@ private MySqlSourceConfig getConfig( UniqueDatabase database, StartupOptions startupOptions, String[] captureTables) { - return getConfigFactory(container, database, captureTables, false) + return getConfig(container, database, startupOptions, captureTables, false); + } + + private MySqlSourceConfig getConfig( + MySqlContainer container, + UniqueDatabase database, + StartupOptions startupOptions, + String[] captureTables, + boolean skipBinlogDeserializationOfUnsubscribedTables) { + return getConfigFactory( + container, + database, + captureTables, + false, + skipBinlogDeserializationOfUnsubscribedTables) .startupOptions(startupOptions) .createConfig(0); } private MySqlSourceConfig getConfig(String[] captureTables) { - return getConfig(captureTables, false); + return getConfig(captureTables, false, false); } - private MySqlSourceConfig getConfig(String[] captureTables, boolean skipSnapshotBackfill) { + private MySqlSourceConfig getConfig( + String[] captureTables, + boolean skipSnapshotBackfill, + boolean skipBinlogDeserializationOfUnsubscribedTables) { return getConfigFactory( - MYSQL_CONTAINER, customerDatabase, captureTables, skipSnapshotBackfill) + MYSQL_CONTAINER, + customerDatabase, + captureTables, + skipSnapshotBackfill, + skipBinlogDeserializationOfUnsubscribedTables) .createConfig(0); } private MySqlSourceConfig getConfig( MySqlContainer container, UniqueDatabase database, String[] captureTables) { - return getConfigFactory(container, database, captureTables, false).createConfig(0); + return getConfigFactory(container, database, captureTables, false, false).createConfig(0); } private MySqlSourceConfigFactory getConfigFactory( MySqlContainer container, UniqueDatabase database, String[] captureTables, - boolean skipSnapshotBackfill) { + boolean skipSnapshotBackfill, + boolean skipBinlogDeserializationOfUnsubscribedTables) { String[] captureTableIds = Arrays.stream(captureTables) .map(tableName -> database.getDatabaseName() + "." + tableName) @@ -1640,6 +1842,8 @@ private MySqlSourceConfigFactory getConfigFactory( .splitSize(4) .fetchSize(2) .skipSnapshotBackfill(skipSnapshotBackfill) + .skipBinlogDeserializationOfUnsubscribedTables( + skipBinlogDeserializationOfUnsubscribedTables) .password(database.getPassword()); } @@ -1649,6 +1853,32 @@ private void addColumnToTable(JdbcConnection connection, String tableId) throws connection.commit(); } + /** + * To produce binlog of both tableId and trivialTableId, where table id is the target read table + * and trivialTableId is not relevant. + * + * @param tableId target table to read. + * @param trivialTableId irrelevant table that not being subscribed. + * @return sqls of both table to produce binlog data. + */ + private static String[] produceBinlogSqls(String tableId, String trivialTableId) { + return new String[] { + "UPDATE " + trivialTableId + " SET address = 'Chengdu' where id = 102", + "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103", + "INSERT INTO " + trivialTableId + " VALUES(10001, 'user_11','Wuhan','123567891234')", + "DELETE FROM " + tableId + " where id = 102", + "UPDATE " + trivialTableId + " SET address = 'Beijing' where id = 103", + "DELETE FROM " + trivialTableId + " where id = 10001", + "INSERT INTO " + tableId + " VALUES(102, 'user_2','Hangzhou','123567891234')", + "INSERT INTO " + + trivialTableId + + " VALUES(10002, 'user_22','Guangzhou','123567891234')", + "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103", + "UPDATE " + trivialTableId + " SET address = 'Tianjin' where id = 10002", + "DELETE FROM " + trivialTableId + " where id = 10002", + }; + } + /** This stateful task context will skip valid the starting offset. */ private static class TestStatefulTaskContext extends StatefulTaskContext { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index 01c2dff84da..48c17e95efd 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -52,6 +52,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.HEARTBEAT_INTERVAL; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES; +import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; @@ -129,7 +130,8 @@ void testCommonProperties() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -179,7 +181,8 @@ void testEnableParallelReadSource() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -225,7 +228,8 @@ void testEnableParallelReadSourceWithSingleServerId() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -269,7 +273,8 @@ void testEnableParallelReadSourceLatestOffset() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -330,7 +335,8 @@ void testOptionalProperties() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), true, SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue()); Assertions.assertThat(actualSource) .isEqualTo(expectedSource) .isInstanceOf(MySqlTableSource.class); @@ -389,7 +395,8 @@ void testStartupFromSpecificOffset() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -431,7 +438,8 @@ void testStartupFromInitial() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -474,7 +482,8 @@ void testStartupFromEarliestOffset() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -518,7 +527,8 @@ void testStartupFromSpecificTimestamp() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -560,7 +570,8 @@ void testStartupFromLatestOffset() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -607,7 +618,8 @@ void testMetadataColumns() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue()); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name"); @@ -810,7 +822,8 @@ void testEnablingExperimentalOptions() { true, true, true, - false); + false, + SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql index e4df63f1a33..d7c16e923fc 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql @@ -326,3 +326,34 @@ CREATE TABLE default_value_test ( INSERT INTO default_value_test VALUES (1,'user1','Shanghai',123567), (2,'user2','Shanghai',123567); + +-- table that is not subscribed +CREATE TABLE trivial ( + id INTEGER NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + address VARCHAR(1024), + phone_number VARCHAR(512) +); + +INSERT INTO trivial +VALUES (101,"user_1","Beijing","123567891234"), + (102,"user_2","Beijing","123567891234"), + (103,"user_3","Beijing","123567891234"), + (109,"user_4","Beijing","123567891234"), + (110,"user_5","Beijing","123567891234"), + (111,"user_6","Beijing","123567891234"), + (118,"user_7","Beijing","123567891234"), + (121,"user_8","Beijing","123567891234"), + (123,"user_9","Beijing","123567891234"), + (1009,"user_10","Beijing","123567891234"), + (1010,"user_11","Beijing","123567891234"), + (1011,"user_12","Beijing","123567891234"), + (1012,"user_13","Beijing","123567891234"), + (1013,"user_14","Beijing","123567891234"), + (1014,"user_15","Beijing","123567891234"), + (1015,"user_16","Beijing","123567891234"), + (1016,"user_17","Beijing","123567891234"), + (1017,"user_18","Beijing","123567891234"), + (1018,"user_19","Beijing","123567891234"), + (1019,"user_20","Beijing","123567891234"), + (2000,"user_21","Beijing","123567891234"); \ No newline at end of file