From 12fe3de8daa8e782ade96c674533b4ab8399753f Mon Sep 17 00:00:00 2001 From: Luca Occhipinti Date: Thu, 25 Sep 2025 10:41:00 +0200 Subject: [PATCH 01/13] [FLINK-38100] feat: bump iceberg to 1.10.0 for iceberg-pipeline-connector --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 0d9b53d7c95..0bfc039839b 100644 --- a/pom.xml +++ b/pom.xml @@ -95,7 +95,7 @@ limitations under the License. 2.12.16 3.6.0 - 1.6.1 + 1.10.0 2.3.9 3.3.4 From d5b6704c01466641baa186b4f75528c403265829 Mon Sep 17 00:00:00 2001 From: Luca Occhipinti Date: Thu, 25 Sep 2025 10:43:28 +0200 Subject: [PATCH 02/13] [FLINK-38100] feat: update docs for iceberg pipeline connector --- .../connectors/pipeline-connectors/iceberg.md | 12 ++++++----- .../pipeline-connectors/overview.md | 20 ++++++++++--------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/docs/content/docs/connectors/pipeline-connectors/iceberg.md b/docs/content/docs/connectors/pipeline-connectors/iceberg.md index 22fd0328439..67cd7851fbf 100644 --- a/docs/content/docs/connectors/pipeline-connectors/iceberg.md +++ b/docs/content/docs/connectors/pipeline-connectors/iceberg.md @@ -3,7 +3,7 @@ title: "Iceberg" weight: 9 type: docs aliases: -- /connectors/pipeline-connectors/iceberg + - /connectors/pipeline-connectors/iceberg --- 3.6.0 - 1.10.0 + 1.10.1 2.3.9 3.3.4 11 From e1c35570aae4d7b9c7eb3d75850d2c84f44c4215 Mon Sep 17 00:00:00 2001 From: Luca Occhipinti Date: Tue, 31 Mar 2026 21:35:48 +0200 Subject: [PATCH 05/13] add scan.snapshot.hostname optional parameter --- .../mysql/factory/MySqlDataSourceFactory.java | 4 ++++ .../mysql/source/MySqlDataSourceOptions.java | 15 +++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index b775cebb05e..ffca41d1cd4 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -84,6 +84,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_HOSTNAME; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET; @@ -121,6 +122,7 @@ public DataSource createDataSource(Context context) { final Configuration config = context.getFactoryConfiguration(); String hostname = config.get(HOSTNAME); + String scanSnapshotHostname = config.getOptional(SCAN_SNAPSHOT_HOSTNAME).orElse(null); int port = config.get(PORT); String username = config.get(USERNAME); @@ -195,6 +197,7 @@ public DataSource createDataSource(Context context) { MySqlSourceConfigFactory configFactory = new MySqlSourceConfigFactory() .hostname(hostname) + .snapshotHostname(scanSnapshotHostname) .port(port) .username(username) .password(password) @@ -327,6 +330,7 @@ public Set> requiredOptions() { @Override public Set> optionalOptions() { Set> options = new HashSet<>(); + options.add(SCAN_SNAPSHOT_HOSTNAME); options.add(PORT); options.add(TABLES_EXCLUDE); options.add(SCHEMA_CHANGE_ENABLED); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 6aff556e7fa..45b050493e9 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -93,6 +93,21 @@ public class MySqlDataSourceOptions { .withDescription( "The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table."); + public static final ConfigOption SCAN_SNAPSHOT_HOSTNAME = + ConfigOptions.key("scan.snapshot.hostname") + .stringType() + .noDefaultValue() + .withDescription( + "Optional IP address or hostname of a MySQL read replica to use for snapshot and metadata" + + "queries (table discovery, chunk splitting). When set, all snapshot data reads and metadata" + + "operations are routed to this instance, reducing load on the primary writer." + + "Binlog position tracking and changelog streaming always use the primary writer instance." + + "WARNING: At-least-once semantics only. When this option is set, exactly-once" + + "guarantees cannot be maintained because the binlog positions recorded during snapshot" + + "scanning originate from the primary writer while the data is read from the replica, and" + + "storage replication lag (even in the millisecond range, as with Aurora/RDS) means the" + + "two positions may not be perfectly consistent."); + public static final ConfigOption SCAN_SNAPSHOT_FETCH_SIZE = ConfigOptions.key("scan.snapshot.fetch.size") .intType() From 4c3312823ab4f4042b54a8c7e1a326011c76e7b5 Mon Sep 17 00:00:00 2001 From: Luca Occhipinti Date: Tue, 31 Mar 2026 21:45:07 +0200 Subject: [PATCH 06/13] connection factory logic --- .../source/config/MySqlSourceConfig.java | 8 ++++++ .../config/MySqlSourceConfigFactory.java | 12 +++++++++ .../source/connection/ConnectionPoolId.java | 12 +++++++++ .../connection/JdbcConnectionFactory.java | 25 ++++++++++++++++--- .../connection/JdbcConnectionPools.java | 16 ++++++++++-- .../connection/PooledDataSourceFactory.java | 14 ++++++++--- 6 files changed, 78 insertions(+), 9 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index 49570463b1f..571bc94d225 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -44,6 +44,7 @@ public class MySqlSourceConfig implements Serializable { private static final long serialVersionUID = 1L; private final String hostname; + @Nullable private final String snapshotHostname; private final int port; private final String username; private final String password; @@ -82,6 +83,7 @@ public class MySqlSourceConfig implements Serializable { MySqlSourceConfig( String hostname, + @Nullable String snapshotHostname, int port, String username, String password, @@ -112,6 +114,7 @@ public class MySqlSourceConfig implements Serializable { boolean useLegacyJsonFormat, boolean assignUnboundedChunkFirst) { this.hostname = checkNotNull(hostname); + this.snapshotHostname = snapshotHostname; this.port = port; this.username = checkNotNull(username); this.password = password; @@ -161,6 +164,11 @@ public String getHostname() { return hostname; } + @Nullable + public String getSnapshotHostname() { + return snapshotHostname; + } + public int getPort() { return port; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 7010d732d33..9311e4262e0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -44,6 +44,7 @@ public class MySqlSourceConfigFactory implements Serializable { private int port = 3306; // default 3306 port private String hostname; + private String snapshotHostname; private String username; private String password; private ServerIdRange serverIdRange; @@ -81,6 +82,16 @@ public MySqlSourceConfigFactory hostname(String hostname) { return this; } + /** + * Optional hostname of a MySQL read replica to use for snapshot queries. When set, snapshot + * data will be read from this replica instead of the primary writer instance, reducing the load + * on the writer. The binlog position will still be obtained from the primary writer instance. + */ + public MySqlSourceConfigFactory snapshotHostname(String snapshotHostname) { + this.snapshotHostname = snapshotHostname; + return this; + } + /** Integer port number of the MySQL database server. */ public MySqlSourceConfigFactory port(int port) { this.port = port; @@ -401,6 +412,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { return new MySqlSourceConfig( hostname, + snapshotHostname, port, username, password, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/ConnectionPoolId.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/ConnectionPoolId.java index 16c831b6488..7618757eeed 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/ConnectionPoolId.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/ConnectionPoolId.java @@ -34,6 +34,18 @@ public ConnectionPoolId(String host, int port, String username) { this.username = username; } + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public String getUsername() { + return username; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java index 320c8868692..bda7eadc868 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java @@ -35,23 +35,40 @@ public class JdbcConnectionFactory implements JdbcConnection.ConnectionFactory { private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionFactory.class); private final MySqlSourceConfig sourceConfig; + private final String hostnameOverride; public JdbcConnectionFactory(MySqlSourceConfig sourceConfig) { + this(sourceConfig, null); + } + + /** + * Creates a factory that connects to a specific hostname instead of the configured one. + * + * @param sourceConfig the source configuration + * @param hostnameOverride if non-null, connect to this host instead of + * sourceConfig.getHostname() + */ + public JdbcConnectionFactory(MySqlSourceConfig sourceConfig, String hostnameOverride) { this.sourceConfig = sourceConfig; + this.hostnameOverride = hostnameOverride; } @Override public Connection connect(JdbcConfiguration config) throws SQLException { final int connectRetryTimes = sourceConfig.getConnectMaxRetries(); + final String targetHostname = + hostnameOverride != null ? hostnameOverride : sourceConfig.getHostname(); final ConnectionPoolId connectionPoolId = new ConnectionPoolId( - sourceConfig.getHostname(), - sourceConfig.getPort(), - sourceConfig.getUsername()); + targetHostname, sourceConfig.getPort(), sourceConfig.getUsername()); HikariDataSource dataSource = - JdbcConnectionPools.getInstance() + hostnameOverride != null + ? JdbcConnectionPools.getInstance() + .getOrCreateConnectionPool( + connectionPoolId, sourceConfig, hostnameOverride) + : JdbcConnectionPools.getInstance() .getOrCreateConnectionPool(connectionPoolId, sourceConfig); int i = 0; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java index 9505a559ad0..9e0f2d7656f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java @@ -44,10 +44,22 @@ public static JdbcConnectionPools getInstance() { @Override public HikariDataSource getOrCreateConnectionPool( ConnectionPoolId poolId, MySqlSourceConfig sourceConfig) { + return getOrCreateConnectionPool(poolId, sourceConfig, poolId.getHost()); + } + + /** + * Gets or creates a connection pool for the specified pool ID with a hostname override. This is + * useful for creating reader connection pools that connect to a different host than the primary + * writer. + */ + public HikariDataSource getOrCreateConnectionPool( + ConnectionPoolId poolId, MySqlSourceConfig sourceConfig, String hostname) { synchronized (pools) { if (!pools.containsKey(poolId)) { - LOG.info("Create and register connection pool {}", poolId); - pools.put(poolId, PooledDataSourceFactory.createPooledDataSource(sourceConfig)); + LOG.info("Create and register connection pool {} for hostname {}", poolId, hostname); + pools.put( + poolId, + PooledDataSourceFactory.createPooledDataSource(sourceConfig, hostname)); } return pools.get(poolId); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java index e5f6bcd37ab..c9997d2b5f8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java @@ -41,14 +41,22 @@ public class PooledDataSourceFactory { private PooledDataSourceFactory() {} public static HikariDataSource createPooledDataSource(MySqlSourceConfig sourceConfig) { + return createPooledDataSource(sourceConfig, sourceConfig.getHostname()); + } + + /** + * Creates a pooled data source with a specific hostname. This is useful when connecting to a + * read replica for snapshot queries while using the primary for binlog operations. + */ + public static HikariDataSource createPooledDataSource( + MySqlSourceConfig sourceConfig, String hostname) { final HikariConfig config = new HikariConfig(); - String hostName = sourceConfig.getHostname(); int port = sourceConfig.getPort(); Properties jdbcProperties = sourceConfig.getJdbcProperties(); - config.setPoolName(CONNECTION_POOL_PREFIX + hostName + ":" + port); - config.setJdbcUrl(formatJdbcUrl(hostName, port, jdbcProperties)); + config.setPoolName(CONNECTION_POOL_PREFIX + hostname + ":" + port); + config.setJdbcUrl(formatJdbcUrl(hostname, port, jdbcProperties)); config.setUsername(sourceConfig.getUsername()); config.setPassword(sourceConfig.getPassword()); config.setMinimumIdle(MINIMUM_POOL_SIZE); From e8ae44e5c9fe3d74a7d226bc28b427badbd633a4 Mon Sep 17 00:00:00 2001 From: Luca Occhipinti Date: Tue, 31 Mar 2026 21:46:57 +0200 Subject: [PATCH 07/13] metadata queries in snapshot with fallback --- .../mysql/debezium/DebeziumUtils.java | 54 +++++++++++++- .../debezium/reader/SnapshotSplitReader.java | 5 +- .../task/MySqlSnapshotSplitReadTask.java | 72 +++++++++++++++---- .../task/context/StatefulTaskContext.java | 39 +++++++++- .../connectors/mysql/source/MySqlSource.java | 4 +- .../mysql/source/MySqlSourceBuilder.java | 12 ++++ .../source/assigners/MySqlChunkSplitter.java | 2 +- .../assigners/MySqlSnapshotSplitAssigner.java | 6 +- .../config/MySqlSourceConfigFactory.java | 2 +- 9 files changed, 172 insertions(+), 24 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java index 4e512a81c67..483fa6b8f22 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java @@ -62,8 +62,11 @@ public class DebeziumUtils { private static final Logger LOG = LoggerFactory.getLogger(DebeziumUtils.class); - /** Creates and opens a new {@link JdbcConnection} backing connection pool. */ + /** Creates and opens a new {@link JdbcConnection} to the primary writer instance. */ public static JdbcConnection openJdbcConnection(MySqlSourceConfig sourceConfig) { + LOG.info( + "Opening a new JDBC connection to MySQL server at {}", + sourceConfig.getHostname()); JdbcConnection jdbc = new JdbcConnection( JdbcConfiguration.adapt(sourceConfig.getDbzConfiguration()), @@ -73,7 +76,33 @@ public static JdbcConnection openJdbcConnection(MySqlSourceConfig sourceConfig) try { jdbc.connect(); } catch (Exception e) { - LOG.error("Failed to open MySQL connection", e); + LOG.error( + "Failed to open MySQL connection to {}", sourceConfig.getHostname(), e); + throw new FlinkRuntimeException(e); + } + return jdbc; + } + + /** + * Creates and opens a new {@link JdbcConnection} for metadata and snapshot queries. Routes to + * the snapshot instance when {@link MySqlSourceConfig#getSnapshotHostname()} is configured, + * otherwise falls back to the primary writer. + */ + public static JdbcConnection openSnapshotJdbcConnection(MySqlSourceConfig sourceConfig) { + String snapshotHostname = sourceConfig.getSnapshotHostname(); + String targetHost = + snapshotHostname != null ? snapshotHostname : sourceConfig.getHostname(); + LOG.info("Opening a new JDBC connection for metadata queries at {}", targetHost); + JdbcConnection jdbc = + new JdbcConnection( + JdbcConfiguration.adapt(sourceConfig.getDbzConfiguration()), + new JdbcConnectionFactory(sourceConfig, snapshotHostname), + QUOTED_CHARACTER, + QUOTED_CHARACTER); + try { + jdbc.connect(); + } catch (Exception e) { + LOG.error("Failed to open MySQL connection to {}", targetHost, e); throw new FlinkRuntimeException(e); } return jdbc; @@ -85,6 +114,27 @@ public static MySqlConnection createMySqlConnection(MySqlSourceConfig sourceConf sourceConfig.getDbzConfiguration(), sourceConfig.getJdbcProperties()); } + /** + * Creates a new {@link MySqlConnection} to the snapshot instance for snapshot queries. If no + * snapshot hostname is configured, returns a connection to the primary. The connection is not + * opened. + */ + public static MySqlConnection createSnapshotMySqlConnection(MySqlSourceConfig sourceConfig) { + String snapshotHostname = sourceConfig.getSnapshotHostname(); + if (snapshotHostname == null) { + LOG.debug( + "No snapshot hostname configured, using primary for snapshot queries"); + return createMySqlConnection(sourceConfig); + } + LOG.info( + "Creating MySQL connection for snapshot queries at {}", + snapshotHostname); + Configuration dbzConfig = sourceConfig.getDbzConfiguration(); + Configuration snapshotConfig = + dbzConfig.edit().with("database.hostname", snapshotHostname).build(); + return createMySqlConnection(snapshotConfig, sourceConfig.getJdbcProperties()); + } + /** Creates a new {@link MySqlConnection}, but not open the connection. */ public static MySqlConnection createMySqlConnection( Configuration dbzConfiguration, Properties jdbcProperties) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index b7b23a18e0e..0c5bbb7c842 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -69,6 +69,7 @@ import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createBinaryClient; import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection; +import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createSnapshotMySqlConnection; /** * A snapshot reader that reads data from Table in split level, the split is assigned by primary key @@ -103,7 +104,8 @@ public SnapshotSplitReader( new StatefulTaskContext( sourceConfig, createBinaryClient(sourceConfig.getDbzConfiguration()), - createMySqlConnection(sourceConfig)), + createMySqlConnection(sourceConfig), + createSnapshotMySqlConnection(sourceConfig)), subtaskId, hooks); } @@ -143,6 +145,7 @@ public void submitSplit(MySqlSplit mySqlSplit) { statefulTaskContext.getSnapshotChangeEventSourceMetrics(), statefulTaskContext.getDatabaseSchema(), statefulTaskContext.getConnection(), + statefulTaskContext.getSnapshotConnection(), statefulTaskContext.getDispatcher(), statefulTaskContext.getTopicSelector(), statefulTaskContext.getSnapshotReceiver(), diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java index f729f59d05f..ebd331aeb38 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java @@ -73,7 +73,8 @@ public class MySqlSnapshotSplitReadTask private final MySqlSourceConfig sourceConfig; private final MySqlDatabaseSchema databaseSchema; - private final MySqlConnection jdbcConnection; + private final MySqlConnection primaryConnection; + private final MySqlConnection snapshotConnection; private final EventDispatcherImpl dispatcher; private final Clock clock; private final MySqlSnapshotSplit snapshotSplit; @@ -84,12 +85,19 @@ public class MySqlSnapshotSplitReadTask private final SnapshotPhaseHooks hooks; private final boolean isBackfillSkipped; + /** + * Creates a new MySqlSnapshotSplitReadTask with separate writer and reader connections. + * + * @param primaryConnection Connection to the primary writer instance (used for binlog position) + * @param snapshotConnection Connection to the snapshot instance (used for snapshot data queries) + */ public MySqlSnapshotSplitReadTask( MySqlSourceConfig sourceConfig, MySqlConnectorConfig connectorConfig, SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics, MySqlDatabaseSchema databaseSchema, - MySqlConnection jdbcConnection, + MySqlConnection primaryConnection, + MySqlConnection snapshotConnection, EventDispatcherImpl dispatcher, TopicSelector topicSelector, EventDispatcher.SnapshotReceiver snapshotReceiver, @@ -100,7 +108,8 @@ public MySqlSnapshotSplitReadTask( super(connectorConfig, snapshotChangeEventSourceMetrics); this.sourceConfig = sourceConfig; this.databaseSchema = databaseSchema; - this.jdbcConnection = jdbcConnection; + this.primaryConnection = primaryConnection; + this.snapshotConnection = snapshotConnection; this.dispatcher = dispatcher; this.clock = clock; this.snapshotSplit = snapshotSplit; @@ -111,6 +120,39 @@ public MySqlSnapshotSplitReadTask( this.isBackfillSkipped = isBackfillSkipped; } + /** + * Legacy constructor for backward compatibility. Uses same connection for both writer and + * snapshot operations. + */ + public MySqlSnapshotSplitReadTask( + MySqlSourceConfig sourceConfig, + MySqlConnectorConfig connectorConfig, + SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics, + MySqlDatabaseSchema databaseSchema, + MySqlConnection jdbcConnection, + EventDispatcherImpl dispatcher, + TopicSelector topicSelector, + EventDispatcher.SnapshotReceiver snapshotReceiver, + Clock clock, + MySqlSnapshotSplit snapshotSplit, + SnapshotPhaseHooks hooks, + boolean isBackfillSkipped) { + this( + sourceConfig, + connectorConfig, + snapshotChangeEventSourceMetrics, + databaseSchema, + jdbcConnection, + jdbcConnection, + dispatcher, + topicSelector, + snapshotReceiver, + clock, + snapshotSplit, + hooks, + isBackfillSkipped); + } + @Override public SnapshotResult execute( ChangeEventSourceContext context, @@ -151,9 +193,10 @@ protected SnapshotResult doExecute( dispatcher.getQueue()); if (hooks.getPreLowWatermarkAction() != null) { - hooks.getPreLowWatermarkAction().accept(jdbcConnection, snapshotSplit); + hooks.getPreLowWatermarkAction().accept(primaryConnection, snapshotSplit); } - final BinlogOffset lowWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection); + // Use writer connection for binlog position (low watermark) + final BinlogOffset lowWatermark = DebeziumUtils.currentBinlogOffset(primaryConnection); LOG.info( "Snapshot step 1 - Determining low watermark {} for split {}", lowWatermark, @@ -164,14 +207,14 @@ protected SnapshotResult doExecute( snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW); if (hooks.getPostLowWatermarkAction() != null) { - hooks.getPostLowWatermarkAction().accept(jdbcConnection, snapshotSplit); + hooks.getPostLowWatermarkAction().accept(primaryConnection, snapshotSplit); } LOG.info("Snapshot step 2 - Snapshotting data"); createDataEvents(ctx, snapshotSplit.getTableId()); if (hooks.getPreHighWatermarkAction() != null) { - hooks.getPreHighWatermarkAction().accept(jdbcConnection, snapshotSplit); + hooks.getPreHighWatermarkAction().accept(primaryConnection, snapshotSplit); } BinlogOffset highWatermark; @@ -185,8 +228,8 @@ protected SnapshotResult doExecute( // phase. highWatermark = lowWatermark; } else { - // Get the current binlog offset as HW - highWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection); + // Use writer connection for binlog position (high watermark) + highWatermark = DebeziumUtils.currentBinlogOffset(primaryConnection); } LOG.info( @@ -199,7 +242,7 @@ protected SnapshotResult doExecute( .setHighWatermark(highWatermark); if (hooks.getPostHighWatermarkAction() != null) { - hooks.getPostHighWatermarkAction().accept(jdbcConnection, snapshotSplit); + hooks.getPostHighWatermarkAction().accept(primaryConnection, snapshotSplit); } return SnapshotResult.completed(ctx.offset); } @@ -254,9 +297,14 @@ private void createDataEventsForTable( table.id(), selectSql); + LOG.debug( + "Executing snapshot query for split '{}' of table {} via host {}", + snapshotSplit.splitId(), + table.id(), + snapshotConnection.connectionConfig().hostname()); try (PreparedStatement selectStatement = StatementUtils.readTableSplitDataStatement( - jdbcConnection, + snapshotConnection, selectSql, snapshotSplit.getSplitStart() == null, snapshotSplit.getSplitEnd() == null, @@ -337,7 +385,7 @@ else if (actualColumn.jdbcType() == Types.TIMESTAMP) { return readTimestampField(rs, fieldNo, actualColumn, actualTable); } // JDBC's rs.GetObject() will return a Boolean for all TINYINT(1) columns. - // TINYINT columns are reprtoed as SMALLINT by JDBC driver + // TINYINT columns are reported as SMALLINT by JDBC driver else if (actualColumn.jdbcType() == Types.TINYINT || actualColumn.jdbcType() == Types.SMALLINT) { // It seems that rs.wasNull() returns false when default value is set and NULL is diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java index 09a5fe8e85c..df2b6a77908 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java @@ -82,6 +82,7 @@ public class StatefulTaskContext implements AutoCloseable { private final MySqlEventMetadataProvider metadataProvider; private final SchemaNameAdjuster schemaNameAdjuster; private final MySqlConnection connection; + private final MySqlConnection snapshotConnection; private final BinaryLogClient binaryLogClient; private MySqlDatabaseSchema databaseSchema; @@ -97,16 +98,37 @@ public class StatefulTaskContext implements AutoCloseable { private ChangeEventQueue queue; private ErrorHandler errorHandler; + /** + * Creates a StatefulTaskContext with separate primary and replica connections. + * + * @param sourceConfig The MySQL source configuration + * @param binaryLogClient The binary log client for binlog streaming + * @param primaryConnection Connection to the primary writer instance + * @param snapshotConnection Connection to the snapshot instance for snapshot queries + */ public StatefulTaskContext( MySqlSourceConfig sourceConfig, BinaryLogClient binaryLogClient, - MySqlConnection connection) { + MySqlConnection primaryConnection, + MySqlConnection snapshotConnection) { this.sourceConfig = sourceConfig; this.connectorConfig = sourceConfig.getMySqlConnectorConfig(); this.schemaNameAdjuster = SchemaNameAdjuster.create(); this.metadataProvider = new MySqlEventMetadataProvider(); this.binaryLogClient = binaryLogClient; - this.connection = connection; + this.connection = primaryConnection; + this.snapshotConnection = snapshotConnection; + } + + /** + * Legacy constructor for backward compatibility. Uses same connection for both primary and + * snapshot operations. + */ + public StatefulTaskContext( + MySqlSourceConfig sourceConfig, + BinaryLogClient binaryLogClient, + MySqlConnection connection) { + this(sourceConfig, binaryLogClient, connection, connection); } public void configure(MySqlSplit mySqlSplit) { @@ -310,6 +332,10 @@ public void close() throws Exception { if (connection != null) { connection.close(); } + // Close snapshot connection only if it's a distinct object from the primary connection + if (snapshotConnection != null && snapshotConnection != connection) { + snapshotConnection.close(); + } if (binaryLogClient != null) { binaryLogClient.disconnect(); } @@ -388,6 +414,15 @@ public MySqlConnection getConnection() { return connection; } + /** + * Returns the snapshot connection for snapshot queries. If a snapshot hostname is configured, + * this connection points to the snapshot instance; otherwise, it's the same as the primary + * connection. + */ + public MySqlConnection getSnapshotConnection() { + return snapshotConnection; + } + public BinaryLogClient getBinaryLogClient() { return binaryLogClient; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java index d7f03057455..c65a690893c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java @@ -204,7 +204,7 @@ public SplitEnumerator createEnumerator( final MySqlSplitAssigner splitAssigner; // In snapshot-only startup option, only split snapshots. if (sourceConfig.getStartupOptions().isSnapshotOnly()) { - try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) { + try (JdbcConnection jdbc = DebeziumUtils.openSnapshotJdbcConnection(sourceConfig)) { boolean isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc); splitAssigner = new MySqlSnapshotSplitAssigner( @@ -218,7 +218,7 @@ public SplitEnumerator createEnumerator( "Failed to discover captured tables for enumerator", e); } } else if (!sourceConfig.getStartupOptions().isStreamOnly()) { - try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) { + try (JdbcConnection jdbc = DebeziumUtils.openSnapshotJdbcConnection(sourceConfig)) { boolean isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc); splitAssigner = new MySqlHybridSplitAssigner( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java index b37ed19e63c..ff669a68c6e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java @@ -59,6 +59,18 @@ public MySqlSourceBuilder hostname(String hostname) { return this; } + /** + * Optional IP address or hostname of a MySQL read replica. When specified, snapshot data will + * be read from this replica instead of the primary writer instance, reducing load on the + * writer. The binlog position will still be obtained from the primary writer. This is + * particularly useful for Aurora/RDS deployments where you want to offload snapshot read + * traffic. + */ + public MySqlSourceBuilder snapshotHostname(String snapshotHostname) { + this.configFactory.snapshotHostname(snapshotHostname); + return this; + } + /** Integer port number of the MySQL database server. */ public MySqlSourceBuilder port(int port) { this.configFactory.port(port); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java index 4821eaba2ea..36b702351b3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java @@ -106,7 +106,7 @@ private MySqlChunkSplitter( @Override public void open() { - this.jdbcConnection = DebeziumUtils.openJdbcConnection(sourceConfig); + this.jdbcConnection = DebeziumUtils.openSnapshotJdbcConnection(sourceConfig); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index 1acbeac941b..3800beb0981 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -192,7 +192,7 @@ private void discoveryCaptureTables() { if (needToDiscoveryTables()) { long start = System.currentTimeMillis(); LOG.debug("The remainingTables is empty, start to discovery tables"); - try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) { + try (JdbcConnection jdbc = DebeziumUtils.openSnapshotJdbcConnection(sourceConfig)) { final List discoverTables = DebeziumUtils.discoverCapturedTables(jdbc, sourceConfig); this.remainingTables.addAll(discoverTables); @@ -208,7 +208,7 @@ private void discoveryCaptureTables() { // remaining tables, discovery remaining table here else if (!isRemainingTablesCheckpointed && !AssignerStatus.isSnapshotAssigningFinished(assignerStatus)) { - try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) { + try (JdbcConnection jdbc = DebeziumUtils.openSnapshotJdbcConnection(sourceConfig)) { final List discoverTables = DebeziumUtils.discoverCapturedTables(jdbc, sourceConfig); discoverTables.removeAll(alreadyProcessedTables); @@ -227,7 +227,7 @@ private void captureNewlyAddedTables() { && !sourceConfig.getStartupOptions().isSnapshotOnly() && AssignerStatus.isAssigningFinished(assignerStatus)) { // check whether we got newly added tables - try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) { + try (JdbcConnection jdbc = DebeziumUtils.openSnapshotJdbcConnection(sourceConfig)) { final List currentCapturedTables = DebeziumUtils.discoverCapturedTables(jdbc, sourceConfig); final Set previousCapturedTables = new HashSet<>(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 9311e4262e0..20662885544 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -1,4 +1,4 @@ -/* +w/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. From 7271336a55c7903292235f71931594c62271de5f Mon Sep 17 00:00:00 2001 From: Luca Occhipinti Date: Tue, 31 Mar 2026 21:48:01 +0200 Subject: [PATCH 08/13] add unit tests --- .../flink-connector-mysql-cdc/pom.xml | 7 + .../mysql/debezium/DebeziumUtilsTest.java | 83 ++++++- ...qlSnapshotSplitReadTaskConnectionTest.java | 210 ++++++++++++++++++ .../connection/ReaderConnectionTest.java | 162 ++++++++++++++ 4 files changed, 457 insertions(+), 5 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml index 96366a9af91..d2638975765 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml @@ -169,6 +169,13 @@ limitations under the License. + + org.mockito + mockito-core + 3.4.6 + test + + org.testcontainers mysql diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java index eeae681ea7b..b9179cf5913 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java @@ -1,4 +1,4 @@ -/* +flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -31,6 +31,10 @@ /** Tests for {@link org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils}. */ class DebeziumUtilsTest { + + private static final String PRIMARY_HOSTNAME = "writer-host"; + private static final String SNAPSHOT_HOSTNAME = "reader-host"; + @Test void testCreateMySqlConnection() { // test without set useSSL @@ -39,7 +43,7 @@ void testCreateMySqlConnection() { MySqlSourceConfig configWithoutUseSSL = getConfig(jdbcProps); MySqlConnection connection0 = DebeziumUtils.createMySqlConnection(configWithoutUseSSL); assertJdbcUrl( - "jdbc:mysql://localhost:3306/?useSSL=false&connectTimeout=20000&useInformationSchema=true" + "jdbc:mysql://writer-host:3306/?useSSL=false&connectTimeout=20000&useInformationSchema=true" + "&nullCatalogMeansCurrent=false&characterSetResults=UTF-8&onlyTest=test" + "&zeroDateTimeBehavior=CONVERT_TO_NULL&characterEncoding=UTF-8&useUnicode=true", connection0.connectionString()); @@ -49,7 +53,7 @@ void testCreateMySqlConnection() { MySqlSourceConfig configNotUseSSL = getConfig(jdbcProps); MySqlConnection connection1 = DebeziumUtils.createMySqlConnection(configNotUseSSL); assertJdbcUrl( - "jdbc:mysql://localhost:3306/?connectTimeout=20000&useInformationSchema=true" + "jdbc:mysql://writer-host:3306/?connectTimeout=20000&useInformationSchema=true" + "&nullCatalogMeansCurrent=false&characterSetResults=UTF-8&useSSL=false&onlyTest=test" + "&zeroDateTimeBehavior=CONVERT_TO_NULL&characterEncoding=UTF-8&useUnicode=true", connection1.connectionString()); @@ -59,19 +63,88 @@ void testCreateMySqlConnection() { MySqlSourceConfig configUseSSL = getConfig(jdbcProps); MySqlConnection connection2 = DebeziumUtils.createMySqlConnection(configUseSSL); assertJdbcUrl( - "jdbc:mysql://localhost:3306/?connectTimeout=20000&useInformationSchema=true" + "jdbc:mysql://writer-host:3306/?connectTimeout=20000&useInformationSchema=true" + "&nullCatalogMeansCurrent=false&characterSetResults=UTF-8&useSSL=true&onlyTest=test" + "&zeroDateTimeBehavior=CONVERT_TO_NULL&characterEncoding=UTF-8&useUnicode=true", connection2.connectionString()); } + @Test + void testCreateSnapshotMySqlConnectionWithSnapshotHostname() { + Properties jdbcProps = new Properties(); + MySqlSourceConfig config = getConfigWithSnapshotHostname(jdbcProps, SNAPSHOT_HOSTNAME); + + // Create reader connection - should use reader hostname + MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config); + + // Create primary connection - should use primary hostname + MySqlConnection primaryConnection = DebeziumUtils.createMySqlConnection(config); + + // Reader connection should point to reader hostname + Assertions.assertThat(readerConnection.connectionString()).contains(SNAPSHOT_HOSTNAME); + Assertions.assertThat(readerConnection.connectionString()).doesNotContain(PRIMARY_HOSTNAME); + + // Primary connection should point to primary hostname + Assertions.assertThat(primaryConnection.connectionString()).contains(PRIMARY_HOSTNAME); + Assertions.assertThat(primaryConnection.connectionString()).doesNotContain(SNAPSHOT_HOSTNAME); + } + + @Test + void testCreateSnapshotMySqlConnectionFallbackToPrimary() { + Properties jdbcProps = new Properties(); + MySqlSourceConfig config = getConfig(jdbcProps); + + // When no reader hostname is configured, should fall back to primary + MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config); + MySqlConnection primaryConnection = DebeziumUtils.createMySqlConnection(config); + + // Both should point to the primary hostname + Assertions.assertThat(readerConnection.connectionString()).contains(PRIMARY_HOSTNAME); + Assertions.assertThat(primaryConnection.connectionString()).contains(PRIMARY_HOSTNAME); + } + + @Test + void testSnapshotHostnameConfigurationProperty() { + Properties jdbcProps = new Properties(); + + // Config without reader hostname + MySqlSourceConfig configWithoutReader = getConfig(jdbcProps); + Assertions.assertThat(configWithoutReader.getSnapshotHostname()).isNull(); + + // Config with reader hostname + MySqlSourceConfig configWithReader = + getConfigWithSnapshotHostname(jdbcProps, SNAPSHOT_HOSTNAME); + Assertions.assertThat(configWithReader.getSnapshotHostname()).isEqualTo(SNAPSHOT_HOSTNAME); + Assertions.assertThat(configWithReader.getHostname()).isEqualTo(PRIMARY_HOSTNAME); + } + private MySqlSourceConfig getConfig(Properties jdbcProperties) { return new MySqlSourceConfigFactory() .startupOptions(StartupOptions.initial()) .databaseList("fakeDb") .tableList("fakeDb.fakeTable") .includeSchemaChanges(false) - .hostname("localhost") + .hostname(PRIMARY_HOSTNAME) + .port(3306) + .splitSize(10) + .fetchSize(2) + .connectTimeout(Duration.ofSeconds(20)) + .username("fakeUser") + .password("fakePw") + .serverTimeZone(ZoneId.of("UTC").toString()) + .jdbcProperties(jdbcProperties) + .createConfig(0); + } + + private MySqlSourceConfig getConfigWithSnapshotHostname( + Properties jdbcProperties, String snapshotHostname) { + return new MySqlSourceConfigFactory() + .startupOptions(StartupOptions.initial()) + .databaseList("fakeDb") + .tableList("fakeDb.fakeTable") + .includeSchemaChanges(false) + .hostname(PRIMARY_HOSTNAME) + .snapshotHostname(snapshotHostname) .port(3306) .splitSize(10) .fetchSize(2) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java new file mode 100644 index 00000000000..f9dabf5101b --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.debezium.task; + +import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; +import org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import io.debezium.connector.mysql.MySqlConnection; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.ZoneId; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests for {@link MySqlSnapshotSplitReadTask} to verify correct usage of writer and reader + * connections. + * + *

The key behavior being tested: + * + *

    + *
  • Writer connection is used for binlog position retrieval (SHOW MASTER STATUS) + *
  • Reader connection is used for snapshot data queries (SELECT statements) + *
  • When reader hostname is not configured, reader connection falls back to writer + *
+ */ +class MySqlSnapshotSplitReadTaskConnectionTest { + + private static final String PRIMARY_HOSTNAME = "writer-host"; + private static final String SNAPSHOT_HOSTNAME = "reader-host"; + private static final int PORT = 3306; + + @Test + void testStatefulTaskContextWithSeparateConnections() { + MySqlSourceConfig config = createConfig(PRIMARY_HOSTNAME, SNAPSHOT_HOSTNAME); + BinaryLogClient binaryLogClient = mock(BinaryLogClient.class); + + // Create separate writer and reader connections + MySqlConnection writerConnection = DebeziumUtils.createMySqlConnection(config); + MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config); + + // Create StatefulTaskContext with both connections + StatefulTaskContext context = + new StatefulTaskContext(config, binaryLogClient, writerConnection, readerConnection); + + // Verify the context returns the correct connections + assertThat(context.getConnection()).isSameAs(writerConnection); + assertThat(context.getSnapshotConnection()).isSameAs(readerConnection); + + // Verify writer and reader connections point to different hosts + assertThat(writerConnection.connectionString()).contains(PRIMARY_HOSTNAME); + assertThat(readerConnection.connectionString()).contains(SNAPSHOT_HOSTNAME); + } + + @Test + void testStatefulTaskContextWithSameConnectionWhenNoReaderConfigured() { + MySqlSourceConfig config = createConfig(PRIMARY_HOSTNAME, null); + BinaryLogClient binaryLogClient = mock(BinaryLogClient.class); + + // When no reader hostname is configured, createReaderMySqlConnection falls back to writer + MySqlConnection writerConnection = DebeziumUtils.createMySqlConnection(config); + MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config); + + // Create StatefulTaskContext + StatefulTaskContext context = + new StatefulTaskContext(config, binaryLogClient, writerConnection, readerConnection); + + // Verify both connections point to the same host (writer) + assertThat(context.getConnection().connectionString()).contains(PRIMARY_HOSTNAME); + assertThat(context.getSnapshotConnection().connectionString()).contains(PRIMARY_HOSTNAME); + } + + @Test + void testStatefulTaskContextLegacyConstructorUsesSameConnection() { + MySqlSourceConfig config = createConfig(PRIMARY_HOSTNAME, SNAPSHOT_HOSTNAME); + BinaryLogClient binaryLogClient = mock(BinaryLogClient.class); + + // Use the legacy constructor that takes only one connection + MySqlConnection connection = DebeziumUtils.createMySqlConnection(config); + StatefulTaskContext context = + new StatefulTaskContext(config, binaryLogClient, connection); + + // Both getConnection() and getReaderConnection() should return the same instance + assertThat(context.getConnection()).isSameAs(context.getSnapshotConnection()); + } + + @Test + void testWriterConnectionUsedForBinlogPosition() { + MySqlSourceConfig config = createConfig(PRIMARY_HOSTNAME, SNAPSHOT_HOSTNAME); + + // The writer connection should be used for binlog position queries + // (SHOW MASTER STATUS only works on the primary/writer) + MySqlConnection writerConnection = DebeziumUtils.createMySqlConnection(config); + MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config); + + // Verify writer points to writer host (required for binlog operations) + assertThat(writerConnection.connectionString()).contains(PRIMARY_HOSTNAME); + assertThat(writerConnection.connectionString()).doesNotContain(SNAPSHOT_HOSTNAME); + + // Verify reader points to reader host (for snapshot data queries) + assertThat(readerConnection.connectionString()).contains(SNAPSHOT_HOSTNAME); + assertThat(readerConnection.connectionString()).doesNotContain(PRIMARY_HOSTNAME); + } + + @Test + void testReaderConnectionFallbackWhenReaderHostnameIsNull() { + MySqlSourceConfig configWithoutReader = createConfig(PRIMARY_HOSTNAME, null); + MySqlSourceConfig configWithReader = createConfig(PRIMARY_HOSTNAME, SNAPSHOT_HOSTNAME); + + // When scanSnapshotHostname is null, createReaderMySqlConnection should return a connection + // that uses the primary hostname + MySqlConnection readerConnectionWithoutConfig = + DebeziumUtils.createSnapshotMySqlConnection(configWithoutReader); + MySqlConnection readerConnectionWithConfig = + DebeziumUtils.createSnapshotMySqlConnection(configWithReader); + + // Without reader configured: should use writer hostname + assertThat(readerConnectionWithoutConfig.connectionString()).contains(PRIMARY_HOSTNAME); + + // With reader configured: should use reader hostname + assertThat(readerConnectionWithConfig.connectionString()).contains(SNAPSHOT_HOSTNAME); + assertThat(readerConnectionWithConfig.connectionString()).doesNotContain(PRIMARY_HOSTNAME); + } + + /** + * Verifies that when snapshotConnection and connection are different objects (replica + * configured), both are closed independently. + */ + @Test + void testCloseWithSeparateConnectionsClosesBoth() throws Exception { + MySqlSourceConfig config = createConfig(PRIMARY_HOSTNAME, SNAPSHOT_HOSTNAME); + BinaryLogClient binaryLogClient = mock(BinaryLogClient.class); + MySqlConnection primaryConnection = mock(MySqlConnection.class); + MySqlConnection snapshotConnection = mock(MySqlConnection.class); + + StatefulTaskContext context = + new StatefulTaskContext( + config, binaryLogClient, primaryConnection, snapshotConnection); + context.close(); + + verify(primaryConnection, times(1)).close(); + verify(snapshotConnection, times(1)).close(); + } + + /** + * Verifies that when snapshotConnection and connection are the same object (no replica + * configured), close() is called exactly once — not twice. + */ + @Test + void testCloseWithSharedConnectionClosesOnce() throws Exception { + MySqlSourceConfig config = createConfig(PRIMARY_HOSTNAME, null); + BinaryLogClient binaryLogClient = mock(BinaryLogClient.class); + MySqlConnection sharedConnection = mock(MySqlConnection.class); + + // Legacy constructor passes the same object for both connection fields + StatefulTaskContext context = + new StatefulTaskContext(config, binaryLogClient, sharedConnection); + context.close(); + + verify(sharedConnection, times(1)).close(); + } + + private MySqlSourceConfig createConfig(String hostname, String snapshotHostname) { + MySqlSourceConfigFactory factory = + new MySqlSourceConfigFactory() + .startupOptions(StartupOptions.initial()) + .databaseList("testdb") + .tableList("testdb.testtable") + .includeSchemaChanges(false) + .hostname(hostname) + .port(PORT) + .splitSize(10) + .fetchSize(2) + .connectTimeout(Duration.ofSeconds(20)) + .username("testuser") + .password("testpw") + .serverTimeZone(ZoneId.of("UTC").toString()) + .jdbcProperties(new Properties()); + + if (snapshotHostname != null) { + factory.snapshotHostname(snapshotHostname); + } + + return factory.createConfig(0); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java new file mode 100644 index 00000000000..9a8809934a7 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.connection; + +import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; + +import io.debezium.connector.mysql.MySqlConnection; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.ZoneId; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for reader hostname connection handling. Verifies that the reader connection is used when + * configured, and falls back to the writer connection when not configured. + */ +class ReaderConnectionTest { + + private static final String WRITER_HOSTNAME = "writer-host"; + private static final String READER_HOSTNAME = "reader-host"; + private static final int PORT = 3306; + + @Test + void testReaderHostnameConfigured() { + MySqlSourceConfig config = createConfig(WRITER_HOSTNAME, READER_HOSTNAME); + + assertThat(config.getHostname()).isEqualTo(WRITER_HOSTNAME); + assertThat(config.getSnapshotHostname()).isEqualTo(READER_HOSTNAME); + } + + @Test + void testReaderHostnameNotConfigured() { + MySqlSourceConfig config = createConfig(WRITER_HOSTNAME, null); + + assertThat(config.getHostname()).isEqualTo(WRITER_HOSTNAME); + assertThat(config.getSnapshotHostname()).isNull(); + } + + @Test + void testCreateReaderMySqlConnectionWithReaderHostname() { + MySqlSourceConfig config = createConfig(WRITER_HOSTNAME, READER_HOSTNAME); + + // Create reader connection - should use reader hostname + MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config); + String readerConnectionString = readerConnection.connectionString(); + + // Create writer connection - should use writer hostname + MySqlConnection writerConnection = DebeziumUtils.createMySqlConnection(config); + String writerConnectionString = writerConnection.connectionString(); + + // Reader connection should contain reader hostname + assertThat(readerConnectionString).contains(READER_HOSTNAME); + assertThat(readerConnectionString).doesNotContain(WRITER_HOSTNAME); + + // Writer connection should contain writer hostname + assertThat(writerConnectionString).contains(WRITER_HOSTNAME); + assertThat(writerConnectionString).doesNotContain(READER_HOSTNAME); + } + + @Test + void testCreateReaderMySqlConnectionFallsBackToWriterWhenNoReaderConfigured() { + MySqlSourceConfig config = createConfig(WRITER_HOSTNAME, null); + + // Create reader connection - should fall back to writer hostname + MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config); + String readerConnectionString = readerConnection.connectionString(); + + // Create writer connection + MySqlConnection writerConnection = DebeziumUtils.createMySqlConnection(config); + String writerConnectionString = writerConnection.connectionString(); + + // Both should use the writer hostname when reader is not configured + assertThat(readerConnectionString).contains(WRITER_HOSTNAME); + assertThat(writerConnectionString).contains(WRITER_HOSTNAME); + } + + @Test + void testJdbcConnectionFactoryUsesHostnameOverride() { + MySqlSourceConfig config = createConfig(WRITER_HOSTNAME, null); + + // Factory without hostname override - should use config hostname + JdbcConnectionFactory factoryWithoutOverride = new JdbcConnectionFactory(config); + + // Factory with hostname override - should use override + JdbcConnectionFactory factoryWithOverride = + new JdbcConnectionFactory(config, READER_HOSTNAME); + + // Verify the factories are created with correct configuration + assertThat(factoryWithoutOverride).isNotNull(); + assertThat(factoryWithOverride).isNotNull(); + } + + @Test + void testConnectionPoolIdDifferentForWriterAndReader() { + // Pool IDs should be different for writer and reader to ensure separate pools + ConnectionPoolId writerPoolId = + new ConnectionPoolId(WRITER_HOSTNAME, PORT, "testuser"); + ConnectionPoolId readerPoolId = + new ConnectionPoolId(READER_HOSTNAME, PORT, "testuser"); + + assertThat(writerPoolId).isNotEqualTo(readerPoolId); + assertThat(writerPoolId.getHost()).isEqualTo(WRITER_HOSTNAME); + assertThat(readerPoolId.getHost()).isEqualTo(READER_HOSTNAME); + } + + @Test + void testConnectionPoolIdSameForSameHostname() { + // Same hostname should result in equal pool IDs + ConnectionPoolId poolId1 = + new ConnectionPoolId(WRITER_HOSTNAME, PORT, "testuser"); + ConnectionPoolId poolId2 = + new ConnectionPoolId(WRITER_HOSTNAME, PORT, "testuser"); + + assertThat(poolId1).isEqualTo(poolId2); + assertThat(poolId1.hashCode()).isEqualTo(poolId2.hashCode()); + } + + private MySqlSourceConfig createConfig(String hostname, String scanSnapshotHostname) { + MySqlSourceConfigFactory factory = + new MySqlSourceConfigFactory() + .startupOptions(StartupOptions.initial()) + .databaseList("testdb") + .tableList("testdb.testtable") + .includeSchemaChanges(false) + .hostname(hostname) + .port(PORT) + .splitSize(10) + .fetchSize(2) + .connectTimeout(Duration.ofSeconds(20)) + .username("testuser") + .password("testpw") + .serverTimeZone(ZoneId.of("UTC").toString()) + .jdbcProperties(new Properties()); + + if (scanSnapshotHostname != null) { + factory.snapshotHostname(scanSnapshotHostname); + } + + return factory.createConfig(0); + } +} From 3eba7fa8bf6c4ec72268d1fbabf23d679e270c56 Mon Sep 17 00:00:00 2001 From: Luca Occhipinti Date: Tue, 31 Mar 2026 21:48:11 +0200 Subject: [PATCH 09/13] update docs --- .../connectors/pipeline-connectors/mysql.md | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index a8964cdaffb..b3f4780dd78 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -163,6 +163,24 @@ pipeline: Integer The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table. + + scan.snapshot.hostname + optional + (none) + String + Optional IP address or hostname of a MySQL read replica to use for snapshot and metadata + queries (table discovery, chunk splitting). When set, all snapshot data reads and metadata + operations are routed to this instance, reducing load on the primary writer. Binlog + position tracking and changelog streaming always use the primary writer instance. +

+ ⚠ WARNING: At-least-once semantics only. When this option is set, exactly-once + guarantees cannot be maintained because the binlog positions recorded during snapshot + scanning originate from the primary writer while the data is read from the replica, and + storage replication lag (even in the millisecond range, as with Aurora/RDS) means the + two positions may not be perfectly consistent. Duplicate records are possible during + recovery. See Read Replica Snapshot for details. + + scan.snapshot.fetch.size optional @@ -398,6 +416,47 @@ source: # ... ``` +## Read Replica Snapshot + +The `scan.snapshot.hostname` option allows routing snapshot data reads and metadata queries +(table discovery, chunk splitting) to a MySQL read replica, while all binlog operations remain +on the primary writer instance. + +This is particularly useful in AWS Aurora/RDS deployments where the primary writer handles +high-throughput transactional workloads and you want to avoid the additional I/O pressure of +a full-table snapshot scan. + +### How it works + +During the snapshot phase, Flink CDC performs the following sequence for each chunk: + +1. **Record low watermark** — queries `SHOW MASTER STATUS` on the **primary writer** to capture the current binlog position. +2. **Scan chunk data** — executes `SELECT` queries on the **snapshot instance** (`scan.snapshot.hostname`). +3. **Record high watermark** — queries `SHOW MASTER STATUS` on the **primary writer** again. +4. **Backfill** — replays changelog events from the primary between low and high watermark to catch up any changes that occurred during the scan. + +Metadata operations (table discovery, `SELECT MIN/MAX` for chunk splitting) also run on the snapshot instance. + +### At-least-once semantics + +> **⚠ Warning:** When `scan.snapshot.hostname` is configured, exactly-once delivery cannot be guaranteed. + +The low/high watermark binlog positions are recorded from the primary writer, while the data is read from the replica. Even with storage-level replication (as used by Aurora), replication lag — even at the millisecond scale — means there is no strict guarantee that the replica's data at the moment of scanning corresponds exactly to the primary's binlog position recorded as the watermark. As a result, some events may be delivered more than once after a job restart or failover. + +This is a known and accepted trade-off: **reduced load on the primary writer at the cost of at-least-once semantics** during the snapshot phase. + +### Example + +```yaml +source: + type: mysql + hostname: writer.cluster.us-east-1.rds.amazonaws.com + scan.snapshot.hostname: reader.cluster-ro.us-east-1.rds.amazonaws.com + username: flink + password: secret + tables: mydb.orders +``` + ## Available Source metrics Metrics can help understand the progress of assignments, and the following are the supported [Flink metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/): From 37bda5c20255339add7cbb6d0ac5c05173059ab1 Mon Sep 17 00:00:00 2001 From: Luca Occhipinti Date: Tue, 31 Mar 2026 22:13:43 +0200 Subject: [PATCH 10/13] fix: typo --- .../mysql/source/config/MySqlSourceConfigFactory.java | 2 +- .../flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 01dbc17928b..c528984edf3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -1,4 +1,4 @@ -w/* +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java index b9179cf5913..e1dbd27094b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java @@ -1,4 +1,4 @@ -flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml/* +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. From a754bd515b2a129aa049c455c3523841079868cf Mon Sep 17 00:00:00 2001 From: Luca Occhipinti Date: Wed, 1 Apr 2026 10:16:05 +0200 Subject: [PATCH 11/13] fix: spotless check --- .../connectors/mysql/debezium/DebeziumUtils.java | 14 ++++---------- .../debezium/task/MySqlSnapshotSplitReadTask.java | 3 ++- .../source/connection/JdbcConnectionFactory.java | 8 ++++---- .../source/connection/JdbcConnectionPools.java | 3 ++- .../mysql/debezium/DebeziumUtilsTest.java | 3 ++- .../MySqlSnapshotSplitReadTaskConnectionTest.java | 9 +++++---- .../source/connection/ReaderConnectionTest.java | 12 ++++-------- 7 files changed, 23 insertions(+), 29 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java index 14d2e9033ef..297a8e47d6b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java @@ -64,9 +64,7 @@ public class DebeziumUtils { /** Creates and opens a new {@link JdbcConnection} to the primary writer instance. */ public static JdbcConnection openJdbcConnection(MySqlSourceConfig sourceConfig) { - LOG.info( - "Opening a new JDBC connection to MySQL server at {}", - sourceConfig.getHostname()); + LOG.info("Opening a new JDBC connection to MySQL server at {}", sourceConfig.getHostname()); JdbcConnection jdbc = new JdbcConnection( JdbcConfiguration.adapt(sourceConfig.getDbzConfiguration()), @@ -76,8 +74,7 @@ public static JdbcConnection openJdbcConnection(MySqlSourceConfig sourceConfig) try { jdbc.connect(); } catch (Exception e) { - LOG.error( - "Failed to open MySQL connection to {}", sourceConfig.getHostname(), e); + LOG.error("Failed to open MySQL connection to {}", sourceConfig.getHostname(), e); throw new FlinkRuntimeException(e); } return jdbc; @@ -122,13 +119,10 @@ public static MySqlConnection createMySqlConnection(MySqlSourceConfig sourceConf public static MySqlConnection createSnapshotMySqlConnection(MySqlSourceConfig sourceConfig) { String snapshotHostname = sourceConfig.getSnapshotHostname(); if (snapshotHostname == null) { - LOG.debug( - "No snapshot hostname configured, using primary for snapshot queries"); + LOG.debug("No snapshot hostname configured, using primary for snapshot queries"); return createMySqlConnection(sourceConfig); } - LOG.info( - "Creating MySQL connection for snapshot queries at {}", - snapshotHostname); + LOG.info("Creating MySQL connection for snapshot queries at {}", snapshotHostname); Configuration dbzConfig = sourceConfig.getDbzConfiguration(); Configuration snapshotConfig = dbzConfig.edit().with("database.hostname", snapshotHostname).build(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java index ebd331aeb38..57d398cec4d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java @@ -89,7 +89,8 @@ public class MySqlSnapshotSplitReadTask * Creates a new MySqlSnapshotSplitReadTask with separate writer and reader connections. * * @param primaryConnection Connection to the primary writer instance (used for binlog position) - * @param snapshotConnection Connection to the snapshot instance (used for snapshot data queries) + * @param snapshotConnection Connection to the snapshot instance (used for snapshot data + * queries) */ public MySqlSnapshotSplitReadTask( MySqlSourceConfig sourceConfig, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java index bda7eadc868..f36e5b08df9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java @@ -46,7 +46,7 @@ public JdbcConnectionFactory(MySqlSourceConfig sourceConfig) { * * @param sourceConfig the source configuration * @param hostnameOverride if non-null, connect to this host instead of - * sourceConfig.getHostname() + * sourceConfig.getHostname() */ public JdbcConnectionFactory(MySqlSourceConfig sourceConfig, String hostnameOverride) { this.sourceConfig = sourceConfig; @@ -66,10 +66,10 @@ public Connection connect(JdbcConfiguration config) throws SQLException { HikariDataSource dataSource = hostnameOverride != null ? JdbcConnectionPools.getInstance() - .getOrCreateConnectionPool( - connectionPoolId, sourceConfig, hostnameOverride) + .getOrCreateConnectionPool( + connectionPoolId, sourceConfig, hostnameOverride) : JdbcConnectionPools.getInstance() - .getOrCreateConnectionPool(connectionPoolId, sourceConfig); + .getOrCreateConnectionPool(connectionPoolId, sourceConfig); int i = 0; while (i < connectRetryTimes) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java index 9e0f2d7656f..93a4deb19eb 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java @@ -56,7 +56,8 @@ public HikariDataSource getOrCreateConnectionPool( ConnectionPoolId poolId, MySqlSourceConfig sourceConfig, String hostname) { synchronized (pools) { if (!pools.containsKey(poolId)) { - LOG.info("Create and register connection pool {} for hostname {}", poolId, hostname); + LOG.info( + "Create and register connection pool {} for hostname {}", poolId, hostname); pools.put( poolId, PooledDataSourceFactory.createPooledDataSource(sourceConfig, hostname)); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java index e1dbd27094b..4f0698a9e06 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java @@ -86,7 +86,8 @@ void testCreateSnapshotMySqlConnectionWithSnapshotHostname() { // Primary connection should point to primary hostname Assertions.assertThat(primaryConnection.connectionString()).contains(PRIMARY_HOSTNAME); - Assertions.assertThat(primaryConnection.connectionString()).doesNotContain(SNAPSHOT_HOSTNAME); + Assertions.assertThat(primaryConnection.connectionString()) + .doesNotContain(SNAPSHOT_HOSTNAME); } @Test diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java index f9dabf5101b..deab2851293 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java @@ -65,7 +65,8 @@ void testStatefulTaskContextWithSeparateConnections() { // Create StatefulTaskContext with both connections StatefulTaskContext context = - new StatefulTaskContext(config, binaryLogClient, writerConnection, readerConnection); + new StatefulTaskContext( + config, binaryLogClient, writerConnection, readerConnection); // Verify the context returns the correct connections assertThat(context.getConnection()).isSameAs(writerConnection); @@ -87,7 +88,8 @@ void testStatefulTaskContextWithSameConnectionWhenNoReaderConfigured() { // Create StatefulTaskContext StatefulTaskContext context = - new StatefulTaskContext(config, binaryLogClient, writerConnection, readerConnection); + new StatefulTaskContext( + config, binaryLogClient, writerConnection, readerConnection); // Verify both connections point to the same host (writer) assertThat(context.getConnection().connectionString()).contains(PRIMARY_HOSTNAME); @@ -101,8 +103,7 @@ void testStatefulTaskContextLegacyConstructorUsesSameConnection() { // Use the legacy constructor that takes only one connection MySqlConnection connection = DebeziumUtils.createMySqlConnection(config); - StatefulTaskContext context = - new StatefulTaskContext(config, binaryLogClient, connection); + StatefulTaskContext context = new StatefulTaskContext(config, binaryLogClient, connection); // Both getConnection() and getReaderConnection() should return the same instance assertThat(context.getConnection()).isSameAs(context.getSnapshotConnection()); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java index 9a8809934a7..e8f4d39cde3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java @@ -114,10 +114,8 @@ void testJdbcConnectionFactoryUsesHostnameOverride() { @Test void testConnectionPoolIdDifferentForWriterAndReader() { // Pool IDs should be different for writer and reader to ensure separate pools - ConnectionPoolId writerPoolId = - new ConnectionPoolId(WRITER_HOSTNAME, PORT, "testuser"); - ConnectionPoolId readerPoolId = - new ConnectionPoolId(READER_HOSTNAME, PORT, "testuser"); + ConnectionPoolId writerPoolId = new ConnectionPoolId(WRITER_HOSTNAME, PORT, "testuser"); + ConnectionPoolId readerPoolId = new ConnectionPoolId(READER_HOSTNAME, PORT, "testuser"); assertThat(writerPoolId).isNotEqualTo(readerPoolId); assertThat(writerPoolId.getHost()).isEqualTo(WRITER_HOSTNAME); @@ -127,10 +125,8 @@ void testConnectionPoolIdDifferentForWriterAndReader() { @Test void testConnectionPoolIdSameForSameHostname() { // Same hostname should result in equal pool IDs - ConnectionPoolId poolId1 = - new ConnectionPoolId(WRITER_HOSTNAME, PORT, "testuser"); - ConnectionPoolId poolId2 = - new ConnectionPoolId(WRITER_HOSTNAME, PORT, "testuser"); + ConnectionPoolId poolId1 = new ConnectionPoolId(WRITER_HOSTNAME, PORT, "testuser"); + ConnectionPoolId poolId2 = new ConnectionPoolId(WRITER_HOSTNAME, PORT, "testuser"); assertThat(poolId1).isEqualTo(poolId2); assertThat(poolId1.hashCode()).isEqualTo(poolId2.hashCode()); From c43e378b0cfdb995ecb59a628ee6c54fcf3bef33 Mon Sep 17 00:00:00 2001 From: Luca Occhipinti Date: Sun, 5 Apr 2026 22:51:32 +0200 Subject: [PATCH 12/13] fix: address PR review comments for scan.snapshot.hostname feature MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove duplicate mockito-core dependency (hardcoded 3.4.6) from pom.xml, keep single entry using ${mockito.version} (3.12.4) - Fix missing spaces in withDescription() string concatenations in MySqlDataSourceOptions for SCAN_SNAPSHOT_HOSTNAME option - Add getHostnameOverride() package-private getter to JdbcConnectionFactory and strengthen testJdbcConnectionFactoryUsesHostnameOverride() to assert the override hostname is correctly stored rather than just non-null - Fix comment in MySqlSnapshotSplitReadTaskConnectionTest referencing getReaderConnection() → getSnapshotConnection() - Normalize empty string snapshotHostname to null in MySqlSourceConfigFactory so downstream != null guards in DebeziumUtils work correctly Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../mysql/source/MySqlDataSourceOptions.java | 16 ++++++++-------- .../flink-connector-mysql-cdc/pom.xml | 7 ------- .../source/config/MySqlSourceConfigFactory.java | 5 ++++- .../source/connection/JdbcConnectionFactory.java | 4 ++++ ...MySqlSnapshotSplitReadTaskConnectionTest.java | 2 +- .../source/connection/ReaderConnectionTest.java | 8 +++++--- 6 files changed, 22 insertions(+), 20 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 45b050493e9..0d727ad13ca 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -98,14 +98,14 @@ public class MySqlDataSourceOptions { .stringType() .noDefaultValue() .withDescription( - "Optional IP address or hostname of a MySQL read replica to use for snapshot and metadata" - + "queries (table discovery, chunk splitting). When set, all snapshot data reads and metadata" - + "operations are routed to this instance, reducing load on the primary writer." - + "Binlog position tracking and changelog streaming always use the primary writer instance." - + "WARNING: At-least-once semantics only. When this option is set, exactly-once" - + "guarantees cannot be maintained because the binlog positions recorded during snapshot" - + "scanning originate from the primary writer while the data is read from the replica, and" - + "storage replication lag (even in the millisecond range, as with Aurora/RDS) means the" + "Optional IP address or hostname of a MySQL read replica to use for snapshot and metadata " + + "queries (table discovery, chunk splitting). When set, all snapshot data reads and metadata " + + "operations are routed to this instance, reducing load on the primary writer. " + + "Binlog position tracking and changelog streaming always use the primary writer instance. " + + "WARNING: At-least-once semantics only. When this option is set, exactly-once " + + "guarantees cannot be maintained because the binlog positions recorded during snapshot " + + "scanning originate from the primary writer while the data is read from the replica, and " + + "storage replication lag (even in the millisecond range, as with Aurora/RDS) means the " + "two positions may not be perfectly consistent."); public static final ConfigOption SCAN_SNAPSHOT_FETCH_SIZE = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml index 4b295f80cfa..026181af508 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml @@ -185,13 +185,6 @@ limitations under the License. - - org.mockito - mockito-core - 3.4.6 - test - - org.testcontainers mysql diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index c528984edf3..79800f7f37b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -89,9 +89,12 @@ public MySqlSourceConfigFactory hostname(String hostname) { * Optional hostname of a MySQL read replica to use for snapshot queries. When set, snapshot * data will be read from this replica instead of the primary writer instance, reducing the load * on the writer. The binlog position will still be obtained from the primary writer instance. + * + *

An empty string is treated as {@code null}, falling back to the primary writer instance. */ public MySqlSourceConfigFactory snapshotHostname(String snapshotHostname) { - this.snapshotHostname = snapshotHostname; + this.snapshotHostname = + (snapshotHostname != null && snapshotHostname.isEmpty()) ? null : snapshotHostname; return this; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java index f36e5b08df9..535f5ff5a90 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java @@ -53,6 +53,10 @@ public JdbcConnectionFactory(MySqlSourceConfig sourceConfig, String hostnameOver this.hostnameOverride = hostnameOverride; } + String getHostnameOverride() { + return hostnameOverride; + } + @Override public Connection connect(JdbcConfiguration config) throws SQLException { final int connectRetryTimes = sourceConfig.getConnectMaxRetries(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java index deab2851293..e205a11c088 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java @@ -105,7 +105,7 @@ void testStatefulTaskContextLegacyConstructorUsesSameConnection() { MySqlConnection connection = DebeziumUtils.createMySqlConnection(config); StatefulTaskContext context = new StatefulTaskContext(config, binaryLogClient, connection); - // Both getConnection() and getReaderConnection() should return the same instance + // Both getConnection() and getSnapshotConnection() should return the same instance assertThat(context.getConnection()).isSameAs(context.getSnapshotConnection()); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java index e8f4d39cde3..b194dcfdac7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java @@ -106,9 +106,11 @@ void testJdbcConnectionFactoryUsesHostnameOverride() { JdbcConnectionFactory factoryWithOverride = new JdbcConnectionFactory(config, READER_HOSTNAME); - // Verify the factories are created with correct configuration - assertThat(factoryWithoutOverride).isNotNull(); - assertThat(factoryWithOverride).isNotNull(); + // Verify the override hostname is stored correctly + assertThat(factoryWithoutOverride.getHostnameOverride()).isNull(); + assertThat(factoryWithOverride.getHostnameOverride()).isEqualTo(READER_HOSTNAME); + assertThat(factoryWithOverride.getHostnameOverride()) + .isNotEqualTo(config.getHostname()); } @Test From 6c86decee09da2789fa1c8fb341f1187e362abfa Mon Sep 17 00:00:00 2001 From: Luca Occhipinti Date: Sun, 5 Apr 2026 22:58:26 +0200 Subject: [PATCH 13/13] fix: trim strings for null-safety --- .../mysql/source/config/MySqlSourceConfigFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 79800f7f37b..48fab4ed771 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -94,7 +94,7 @@ public MySqlSourceConfigFactory hostname(String hostname) { */ public MySqlSourceConfigFactory snapshotHostname(String snapshotHostname) { this.snapshotHostname = - (snapshotHostname != null && snapshotHostname.isEmpty()) ? null : snapshotHostname; + (snapshotHostname != null && snapshotHostname.trim().isEmpty()) ? null : snapshotHostname; return this; }