SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED =
+ ConfigOptions.key("scan.binlog.skip-unsubscribed-tables.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to skip deserialization of binlog row events for unsubscribed tables.");
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/DeleteRowsEventDataDeserializer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/DeleteRowsEventDataDeserializer.java
new file mode 100644
index 00000000000..137c6bdddd1
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/DeleteRowsEventDataDeserializer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.github.shyiko.mysql.binlog.event.deserialization;
+
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Copied from mysql-binlog-connector 0.27.2 to add a {@link TableIdFilter}.
+ *
+ * Line 52-56: Add a new constructor with {@link TableIdFilter} supplied.
+ *
+ *
Line 70-74: Use a {@link TableIdFilter} to skip the binlog deserialization of unwanted tables.
+ */
+public class DeleteRowsEventDataDeserializer
+ extends AbstractRowsEventDataDeserializer {
+
+ private boolean mayContainExtraInformation;
+
+ /** the table id filter to skip further deserialization of unsubscribed table ids. */
+ private final TableIdFilter tableIdFilter;
+
+ public DeleteRowsEventDataDeserializer(Map tableMapEventByTableId) {
+ this(tableMapEventByTableId, TableIdFilter.all());
+ }
+
+ public DeleteRowsEventDataDeserializer(
+ Map tableMapEventByTableId, TableIdFilter tableIdFilter) {
+ super(tableMapEventByTableId);
+ this.tableIdFilter = tableIdFilter;
+ }
+
+ public DeleteRowsEventDataDeserializer setMayContainExtraInformation(
+ boolean mayContainExtraInformation) {
+ this.mayContainExtraInformation = mayContainExtraInformation;
+ return this;
+ }
+
+ @Override
+ public DeleteRowsEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
+ DeleteRowsEventData eventData = new DeleteRowsEventData();
+ eventData.setTableId(inputStream.readLong(6));
+
+ // skip further deserialization if the table id is unsubscribed
+ if (!tableIdFilter.test(eventData.getTableId())) {
+ eventData.setIncludedColumns(null);
+ eventData.setRows(Collections.emptyList());
+ return eventData;
+ }
+
+ inputStream.readInteger(2); // reserved
+ if (mayContainExtraInformation) {
+ int extraInfoLength = inputStream.readInteger(2);
+ inputStream.skip(extraInfoLength - 2);
+ }
+ int numberOfColumns = inputStream.readPackedInteger();
+ eventData.setIncludedColumns(inputStream.readBitSet(numberOfColumns, true));
+ eventData.setRows(
+ deserializeRows(
+ eventData.getTableId(), eventData.getIncludedColumns(), inputStream));
+ return eventData;
+ }
+
+ private List deserializeRows(
+ long tableId, BitSet includedColumns, ByteArrayInputStream inputStream)
+ throws IOException {
+ List result = new LinkedList();
+ while (inputStream.available() > 0) {
+ result.add(deserializeRow(tableId, includedColumns, inputStream));
+ }
+ return result;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableIdFilter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableIdFilter.java
new file mode 100644
index 00000000000..20916a9054e
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableIdFilter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.github.shyiko.mysql.binlog.event.deserialization;
+
+import java.util.function.Predicate;
+
+/** The filter used for skipping the binlog deserialization of unsubscribed table. */
+public interface TableIdFilter extends Predicate {
+
+ @Override
+ boolean test(Long tableId);
+
+ static TableIdFilter all() {
+ return tableId -> true;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/UpdateRowsEventDataDeserializer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/UpdateRowsEventDataDeserializer.java
new file mode 100644
index 00000000000..2e99b740ede
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/UpdateRowsEventDataDeserializer.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.github.shyiko.mysql.binlog.event.deserialization;
+
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Copied from mysql-binlog-connector 0.27.2 to add a {@link TableIdFilter}.
+ *
+ * Line 53-57: Add a new constructor with {@link TableIdFilter} supplied.
+ *
+ *
Line 71-75: Use a {@link TableIdFilter} to skip the binlog deserialization of unwanted tables.
+ */
+public class UpdateRowsEventDataDeserializer
+ extends AbstractRowsEventDataDeserializer {
+
+ private boolean mayContainExtraInformation;
+
+ /** the table id filter to skip further deserialization of unsubscribed table ids. */
+ private final TableIdFilter tableIdFilter;
+
+ public UpdateRowsEventDataDeserializer(Map tableMapEventByTableId) {
+ this(tableMapEventByTableId, TableIdFilter.all());
+ }
+
+ public UpdateRowsEventDataDeserializer(
+ Map tableMapEventByTableId, TableIdFilter tableIdFilter) {
+ super(tableMapEventByTableId);
+ this.tableIdFilter = tableIdFilter;
+ }
+
+ public UpdateRowsEventDataDeserializer setMayContainExtraInformation(
+ boolean mayContainExtraInformation) {
+ this.mayContainExtraInformation = mayContainExtraInformation;
+ return this;
+ }
+
+ @Override
+ public UpdateRowsEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
+ UpdateRowsEventData eventData = new UpdateRowsEventData();
+ eventData.setTableId(inputStream.readLong(6));
+
+ // skip further deserialization if the table id is unsubscribed
+ if (!tableIdFilter.test(eventData.getTableId())) {
+ eventData.setIncludedColumns(null);
+ eventData.setRows(Collections.emptyList());
+ return eventData;
+ }
+
+ inputStream.skip(2); // reserved
+ if (mayContainExtraInformation) {
+ int extraInfoLength = inputStream.readInteger(2);
+ inputStream.skip(extraInfoLength - 2);
+ }
+ int numberOfColumns = inputStream.readPackedInteger();
+ eventData.setIncludedColumnsBeforeUpdate(inputStream.readBitSet(numberOfColumns, true));
+ eventData.setIncludedColumns(inputStream.readBitSet(numberOfColumns, true));
+ eventData.setRows(deserializeRows(eventData, inputStream));
+ return eventData;
+ }
+
+ private List> deserializeRows(
+ UpdateRowsEventData eventData, ByteArrayInputStream inputStream) throws IOException {
+ long tableId = eventData.getTableId();
+ BitSet includedColumnsBeforeUpdate = eventData.getIncludedColumnsBeforeUpdate(),
+ includedColumns = eventData.getIncludedColumns();
+ List> rows =
+ new ArrayList>();
+ while (inputStream.available() > 0) {
+ rows.add(
+ new AbstractMap.SimpleEntry(
+ deserializeRow(tableId, includedColumnsBeforeUpdate, inputStream),
+ deserializeRow(tableId, includedColumns, inputStream)));
+ }
+ return rows;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/WriteRowsEventDataDeserializer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/WriteRowsEventDataDeserializer.java
new file mode 100644
index 00000000000..1c7330b75b5
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/WriteRowsEventDataDeserializer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.github.shyiko.mysql.binlog.event.deserialization;
+
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Copied from mysql-binlog-connector 0.27.2 to add a {@link TableIdFilter}.
+ *
+ * Line 52-56: Add a new constructor with {@link TableIdFilter} supplied.
+ *
+ *
Line 70-74: Use a {@link TableIdFilter} to skip the binlog deserialization of unwanted tables.
+ */
+public class WriteRowsEventDataDeserializer
+ extends AbstractRowsEventDataDeserializer {
+
+ private boolean mayContainExtraInformation;
+
+ /** the table id filter to skip further deserialization of unsubscribed table ids. */
+ private final TableIdFilter tableIdFilter;
+
+ public WriteRowsEventDataDeserializer(Map tableMapEventByTableId) {
+ this(tableMapEventByTableId, TableIdFilter.all());
+ }
+
+ public WriteRowsEventDataDeserializer(
+ Map tableMapEventByTableId, TableIdFilter tableIdFilter) {
+ super(tableMapEventByTableId);
+ this.tableIdFilter = tableIdFilter;
+ }
+
+ public WriteRowsEventDataDeserializer setMayContainExtraInformation(
+ boolean mayContainExtraInformation) {
+ this.mayContainExtraInformation = mayContainExtraInformation;
+ return this;
+ }
+
+ @Override
+ public WriteRowsEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
+ WriteRowsEventData eventData = new WriteRowsEventData();
+ eventData.setTableId(inputStream.readLong(6));
+
+ // skip further deserialization if the table id is unsubscribed
+ if (!tableIdFilter.test(eventData.getTableId())) {
+ eventData.setIncludedColumns(null);
+ eventData.setRows(Collections.emptyList());
+ return eventData;
+ }
+
+ inputStream.skip(2); // reserved
+ if (mayContainExtraInformation) {
+ int extraInfoLength = inputStream.readInteger(2);
+ inputStream.skip(extraInfoLength - 2);
+ }
+ int numberOfColumns = inputStream.readPackedInteger();
+ eventData.setIncludedColumns(inputStream.readBitSet(numberOfColumns, true));
+ eventData.setRows(
+ deserializeRows(
+ eventData.getTableId(), eventData.getIncludedColumns(), inputStream));
+ return eventData;
+ }
+
+ private List deserializeRows(
+ long tableId, BitSet includedColumns, ByteArrayInputStream inputStream)
+ throws IOException {
+ List result = new LinkedList();
+ while (inputStream.available() > 0) {
+ result.add(deserializeRow(tableId, includedColumns, inputStream));
+ }
+ return result;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
index 557a149a2d0..9bfbdbfe79d 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
@@ -24,6 +24,7 @@
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
+import com.github.shyiko.mysql.binlog.event.deserialization.TableIdFilter;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory;
@@ -87,18 +88,21 @@
* Copied from Debezium project(1.9.8.Final) to fix
* https://github.com/ververica/flink-cdc-connectors/issues/1944.
*
- * Line 1432-1443 : Adjust GTID merging logic to support recovering from job which previously
+ *
Line 1454-1466 : Adjust GTID merging logic to support recovering from job which previously
* specifying starting offset on start. Uses {@link GtidUtils#fixOldChannelsGtidSet} for shared
* EARLIEST/LATEST logic.
*
- *
Line 1444-1452 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions
+ *
Line 1467-1475 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions
* when checkpoint GTID has non-contiguous ranges. Delegates to {@link
* GtidUtils#computeLatestModeGtidSet}. See FLINK-39149.
*
- *
Line 1490 : Add more error details for some exceptions.
+ *
Line 1526 : Add more error details for some exceptions.
*
- *
Line 951-963 : Use iterator instead of index-based loop to avoid O(n²) complexity when
+ *
Line 965-977 : Use iterator instead of index-based loop to avoid O(n²) complexity when
* processing LinkedList rows in handleChange method. See FLINK-38846.
+ *
+ *
Line 324-330, 1366-1373 : Use a {@link TableIdFilter} to skip binlog deserialization of
+ * unmatched tables.
*/
public class MySqlStreamingChangeEventSource
implements StreamingChangeEventSource {
@@ -203,7 +207,8 @@ public MySqlStreamingChangeEventSource(
ErrorHandler errorHandler,
Clock clock,
MySqlTaskContext taskContext,
- MySqlStreamingChangeEventSourceMetrics metrics) {
+ MySqlStreamingChangeEventSourceMetrics metrics,
+ boolean skipBinlogDeserializationOfUnsubscribedTables) {
this.taskContext = taskContext;
this.connectorConfig = connectorConfig;
@@ -316,29 +321,37 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
}
};
+ LOGGER.info(
+ "Skip binlog deserialization for unsubscribed tables: {}",
+ skipBinlogDeserializationOfUnsubscribedTables);
+ final TableIdFilter tableIdFilter =
+ skipBinlogDeserializationOfUnsubscribedTables
+ ? getTableIdDeserializationFilter()
+ : TableIdFilter.all();
+
// Add our custom deserializers ...
eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(
EventType.WRITE_ROWS,
- new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId));
+ new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId, tableIdFilter));
eventDeserializer.setEventDataDeserializer(
EventType.UPDATE_ROWS,
- new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId));
+ new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId, tableIdFilter));
eventDeserializer.setEventDataDeserializer(
EventType.DELETE_ROWS,
- new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId));
+ new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId, tableIdFilter));
eventDeserializer.setEventDataDeserializer(
EventType.EXT_WRITE_ROWS,
- new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId)
+ new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId, tableIdFilter)
.setMayContainExtraInformation(true));
eventDeserializer.setEventDataDeserializer(
EventType.EXT_UPDATE_ROWS,
- new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId)
+ new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId, tableIdFilter)
.setMayContainExtraInformation(true));
eventDeserializer.setEventDataDeserializer(
EventType.EXT_DELETE_ROWS,
- new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId)
+ new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId, tableIdFilter)
.setMayContainExtraInformation(true));
client.setEventDeserializer(eventDeserializer);
}
@@ -1350,6 +1363,15 @@ protected void initSSLContext(SSLContext sc) throws GeneralSecurityException {
return null;
}
+ private TableIdFilter getTableIdDeserializationFilter() {
+ return tableId -> {
+ // since only subscribed table is recording schema, the result could be null
+ TableId table = taskContext.getSchema().getTableId(tableId);
+ return table != null
+ && connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(table);
+ };
+ }
+
private void logStreamingSourceState() {
logStreamingSourceState(Level.ERROR);
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/RowDeserializers.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/RowDeserializers.java
new file mode 100644
index 00000000000..fdd3c5356a0
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/RowDeserializers.java
@@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.mysql;
+
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer;
+import com.github.shyiko.mysql.binlog.event.deserialization.DeleteRowsEventDataDeserializer;
+import com.github.shyiko.mysql.binlog.event.deserialization.TableIdFilter;
+import com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer;
+import com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.Year;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.Map;
+
+/**
+ * Copied from Debezium project(1.9.8.Final) to add constructors with {@link TableIdFilter}.
+ *
+ * Line 64-67: add an constructors with {@link TableIdFilter} of {@link DeleteRowsDeserializer}.
+ *
+ *
Line 141-144: add an constructors with {@link TableIdFilter} of {@link
+ * UpdateRowsDeserializer}.
+ *
+ *
Line 218-221: add an constructors with {@link TableIdFilter} of {@link WriteRowsDeserializer}.
+ */
+public class RowDeserializers {
+
+ /**
+ * A specialization of {@link DeleteRowsEventDataDeserializer} that converts MySQL {@code DATE},
+ * {@code TIME}, {@code DATETIME}, and {@code TIMESTAMP} values to {@link LocalDate}, {@link
+ * LocalTime}, {@link LocalDateTime}, and {@link OffsetDateTime} objects, respectively.
+ */
+ public static class DeleteRowsDeserializer extends DeleteRowsEventDataDeserializer {
+
+ public DeleteRowsDeserializer(Map tableMapEventByTableId) {
+ super(tableMapEventByTableId);
+ }
+
+ public DeleteRowsDeserializer(
+ Map tableMapEventByTableId, TableIdFilter tableIdFilter) {
+ super(tableMapEventByTableId, tableIdFilter);
+ }
+
+ @Override
+ protected Serializable deserializeString(int length, ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeString(length, inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeVarString(int meta, ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeVarString(meta, inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeDate(ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeDate(inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeDatetime(ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeDatetime(inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeDatetimeV2(meta, inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeTimeV2(int meta, ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeTimeV2(meta, inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeTime(ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeTime(inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeTimestamp(ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeTimestamp(inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeTimestampV2(int meta, ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeTimestampV2(meta, inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeYear(ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeYear(inputStream);
+ }
+ }
+
+ /**
+ * A specialization of {@link UpdateRowsEventDataDeserializer} that converts MySQL {@code DATE},
+ * {@code TIME}, {@code DATETIME}, and {@code TIMESTAMP} values to {@link LocalDate}, {@link
+ * LocalTime}, {@link LocalDateTime}, and {@link OffsetDateTime} objects, respectively.
+ */
+ public static class UpdateRowsDeserializer extends UpdateRowsEventDataDeserializer {
+
+ public UpdateRowsDeserializer(Map tableMapEventByTableId) {
+ super(tableMapEventByTableId);
+ }
+
+ public UpdateRowsDeserializer(
+ Map tableMapEventByTableId, TableIdFilter tableIdFilter) {
+ super(tableMapEventByTableId, tableIdFilter);
+ }
+
+ @Override
+ protected Serializable deserializeString(int length, ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeString(length, inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeVarString(int meta, ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeVarString(meta, inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeDate(ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeDate(inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeDatetime(ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeDatetime(inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeDatetimeV2(meta, inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeTimeV2(int meta, ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeTimeV2(meta, inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeTime(ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeTime(inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeTimestamp(ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeTimestamp(inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeTimestampV2(int meta, ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeTimestampV2(meta, inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeYear(ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeYear(inputStream);
+ }
+ }
+
+ /**
+ * A specialization of {@link WriteRowsEventDataDeserializer} that converts MySQL {@code DATE},
+ * {@code TIME}, {@code DATETIME}, and {@code TIMESTAMP} values to {@link LocalDate}, {@link
+ * LocalTime}, {@link LocalDateTime}, and {@link OffsetDateTime} objects, respectively.
+ */
+ public static class WriteRowsDeserializer extends WriteRowsEventDataDeserializer {
+
+ public WriteRowsDeserializer(Map tableMapEventByTableId) {
+ super(tableMapEventByTableId);
+ }
+
+ public WriteRowsDeserializer(
+ Map tableMapEventByTableId, TableIdFilter tableIdFilter) {
+ super(tableMapEventByTableId, tableIdFilter);
+ }
+
+ @Override
+ protected Serializable deserializeString(int length, ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeString(length, inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeVarString(int meta, ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeVarString(meta, inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeDate(ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeDate(inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeDatetime(ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeDatetime(inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeDatetimeV2(meta, inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeTimeV2(int meta, ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeTimeV2(meta, inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeTime(ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeTime(inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeTimestamp(ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeTimestamp(inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeTimestampV2(int meta, ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeTimestampV2(meta, inputStream);
+ }
+
+ @Override
+ protected Serializable deserializeYear(ByteArrayInputStream inputStream)
+ throws IOException {
+ return RowDeserializers.deserializeYear(inputStream);
+ }
+ }
+
+ private static final int MASK_10_BITS = (1 << 10) - 1;
+ private static final int MASK_6_BITS = (1 << 6) - 1;
+
+ /**
+ * Converts a MySQL string to a {@code byte[]}.
+ *
+ * @param length the number of bytes used to store the length of the string
+ * @param inputStream the binary stream containing the raw binlog event data for the value
+ * @return the {@code byte[]} object
+ * @throws IOException if there is an error reading from the binlog event data
+ */
+ protected static Serializable deserializeString(int length, ByteArrayInputStream inputStream)
+ throws IOException {
+ // charset is not present in the binary log (meaning there is no way to distinguish between
+ // CHAR / BINARY)
+ // as a result - return byte[] instead of an actual String
+ int stringLength = length < 256 ? inputStream.readInteger(1) : inputStream.readInteger(2);
+ return inputStream.read(stringLength);
+ }
+
+ /**
+ * Converts a MySQL string to a {@code byte[]}.
+ *
+ * @param meta the {@code meta} value containing the number of bytes in the length field
+ * @param inputStream the binary stream containing the raw binlog event data for the value
+ * @return the {@code byte[]} object
+ * @throws IOException if there is an error reading from the binlog event data
+ */
+ protected static Serializable deserializeVarString(int meta, ByteArrayInputStream inputStream)
+ throws IOException {
+ int varcharLength = meta < 256 ? inputStream.readInteger(1) : inputStream.readInteger(2);
+ return inputStream.read(varcharLength);
+ }
+
+ /**
+ * Converts a MySQL {@code DATE} value to a {@link LocalDate}.
+ *
+ * This method treats all zero values for
+ * {@code DATE} columns as NULL, since they cannot be accurately represented as valid {@link
+ * LocalDate} objects.
+ *
+ * @param inputStream the binary stream containing the raw binlog event data for the value
+ * @return the {@link LocalDate} object
+ * @throws IOException if there is an error reading from the binlog event data
+ */
+ protected static Serializable deserializeDate(ByteArrayInputStream inputStream)
+ throws IOException {
+ int value = inputStream.readInteger(3);
+ int day = value % 32; // 1-based day of the month
+ value >>>= 5;
+ int month = value % 16; // 1-based month number
+ int year = value >> 4;
+ if (year == 0 || month == 0 || day == 0) {
+ return null;
+ }
+ return LocalDate.of(year, month, day);
+ }
+
+ /**
+ * Converts a MySQL {@code TIME} value without fractional seconds to a {@link
+ * java.time.Duration}.
+ *
+ * @param inputStream the binary stream containing the raw binlog event data for the value
+ * @return the {@link LocalTime} object
+ * @throws IOException if there is an error reading from the binlog event data
+ */
+ protected static Serializable deserializeTime(ByteArrayInputStream inputStream)
+ throws IOException {
+ // Times are stored as an integer as `HHMMSS`, so we need to split out the digits ...
+ int value = inputStream.readInteger(3);
+ int[] split = split(value, 100, 3);
+ int hours = split[2];
+ int minutes = split[1];
+ int seconds = split[0];
+ return Duration.ofHours(hours).plusMinutes(minutes).plusSeconds(seconds);
+ }
+
+ /**
+ * Converts a MySQL {@code TIME} value with fractional seconds to a {@link
+ * java.time.Duration}.
+ *
+ * @param meta the {@code meta} value containing the fractional second precision, or {@code fsp}
+ * @param inputStream the binary stream containing the raw binlog event data for the value
+ * @return the {@link java.time.Duration} object
+ * @throws IOException if there is an error reading from the binlog event data
+ */
+ protected static Serializable deserializeTimeV2(int meta, ByteArrayInputStream inputStream)
+ throws IOException {
+ /*
+ * (in big endian)
+ *
+ * 1 bit sign (1= non-negative, 0= negative)
+ * 1 bit unused (reserved for future extensions)
+ * 10 bits hour (0-838)
+ * 6 bits minute (0-59)
+ * 6 bits second (0-59)
+ *
+ * (3 bytes in total)
+ *
+ * + fractional-seconds storage (size depends on meta)
+ */
+ long time = bigEndianLong(inputStream.read(3), 0, 3);
+ boolean is_negative = bitSlice(time, 0, 1, 24) == 0;
+ int hours = bitSlice(time, 2, 10, 24);
+ int minutes = bitSlice(time, 12, 6, 24);
+ int seconds = bitSlice(time, 18, 6, 24);
+ int nanoSeconds;
+ if (is_negative) { // mysql binary arithmetic for negative encoded values
+ hours = ~hours & MASK_10_BITS;
+ hours = hours & ~(1 << 10); // unset sign bit
+ minutes = ~minutes & MASK_6_BITS;
+ minutes = minutes & ~(1 << 6); // unset sign bit
+ seconds = ~seconds & MASK_6_BITS;
+ seconds = seconds & ~(1 << 6); // unset sign bit
+ nanoSeconds = deserializeFractionalSecondsInNanosNegative(meta, inputStream);
+ if (nanoSeconds == 0 && seconds < 59) { // weird java Duration behavior
+ ++seconds;
+ }
+ hours = -hours;
+ minutes = -minutes;
+ seconds = -seconds;
+ nanoSeconds = -nanoSeconds;
+ } else {
+ nanoSeconds = deserializeFractionalSecondsInNanos(meta, inputStream);
+ }
+ return Duration.ofHours(hours)
+ .plusMinutes(minutes)
+ .plusSeconds(seconds)
+ .plusNanos(nanoSeconds);
+ }
+
+ /**
+ * Converts a MySQL {@code DATETIME} value without fractional seconds to a {@link
+ * LocalDateTime}.
+ *
+ *
This method treats all zero values for
+ * {@code DATETIME} columns as NULL, since they cannot be accurately represented as valid {@link
+ * LocalDateTime} objects.
+ *
+ * @param inputStream the binary stream containing the raw binlog event data for the value
+ * @return the {@link LocalDateTime} object
+ * @throws IOException if there is an error reading from the binlog event data
+ */
+ protected static Serializable deserializeDatetime(ByteArrayInputStream inputStream)
+ throws IOException {
+ int[] split = split(inputStream.readLong(8), 100, 6);
+ int year = split[5];
+ int month = split[4]; // 1-based month number
+ int day = split[3]; // 1-based day of the month
+ int hours = split[2];
+ int minutes = split[1];
+ int seconds = split[0];
+ int nanoOfSecond = 0; // This version does not support fractional seconds
+ if (year == 0 || month == 0 || day == 0) {
+ return null;
+ }
+ return LocalDateTime.of(year, month, day, hours, minutes, seconds, nanoOfSecond);
+ }
+
+ /**
+ * Converts a MySQL {@code DATETIME} value with fractional seconds to a {@link
+ * LocalDateTime}.
+ *
+ *
This method treats all zero values for
+ * {@code DATETIME} columns as NULL, since they cannot be accurately represented as valid {@link
+ * LocalDateTime} objects.
+ *
+ * @param meta the {@code meta} value containing the fractional second precision, or {@code fsp}
+ * @param inputStream the binary stream containing the raw binlog event data for the value
+ * @return the {@link LocalDateTime} object
+ * @throws IOException if there is an error reading from the binlog event data
+ */
+ protected static Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream)
+ throws IOException {
+ /*
+ * (in big endian)
+ *
+ * 1 bit sign (1= non-negative, 0= negative)
+ * 17 bits year*13+month (year 0-9999, month 0-12)
+ * 5 bits day (0-31)
+ * 5 bits hour (0-23)
+ * 6 bits minute (0-59)
+ * 6 bits second (0-59)
+ *
+ * (5 bytes in total)
+ *
+ * + fractional-seconds storage (size depends on meta)
+ */
+ long datetime = bigEndianLong(inputStream.read(5), 0, 5);
+ int yearMonth = bitSlice(datetime, 1, 17, 40);
+ int year = yearMonth / 13;
+ int month = yearMonth % 13; // 1-based month number
+ int day = bitSlice(datetime, 18, 5, 40); // 1-based day of the month
+ int hours = bitSlice(datetime, 23, 5, 40);
+ int minutes = bitSlice(datetime, 28, 6, 40);
+ int seconds = bitSlice(datetime, 34, 6, 40);
+ int nanoOfSecond = deserializeFractionalSecondsInNanos(meta, inputStream);
+ if (year == 0 || month == 0 || day == 0) {
+ return null;
+ }
+ return LocalDateTime.of(year, month, day, hours, minutes, seconds, nanoOfSecond);
+ }
+
+ /**
+ * Converts a MySQL {@code TIMESTAMP} value without fractional seconds to a {@link
+ * OffsetDateTime}. MySQL stores the {@code TIMESTAMP} values as seconds past epoch in UTC, but
+ * the resulting {@link OffsetDateTime} will be in the local timezone.
+ *
+ * @param inputStream the binary stream containing the raw binlog event data for the value
+ * @return the {@link OffsetDateTime} object
+ * @throws IOException if there is an error reading from the binlog event data
+ */
+ protected static Serializable deserializeTimestamp(ByteArrayInputStream inputStream)
+ throws IOException {
+ long epochSecond = inputStream.readLong(4);
+ int nanoSeconds = 0; // no fractional seconds
+ return ZonedDateTime.ofInstant(
+ Instant.ofEpochSecond(epochSecond, nanoSeconds), ZoneOffset.UTC);
+ }
+
+ /**
+ * Converts a MySQL {@code TIMESTAMP} value with fractional seconds to a {@link
+ * OffsetDateTime}. MySQL stores the {@code TIMESTAMP} values as seconds + fractional seconds
+ * past epoch in UTC, but the resulting {@link OffsetDateTime} will be in the local timezone.
+ *
+ * @param meta the {@code meta} value containing the fractional second precision, or {@code fsp}
+ * @param inputStream the binary stream containing the raw binlog event data for the value
+ * @return the {@link OffsetDateTime} object
+ * @throws IOException if there is an error reading from the binlog event data
+ */
+ protected static Serializable deserializeTimestampV2(int meta, ByteArrayInputStream inputStream)
+ throws IOException {
+ long epochSecond = bigEndianLong(inputStream.read(4), 0, 4);
+ int nanoSeconds = deserializeFractionalSecondsInNanos(meta, inputStream);
+ return ZonedDateTime.ofInstant(
+ Instant.ofEpochSecond(epochSecond, nanoSeconds), ZoneOffset.UTC);
+ }
+
+ /**
+ * Converts a MySQL {@code YEAR} value to a {@link Year} object.
+ *
+ * @param inputStream the binary stream containing the raw binlog event data for the value
+ * @return the {@link Year} object
+ * @throws IOException if there is an error reading from the binlog event data
+ */
+ protected static Serializable deserializeYear(ByteArrayInputStream inputStream)
+ throws IOException {
+ return Year.of(1900 + inputStream.readInteger(1));
+ }
+
+ /**
+ * Split the integer into multiple integers.
+ *
+ *
We can't use/access the private {@code split} method in the {@link
+ * AbstractRowsEventDataDeserializer} class, so we replicate it here. Note the original is
+ * licensed under the same Apache Software License 2.0 as Debezium.
+ *
+ * @param value the long value
+ * @param divider the value used to separate the individual values (e.g., 10 to separate each
+ * digit into a separate value, 100 to separate each pair of digits into a separate value,
+ * 1000 to separate each 3 digits into a separate value, etc.)
+ * @param length the expected length of the integer array
+ * @return the integer values
+ * @author Stanley Shyiko
+ */
+ protected static int[] split(long value, int divider, int length) {
+ int[] result = new int[length];
+ for (int i = 0; i < length - 1; i++) {
+ result[i] = (int) (value % divider);
+ value /= divider;
+ }
+ result[length - 1] = (int) value;
+ return result;
+ }
+
+ /**
+ * Read a big-endian long value.
+ *
+ *
We can't use/access the private {@code bigEndianLong} method in the {@link
+ * AbstractRowsEventDataDeserializer} class, so we replicate it here. Note the original is
+ * licensed under the same Apache Software License 2.0 as Debezium.
+ *
+ * @param bytes the bytes containing the big-endian representation of the value
+ * @param offset the offset within the {@code bytes} byte array where the value starts
+ * @param length the length of the byte representation within the {@code bytes} byte array
+ * @return the long value
+ * @author Stanley Shyiko
+ */
+ protected static long bigEndianLong(byte[] bytes, int offset, int length) {
+ long result = 0;
+ for (int i = offset; i < (offset + length); i++) {
+ byte b = bytes[i];
+ result = (result << 8) | (b >= 0 ? (int) b : (b + 256));
+ }
+ return result;
+ }
+
+ /**
+ * Slice an integer out of a portion of long value.
+ *
+ *
We can't use/access the private {@code bitSlice} method in the {@link
+ * AbstractRowsEventDataDeserializer} class, so we replicate it here. Note the original is
+ * licensed under the same Apache Software License 2.0 as Debezium.
+ *
+ * @param value the long containing the integer encoded within it
+ * @param bitOffset the number of bits where the integer value starts
+ * @param numberOfBits the number of bits in the integer value
+ * @param payloadSize the total number of bits used in the {@code value}
+ * @return the integer value
+ */
+ protected static int bitSlice(long value, int bitOffset, int numberOfBits, int payloadSize) {
+ long result = value >> payloadSize - (bitOffset + numberOfBits);
+ return (int) (result & ((1 << numberOfBits) - 1));
+ }
+
+ /**
+ * Read the binary input stream to obtain the number of nanoseconds given the fractional
+ * seconds precision, or fsp.
+ *
+ *
We can't use/access the {@code deserializeFractionalSeconds} method in the {@link
+ * AbstractRowsEventDataDeserializer} class, so we replicate it here with modifications to
+ * support nanoseconds rather than microseconds. Note the original is licensed under the same
+ * Apache Software License 2.0 as Debezium.
+ *
+ * @param fsp the fractional seconds precision describing the number of digits precision used to
+ * store the fractional seconds (e.g., 1 for storing tenths of a second, 2 for storing
+ * hundredths, 3 for storing milliseconds, etc.)
+ * @param inputStream the binary data stream
+ * @return the number of nanoseconds
+ * @throws IOException if there is an error reading from the binlog event data
+ */
+ protected static int deserializeFractionalSecondsInNanos(
+ int fsp, ByteArrayInputStream inputStream) throws IOException {
+ // Calculate the number of bytes to read, which is
+ // '1' when fsp=(1,2) -- 7
+ // '2' when fsp=(3,4) and -- 12
+ // '3' when fsp=(5,6) -- 21
+ int length = (fsp + 1) / 2;
+ if (length > 0) {
+ long fraction = bigEndianLong(inputStream.read(length), 0, length);
+ // Convert the fractional value (which has extra trailing digit for fsp=1,3, and 5) to
+ // nanoseconds ...
+ return (int) (fraction / (0.0000001 * Math.pow(100, length - 1)));
+ }
+ return 0;
+ }
+
+ /**
+ * Read the binary input stream to obtain the number of nanoseconds given the fractional
+ * seconds precision, or fsp.
+ *
+ *
We can't use/access the {@code deserializeFractionalSeconds} method in the {@link
+ * AbstractRowsEventDataDeserializer} class, so we replicate it here with modifications to
+ * support nanoseconds rather than microseconds and negative values. Note the original is
+ * licensed under the same Apache Software License 2.0 as Debezium.
+ *
+ * @param fsp the fractional seconds precision describing the number of digits precision used to
+ * store the fractional seconds (e.g., 1 for storing tenths of a second, 2 for storing
+ * hundredths, 3 for storing milliseconds, etc.)
+ * @param inputStream the binary data stream
+ * @return the number of nanoseconds
+ * @throws IOException if there is an error reading from the binlog event data
+ */
+ protected static int deserializeFractionalSecondsInNanosNegative(
+ int fsp, ByteArrayInputStream inputStream) throws IOException {
+ // Calculate the number of bytes to read, which is
+ // '1' when fsp=(1,2)
+ // '2' when fsp=(3,4) and
+ // '3' when fsp=(5,6)
+ int length = (fsp + 1) / 2;
+ if (length > 0) {
+ long fraction = bigEndianLong(inputStream.read(length), 0, length);
+ int maskBits = 0;
+ switch (length) { // mask bits according to field precision
+ case 1:
+ maskBits = 8;
+ break;
+ case 2:
+ maskBits = 15;
+ break;
+ case 3:
+ maskBits = 20;
+ break;
+ default:
+ break;
+ }
+ fraction = ~fraction & ((1 << maskBits) - 1);
+ fraction = (fraction & ~(1 << maskBits)) + 1; // unset sign bit
+ // Convert the fractional value (which has extra trailing digit for fsp=1,3, and 5) to
+ // nanoseconds ...
+ return (int) (fraction / (0.0000001 * Math.pow(100, length - 1)));
+ }
+ return 0;
+ }
+
+ private RowDeserializers() {}
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
index b1e6d1dfc86..70ffac35e28 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
@@ -137,7 +137,10 @@ public void submitSplit(MySqlSplit mySqlSplit) {
(MySqlStreamingChangeEventSourceMetrics)
statefulTaskContext.getStreamingChangeEventSourceMetrics(),
currentBinlogSplit,
- createEventFilter());
+ createEventFilter(),
+ statefulTaskContext
+ .getSourceConfig()
+ .isSkipBinlogDeserializationOfUnsubscribedTables());
executorService.submit(
() -> {
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
index e3549237065..2956afee4d3 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
@@ -254,7 +254,10 @@ private MySqlBinlogSplitReadTask createBackfillBinlogReadTask(
(MySqlStreamingChangeEventSourceMetrics)
statefulTaskContext.getStreamingChangeEventSourceMetrics(),
backfillBinlogSplit,
- event -> true);
+ event -> true,
+ statefulTaskContext
+ .getSourceConfig()
+ .isSkipBinlogDeserializationOfUnsubscribedTables());
}
private void dispatchBinlogEndEvent(MySqlBinlogSplit backFillBinlogSplit)
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java
index 7a6e7757f7d..50a722462e9 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java
@@ -68,8 +68,17 @@ public MySqlBinlogSplitReadTask(
MySqlTaskContext taskContext,
MySqlStreamingChangeEventSourceMetrics metrics,
MySqlBinlogSplit binlogSplit,
- Predicate eventFilter) {
- super(connectorConfig, connection, dispatcher, errorHandler, clock, taskContext, metrics);
+ Predicate eventFilter,
+ boolean skipBinlogDeserializationOfUnsubscribedTables) {
+ super(
+ connectorConfig,
+ connection,
+ dispatcher,
+ errorHandler,
+ clock,
+ taskContext,
+ metrics,
+ skipBinlogDeserializationOfUnsubscribedTables);
this.binlogSplit = binlogSplit;
this.eventDispatcher = dispatcher;
this.errorHandler = errorHandler;
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
index 93fa2a0d36a..18b1fc27e00 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
@@ -312,6 +312,14 @@ public MySqlSourceBuilder assignUnboundedChunkFirst(boolean assignUnboundedCh
return this;
}
+ /** Whether to skip deserialization of binlog row events for unsubscribed tables. */
+ public MySqlSourceBuilder skipBinlogDeserializationOfUnsubscribedTables(
+ boolean skipBinlogDeserializationOfUnsubscribedTables) {
+ this.configFactory.skipBinlogDeserializationOfUnsubscribedTables(
+ skipBinlogDeserializationOfUnsubscribedTables);
+ return this;
+ }
+
/**
* Build the {@link MySqlSource}.
*
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
index cf456fcaed0..fb6375fe025 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
@@ -72,6 +72,7 @@ public class MySqlSourceConfig implements Serializable {
private final boolean parseOnLineSchemaChanges;
public static boolean useLegacyJsonFormat = true;
private final boolean assignUnboundedChunkFirst;
+ private final boolean skipBinlogDeserializationOfUnsubscribedTables;
// --------------------------------------------------------------------------------------------
// Debezium Configurations
@@ -112,7 +113,8 @@ public class MySqlSourceConfig implements Serializable {
boolean parseOnLineSchemaChanges,
boolean treatTinyInt1AsBoolean,
boolean useLegacyJsonFormat,
- boolean assignUnboundedChunkFirst) {
+ boolean assignUnboundedChunkFirst,
+ boolean skipBinlogDeserializationOfUnsubscribedTables) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
@@ -158,6 +160,8 @@ public class MySqlSourceConfig implements Serializable {
this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean;
this.useLegacyJsonFormat = useLegacyJsonFormat;
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
+ this.skipBinlogDeserializationOfUnsubscribedTables =
+ skipBinlogDeserializationOfUnsubscribedTables;
}
public String getHostname() {
@@ -299,4 +303,8 @@ public boolean isSkipSnapshotBackfill() {
public boolean isTreatTinyInt1AsBoolean() {
return treatTinyInt1AsBoolean;
}
+
+ public boolean isSkipBinlogDeserializationOfUnsubscribedTables() {
+ return skipBinlogDeserializationOfUnsubscribedTables;
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
index 569b62232db..ae8e436b2aa 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
@@ -78,6 +78,8 @@ public class MySqlSourceConfigFactory implements Serializable {
private boolean treatTinyInt1AsBoolean = true;
private boolean useLegacyJsonFormat = true;
private boolean assignUnboundedChunkFirst = false;
+ private boolean skipBinlogDeserializationOfUnsubscribedTables =
+ MySqlSourceOptions.SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue();
public MySqlSourceConfigFactory hostname(String hostname) {
this.hostname = hostname;
@@ -341,6 +343,14 @@ public MySqlSourceConfigFactory assignUnboundedChunkFirst(boolean assignUnbounde
return this;
}
+ /** Whether to skip deserialization of binlog row events for unsubscribed tables. */
+ public MySqlSourceConfigFactory skipBinlogDeserializationOfUnsubscribedTables(
+ boolean skipBinlogDeserializationOfUnsubscribedTables) {
+ this.skipBinlogDeserializationOfUnsubscribedTables =
+ skipBinlogDeserializationOfUnsubscribedTables;
+ return this;
+ }
+
/** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */
public MySqlSourceConfig createConfig(int subtaskId) {
// hard code server name, because we don't need to distinguish it, docs:
@@ -444,6 +454,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
parseOnLineSchemaChanges,
treatTinyInt1AsBoolean,
useLegacyJsonFormat,
- assignUnboundedChunkFirst);
+ assignUnboundedChunkFirst,
+ skipBinlogDeserializationOfUnsubscribedTables);
}
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
index a8e143f5fc5..8dbf12f0092 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
@@ -292,4 +292,12 @@ public class MySqlSourceOptions {
.defaultValue(true)
.withDescription(
"Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.");
+
+ @Experimental
+ public static final ConfigOption SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED =
+ ConfigOptions.key("scan.binlog.skip-unsubscribed-tables.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to skip deserialization of binlog row events for unsubscribed tables.");
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
index 2a1f0519435..e8adbb8ce34 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
@@ -101,8 +101,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
final boolean parseOnlineSchemaChanges;
private final boolean useLegacyJsonFormat;
private final boolean assignUnboundedChunkFirst;
-
private final boolean appendOnly;
+ private final boolean skipBinlogDeserializationOfUnsubscribedTables;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@@ -144,7 +144,8 @@ public MySqlTableSource(
boolean parseOnlineSchemaChanges,
boolean useLegacyJsonFormat,
boolean assignUnboundedChunkFirst,
- boolean appendOnly) {
+ boolean appendOnly,
+ boolean skipBinlogDeserializationOfUnsubscribedTables) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@@ -178,6 +179,8 @@ public MySqlTableSource(
this.useLegacyJsonFormat = useLegacyJsonFormat;
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
this.appendOnly = appendOnly;
+ this.skipBinlogDeserializationOfUnsubscribedTables =
+ skipBinlogDeserializationOfUnsubscribedTables;
}
@Override
@@ -241,6 +244,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.parseOnLineSchemaChanges(parseOnlineSchemaChanges)
.useLegacyJsonFormat(useLegacyJsonFormat)
.assignUnboundedChunkFirst(assignUnboundedChunkFirst)
+ .skipBinlogDeserializationOfUnsubscribedTables(
+ skipBinlogDeserializationOfUnsubscribedTables)
.build();
return SourceProvider.of(parallelSource);
} else {
@@ -330,7 +335,8 @@ public DynamicTableSource copy() {
parseOnlineSchemaChanges,
useLegacyJsonFormat,
assignUnboundedChunkFirst,
- appendOnly);
+ appendOnly,
+ skipBinlogDeserializationOfUnsubscribedTables);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
@@ -376,7 +382,9 @@ public boolean equals(Object o) {
&& parseOnlineSchemaChanges == that.parseOnlineSchemaChanges
&& useLegacyJsonFormat == that.useLegacyJsonFormat
&& assignUnboundedChunkFirst == that.assignUnboundedChunkFirst
- && Objects.equals(appendOnly, that.appendOnly);
+ && Objects.equals(appendOnly, that.appendOnly)
+ && skipBinlogDeserializationOfUnsubscribedTables
+ == that.skipBinlogDeserializationOfUnsubscribedTables;
}
@Override
@@ -413,7 +421,8 @@ public int hashCode() {
parseOnlineSchemaChanges,
useLegacyJsonFormat,
assignUnboundedChunkFirst,
- appendOnly);
+ appendOnly,
+ skipBinlogDeserializationOfUnsubscribedTables);
}
@Override
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
index 5ea430d94e7..3833cd95168 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
@@ -106,9 +106,10 @@ public DynamicTableSource createDynamicTableSource(Context context) {
boolean useLegacyJsonFormat = config.get(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
boolean assignUnboundedChunkFirst =
config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST);
-
boolean appendOnly =
config.get(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
+ boolean skipBinlogDeserializationOfUnsubscribedTables =
+ config.get(MySqlSourceOptions.SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED);
if (enableParallelRead) {
validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn);
@@ -156,7 +157,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
parseOnLineSchemaChanges,
useLegacyJsonFormat,
assignUnboundedChunkFirst,
- appendOnly);
+ appendOnly,
+ skipBinlogDeserializationOfUnsubscribedTables);
}
@Override
@@ -206,6 +208,7 @@ public Set> optionalOptions() {
options.add(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST);
options.add(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
+ options.add(MySqlSourceOptions.SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED);
return options;
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
index e7fd2c4cbb9..78740f53fd0 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
@@ -66,6 +66,8 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.testcontainers.lifecycle.Startables;
import java.sql.Connection;
@@ -145,10 +147,16 @@ public void after() throws Exception {
customerDatabase.dropDatabase();
}
- @Test
- void testReadSingleBinlogSplit() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testReadSingleBinlogSplit(boolean skipBinlogDeserializationOfUnsubscribedTables)
+ throws Exception {
customerDatabase.createAndInitialize();
- MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"});
+ MySqlSourceConfig sourceConfig =
+ getConfig(
+ new String[] {"customers_even_dist"},
+ false,
+ skipBinlogDeserializationOfUnsubscribedTables);
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
final DataType dataType =
@@ -180,28 +188,59 @@ void testReadSingleBinlogSplit() throws Exception {
sourceConfig,
1,
expected.length,
- splits.get(splits.size() - 1).getTableId());
+ splits.get(splits.size() - 1).getTableId(),
+ customerDatabase.qualifiedTableName("trivial"));
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
- @Test
- void testSnapshotScanSkipBackfillWithPostHighWatermark() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testSnapshotScanSkipBackfillWithPostHighWatermark(
+ boolean skipBinlogDeserializationOfUnsubscribedTables) throws Exception {
String tableName = "customers_even_dist";
testSnapshotScanSkipBackfill(
- getSnapshotPhaseHooksWithPostHighWatermark(tableName), tableName);
+ getSnapshotPhaseHooksWithPostHighWatermark(tableName),
+ tableName,
+ true,
+ skipBinlogDeserializationOfUnsubscribedTables);
}
- @Test
- void testSnapshotScanSkipBackfillWithPreHighWatermark() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testSnapshotScanSkipBackfillWithPreHighWatermark(
+ boolean skipBinlogDeserializationOfUnsubscribedTables) throws Exception {
String tableName = "customers_even_dist";
testSnapshotScanSkipBackfill(
- getSnapshotPhaseHooksWithPreHighWatermark(tableName), tableName);
+ getSnapshotPhaseHooksWithPreHighWatermark(tableName),
+ tableName,
+ true,
+ skipBinlogDeserializationOfUnsubscribedTables);
}
- void testSnapshotScanSkipBackfill(SnapshotPhaseHooks snapshotHooks, String tableName)
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testSnapshotScanWithBackfillSkipUnsubscribedTableBinlogDeserialization(
+ boolean skipBinlogDeserializationOfUnsubscribedTables) throws Exception {
+ String tableName = "customers_even_dist";
+ testSnapshotScanSkipBackfill(
+ getSnapshotPhaseHooksWithPreHighWatermark(tableName),
+ tableName,
+ false,
+ skipBinlogDeserializationOfUnsubscribedTables);
+ }
+
+ void testSnapshotScanSkipBackfill(
+ SnapshotPhaseHooks snapshotHooks,
+ String tableName,
+ boolean skipSnapshotBackfill,
+ boolean skipBinlogDeserializationOfUnsubscribedTables)
throws Exception {
customerDatabase.createAndInitialize();
- MySqlSourceConfig sourceConfig = getConfig(new String[] {tableName}, true);
+ MySqlSourceConfig sourceConfig =
+ getConfig(
+ new String[] {tableName},
+ skipSnapshotBackfill,
+ skipBinlogDeserializationOfUnsubscribedTables);
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
@@ -212,32 +251,56 @@ void testSnapshotScanSkipBackfill(SnapshotPhaseHooks snapshotHooks, String table
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
List splits = getMySqlSplits(new String[] {tableName}, sourceConfig);
- String[] expected =
- new String[] {
- "+I[101, user_1, Shanghai, 123567891234]",
- "+I[102, user_2, Shanghai, 123567891234]",
- "+I[103, user_3, Shanghai, 123567891234]",
- "+I[104, user_4, Shanghai, 123567891234]",
- "+I[105, user_5, Shanghai, 123567891234]",
- "+I[106, user_6, Shanghai, 123567891234]",
- "+I[107, user_7, Shanghai, 123567891234]",
- "+I[108, user_8, Shanghai, 123567891234]",
- "-U[103, user_3, Shanghai, 123567891234]",
- "+U[103, user_3, Hangzhou, 123567891234]",
- "-D[102, user_2, Shanghai, 123567891234]",
- "+I[102, user_2, Hangzhou, 123567891234]",
- "-U[103, user_3, Hangzhou, 123567891234]",
- "+U[103, user_3, Shanghai, 123567891234]",
- "-U[103, user_3, Shanghai, 123567891234]",
- "+U[103, user_3, Hangzhou, 123567891234]",
- "-D[102, user_2, Hangzhou, 123567891234]",
- "+I[102, user_2, Shanghai, 123567891234]",
- "-U[103, user_3, Hangzhou, 123567891234]",
- "+U[103, user_3, Shanghai, 123567891234]",
- "+I[2001, user_22, Shanghai, 123567891234]",
- "+I[2002, user_23, Shanghai, 123567891234]",
- "+I[2003, user_24, Shanghai, 123567891234]"
- };
+ String[] expected = null;
+ if (skipSnapshotBackfill) {
+ expected =
+ new String[] {
+ "+I[101, user_1, Shanghai, 123567891234]",
+ "+I[102, user_2, Shanghai, 123567891234]",
+ "+I[103, user_3, Shanghai, 123567891234]",
+ "+I[104, user_4, Shanghai, 123567891234]",
+ "+I[105, user_5, Shanghai, 123567891234]",
+ "+I[106, user_6, Shanghai, 123567891234]",
+ "+I[107, user_7, Shanghai, 123567891234]",
+ "+I[108, user_8, Shanghai, 123567891234]",
+ "-U[103, user_3, Shanghai, 123567891234]",
+ "+U[103, user_3, Hangzhou, 123567891234]",
+ "-D[102, user_2, Shanghai, 123567891234]",
+ "+I[102, user_2, Hangzhou, 123567891234]",
+ "-U[103, user_3, Hangzhou, 123567891234]",
+ "+U[103, user_3, Shanghai, 123567891234]",
+ "-U[103, user_3, Shanghai, 123567891234]",
+ "+U[103, user_3, Hangzhou, 123567891234]",
+ "-D[102, user_2, Hangzhou, 123567891234]",
+ "+I[102, user_2, Shanghai, 123567891234]",
+ "-U[103, user_3, Hangzhou, 123567891234]",
+ "+U[103, user_3, Shanghai, 123567891234]",
+ "+I[2001, user_22, Shanghai, 123567891234]",
+ "+I[2002, user_23, Shanghai, 123567891234]",
+ "+I[2003, user_24, Shanghai, 123567891234]"
+ };
+ } else {
+ expected =
+ new String[] {
+ "+I[101, user_1, Shanghai, 123567891234]",
+ "+I[102, user_2, Hangzhou, 123567891234]",
+ "+I[103, user_3, Shanghai, 123567891234]",
+ "+I[104, user_4, Shanghai, 123567891234]",
+ "+I[105, user_5, Shanghai, 123567891234]",
+ "+I[106, user_6, Shanghai, 123567891234]",
+ "+I[107, user_7, Shanghai, 123567891234]",
+ "+I[108, user_8, Shanghai, 123567891234]",
+ "-U[103, user_3, Shanghai, 123567891234]",
+ "+U[103, user_3, Hangzhou, 123567891234]",
+ "-D[102, user_2, Hangzhou, 123567891234]",
+ "+I[102, user_2, Shanghai, 123567891234]",
+ "-U[103, user_3, Hangzhou, 123567891234]",
+ "+U[103, user_3, Shanghai, 123567891234]",
+ "+I[2001, user_22, Shanghai, 123567891234]",
+ "+I[2002, user_23, Shanghai, 123567891234]",
+ "+I[2003, user_24, Shanghai, 123567891234]"
+ };
+ }
// skip snapshot backfill makes highwatermark equal lowwatermark, so need 2 splits to
List actual =
@@ -246,8 +309,9 @@ void testSnapshotScanSkipBackfill(SnapshotPhaseHooks snapshotHooks, String table
dataType,
sourceConfig,
2,
- 23,
+ expected.length,
splits.get(splits.size() - 1).getTableId(),
+ customerDatabase.qualifiedTableName("trivial"),
snapshotHooks);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
@@ -255,16 +319,11 @@ void testSnapshotScanSkipBackfill(SnapshotPhaseHooks snapshotHooks, String table
@NotNull
private SnapshotPhaseHooks getSnapshotPhaseHooksWithPreHighWatermark(String tableName) {
String tableId = customerDatabase.getDatabaseName() + "." + tableName;
- String[] changingDataSql =
- new String[] {
- "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103",
- "DELETE FROM " + tableId + " where id = 102",
- "INSERT INTO " + tableId + " VALUES(102, 'user_2','Hangzhou','123567891234')",
- "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103",
- };
+ String trivialTableId = customerDatabase.getDatabaseName() + ".trivial";
+ String[] changingDataSql = produceBinlogSqls(tableId, trivialTableId);
SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks();
- snapshotHooks.setPostHighWatermarkAction(
+ snapshotHooks.setPreHighWatermarkAction(
(mySqlConnection, split) -> {
if (split.splitId().equals(tableId + ":0")) {
mySqlConnection.execute(changingDataSql);
@@ -277,13 +336,8 @@ private SnapshotPhaseHooks getSnapshotPhaseHooksWithPreHighWatermark(String tabl
@NotNull
private SnapshotPhaseHooks getSnapshotPhaseHooksWithPostHighWatermark(String tableName) {
String tableId = customerDatabase.getDatabaseName() + "." + tableName;
- String[] changingDataSql =
- new String[] {
- "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103",
- "DELETE FROM " + tableId + " where id = 102",
- "INSERT INTO " + tableId + " VALUES(102, 'user_2','Hangzhou','123567891234')",
- "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103",
- };
+ String trivialTableId = customerDatabase.getDatabaseName() + ".trivial";
+ String[] changingDataSql = produceBinlogSqls(tableId, trivialTableId);
SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks();
snapshotHooks.setPostHighWatermarkAction(
@@ -296,10 +350,16 @@ private SnapshotPhaseHooks getSnapshotPhaseHooksWithPostHighWatermark(String tab
return snapshotHooks;
}
- @Test
- void testReadAllBinlogSplitsForOneTable() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testReadAllBinlogSplitsForOneTable(boolean skipBinlogDeserializationOfUnsubscribedTables)
+ throws Exception {
customerDatabase.createAndInitialize();
- MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"});
+ MySqlSourceConfig sourceConfig =
+ getConfig(
+ new String[] {"customers_even_dist"},
+ false,
+ skipBinlogDeserializationOfUnsubscribedTables);
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
final DataType dataType =
@@ -340,7 +400,8 @@ void testReadAllBinlogSplitsForOneTable() throws Exception {
sourceConfig,
splits.size(),
expected.length,
- splits.get(splits.size() - 1).getTableId());
+ splits.get(splits.size() - 1).getTableId(),
+ customerDatabase.qualifiedTableName("trivial"));
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
@@ -375,7 +436,8 @@ void testReadAllBinlogForTableWithSingleLine() throws Exception {
sourceConfig,
splits.size(),
expected.length,
- splits.get(splits.size() - 1).getTableId());
+ splits.get(splits.size() - 1).getTableId(),
+ customerDatabase.qualifiedTableName("trivial"));
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
@@ -432,15 +494,21 @@ void testReadAllBinlogSplitsForTables() throws Exception {
TableId.parse(
customerDatabase.getDatabaseName()
+ "."
- + "customer_card_single_line"));
+ + "customer_card_single_line"),
+ customerDatabase.qualifiedTableName("trivial"));
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
- @Test
- void testReadBinlogFromLatestOffset() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testReadBinlogFromLatestOffset(boolean skipBinlogDeserializationOfUnsubscribedTables)
+ throws Exception {
customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig =
- getConfig(StartupOptions.latest(), new String[] {"customers"});
+ getConfig(
+ StartupOptions.latest(),
+ new String[] {"customers"},
+ skipBinlogDeserializationOfUnsubscribedTables);
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
@@ -451,7 +519,10 @@ void testReadBinlogFromLatestOffset() throws Exception {
// Create some binlog events
makeCustomersBinlogEvents(
- mySqlConnection, customerDatabase.qualifiedTableName("customers"), false);
+ mySqlConnection,
+ customerDatabase.qualifiedTableName("customers"),
+ customerDatabase.qualifiedTableName("trivial"),
+ false);
final DataType dataType =
DataTypes.ROW(
@@ -479,15 +550,18 @@ void testReadBinlogFromLatestOffset() throws Exception {
reader.close();
}
- @Test
- void testReadBinlogWithoutGtidFromLatestOffset() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testReadBinlogWithoutGtidFromLatestOffset(
+ boolean skipBinlogDeserializationOfUnsubscribedTables) throws Exception {
customerDatabaseNoGtid.createAndInitialize();
MySqlSourceConfig sourceConfig =
getConfig(
MYSQL_CONTAINER_NOGTID,
customerDatabaseNoGtid,
StartupOptions.latest(),
- new String[] {"customers"});
+ new String[] {"customers"},
+ skipBinlogDeserializationOfUnsubscribedTables);
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
@@ -498,7 +572,10 @@ void testReadBinlogWithoutGtidFromLatestOffset() throws Exception {
// Create some binlog events
makeCustomersBinlogEvents(
- mySqlConnection, customerDatabaseNoGtid.qualifiedTableName("customers"), false);
+ mySqlConnection,
+ customerDatabaseNoGtid.qualifiedTableName("customers"),
+ customerDatabaseNoGtid.qualifiedTableName("trivial"),
+ false);
final DataType dataType =
DataTypes.ROW(
@@ -526,11 +603,16 @@ void testReadBinlogWithoutGtidFromLatestOffset() throws Exception {
reader.close();
}
- @Test
- void testReadBinlogFromEarliestOffset() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testReadBinlogFromEarliestOffset(boolean skipBinlogDeserializationOfUnsubscribedTables)
+ throws Exception {
customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig =
- getConfig(StartupOptions.earliest(), new String[] {"customers"});
+ getConfig(
+ StartupOptions.earliest(),
+ new String[] {"customers"},
+ skipBinlogDeserializationOfUnsubscribedTables);
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
@@ -541,7 +623,10 @@ void testReadBinlogFromEarliestOffset() throws Exception {
// Create some binlog events
makeCustomersBinlogEvents(
- mySqlConnection, customerDatabase.qualifiedTableName("customers"), false);
+ mySqlConnection,
+ customerDatabase.qualifiedTableName("customers"),
+ customerDatabase.qualifiedTableName("trivial"),
+ false);
final DataType dataType =
DataTypes.ROW(
@@ -624,11 +709,17 @@ void testReadBinlogFromEarliestOffsetAfterSchemaChange() throws Exception {
+ "binlog offset, which is not supported when startup mode is set to EARLIEST_OFFSET");
}
- @Test
- void testReadBinlogFromBinlogFilePosition() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testReadBinlogFromBinlogFilePosition(boolean skipBinlogDeserializationOfUnsubscribedTables)
+ throws Exception {
// Preparations
customerDatabase.createAndInitialize();
- MySqlSourceConfig connectionConfig = getConfig(new String[] {"customers"});
+ MySqlSourceConfig connectionConfig =
+ getConfig(
+ new String[] {"customers"},
+ false,
+ skipBinlogDeserializationOfUnsubscribedTables);
binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig);
DataType dataType =
@@ -646,7 +737,8 @@ void testReadBinlogFromBinlogFilePosition() throws Exception {
getConfig(
StartupOptions.specificOffset(
startingOffset.getFilename(), startingOffset.getPosition()),
- new String[] {"customers"});
+ new String[] {"customers"},
+ skipBinlogDeserializationOfUnsubscribedTables);
// Create reader and submit splits
MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
@@ -655,7 +747,10 @@ void testReadBinlogFromBinlogFilePosition() throws Exception {
// Create some binlog events
makeCustomersBinlogEvents(
- mySqlConnection, customerDatabase.qualifiedTableName("customers"), false);
+ mySqlConnection,
+ customerDatabase.qualifiedTableName("customers"),
+ customerDatabase.qualifiedTableName("trivial"),
+ false);
// Read with binlog split reader and validate
String[] expected =
@@ -733,11 +828,17 @@ void testSkippingEvents() throws Exception {
assertEqualsInOrder(Arrays.asList(expected), actual);
}
- @Test
- void testReadBinlogFromGtidSet() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testReadBinlogFromGtidSet(boolean skipBinlogDeserializationOfUnsubscribedTables)
+ throws Exception {
// Preparations
customerDatabase.createAndInitialize();
- MySqlSourceConfig connectionConfig = getConfig(new String[] {"customers"});
+ MySqlSourceConfig connectionConfig =
+ getConfig(
+ new String[] {"customers"},
+ false,
+ skipBinlogDeserializationOfUnsubscribedTables);
binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig);
DataType dataType =
@@ -754,7 +855,8 @@ void testReadBinlogFromGtidSet() throws Exception {
MySqlSourceConfig sourceConfig =
getConfig(
StartupOptions.specificOffset(startingOffset.getGtidSet()),
- new String[] {"customers"});
+ new String[] {"customers"},
+ skipBinlogDeserializationOfUnsubscribedTables);
// Create reader and submit splits
MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
@@ -763,7 +865,10 @@ void testReadBinlogFromGtidSet() throws Exception {
// Create some binlog events
makeCustomersBinlogEvents(
- mySqlConnection, customerDatabase.qualifiedTableName("customers"), false);
+ mySqlConnection,
+ customerDatabase.qualifiedTableName("customers"),
+ customerDatabase.qualifiedTableName("trivial"),
+ false);
// Read with binlog split reader and validate
String[] expected =
@@ -869,11 +974,17 @@ void testRestoreFromCheckpointWithGtidSetAndSkippingEventsAndRows() throws Excep
assertEqualsInOrder(Arrays.asList(expected), actual);
}
- @Test
- void testReadBinlogFromTimestamp() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testReadBinlogFromTimestamp(boolean skipBinlogDeserializationOfUnsubscribedTables)
+ throws Exception {
// Preparations
customerDatabase.createAndInitialize();
- MySqlSourceConfig connectionConfig = getConfig(new String[] {"customers"});
+ MySqlSourceConfig connectionConfig =
+ getConfig(
+ new String[] {"customers"},
+ false,
+ skipBinlogDeserializationOfUnsubscribedTables);
binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig);
DataType dataType =
@@ -892,7 +1003,10 @@ void testReadBinlogFromTimestamp() throws Exception {
// Create a new config to start reading from the offset captured above
MySqlSourceConfig sourceConfig =
- getConfig(StartupOptions.timestamp(startTimestamp), new String[] {"customers"});
+ getConfig(
+ StartupOptions.timestamp(startTimestamp),
+ new String[] {"customers"},
+ skipBinlogDeserializationOfUnsubscribedTables);
// Create reader and submit splits
MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
@@ -901,7 +1015,10 @@ void testReadBinlogFromTimestamp() throws Exception {
// Create some binlog events
makeCustomersBinlogEvents(
- mySqlConnection, customerDatabase.qualifiedTableName("customers"), false);
+ mySqlConnection,
+ customerDatabase.qualifiedTableName("customers"),
+ customerDatabase.qualifiedTableName("trivial"),
+ false);
// Read with binlog split reader and validate
String[] expected =
@@ -984,8 +1101,10 @@ void testReadBinlogFromTimestampAfterSchemaChange() throws Exception {
assertEqualsInOrder(Arrays.asList(expected), actual);
}
- @Test
- void testHeartbeatEvent() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testHeartbeatEvent(boolean skipBinlogDeserializationOfUnsubscribedTables)
+ throws Exception {
// Initialize database
customerDatabase.createAndInitialize();
@@ -1006,7 +1125,8 @@ void testHeartbeatEvent() throws Exception {
MYSQL_CONTAINER,
customerDatabase,
new String[] {"customers"},
- false)
+ false,
+ skipBinlogDeserializationOfUnsubscribedTables)
.startupOptions(StartupOptions.latest())
.heartbeatInterval(heartbeatInterval)
.debeziumProperties(dbzProps)
@@ -1023,6 +1143,7 @@ void testHeartbeatEvent() throws Exception {
makeCustomersBinlogEvents(
mySqlConnection,
binlogSplit.getTableSchemas().keySet().iterator().next().toString(),
+ customerDatabase.qualifiedTableName("trivial"),
false);
// Keep polling until we receive heartbeat. We don't validate offset of heartbeat here
@@ -1330,7 +1451,8 @@ private List readBinlogSplitsFromSnapshotSplits(
MySqlSourceConfig sourceConfig,
int scanSplitsNum,
int expectedSize,
- TableId binlogChangeTableId)
+ TableId binlogChangeTableId,
+ String trivialTableName)
throws Exception {
return readBinlogSplitsFromSnapshotSplits(
sqlSplits,
@@ -1339,6 +1461,7 @@ private List readBinlogSplitsFromSnapshotSplits(
scanSplitsNum,
expectedSize,
binlogChangeTableId,
+ trivialTableName,
SnapshotPhaseHooks.empty());
}
@@ -1349,6 +1472,7 @@ private List readBinlogSplitsFromSnapshotSplits(
int scanSplitsNum,
int expectedSize,
TableId binlogChangeTableId,
+ String trivialTableName,
SnapshotPhaseHooks snapshotHooks)
throws Exception {
final StatefulTaskContext statefulTaskContext =
@@ -1405,6 +1529,7 @@ private List readBinlogSplitsFromSnapshotSplits(
makeCustomersBinlogEvents(
statefulTaskContext.getConnection(),
binlogChangeTableId.toString(),
+ trivialTableName,
scanSplitsNum == 1);
} else {
makeCustomerCardsBinlogEvents(
@@ -1438,14 +1563,29 @@ private void updateCustomersTableInBulk(JdbcConnection connection, String tableI
}
private void makeCustomersBinlogEvents(
- JdbcConnection connection, String tableId, boolean firstSplitOnly) throws SQLException {
+ JdbcConnection connection,
+ String tableId,
+ String trivialTableId,
+ boolean firstSplitOnly)
+ throws SQLException {
// make binlog events for the first split
connection.setAutoCommit(false);
connection.execute(
+ "UPDATE " + trivialTableId + " SET address = 'Chengdu' where id = 102",
"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103",
+ "INSERT INTO "
+ + trivialTableId
+ + " VALUES(10001, 'user_11','Wuhan','123567891234')",
"DELETE FROM " + tableId + " where id = 102",
+ "UPDATE " + trivialTableId + " SET address = 'Beijing' where id = 103",
+ "DELETE FROM " + trivialTableId + " where id = 10001",
"INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')",
- "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103");
+ "INSERT INTO "
+ + trivialTableId
+ + " VALUES(10002, 'user_22','Guangzhou','123567891234')",
+ "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103",
+ "UPDATE " + trivialTableId + " SET address = 'Tianjin' where id = 10002",
+ "DELETE FROM " + trivialTableId + " where id = 10002");
connection.commit();
if (!firstSplitOnly) {
@@ -1453,6 +1593,10 @@ private void makeCustomersBinlogEvents(
connection.execute("UPDATE " + tableId + " SET name = 'Hangzhou' where id = 1010");
connection.commit();
+ connection.execute(
+ "UPDATE " + trivialTableId + " SET name = 'Shanghai' where id = 1010");
+ connection.commit();
+
// make binlog events for the last split
connection.execute(
"INSERT INTO "
@@ -1460,6 +1604,12 @@ private void makeCustomersBinlogEvents(
+ " VALUES(2001, 'user_22','Shanghai','123567891234')");
connection.commit();
+ connection.execute(
+ "INSERT INTO "
+ + trivialTableId
+ + " VALUES(2001, 'user_22','Beijing','123567891234')");
+ connection.commit();
+
// make schema change binlog events
connection.execute(
"ALTER TABLE "
@@ -1467,6 +1617,12 @@ private void makeCustomersBinlogEvents(
+ " ADD COLUMN email VARCHAR(128) DEFAULT 'user@flink.apache.org'");
connection.commit();
+ connection.execute(
+ "ALTER TABLE "
+ + trivialTableId
+ + " ADD COLUMN email VARCHAR(128) DEFAULT 'user@flink.apache.org'");
+ connection.commit();
+
// make binlog events after schema changed
connection.execute(
"INSERT INTO "
@@ -1474,12 +1630,24 @@ private void makeCustomersBinlogEvents(
+ " VALUES(2002, 'user_23','Shanghai','123567891234', 'test1@gmail.com')");
connection.commit();
+ connection.execute(
+ "INSERT INTO "
+ + trivialTableId
+ + " VALUES(2002, 'user_23','Shanghai','123567891234', 'test1@gmail.com')");
+ connection.commit();
+
// make binlog again
connection.execute(
"INSERT INTO "
+ tableId
+ " VALUES(2003, 'user_24','Shanghai','123567891234', 'test2@gmail.com')");
connection.commit();
+
+ connection.execute(
+ "INSERT INTO "
+ + trivialTableId
+ + " VALUES(2003, 'user_24','Shanghai','123567891234', 'test2@gmail.com')");
+ connection.commit();
}
}
@@ -1593,7 +1761,19 @@ private List getMySqlSplits(
}
private MySqlSourceConfig getConfig(StartupOptions startupOptions, String[] captureTables) {
- return getConfig(MYSQL_CONTAINER, customerDatabase, startupOptions, captureTables);
+ return getConfig(startupOptions, captureTables, false);
+ }
+
+ private MySqlSourceConfig getConfig(
+ StartupOptions startupOptions,
+ String[] captureTables,
+ boolean skipBinlogDeserializationOfUnsubscribedTables) {
+ return getConfig(
+ MYSQL_CONTAINER,
+ customerDatabase,
+ startupOptions,
+ captureTables,
+ skipBinlogDeserializationOfUnsubscribedTables);
}
private MySqlSourceConfig getConfig(
@@ -1601,31 +1781,53 @@ private MySqlSourceConfig getConfig(
UniqueDatabase database,
StartupOptions startupOptions,
String[] captureTables) {
- return getConfigFactory(container, database, captureTables, false)
+ return getConfig(container, database, startupOptions, captureTables, false);
+ }
+
+ private MySqlSourceConfig getConfig(
+ MySqlContainer container,
+ UniqueDatabase database,
+ StartupOptions startupOptions,
+ String[] captureTables,
+ boolean skipBinlogDeserializationOfUnsubscribedTables) {
+ return getConfigFactory(
+ container,
+ database,
+ captureTables,
+ false,
+ skipBinlogDeserializationOfUnsubscribedTables)
.startupOptions(startupOptions)
.createConfig(0);
}
private MySqlSourceConfig getConfig(String[] captureTables) {
- return getConfig(captureTables, false);
+ return getConfig(captureTables, false, false);
}
- private MySqlSourceConfig getConfig(String[] captureTables, boolean skipSnapshotBackfill) {
+ private MySqlSourceConfig getConfig(
+ String[] captureTables,
+ boolean skipSnapshotBackfill,
+ boolean skipBinlogDeserializationOfUnsubscribedTables) {
return getConfigFactory(
- MYSQL_CONTAINER, customerDatabase, captureTables, skipSnapshotBackfill)
+ MYSQL_CONTAINER,
+ customerDatabase,
+ captureTables,
+ skipSnapshotBackfill,
+ skipBinlogDeserializationOfUnsubscribedTables)
.createConfig(0);
}
private MySqlSourceConfig getConfig(
MySqlContainer container, UniqueDatabase database, String[] captureTables) {
- return getConfigFactory(container, database, captureTables, false).createConfig(0);
+ return getConfigFactory(container, database, captureTables, false, false).createConfig(0);
}
private MySqlSourceConfigFactory getConfigFactory(
MySqlContainer container,
UniqueDatabase database,
String[] captureTables,
- boolean skipSnapshotBackfill) {
+ boolean skipSnapshotBackfill,
+ boolean skipBinlogDeserializationOfUnsubscribedTables) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> database.getDatabaseName() + "." + tableName)
@@ -1640,6 +1842,8 @@ private MySqlSourceConfigFactory getConfigFactory(
.splitSize(4)
.fetchSize(2)
.skipSnapshotBackfill(skipSnapshotBackfill)
+ .skipBinlogDeserializationOfUnsubscribedTables(
+ skipBinlogDeserializationOfUnsubscribedTables)
.password(database.getPassword());
}
@@ -1649,6 +1853,32 @@ private void addColumnToTable(JdbcConnection connection, String tableId) throws
connection.commit();
}
+ /**
+ * To produce binlog of both tableId and trivialTableId, where table id is the target read table
+ * and trivialTableId is not relevant.
+ *
+ * @param tableId target table to read.
+ * @param trivialTableId irrelevant table that not being subscribed.
+ * @return sqls of both table to produce binlog data.
+ */
+ private static String[] produceBinlogSqls(String tableId, String trivialTableId) {
+ return new String[] {
+ "UPDATE " + trivialTableId + " SET address = 'Chengdu' where id = 102",
+ "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103",
+ "INSERT INTO " + trivialTableId + " VALUES(10001, 'user_11','Wuhan','123567891234')",
+ "DELETE FROM " + tableId + " where id = 102",
+ "UPDATE " + trivialTableId + " SET address = 'Beijing' where id = 103",
+ "DELETE FROM " + trivialTableId + " where id = 10001",
+ "INSERT INTO " + tableId + " VALUES(102, 'user_2','Hangzhou','123567891234')",
+ "INSERT INTO "
+ + trivialTableId
+ + " VALUES(10002, 'user_22','Guangzhou','123567891234')",
+ "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103",
+ "UPDATE " + trivialTableId + " SET address = 'Tianjin' where id = 10002",
+ "DELETE FROM " + trivialTableId + " where id = 10002",
+ };
+ }
+
/** This stateful task context will skip valid the starting offset. */
private static class TestStatefulTaskContext extends StatefulTaskContext {
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
index 01c2dff84da..48c17e95efd 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
@@ -52,6 +52,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES;
+import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
@@ -129,7 +130,8 @@ void testCommonProperties() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue());
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -179,7 +181,8 @@ void testEnableParallelReadSource() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue());
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -225,7 +228,8 @@ void testEnableParallelReadSourceWithSingleServerId() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue());
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -269,7 +273,8 @@ void testEnableParallelReadSourceLatestOffset() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue());
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -330,7 +335,8 @@ void testOptionalProperties() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
true,
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue());
Assertions.assertThat(actualSource)
.isEqualTo(expectedSource)
.isInstanceOf(MySqlTableSource.class);
@@ -389,7 +395,8 @@ void testStartupFromSpecificOffset() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue());
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -431,7 +438,8 @@ void testStartupFromInitial() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue());
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -474,7 +482,8 @@ void testStartupFromEarliestOffset() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue());
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -518,7 +527,8 @@ void testStartupFromSpecificTimestamp() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue());
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -560,7 +570,8 @@ void testStartupFromLatestOffset() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue());
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -607,7 +618,8 @@ void testMetadataColumns() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
@@ -810,7 +822,8 @@ void testEnablingExperimentalOptions() {
true,
true,
true,
- false);
+ false,
+ SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED.defaultValue());
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql
index e4df63f1a33..d7c16e923fc 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql
@@ -326,3 +326,34 @@ CREATE TABLE default_value_test (
INSERT INTO default_value_test
VALUES (1,'user1','Shanghai',123567),
(2,'user2','Shanghai',123567);
+
+-- table that is not subscribed
+CREATE TABLE trivial (
+ id INTEGER NOT NULL PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'flink',
+ address VARCHAR(1024),
+ phone_number VARCHAR(512)
+);
+
+INSERT INTO trivial
+VALUES (101,"user_1","Beijing","123567891234"),
+ (102,"user_2","Beijing","123567891234"),
+ (103,"user_3","Beijing","123567891234"),
+ (109,"user_4","Beijing","123567891234"),
+ (110,"user_5","Beijing","123567891234"),
+ (111,"user_6","Beijing","123567891234"),
+ (118,"user_7","Beijing","123567891234"),
+ (121,"user_8","Beijing","123567891234"),
+ (123,"user_9","Beijing","123567891234"),
+ (1009,"user_10","Beijing","123567891234"),
+ (1010,"user_11","Beijing","123567891234"),
+ (1011,"user_12","Beijing","123567891234"),
+ (1012,"user_13","Beijing","123567891234"),
+ (1013,"user_14","Beijing","123567891234"),
+ (1014,"user_15","Beijing","123567891234"),
+ (1015,"user_16","Beijing","123567891234"),
+ (1016,"user_17","Beijing","123567891234"),
+ (1017,"user_18","Beijing","123567891234"),
+ (1018,"user_19","Beijing","123567891234"),
+ (1019,"user_20","Beijing","123567891234"),
+ (2000,"user_21","Beijing","123567891234");
\ No newline at end of file