previousCapturedTables = new HashSet<>();
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
index cf456fcaed0..99d97bfcd2b 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
@@ -44,6 +44,7 @@ public class MySqlSourceConfig implements Serializable {
private static final long serialVersionUID = 1L;
private final String hostname;
+ @Nullable private final String snapshotHostname;
private final int port;
private final String username;
private final String password;
@@ -83,6 +84,7 @@ public class MySqlSourceConfig implements Serializable {
MySqlSourceConfig(
String hostname,
+ @Nullable String snapshotHostname,
int port,
String username,
String password,
@@ -114,6 +116,7 @@ public class MySqlSourceConfig implements Serializable {
boolean useLegacyJsonFormat,
boolean assignUnboundedChunkFirst) {
this.hostname = checkNotNull(hostname);
+ this.snapshotHostname = snapshotHostname;
this.port = port;
this.username = checkNotNull(username);
this.password = password;
@@ -164,6 +167,11 @@ public String getHostname() {
return hostname;
}
+ @Nullable
+ public String getSnapshotHostname() {
+ return snapshotHostname;
+ }
+
public int getPort() {
return port;
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
index 569b62232db..48fab4ed771 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
@@ -46,6 +46,7 @@ public class MySqlSourceConfigFactory implements Serializable {
private int port = 3306; // default 3306 port
private String hostname;
+ private String snapshotHostname;
private String username;
private String password;
private ServerIdRange serverIdRange;
@@ -84,6 +85,19 @@ public MySqlSourceConfigFactory hostname(String hostname) {
return this;
}
+ /**
+ * Optional hostname of a MySQL read replica to use for snapshot queries. When set, snapshot
+ * data will be read from this replica instead of the primary writer instance, reducing the load
+ * on the writer. The binlog position will still be obtained from the primary writer instance.
+ *
+ * An empty string is treated as {@code null}, falling back to the primary writer instance.
+ */
+ public MySqlSourceConfigFactory snapshotHostname(String snapshotHostname) {
+ this.snapshotHostname =
+ (snapshotHostname != null && snapshotHostname.trim().isEmpty()) ? null : snapshotHostname;
+ return this;
+ }
+
/** Integer port number of the MySQL database server. */
public MySqlSourceConfigFactory port(int port) {
this.port = port;
@@ -415,6 +429,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
return new MySqlSourceConfig(
hostname,
+ snapshotHostname,
port,
username,
password,
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/ConnectionPoolId.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/ConnectionPoolId.java
index 16c831b6488..7618757eeed 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/ConnectionPoolId.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/ConnectionPoolId.java
@@ -34,6 +34,18 @@ public ConnectionPoolId(String host, int port, String username) {
this.username = username;
}
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java
index 320c8868692..535f5ff5a90 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java
@@ -35,24 +35,45 @@ public class JdbcConnectionFactory implements JdbcConnection.ConnectionFactory {
private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionFactory.class);
private final MySqlSourceConfig sourceConfig;
+ private final String hostnameOverride;
public JdbcConnectionFactory(MySqlSourceConfig sourceConfig) {
+ this(sourceConfig, null);
+ }
+
+ /**
+ * Creates a factory that connects to a specific hostname instead of the configured one.
+ *
+ * @param sourceConfig the source configuration
+ * @param hostnameOverride if non-null, connect to this host instead of
+ * sourceConfig.getHostname()
+ */
+ public JdbcConnectionFactory(MySqlSourceConfig sourceConfig, String hostnameOverride) {
this.sourceConfig = sourceConfig;
+ this.hostnameOverride = hostnameOverride;
+ }
+
+ String getHostnameOverride() {
+ return hostnameOverride;
}
@Override
public Connection connect(JdbcConfiguration config) throws SQLException {
final int connectRetryTimes = sourceConfig.getConnectMaxRetries();
+ final String targetHostname =
+ hostnameOverride != null ? hostnameOverride : sourceConfig.getHostname();
final ConnectionPoolId connectionPoolId =
new ConnectionPoolId(
- sourceConfig.getHostname(),
- sourceConfig.getPort(),
- sourceConfig.getUsername());
+ targetHostname, sourceConfig.getPort(), sourceConfig.getUsername());
HikariDataSource dataSource =
- JdbcConnectionPools.getInstance()
- .getOrCreateConnectionPool(connectionPoolId, sourceConfig);
+ hostnameOverride != null
+ ? JdbcConnectionPools.getInstance()
+ .getOrCreateConnectionPool(
+ connectionPoolId, sourceConfig, hostnameOverride)
+ : JdbcConnectionPools.getInstance()
+ .getOrCreateConnectionPool(connectionPoolId, sourceConfig);
int i = 0;
while (i < connectRetryTimes) {
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java
index 9505a559ad0..93a4deb19eb 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java
@@ -44,10 +44,23 @@ public static JdbcConnectionPools getInstance() {
@Override
public HikariDataSource getOrCreateConnectionPool(
ConnectionPoolId poolId, MySqlSourceConfig sourceConfig) {
+ return getOrCreateConnectionPool(poolId, sourceConfig, poolId.getHost());
+ }
+
+ /**
+ * Gets or creates a connection pool for the specified pool ID with a hostname override. This is
+ * useful for creating reader connection pools that connect to a different host than the primary
+ * writer.
+ */
+ public HikariDataSource getOrCreateConnectionPool(
+ ConnectionPoolId poolId, MySqlSourceConfig sourceConfig, String hostname) {
synchronized (pools) {
if (!pools.containsKey(poolId)) {
- LOG.info("Create and register connection pool {}", poolId);
- pools.put(poolId, PooledDataSourceFactory.createPooledDataSource(sourceConfig));
+ LOG.info(
+ "Create and register connection pool {} for hostname {}", poolId, hostname);
+ pools.put(
+ poolId,
+ PooledDataSourceFactory.createPooledDataSource(sourceConfig, hostname));
}
return pools.get(poolId);
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java
index e5f6bcd37ab..c9997d2b5f8 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java
@@ -41,14 +41,22 @@ public class PooledDataSourceFactory {
private PooledDataSourceFactory() {}
public static HikariDataSource createPooledDataSource(MySqlSourceConfig sourceConfig) {
+ return createPooledDataSource(sourceConfig, sourceConfig.getHostname());
+ }
+
+ /**
+ * Creates a pooled data source with a specific hostname. This is useful when connecting to a
+ * read replica for snapshot queries while using the primary for binlog operations.
+ */
+ public static HikariDataSource createPooledDataSource(
+ MySqlSourceConfig sourceConfig, String hostname) {
final HikariConfig config = new HikariConfig();
- String hostName = sourceConfig.getHostname();
int port = sourceConfig.getPort();
Properties jdbcProperties = sourceConfig.getJdbcProperties();
- config.setPoolName(CONNECTION_POOL_PREFIX + hostName + ":" + port);
- config.setJdbcUrl(formatJdbcUrl(hostName, port, jdbcProperties));
+ config.setPoolName(CONNECTION_POOL_PREFIX + hostname + ":" + port);
+ config.setJdbcUrl(formatJdbcUrl(hostname, port, jdbcProperties));
config.setUsername(sourceConfig.getUsername());
config.setPassword(sourceConfig.getPassword());
config.setMinimumIdle(MINIMUM_POOL_SIZE);
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java
index eeae681ea7b..4f0698a9e06 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java
@@ -31,6 +31,10 @@
/** Tests for {@link org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils}. */
class DebeziumUtilsTest {
+
+ private static final String PRIMARY_HOSTNAME = "writer-host";
+ private static final String SNAPSHOT_HOSTNAME = "reader-host";
+
@Test
void testCreateMySqlConnection() {
// test without set useSSL
@@ -39,7 +43,7 @@ void testCreateMySqlConnection() {
MySqlSourceConfig configWithoutUseSSL = getConfig(jdbcProps);
MySqlConnection connection0 = DebeziumUtils.createMySqlConnection(configWithoutUseSSL);
assertJdbcUrl(
- "jdbc:mysql://localhost:3306/?useSSL=false&connectTimeout=20000&useInformationSchema=true"
+ "jdbc:mysql://writer-host:3306/?useSSL=false&connectTimeout=20000&useInformationSchema=true"
+ "&nullCatalogMeansCurrent=false&characterSetResults=UTF-8&onlyTest=test"
+ "&zeroDateTimeBehavior=CONVERT_TO_NULL&characterEncoding=UTF-8&useUnicode=true",
connection0.connectionString());
@@ -49,7 +53,7 @@ void testCreateMySqlConnection() {
MySqlSourceConfig configNotUseSSL = getConfig(jdbcProps);
MySqlConnection connection1 = DebeziumUtils.createMySqlConnection(configNotUseSSL);
assertJdbcUrl(
- "jdbc:mysql://localhost:3306/?connectTimeout=20000&useInformationSchema=true"
+ "jdbc:mysql://writer-host:3306/?connectTimeout=20000&useInformationSchema=true"
+ "&nullCatalogMeansCurrent=false&characterSetResults=UTF-8&useSSL=false&onlyTest=test"
+ "&zeroDateTimeBehavior=CONVERT_TO_NULL&characterEncoding=UTF-8&useUnicode=true",
connection1.connectionString());
@@ -59,19 +63,89 @@ void testCreateMySqlConnection() {
MySqlSourceConfig configUseSSL = getConfig(jdbcProps);
MySqlConnection connection2 = DebeziumUtils.createMySqlConnection(configUseSSL);
assertJdbcUrl(
- "jdbc:mysql://localhost:3306/?connectTimeout=20000&useInformationSchema=true"
+ "jdbc:mysql://writer-host:3306/?connectTimeout=20000&useInformationSchema=true"
+ "&nullCatalogMeansCurrent=false&characterSetResults=UTF-8&useSSL=true&onlyTest=test"
+ "&zeroDateTimeBehavior=CONVERT_TO_NULL&characterEncoding=UTF-8&useUnicode=true",
connection2.connectionString());
}
+ @Test
+ void testCreateSnapshotMySqlConnectionWithSnapshotHostname() {
+ Properties jdbcProps = new Properties();
+ MySqlSourceConfig config = getConfigWithSnapshotHostname(jdbcProps, SNAPSHOT_HOSTNAME);
+
+ // Create reader connection - should use reader hostname
+ MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config);
+
+ // Create primary connection - should use primary hostname
+ MySqlConnection primaryConnection = DebeziumUtils.createMySqlConnection(config);
+
+ // Reader connection should point to reader hostname
+ Assertions.assertThat(readerConnection.connectionString()).contains(SNAPSHOT_HOSTNAME);
+ Assertions.assertThat(readerConnection.connectionString()).doesNotContain(PRIMARY_HOSTNAME);
+
+ // Primary connection should point to primary hostname
+ Assertions.assertThat(primaryConnection.connectionString()).contains(PRIMARY_HOSTNAME);
+ Assertions.assertThat(primaryConnection.connectionString())
+ .doesNotContain(SNAPSHOT_HOSTNAME);
+ }
+
+ @Test
+ void testCreateSnapshotMySqlConnectionFallbackToPrimary() {
+ Properties jdbcProps = new Properties();
+ MySqlSourceConfig config = getConfig(jdbcProps);
+
+ // When no reader hostname is configured, should fall back to primary
+ MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config);
+ MySqlConnection primaryConnection = DebeziumUtils.createMySqlConnection(config);
+
+ // Both should point to the primary hostname
+ Assertions.assertThat(readerConnection.connectionString()).contains(PRIMARY_HOSTNAME);
+ Assertions.assertThat(primaryConnection.connectionString()).contains(PRIMARY_HOSTNAME);
+ }
+
+ @Test
+ void testSnapshotHostnameConfigurationProperty() {
+ Properties jdbcProps = new Properties();
+
+ // Config without reader hostname
+ MySqlSourceConfig configWithoutReader = getConfig(jdbcProps);
+ Assertions.assertThat(configWithoutReader.getSnapshotHostname()).isNull();
+
+ // Config with reader hostname
+ MySqlSourceConfig configWithReader =
+ getConfigWithSnapshotHostname(jdbcProps, SNAPSHOT_HOSTNAME);
+ Assertions.assertThat(configWithReader.getSnapshotHostname()).isEqualTo(SNAPSHOT_HOSTNAME);
+ Assertions.assertThat(configWithReader.getHostname()).isEqualTo(PRIMARY_HOSTNAME);
+ }
+
private MySqlSourceConfig getConfig(Properties jdbcProperties) {
return new MySqlSourceConfigFactory()
.startupOptions(StartupOptions.initial())
.databaseList("fakeDb")
.tableList("fakeDb.fakeTable")
.includeSchemaChanges(false)
- .hostname("localhost")
+ .hostname(PRIMARY_HOSTNAME)
+ .port(3306)
+ .splitSize(10)
+ .fetchSize(2)
+ .connectTimeout(Duration.ofSeconds(20))
+ .username("fakeUser")
+ .password("fakePw")
+ .serverTimeZone(ZoneId.of("UTC").toString())
+ .jdbcProperties(jdbcProperties)
+ .createConfig(0);
+ }
+
+ private MySqlSourceConfig getConfigWithSnapshotHostname(
+ Properties jdbcProperties, String snapshotHostname) {
+ return new MySqlSourceConfigFactory()
+ .startupOptions(StartupOptions.initial())
+ .databaseList("fakeDb")
+ .tableList("fakeDb.fakeTable")
+ .includeSchemaChanges(false)
+ .hostname(PRIMARY_HOSTNAME)
+ .snapshotHostname(snapshotHostname)
.port(3306)
.splitSize(10)
.fetchSize(2)
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java
new file mode 100644
index 00000000000..e205a11c088
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.debezium.task;
+
+import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
+import org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
+import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
+import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
+import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import io.debezium.connector.mysql.MySqlConnection;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for {@link MySqlSnapshotSplitReadTask} to verify correct usage of writer and reader
+ * connections.
+ *
+ *
The key behavior being tested:
+ *
+ *
+ * - Writer connection is used for binlog position retrieval (SHOW MASTER STATUS)
+ *
- Reader connection is used for snapshot data queries (SELECT statements)
+ *
- When reader hostname is not configured, reader connection falls back to writer
+ *
+ */
+class MySqlSnapshotSplitReadTaskConnectionTest {
+
+ private static final String PRIMARY_HOSTNAME = "writer-host";
+ private static final String SNAPSHOT_HOSTNAME = "reader-host";
+ private static final int PORT = 3306;
+
+ @Test
+ void testStatefulTaskContextWithSeparateConnections() {
+ MySqlSourceConfig config = createConfig(PRIMARY_HOSTNAME, SNAPSHOT_HOSTNAME);
+ BinaryLogClient binaryLogClient = mock(BinaryLogClient.class);
+
+ // Create separate writer and reader connections
+ MySqlConnection writerConnection = DebeziumUtils.createMySqlConnection(config);
+ MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config);
+
+ // Create StatefulTaskContext with both connections
+ StatefulTaskContext context =
+ new StatefulTaskContext(
+ config, binaryLogClient, writerConnection, readerConnection);
+
+ // Verify the context returns the correct connections
+ assertThat(context.getConnection()).isSameAs(writerConnection);
+ assertThat(context.getSnapshotConnection()).isSameAs(readerConnection);
+
+ // Verify writer and reader connections point to different hosts
+ assertThat(writerConnection.connectionString()).contains(PRIMARY_HOSTNAME);
+ assertThat(readerConnection.connectionString()).contains(SNAPSHOT_HOSTNAME);
+ }
+
+ @Test
+ void testStatefulTaskContextWithSameConnectionWhenNoReaderConfigured() {
+ MySqlSourceConfig config = createConfig(PRIMARY_HOSTNAME, null);
+ BinaryLogClient binaryLogClient = mock(BinaryLogClient.class);
+
+ // When no reader hostname is configured, createReaderMySqlConnection falls back to writer
+ MySqlConnection writerConnection = DebeziumUtils.createMySqlConnection(config);
+ MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config);
+
+ // Create StatefulTaskContext
+ StatefulTaskContext context =
+ new StatefulTaskContext(
+ config, binaryLogClient, writerConnection, readerConnection);
+
+ // Verify both connections point to the same host (writer)
+ assertThat(context.getConnection().connectionString()).contains(PRIMARY_HOSTNAME);
+ assertThat(context.getSnapshotConnection().connectionString()).contains(PRIMARY_HOSTNAME);
+ }
+
+ @Test
+ void testStatefulTaskContextLegacyConstructorUsesSameConnection() {
+ MySqlSourceConfig config = createConfig(PRIMARY_HOSTNAME, SNAPSHOT_HOSTNAME);
+ BinaryLogClient binaryLogClient = mock(BinaryLogClient.class);
+
+ // Use the legacy constructor that takes only one connection
+ MySqlConnection connection = DebeziumUtils.createMySqlConnection(config);
+ StatefulTaskContext context = new StatefulTaskContext(config, binaryLogClient, connection);
+
+ // Both getConnection() and getSnapshotConnection() should return the same instance
+ assertThat(context.getConnection()).isSameAs(context.getSnapshotConnection());
+ }
+
+ @Test
+ void testWriterConnectionUsedForBinlogPosition() {
+ MySqlSourceConfig config = createConfig(PRIMARY_HOSTNAME, SNAPSHOT_HOSTNAME);
+
+ // The writer connection should be used for binlog position queries
+ // (SHOW MASTER STATUS only works on the primary/writer)
+ MySqlConnection writerConnection = DebeziumUtils.createMySqlConnection(config);
+ MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config);
+
+ // Verify writer points to writer host (required for binlog operations)
+ assertThat(writerConnection.connectionString()).contains(PRIMARY_HOSTNAME);
+ assertThat(writerConnection.connectionString()).doesNotContain(SNAPSHOT_HOSTNAME);
+
+ // Verify reader points to reader host (for snapshot data queries)
+ assertThat(readerConnection.connectionString()).contains(SNAPSHOT_HOSTNAME);
+ assertThat(readerConnection.connectionString()).doesNotContain(PRIMARY_HOSTNAME);
+ }
+
+ @Test
+ void testReaderConnectionFallbackWhenReaderHostnameIsNull() {
+ MySqlSourceConfig configWithoutReader = createConfig(PRIMARY_HOSTNAME, null);
+ MySqlSourceConfig configWithReader = createConfig(PRIMARY_HOSTNAME, SNAPSHOT_HOSTNAME);
+
+ // When scanSnapshotHostname is null, createReaderMySqlConnection should return a connection
+ // that uses the primary hostname
+ MySqlConnection readerConnectionWithoutConfig =
+ DebeziumUtils.createSnapshotMySqlConnection(configWithoutReader);
+ MySqlConnection readerConnectionWithConfig =
+ DebeziumUtils.createSnapshotMySqlConnection(configWithReader);
+
+ // Without reader configured: should use writer hostname
+ assertThat(readerConnectionWithoutConfig.connectionString()).contains(PRIMARY_HOSTNAME);
+
+ // With reader configured: should use reader hostname
+ assertThat(readerConnectionWithConfig.connectionString()).contains(SNAPSHOT_HOSTNAME);
+ assertThat(readerConnectionWithConfig.connectionString()).doesNotContain(PRIMARY_HOSTNAME);
+ }
+
+ /**
+ * Verifies that when snapshotConnection and connection are different objects (replica
+ * configured), both are closed independently.
+ */
+ @Test
+ void testCloseWithSeparateConnectionsClosesBoth() throws Exception {
+ MySqlSourceConfig config = createConfig(PRIMARY_HOSTNAME, SNAPSHOT_HOSTNAME);
+ BinaryLogClient binaryLogClient = mock(BinaryLogClient.class);
+ MySqlConnection primaryConnection = mock(MySqlConnection.class);
+ MySqlConnection snapshotConnection = mock(MySqlConnection.class);
+
+ StatefulTaskContext context =
+ new StatefulTaskContext(
+ config, binaryLogClient, primaryConnection, snapshotConnection);
+ context.close();
+
+ verify(primaryConnection, times(1)).close();
+ verify(snapshotConnection, times(1)).close();
+ }
+
+ /**
+ * Verifies that when snapshotConnection and connection are the same object (no replica
+ * configured), close() is called exactly once — not twice.
+ */
+ @Test
+ void testCloseWithSharedConnectionClosesOnce() throws Exception {
+ MySqlSourceConfig config = createConfig(PRIMARY_HOSTNAME, null);
+ BinaryLogClient binaryLogClient = mock(BinaryLogClient.class);
+ MySqlConnection sharedConnection = mock(MySqlConnection.class);
+
+ // Legacy constructor passes the same object for both connection fields
+ StatefulTaskContext context =
+ new StatefulTaskContext(config, binaryLogClient, sharedConnection);
+ context.close();
+
+ verify(sharedConnection, times(1)).close();
+ }
+
+ private MySqlSourceConfig createConfig(String hostname, String snapshotHostname) {
+ MySqlSourceConfigFactory factory =
+ new MySqlSourceConfigFactory()
+ .startupOptions(StartupOptions.initial())
+ .databaseList("testdb")
+ .tableList("testdb.testtable")
+ .includeSchemaChanges(false)
+ .hostname(hostname)
+ .port(PORT)
+ .splitSize(10)
+ .fetchSize(2)
+ .connectTimeout(Duration.ofSeconds(20))
+ .username("testuser")
+ .password("testpw")
+ .serverTimeZone(ZoneId.of("UTC").toString())
+ .jdbcProperties(new Properties());
+
+ if (snapshotHostname != null) {
+ factory.snapshotHostname(snapshotHostname);
+ }
+
+ return factory.createConfig(0);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java
new file mode 100644
index 00000000000..b194dcfdac7
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source.connection;
+
+import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
+import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
+import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
+import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+
+import io.debezium.connector.mysql.MySqlConnection;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for reader hostname connection handling. Verifies that the reader connection is used when
+ * configured, and falls back to the writer connection when not configured.
+ */
+class ReaderConnectionTest {
+
+ private static final String WRITER_HOSTNAME = "writer-host";
+ private static final String READER_HOSTNAME = "reader-host";
+ private static final int PORT = 3306;
+
+ @Test
+ void testReaderHostnameConfigured() {
+ MySqlSourceConfig config = createConfig(WRITER_HOSTNAME, READER_HOSTNAME);
+
+ assertThat(config.getHostname()).isEqualTo(WRITER_HOSTNAME);
+ assertThat(config.getSnapshotHostname()).isEqualTo(READER_HOSTNAME);
+ }
+
+ @Test
+ void testReaderHostnameNotConfigured() {
+ MySqlSourceConfig config = createConfig(WRITER_HOSTNAME, null);
+
+ assertThat(config.getHostname()).isEqualTo(WRITER_HOSTNAME);
+ assertThat(config.getSnapshotHostname()).isNull();
+ }
+
+ @Test
+ void testCreateReaderMySqlConnectionWithReaderHostname() {
+ MySqlSourceConfig config = createConfig(WRITER_HOSTNAME, READER_HOSTNAME);
+
+ // Create reader connection - should use reader hostname
+ MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config);
+ String readerConnectionString = readerConnection.connectionString();
+
+ // Create writer connection - should use writer hostname
+ MySqlConnection writerConnection = DebeziumUtils.createMySqlConnection(config);
+ String writerConnectionString = writerConnection.connectionString();
+
+ // Reader connection should contain reader hostname
+ assertThat(readerConnectionString).contains(READER_HOSTNAME);
+ assertThat(readerConnectionString).doesNotContain(WRITER_HOSTNAME);
+
+ // Writer connection should contain writer hostname
+ assertThat(writerConnectionString).contains(WRITER_HOSTNAME);
+ assertThat(writerConnectionString).doesNotContain(READER_HOSTNAME);
+ }
+
+ @Test
+ void testCreateReaderMySqlConnectionFallsBackToWriterWhenNoReaderConfigured() {
+ MySqlSourceConfig config = createConfig(WRITER_HOSTNAME, null);
+
+ // Create reader connection - should fall back to writer hostname
+ MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config);
+ String readerConnectionString = readerConnection.connectionString();
+
+ // Create writer connection
+ MySqlConnection writerConnection = DebeziumUtils.createMySqlConnection(config);
+ String writerConnectionString = writerConnection.connectionString();
+
+ // Both should use the writer hostname when reader is not configured
+ assertThat(readerConnectionString).contains(WRITER_HOSTNAME);
+ assertThat(writerConnectionString).contains(WRITER_HOSTNAME);
+ }
+
+ @Test
+ void testJdbcConnectionFactoryUsesHostnameOverride() {
+ MySqlSourceConfig config = createConfig(WRITER_HOSTNAME, null);
+
+ // Factory without hostname override - should use config hostname
+ JdbcConnectionFactory factoryWithoutOverride = new JdbcConnectionFactory(config);
+
+ // Factory with hostname override - should use override
+ JdbcConnectionFactory factoryWithOverride =
+ new JdbcConnectionFactory(config, READER_HOSTNAME);
+
+ // Verify the override hostname is stored correctly
+ assertThat(factoryWithoutOverride.getHostnameOverride()).isNull();
+ assertThat(factoryWithOverride.getHostnameOverride()).isEqualTo(READER_HOSTNAME);
+ assertThat(factoryWithOverride.getHostnameOverride())
+ .isNotEqualTo(config.getHostname());
+ }
+
+ @Test
+ void testConnectionPoolIdDifferentForWriterAndReader() {
+ // Pool IDs should be different for writer and reader to ensure separate pools
+ ConnectionPoolId writerPoolId = new ConnectionPoolId(WRITER_HOSTNAME, PORT, "testuser");
+ ConnectionPoolId readerPoolId = new ConnectionPoolId(READER_HOSTNAME, PORT, "testuser");
+
+ assertThat(writerPoolId).isNotEqualTo(readerPoolId);
+ assertThat(writerPoolId.getHost()).isEqualTo(WRITER_HOSTNAME);
+ assertThat(readerPoolId.getHost()).isEqualTo(READER_HOSTNAME);
+ }
+
+ @Test
+ void testConnectionPoolIdSameForSameHostname() {
+ // Same hostname should result in equal pool IDs
+ ConnectionPoolId poolId1 = new ConnectionPoolId(WRITER_HOSTNAME, PORT, "testuser");
+ ConnectionPoolId poolId2 = new ConnectionPoolId(WRITER_HOSTNAME, PORT, "testuser");
+
+ assertThat(poolId1).isEqualTo(poolId2);
+ assertThat(poolId1.hashCode()).isEqualTo(poolId2.hashCode());
+ }
+
+ private MySqlSourceConfig createConfig(String hostname, String scanSnapshotHostname) {
+ MySqlSourceConfigFactory factory =
+ new MySqlSourceConfigFactory()
+ .startupOptions(StartupOptions.initial())
+ .databaseList("testdb")
+ .tableList("testdb.testtable")
+ .includeSchemaChanges(false)
+ .hostname(hostname)
+ .port(PORT)
+ .splitSize(10)
+ .fetchSize(2)
+ .connectTimeout(Duration.ofSeconds(20))
+ .username("testuser")
+ .password("testpw")
+ .serverTimeZone(ZoneId.of("UTC").toString())
+ .jdbcProperties(new Properties());
+
+ if (scanSnapshotHostname != null) {
+ factory.snapshotHostname(scanSnapshotHostname);
+ }
+
+ return factory.createConfig(0);
+ }
+}