diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index e6e61e20720..d288ec1382d 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -170,6 +170,24 @@ pipeline: Integer The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table. + + scan.snapshot.hostname + optional + (none) + String + Optional IP address or hostname of a MySQL read replica to use for snapshot and metadata + queries (table discovery, chunk splitting). When set, all snapshot data reads and metadata + operations are routed to this instance, reducing load on the primary writer. Binlog + position tracking and changelog streaming always use the primary writer instance. +

+ ⚠ WARNING: At-least-once semantics only. When this option is set, exactly-once + guarantees cannot be maintained because the binlog positions recorded during snapshot + scanning originate from the primary writer while the data is read from the replica, and + storage replication lag (even in the millisecond range, as with Aurora/RDS) means the + two positions may not be perfectly consistent. Duplicate records are possible during + recovery. See Read Replica Snapshot for details. + + scan.snapshot.fetch.size optional @@ -405,6 +423,47 @@ source: # ... ``` +## Read Replica Snapshot + +The `scan.snapshot.hostname` option allows routing snapshot data reads and metadata queries +(table discovery, chunk splitting) to a MySQL read replica, while all binlog operations remain +on the primary writer instance. + +This is particularly useful in AWS Aurora/RDS deployments where the primary writer handles +high-throughput transactional workloads and you want to avoid the additional I/O pressure of +a full-table snapshot scan. + +### How it works + +During the snapshot phase, Flink CDC performs the following sequence for each chunk: + +1. **Record low watermark** — queries `SHOW MASTER STATUS` on the **primary writer** to capture the current binlog position. +2. **Scan chunk data** — executes `SELECT` queries on the **snapshot instance** (`scan.snapshot.hostname`). +3. **Record high watermark** — queries `SHOW MASTER STATUS` on the **primary writer** again. +4. **Backfill** — replays changelog events from the primary between low and high watermark to catch up any changes that occurred during the scan. + +Metadata operations (table discovery, `SELECT MIN/MAX` for chunk splitting) also run on the snapshot instance. + +### At-least-once semantics + +> **⚠ Warning:** When `scan.snapshot.hostname` is configured, exactly-once delivery cannot be guaranteed. + +The low/high watermark binlog positions are recorded from the primary writer, while the data is read from the replica. Even with storage-level replication (as used by Aurora), replication lag — even at the millisecond scale — means there is no strict guarantee that the replica's data at the moment of scanning corresponds exactly to the primary's binlog position recorded as the watermark. As a result, some events may be delivered more than once after a job restart or failover. + +This is a known and accepted trade-off: **reduced load on the primary writer at the cost of at-least-once semantics** during the snapshot phase. + +### Example + +```yaml +source: + type: mysql + hostname: writer.cluster.us-east-1.rds.amazonaws.com + scan.snapshot.hostname: reader.cluster-ro.us-east-1.rds.amazonaws.com + username: flink + password: secret + tables: mydb.orders +``` + ## Available Source metrics Metrics can help understand the progress of assignments, and the following are the supported [Flink metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/): diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 1b3540da0bb..6d7dabb6fa3 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -83,6 +83,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_HOSTNAME; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET; @@ -120,6 +121,7 @@ public DataSource createDataSource(Context context) { final Configuration config = context.getFactoryConfiguration(); String hostname = config.get(HOSTNAME); + String scanSnapshotHostname = config.getOptional(SCAN_SNAPSHOT_HOSTNAME).orElse(null); int port = config.get(PORT); String username = config.get(USERNAME); @@ -194,6 +196,7 @@ public DataSource createDataSource(Context context) { MySqlSourceConfigFactory configFactory = new MySqlSourceConfigFactory() .hostname(hostname) + .snapshotHostname(scanSnapshotHostname) .port(port) .username(username) .password(password) @@ -326,6 +329,7 @@ public Set> requiredOptions() { @Override public Set> optionalOptions() { Set> options = new HashSet<>(); + options.add(SCAN_SNAPSHOT_HOSTNAME); options.add(PORT); options.add(TABLES_EXCLUDE); options.add(SCHEMA_CHANGE_ENABLED); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 6aff556e7fa..0d727ad13ca 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -93,6 +93,21 @@ public class MySqlDataSourceOptions { .withDescription( "The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table."); + public static final ConfigOption SCAN_SNAPSHOT_HOSTNAME = + ConfigOptions.key("scan.snapshot.hostname") + .stringType() + .noDefaultValue() + .withDescription( + "Optional IP address or hostname of a MySQL read replica to use for snapshot and metadata " + + "queries (table discovery, chunk splitting). When set, all snapshot data reads and metadata " + + "operations are routed to this instance, reducing load on the primary writer. " + + "Binlog position tracking and changelog streaming always use the primary writer instance. " + + "WARNING: At-least-once semantics only. When this option is set, exactly-once " + + "guarantees cannot be maintained because the binlog positions recorded during snapshot " + + "scanning originate from the primary writer while the data is read from the replica, and " + + "storage replication lag (even in the millisecond range, as with Aurora/RDS) means the " + + "two positions may not be perfectly consistent."); + public static final ConfigOption SCAN_SNAPSHOT_FETCH_SIZE = ConfigOptions.key("scan.snapshot.fetch.size") .intType() diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java index f567dde7347..297a8e47d6b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java @@ -62,8 +62,9 @@ public class DebeziumUtils { private static final Logger LOG = LoggerFactory.getLogger(DebeziumUtils.class); - /** Creates and opens a new {@link JdbcConnection} backing connection pool. */ + /** Creates and opens a new {@link JdbcConnection} to the primary writer instance. */ public static JdbcConnection openJdbcConnection(MySqlSourceConfig sourceConfig) { + LOG.info("Opening a new JDBC connection to MySQL server at {}", sourceConfig.getHostname()); JdbcConnection jdbc = new JdbcConnection( JdbcConfiguration.adapt(sourceConfig.getDbzConfiguration()), @@ -73,7 +74,32 @@ public static JdbcConnection openJdbcConnection(MySqlSourceConfig sourceConfig) try { jdbc.connect(); } catch (Exception e) { - LOG.error("Failed to open MySQL connection", e); + LOG.error("Failed to open MySQL connection to {}", sourceConfig.getHostname(), e); + throw new FlinkRuntimeException(e); + } + return jdbc; + } + + /** + * Creates and opens a new {@link JdbcConnection} for metadata and snapshot queries. Routes to + * the snapshot instance when {@link MySqlSourceConfig#getSnapshotHostname()} is configured, + * otherwise falls back to the primary writer. + */ + public static JdbcConnection openSnapshotJdbcConnection(MySqlSourceConfig sourceConfig) { + String snapshotHostname = sourceConfig.getSnapshotHostname(); + String targetHost = + snapshotHostname != null ? snapshotHostname : sourceConfig.getHostname(); + LOG.info("Opening a new JDBC connection for metadata queries at {}", targetHost); + JdbcConnection jdbc = + new JdbcConnection( + JdbcConfiguration.adapt(sourceConfig.getDbzConfiguration()), + new JdbcConnectionFactory(sourceConfig, snapshotHostname), + QUOTED_CHARACTER, + QUOTED_CHARACTER); + try { + jdbc.connect(); + } catch (Exception e) { + LOG.error("Failed to open MySQL connection to {}", targetHost, e); throw new FlinkRuntimeException(e); } return jdbc; @@ -85,6 +111,24 @@ public static MySqlConnection createMySqlConnection(MySqlSourceConfig sourceConf sourceConfig.getDbzConfiguration(), sourceConfig.getJdbcProperties()); } + /** + * Creates a new {@link MySqlConnection} to the snapshot instance for snapshot queries. If no + * snapshot hostname is configured, returns a connection to the primary. The connection is not + * opened. + */ + public static MySqlConnection createSnapshotMySqlConnection(MySqlSourceConfig sourceConfig) { + String snapshotHostname = sourceConfig.getSnapshotHostname(); + if (snapshotHostname == null) { + LOG.debug("No snapshot hostname configured, using primary for snapshot queries"); + return createMySqlConnection(sourceConfig); + } + LOG.info("Creating MySQL connection for snapshot queries at {}", snapshotHostname); + Configuration dbzConfig = sourceConfig.getDbzConfiguration(); + Configuration snapshotConfig = + dbzConfig.edit().with("database.hostname", snapshotHostname).build(); + return createMySqlConnection(snapshotConfig, sourceConfig.getJdbcProperties()); + } + /** Creates a new {@link MySqlConnection}, but not open the connection. */ public static MySqlConnection createMySqlConnection( Configuration dbzConfiguration, Properties jdbcProperties) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index e3549237065..a455d0819f6 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -69,6 +69,7 @@ import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createBinaryClient; import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection; +import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createSnapshotMySqlConnection; /** * A snapshot reader that reads data from Table in split level, the split is assigned by primary key @@ -103,7 +104,8 @@ public SnapshotSplitReader( new StatefulTaskContext( sourceConfig, createBinaryClient(sourceConfig.getDbzConfiguration()), - createMySqlConnection(sourceConfig)), + createMySqlConnection(sourceConfig), + createSnapshotMySqlConnection(sourceConfig)), subtaskId, hooks); } @@ -143,6 +145,7 @@ public void submitSplit(MySqlSplit mySqlSplit) { statefulTaskContext.getSnapshotChangeEventSourceMetrics(), statefulTaskContext.getDatabaseSchema(), statefulTaskContext.getConnection(), + statefulTaskContext.getSnapshotConnection(), statefulTaskContext.getDispatcher(), statefulTaskContext.getTopicSelector(), statefulTaskContext.getSnapshotReceiver(), diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java index f729f59d05f..57d398cec4d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java @@ -73,7 +73,8 @@ public class MySqlSnapshotSplitReadTask private final MySqlSourceConfig sourceConfig; private final MySqlDatabaseSchema databaseSchema; - private final MySqlConnection jdbcConnection; + private final MySqlConnection primaryConnection; + private final MySqlConnection snapshotConnection; private final EventDispatcherImpl dispatcher; private final Clock clock; private final MySqlSnapshotSplit snapshotSplit; @@ -84,12 +85,20 @@ public class MySqlSnapshotSplitReadTask private final SnapshotPhaseHooks hooks; private final boolean isBackfillSkipped; + /** + * Creates a new MySqlSnapshotSplitReadTask with separate writer and reader connections. + * + * @param primaryConnection Connection to the primary writer instance (used for binlog position) + * @param snapshotConnection Connection to the snapshot instance (used for snapshot data + * queries) + */ public MySqlSnapshotSplitReadTask( MySqlSourceConfig sourceConfig, MySqlConnectorConfig connectorConfig, SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics, MySqlDatabaseSchema databaseSchema, - MySqlConnection jdbcConnection, + MySqlConnection primaryConnection, + MySqlConnection snapshotConnection, EventDispatcherImpl dispatcher, TopicSelector topicSelector, EventDispatcher.SnapshotReceiver snapshotReceiver, @@ -100,7 +109,8 @@ public MySqlSnapshotSplitReadTask( super(connectorConfig, snapshotChangeEventSourceMetrics); this.sourceConfig = sourceConfig; this.databaseSchema = databaseSchema; - this.jdbcConnection = jdbcConnection; + this.primaryConnection = primaryConnection; + this.snapshotConnection = snapshotConnection; this.dispatcher = dispatcher; this.clock = clock; this.snapshotSplit = snapshotSplit; @@ -111,6 +121,39 @@ public MySqlSnapshotSplitReadTask( this.isBackfillSkipped = isBackfillSkipped; } + /** + * Legacy constructor for backward compatibility. Uses same connection for both writer and + * snapshot operations. + */ + public MySqlSnapshotSplitReadTask( + MySqlSourceConfig sourceConfig, + MySqlConnectorConfig connectorConfig, + SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics, + MySqlDatabaseSchema databaseSchema, + MySqlConnection jdbcConnection, + EventDispatcherImpl dispatcher, + TopicSelector topicSelector, + EventDispatcher.SnapshotReceiver snapshotReceiver, + Clock clock, + MySqlSnapshotSplit snapshotSplit, + SnapshotPhaseHooks hooks, + boolean isBackfillSkipped) { + this( + sourceConfig, + connectorConfig, + snapshotChangeEventSourceMetrics, + databaseSchema, + jdbcConnection, + jdbcConnection, + dispatcher, + topicSelector, + snapshotReceiver, + clock, + snapshotSplit, + hooks, + isBackfillSkipped); + } + @Override public SnapshotResult execute( ChangeEventSourceContext context, @@ -151,9 +194,10 @@ protected SnapshotResult doExecute( dispatcher.getQueue()); if (hooks.getPreLowWatermarkAction() != null) { - hooks.getPreLowWatermarkAction().accept(jdbcConnection, snapshotSplit); + hooks.getPreLowWatermarkAction().accept(primaryConnection, snapshotSplit); } - final BinlogOffset lowWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection); + // Use writer connection for binlog position (low watermark) + final BinlogOffset lowWatermark = DebeziumUtils.currentBinlogOffset(primaryConnection); LOG.info( "Snapshot step 1 - Determining low watermark {} for split {}", lowWatermark, @@ -164,14 +208,14 @@ protected SnapshotResult doExecute( snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW); if (hooks.getPostLowWatermarkAction() != null) { - hooks.getPostLowWatermarkAction().accept(jdbcConnection, snapshotSplit); + hooks.getPostLowWatermarkAction().accept(primaryConnection, snapshotSplit); } LOG.info("Snapshot step 2 - Snapshotting data"); createDataEvents(ctx, snapshotSplit.getTableId()); if (hooks.getPreHighWatermarkAction() != null) { - hooks.getPreHighWatermarkAction().accept(jdbcConnection, snapshotSplit); + hooks.getPreHighWatermarkAction().accept(primaryConnection, snapshotSplit); } BinlogOffset highWatermark; @@ -185,8 +229,8 @@ protected SnapshotResult doExecute( // phase. highWatermark = lowWatermark; } else { - // Get the current binlog offset as HW - highWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection); + // Use writer connection for binlog position (high watermark) + highWatermark = DebeziumUtils.currentBinlogOffset(primaryConnection); } LOG.info( @@ -199,7 +243,7 @@ protected SnapshotResult doExecute( .setHighWatermark(highWatermark); if (hooks.getPostHighWatermarkAction() != null) { - hooks.getPostHighWatermarkAction().accept(jdbcConnection, snapshotSplit); + hooks.getPostHighWatermarkAction().accept(primaryConnection, snapshotSplit); } return SnapshotResult.completed(ctx.offset); } @@ -254,9 +298,14 @@ private void createDataEventsForTable( table.id(), selectSql); + LOG.debug( + "Executing snapshot query for split '{}' of table {} via host {}", + snapshotSplit.splitId(), + table.id(), + snapshotConnection.connectionConfig().hostname()); try (PreparedStatement selectStatement = StatementUtils.readTableSplitDataStatement( - jdbcConnection, + snapshotConnection, selectSql, snapshotSplit.getSplitStart() == null, snapshotSplit.getSplitEnd() == null, @@ -337,7 +386,7 @@ else if (actualColumn.jdbcType() == Types.TIMESTAMP) { return readTimestampField(rs, fieldNo, actualColumn, actualTable); } // JDBC's rs.GetObject() will return a Boolean for all TINYINT(1) columns. - // TINYINT columns are reprtoed as SMALLINT by JDBC driver + // TINYINT columns are reported as SMALLINT by JDBC driver else if (actualColumn.jdbcType() == Types.TINYINT || actualColumn.jdbcType() == Types.SMALLINT) { // It seems that rs.wasNull() returns false when default value is set and NULL is diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java index 09a5fe8e85c..df2b6a77908 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java @@ -82,6 +82,7 @@ public class StatefulTaskContext implements AutoCloseable { private final MySqlEventMetadataProvider metadataProvider; private final SchemaNameAdjuster schemaNameAdjuster; private final MySqlConnection connection; + private final MySqlConnection snapshotConnection; private final BinaryLogClient binaryLogClient; private MySqlDatabaseSchema databaseSchema; @@ -97,16 +98,37 @@ public class StatefulTaskContext implements AutoCloseable { private ChangeEventQueue queue; private ErrorHandler errorHandler; + /** + * Creates a StatefulTaskContext with separate primary and replica connections. + * + * @param sourceConfig The MySQL source configuration + * @param binaryLogClient The binary log client for binlog streaming + * @param primaryConnection Connection to the primary writer instance + * @param snapshotConnection Connection to the snapshot instance for snapshot queries + */ public StatefulTaskContext( MySqlSourceConfig sourceConfig, BinaryLogClient binaryLogClient, - MySqlConnection connection) { + MySqlConnection primaryConnection, + MySqlConnection snapshotConnection) { this.sourceConfig = sourceConfig; this.connectorConfig = sourceConfig.getMySqlConnectorConfig(); this.schemaNameAdjuster = SchemaNameAdjuster.create(); this.metadataProvider = new MySqlEventMetadataProvider(); this.binaryLogClient = binaryLogClient; - this.connection = connection; + this.connection = primaryConnection; + this.snapshotConnection = snapshotConnection; + } + + /** + * Legacy constructor for backward compatibility. Uses same connection for both primary and + * snapshot operations. + */ + public StatefulTaskContext( + MySqlSourceConfig sourceConfig, + BinaryLogClient binaryLogClient, + MySqlConnection connection) { + this(sourceConfig, binaryLogClient, connection, connection); } public void configure(MySqlSplit mySqlSplit) { @@ -310,6 +332,10 @@ public void close() throws Exception { if (connection != null) { connection.close(); } + // Close snapshot connection only if it's a distinct object from the primary connection + if (snapshotConnection != null && snapshotConnection != connection) { + snapshotConnection.close(); + } if (binaryLogClient != null) { binaryLogClient.disconnect(); } @@ -388,6 +414,15 @@ public MySqlConnection getConnection() { return connection; } + /** + * Returns the snapshot connection for snapshot queries. If a snapshot hostname is configured, + * this connection points to the snapshot instance; otherwise, it's the same as the primary + * connection. + */ + public MySqlConnection getSnapshotConnection() { + return snapshotConnection; + } + public BinaryLogClient getBinaryLogClient() { return binaryLogClient; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java index 5fcbae1a1e4..53322fb6805 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java @@ -199,7 +199,7 @@ public SplitEnumerator createEnumerator( final MySqlSplitAssigner splitAssigner; // In snapshot-only startup option, only split snapshots. if (sourceConfig.getStartupOptions().isSnapshotOnly()) { - try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) { + try (JdbcConnection jdbc = DebeziumUtils.openSnapshotJdbcConnection(sourceConfig)) { boolean isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc); splitAssigner = new MySqlSnapshotSplitAssigner( @@ -213,7 +213,7 @@ public SplitEnumerator createEnumerator( "Failed to discover captured tables for enumerator", e); } } else if (!sourceConfig.getStartupOptions().isStreamOnly()) { - try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) { + try (JdbcConnection jdbc = DebeziumUtils.openSnapshotJdbcConnection(sourceConfig)) { boolean isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc); splitAssigner = new MySqlHybridSplitAssigner( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java index 93fa2a0d36a..ba9b4fae4d6 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java @@ -59,6 +59,18 @@ public MySqlSourceBuilder hostname(String hostname) { return this; } + /** + * Optional IP address or hostname of a MySQL read replica. When specified, snapshot data will + * be read from this replica instead of the primary writer instance, reducing load on the + * writer. The binlog position will still be obtained from the primary writer. This is + * particularly useful for Aurora/RDS deployments where you want to offload snapshot read + * traffic. + */ + public MySqlSourceBuilder snapshotHostname(String snapshotHostname) { + this.configFactory.snapshotHostname(snapshotHostname); + return this; + } + /** Integer port number of the MySQL database server. */ public MySqlSourceBuilder port(int port) { this.configFactory.port(port); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java index b7efb9ab8ec..da8701790ad 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java @@ -106,7 +106,7 @@ private MySqlChunkSplitter( @Override public void open() { - this.jdbcConnection = DebeziumUtils.openJdbcConnection(sourceConfig); + this.jdbcConnection = DebeziumUtils.openSnapshotJdbcConnection(sourceConfig); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index f51df018d4d..f979d3a28e1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -197,7 +197,7 @@ private void discoveryCaptureTables() { if (needToDiscoveryTables()) { long start = System.currentTimeMillis(); LOG.debug("The remainingTables is empty, start to discovery tables"); - try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) { + try (JdbcConnection jdbc = DebeziumUtils.openSnapshotJdbcConnection(sourceConfig)) { final List discoverTables = DebeziumUtils.discoverCapturedTables(jdbc, sourceConfig); this.remainingTables.addAll(discoverTables); @@ -213,7 +213,7 @@ private void discoveryCaptureTables() { // remaining tables, discovery remaining table here else if (!isRemainingTablesCheckpointed && !AssignerStatus.isSnapshotAssigningFinished(assignerStatus)) { - try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) { + try (JdbcConnection jdbc = DebeziumUtils.openSnapshotJdbcConnection(sourceConfig)) { final List discoverTables = DebeziumUtils.discoverCapturedTables(jdbc, sourceConfig); discoverTables.removeAll(alreadyProcessedTables); @@ -231,7 +231,7 @@ private void captureNewlyAddedTables() { if (sourceConfig.isScanNewlyAddedTableEnabled() && !sourceConfig.getStartupOptions().isSnapshotOnly()) { // check whether we got newly added tables - try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) { + try (JdbcConnection jdbc = DebeziumUtils.openSnapshotJdbcConnection(sourceConfig)) { final List currentCapturedTables = DebeziumUtils.discoverCapturedTables(jdbc, sourceConfig); final Set previousCapturedTables = new HashSet<>(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index cf456fcaed0..99d97bfcd2b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -44,6 +44,7 @@ public class MySqlSourceConfig implements Serializable { private static final long serialVersionUID = 1L; private final String hostname; + @Nullable private final String snapshotHostname; private final int port; private final String username; private final String password; @@ -83,6 +84,7 @@ public class MySqlSourceConfig implements Serializable { MySqlSourceConfig( String hostname, + @Nullable String snapshotHostname, int port, String username, String password, @@ -114,6 +116,7 @@ public class MySqlSourceConfig implements Serializable { boolean useLegacyJsonFormat, boolean assignUnboundedChunkFirst) { this.hostname = checkNotNull(hostname); + this.snapshotHostname = snapshotHostname; this.port = port; this.username = checkNotNull(username); this.password = password; @@ -164,6 +167,11 @@ public String getHostname() { return hostname; } + @Nullable + public String getSnapshotHostname() { + return snapshotHostname; + } + public int getPort() { return port; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 569b62232db..48fab4ed771 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -46,6 +46,7 @@ public class MySqlSourceConfigFactory implements Serializable { private int port = 3306; // default 3306 port private String hostname; + private String snapshotHostname; private String username; private String password; private ServerIdRange serverIdRange; @@ -84,6 +85,19 @@ public MySqlSourceConfigFactory hostname(String hostname) { return this; } + /** + * Optional hostname of a MySQL read replica to use for snapshot queries. When set, snapshot + * data will be read from this replica instead of the primary writer instance, reducing the load + * on the writer. The binlog position will still be obtained from the primary writer instance. + * + *

An empty string is treated as {@code null}, falling back to the primary writer instance. + */ + public MySqlSourceConfigFactory snapshotHostname(String snapshotHostname) { + this.snapshotHostname = + (snapshotHostname != null && snapshotHostname.trim().isEmpty()) ? null : snapshotHostname; + return this; + } + /** Integer port number of the MySQL database server. */ public MySqlSourceConfigFactory port(int port) { this.port = port; @@ -415,6 +429,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { return new MySqlSourceConfig( hostname, + snapshotHostname, port, username, password, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/ConnectionPoolId.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/ConnectionPoolId.java index 16c831b6488..7618757eeed 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/ConnectionPoolId.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/ConnectionPoolId.java @@ -34,6 +34,18 @@ public ConnectionPoolId(String host, int port, String username) { this.username = username; } + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public String getUsername() { + return username; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java index 320c8868692..535f5ff5a90 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java @@ -35,24 +35,45 @@ public class JdbcConnectionFactory implements JdbcConnection.ConnectionFactory { private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionFactory.class); private final MySqlSourceConfig sourceConfig; + private final String hostnameOverride; public JdbcConnectionFactory(MySqlSourceConfig sourceConfig) { + this(sourceConfig, null); + } + + /** + * Creates a factory that connects to a specific hostname instead of the configured one. + * + * @param sourceConfig the source configuration + * @param hostnameOverride if non-null, connect to this host instead of + * sourceConfig.getHostname() + */ + public JdbcConnectionFactory(MySqlSourceConfig sourceConfig, String hostnameOverride) { this.sourceConfig = sourceConfig; + this.hostnameOverride = hostnameOverride; + } + + String getHostnameOverride() { + return hostnameOverride; } @Override public Connection connect(JdbcConfiguration config) throws SQLException { final int connectRetryTimes = sourceConfig.getConnectMaxRetries(); + final String targetHostname = + hostnameOverride != null ? hostnameOverride : sourceConfig.getHostname(); final ConnectionPoolId connectionPoolId = new ConnectionPoolId( - sourceConfig.getHostname(), - sourceConfig.getPort(), - sourceConfig.getUsername()); + targetHostname, sourceConfig.getPort(), sourceConfig.getUsername()); HikariDataSource dataSource = - JdbcConnectionPools.getInstance() - .getOrCreateConnectionPool(connectionPoolId, sourceConfig); + hostnameOverride != null + ? JdbcConnectionPools.getInstance() + .getOrCreateConnectionPool( + connectionPoolId, sourceConfig, hostnameOverride) + : JdbcConnectionPools.getInstance() + .getOrCreateConnectionPool(connectionPoolId, sourceConfig); int i = 0; while (i < connectRetryTimes) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java index 9505a559ad0..93a4deb19eb 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java @@ -44,10 +44,23 @@ public static JdbcConnectionPools getInstance() { @Override public HikariDataSource getOrCreateConnectionPool( ConnectionPoolId poolId, MySqlSourceConfig sourceConfig) { + return getOrCreateConnectionPool(poolId, sourceConfig, poolId.getHost()); + } + + /** + * Gets or creates a connection pool for the specified pool ID with a hostname override. This is + * useful for creating reader connection pools that connect to a different host than the primary + * writer. + */ + public HikariDataSource getOrCreateConnectionPool( + ConnectionPoolId poolId, MySqlSourceConfig sourceConfig, String hostname) { synchronized (pools) { if (!pools.containsKey(poolId)) { - LOG.info("Create and register connection pool {}", poolId); - pools.put(poolId, PooledDataSourceFactory.createPooledDataSource(sourceConfig)); + LOG.info( + "Create and register connection pool {} for hostname {}", poolId, hostname); + pools.put( + poolId, + PooledDataSourceFactory.createPooledDataSource(sourceConfig, hostname)); } return pools.get(poolId); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java index e5f6bcd37ab..c9997d2b5f8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java @@ -41,14 +41,22 @@ public class PooledDataSourceFactory { private PooledDataSourceFactory() {} public static HikariDataSource createPooledDataSource(MySqlSourceConfig sourceConfig) { + return createPooledDataSource(sourceConfig, sourceConfig.getHostname()); + } + + /** + * Creates a pooled data source with a specific hostname. This is useful when connecting to a + * read replica for snapshot queries while using the primary for binlog operations. + */ + public static HikariDataSource createPooledDataSource( + MySqlSourceConfig sourceConfig, String hostname) { final HikariConfig config = new HikariConfig(); - String hostName = sourceConfig.getHostname(); int port = sourceConfig.getPort(); Properties jdbcProperties = sourceConfig.getJdbcProperties(); - config.setPoolName(CONNECTION_POOL_PREFIX + hostName + ":" + port); - config.setJdbcUrl(formatJdbcUrl(hostName, port, jdbcProperties)); + config.setPoolName(CONNECTION_POOL_PREFIX + hostname + ":" + port); + config.setJdbcUrl(formatJdbcUrl(hostname, port, jdbcProperties)); config.setUsername(sourceConfig.getUsername()); config.setPassword(sourceConfig.getPassword()); config.setMinimumIdle(MINIMUM_POOL_SIZE); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java index eeae681ea7b..4f0698a9e06 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java @@ -31,6 +31,10 @@ /** Tests for {@link org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils}. */ class DebeziumUtilsTest { + + private static final String PRIMARY_HOSTNAME = "writer-host"; + private static final String SNAPSHOT_HOSTNAME = "reader-host"; + @Test void testCreateMySqlConnection() { // test without set useSSL @@ -39,7 +43,7 @@ void testCreateMySqlConnection() { MySqlSourceConfig configWithoutUseSSL = getConfig(jdbcProps); MySqlConnection connection0 = DebeziumUtils.createMySqlConnection(configWithoutUseSSL); assertJdbcUrl( - "jdbc:mysql://localhost:3306/?useSSL=false&connectTimeout=20000&useInformationSchema=true" + "jdbc:mysql://writer-host:3306/?useSSL=false&connectTimeout=20000&useInformationSchema=true" + "&nullCatalogMeansCurrent=false&characterSetResults=UTF-8&onlyTest=test" + "&zeroDateTimeBehavior=CONVERT_TO_NULL&characterEncoding=UTF-8&useUnicode=true", connection0.connectionString()); @@ -49,7 +53,7 @@ void testCreateMySqlConnection() { MySqlSourceConfig configNotUseSSL = getConfig(jdbcProps); MySqlConnection connection1 = DebeziumUtils.createMySqlConnection(configNotUseSSL); assertJdbcUrl( - "jdbc:mysql://localhost:3306/?connectTimeout=20000&useInformationSchema=true" + "jdbc:mysql://writer-host:3306/?connectTimeout=20000&useInformationSchema=true" + "&nullCatalogMeansCurrent=false&characterSetResults=UTF-8&useSSL=false&onlyTest=test" + "&zeroDateTimeBehavior=CONVERT_TO_NULL&characterEncoding=UTF-8&useUnicode=true", connection1.connectionString()); @@ -59,19 +63,89 @@ void testCreateMySqlConnection() { MySqlSourceConfig configUseSSL = getConfig(jdbcProps); MySqlConnection connection2 = DebeziumUtils.createMySqlConnection(configUseSSL); assertJdbcUrl( - "jdbc:mysql://localhost:3306/?connectTimeout=20000&useInformationSchema=true" + "jdbc:mysql://writer-host:3306/?connectTimeout=20000&useInformationSchema=true" + "&nullCatalogMeansCurrent=false&characterSetResults=UTF-8&useSSL=true&onlyTest=test" + "&zeroDateTimeBehavior=CONVERT_TO_NULL&characterEncoding=UTF-8&useUnicode=true", connection2.connectionString()); } + @Test + void testCreateSnapshotMySqlConnectionWithSnapshotHostname() { + Properties jdbcProps = new Properties(); + MySqlSourceConfig config = getConfigWithSnapshotHostname(jdbcProps, SNAPSHOT_HOSTNAME); + + // Create reader connection - should use reader hostname + MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config); + + // Create primary connection - should use primary hostname + MySqlConnection primaryConnection = DebeziumUtils.createMySqlConnection(config); + + // Reader connection should point to reader hostname + Assertions.assertThat(readerConnection.connectionString()).contains(SNAPSHOT_HOSTNAME); + Assertions.assertThat(readerConnection.connectionString()).doesNotContain(PRIMARY_HOSTNAME); + + // Primary connection should point to primary hostname + Assertions.assertThat(primaryConnection.connectionString()).contains(PRIMARY_HOSTNAME); + Assertions.assertThat(primaryConnection.connectionString()) + .doesNotContain(SNAPSHOT_HOSTNAME); + } + + @Test + void testCreateSnapshotMySqlConnectionFallbackToPrimary() { + Properties jdbcProps = new Properties(); + MySqlSourceConfig config = getConfig(jdbcProps); + + // When no reader hostname is configured, should fall back to primary + MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config); + MySqlConnection primaryConnection = DebeziumUtils.createMySqlConnection(config); + + // Both should point to the primary hostname + Assertions.assertThat(readerConnection.connectionString()).contains(PRIMARY_HOSTNAME); + Assertions.assertThat(primaryConnection.connectionString()).contains(PRIMARY_HOSTNAME); + } + + @Test + void testSnapshotHostnameConfigurationProperty() { + Properties jdbcProps = new Properties(); + + // Config without reader hostname + MySqlSourceConfig configWithoutReader = getConfig(jdbcProps); + Assertions.assertThat(configWithoutReader.getSnapshotHostname()).isNull(); + + // Config with reader hostname + MySqlSourceConfig configWithReader = + getConfigWithSnapshotHostname(jdbcProps, SNAPSHOT_HOSTNAME); + Assertions.assertThat(configWithReader.getSnapshotHostname()).isEqualTo(SNAPSHOT_HOSTNAME); + Assertions.assertThat(configWithReader.getHostname()).isEqualTo(PRIMARY_HOSTNAME); + } + private MySqlSourceConfig getConfig(Properties jdbcProperties) { return new MySqlSourceConfigFactory() .startupOptions(StartupOptions.initial()) .databaseList("fakeDb") .tableList("fakeDb.fakeTable") .includeSchemaChanges(false) - .hostname("localhost") + .hostname(PRIMARY_HOSTNAME) + .port(3306) + .splitSize(10) + .fetchSize(2) + .connectTimeout(Duration.ofSeconds(20)) + .username("fakeUser") + .password("fakePw") + .serverTimeZone(ZoneId.of("UTC").toString()) + .jdbcProperties(jdbcProperties) + .createConfig(0); + } + + private MySqlSourceConfig getConfigWithSnapshotHostname( + Properties jdbcProperties, String snapshotHostname) { + return new MySqlSourceConfigFactory() + .startupOptions(StartupOptions.initial()) + .databaseList("fakeDb") + .tableList("fakeDb.fakeTable") + .includeSchemaChanges(false) + .hostname(PRIMARY_HOSTNAME) + .snapshotHostname(snapshotHostname) .port(3306) .splitSize(10) .fetchSize(2) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java new file mode 100644 index 00000000000..e205a11c088 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.debezium.task; + +import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; +import org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import io.debezium.connector.mysql.MySqlConnection; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.ZoneId; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests for {@link MySqlSnapshotSplitReadTask} to verify correct usage of writer and reader + * connections. + * + *

The key behavior being tested: + * + *

    + *
  • Writer connection is used for binlog position retrieval (SHOW MASTER STATUS) + *
  • Reader connection is used for snapshot data queries (SELECT statements) + *
  • When reader hostname is not configured, reader connection falls back to writer + *
+ */ +class MySqlSnapshotSplitReadTaskConnectionTest { + + private static final String PRIMARY_HOSTNAME = "writer-host"; + private static final String SNAPSHOT_HOSTNAME = "reader-host"; + private static final int PORT = 3306; + + @Test + void testStatefulTaskContextWithSeparateConnections() { + MySqlSourceConfig config = createConfig(PRIMARY_HOSTNAME, SNAPSHOT_HOSTNAME); + BinaryLogClient binaryLogClient = mock(BinaryLogClient.class); + + // Create separate writer and reader connections + MySqlConnection writerConnection = DebeziumUtils.createMySqlConnection(config); + MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config); + + // Create StatefulTaskContext with both connections + StatefulTaskContext context = + new StatefulTaskContext( + config, binaryLogClient, writerConnection, readerConnection); + + // Verify the context returns the correct connections + assertThat(context.getConnection()).isSameAs(writerConnection); + assertThat(context.getSnapshotConnection()).isSameAs(readerConnection); + + // Verify writer and reader connections point to different hosts + assertThat(writerConnection.connectionString()).contains(PRIMARY_HOSTNAME); + assertThat(readerConnection.connectionString()).contains(SNAPSHOT_HOSTNAME); + } + + @Test + void testStatefulTaskContextWithSameConnectionWhenNoReaderConfigured() { + MySqlSourceConfig config = createConfig(PRIMARY_HOSTNAME, null); + BinaryLogClient binaryLogClient = mock(BinaryLogClient.class); + + // When no reader hostname is configured, createReaderMySqlConnection falls back to writer + MySqlConnection writerConnection = DebeziumUtils.createMySqlConnection(config); + MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config); + + // Create StatefulTaskContext + StatefulTaskContext context = + new StatefulTaskContext( + config, binaryLogClient, writerConnection, readerConnection); + + // Verify both connections point to the same host (writer) + assertThat(context.getConnection().connectionString()).contains(PRIMARY_HOSTNAME); + assertThat(context.getSnapshotConnection().connectionString()).contains(PRIMARY_HOSTNAME); + } + + @Test + void testStatefulTaskContextLegacyConstructorUsesSameConnection() { + MySqlSourceConfig config = createConfig(PRIMARY_HOSTNAME, SNAPSHOT_HOSTNAME); + BinaryLogClient binaryLogClient = mock(BinaryLogClient.class); + + // Use the legacy constructor that takes only one connection + MySqlConnection connection = DebeziumUtils.createMySqlConnection(config); + StatefulTaskContext context = new StatefulTaskContext(config, binaryLogClient, connection); + + // Both getConnection() and getSnapshotConnection() should return the same instance + assertThat(context.getConnection()).isSameAs(context.getSnapshotConnection()); + } + + @Test + void testWriterConnectionUsedForBinlogPosition() { + MySqlSourceConfig config = createConfig(PRIMARY_HOSTNAME, SNAPSHOT_HOSTNAME); + + // The writer connection should be used for binlog position queries + // (SHOW MASTER STATUS only works on the primary/writer) + MySqlConnection writerConnection = DebeziumUtils.createMySqlConnection(config); + MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config); + + // Verify writer points to writer host (required for binlog operations) + assertThat(writerConnection.connectionString()).contains(PRIMARY_HOSTNAME); + assertThat(writerConnection.connectionString()).doesNotContain(SNAPSHOT_HOSTNAME); + + // Verify reader points to reader host (for snapshot data queries) + assertThat(readerConnection.connectionString()).contains(SNAPSHOT_HOSTNAME); + assertThat(readerConnection.connectionString()).doesNotContain(PRIMARY_HOSTNAME); + } + + @Test + void testReaderConnectionFallbackWhenReaderHostnameIsNull() { + MySqlSourceConfig configWithoutReader = createConfig(PRIMARY_HOSTNAME, null); + MySqlSourceConfig configWithReader = createConfig(PRIMARY_HOSTNAME, SNAPSHOT_HOSTNAME); + + // When scanSnapshotHostname is null, createReaderMySqlConnection should return a connection + // that uses the primary hostname + MySqlConnection readerConnectionWithoutConfig = + DebeziumUtils.createSnapshotMySqlConnection(configWithoutReader); + MySqlConnection readerConnectionWithConfig = + DebeziumUtils.createSnapshotMySqlConnection(configWithReader); + + // Without reader configured: should use writer hostname + assertThat(readerConnectionWithoutConfig.connectionString()).contains(PRIMARY_HOSTNAME); + + // With reader configured: should use reader hostname + assertThat(readerConnectionWithConfig.connectionString()).contains(SNAPSHOT_HOSTNAME); + assertThat(readerConnectionWithConfig.connectionString()).doesNotContain(PRIMARY_HOSTNAME); + } + + /** + * Verifies that when snapshotConnection and connection are different objects (replica + * configured), both are closed independently. + */ + @Test + void testCloseWithSeparateConnectionsClosesBoth() throws Exception { + MySqlSourceConfig config = createConfig(PRIMARY_HOSTNAME, SNAPSHOT_HOSTNAME); + BinaryLogClient binaryLogClient = mock(BinaryLogClient.class); + MySqlConnection primaryConnection = mock(MySqlConnection.class); + MySqlConnection snapshotConnection = mock(MySqlConnection.class); + + StatefulTaskContext context = + new StatefulTaskContext( + config, binaryLogClient, primaryConnection, snapshotConnection); + context.close(); + + verify(primaryConnection, times(1)).close(); + verify(snapshotConnection, times(1)).close(); + } + + /** + * Verifies that when snapshotConnection and connection are the same object (no replica + * configured), close() is called exactly once — not twice. + */ + @Test + void testCloseWithSharedConnectionClosesOnce() throws Exception { + MySqlSourceConfig config = createConfig(PRIMARY_HOSTNAME, null); + BinaryLogClient binaryLogClient = mock(BinaryLogClient.class); + MySqlConnection sharedConnection = mock(MySqlConnection.class); + + // Legacy constructor passes the same object for both connection fields + StatefulTaskContext context = + new StatefulTaskContext(config, binaryLogClient, sharedConnection); + context.close(); + + verify(sharedConnection, times(1)).close(); + } + + private MySqlSourceConfig createConfig(String hostname, String snapshotHostname) { + MySqlSourceConfigFactory factory = + new MySqlSourceConfigFactory() + .startupOptions(StartupOptions.initial()) + .databaseList("testdb") + .tableList("testdb.testtable") + .includeSchemaChanges(false) + .hostname(hostname) + .port(PORT) + .splitSize(10) + .fetchSize(2) + .connectTimeout(Duration.ofSeconds(20)) + .username("testuser") + .password("testpw") + .serverTimeZone(ZoneId.of("UTC").toString()) + .jdbcProperties(new Properties()); + + if (snapshotHostname != null) { + factory.snapshotHostname(snapshotHostname); + } + + return factory.createConfig(0); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java new file mode 100644 index 00000000000..b194dcfdac7 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.connection; + +import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; + +import io.debezium.connector.mysql.MySqlConnection; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.ZoneId; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for reader hostname connection handling. Verifies that the reader connection is used when + * configured, and falls back to the writer connection when not configured. + */ +class ReaderConnectionTest { + + private static final String WRITER_HOSTNAME = "writer-host"; + private static final String READER_HOSTNAME = "reader-host"; + private static final int PORT = 3306; + + @Test + void testReaderHostnameConfigured() { + MySqlSourceConfig config = createConfig(WRITER_HOSTNAME, READER_HOSTNAME); + + assertThat(config.getHostname()).isEqualTo(WRITER_HOSTNAME); + assertThat(config.getSnapshotHostname()).isEqualTo(READER_HOSTNAME); + } + + @Test + void testReaderHostnameNotConfigured() { + MySqlSourceConfig config = createConfig(WRITER_HOSTNAME, null); + + assertThat(config.getHostname()).isEqualTo(WRITER_HOSTNAME); + assertThat(config.getSnapshotHostname()).isNull(); + } + + @Test + void testCreateReaderMySqlConnectionWithReaderHostname() { + MySqlSourceConfig config = createConfig(WRITER_HOSTNAME, READER_HOSTNAME); + + // Create reader connection - should use reader hostname + MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config); + String readerConnectionString = readerConnection.connectionString(); + + // Create writer connection - should use writer hostname + MySqlConnection writerConnection = DebeziumUtils.createMySqlConnection(config); + String writerConnectionString = writerConnection.connectionString(); + + // Reader connection should contain reader hostname + assertThat(readerConnectionString).contains(READER_HOSTNAME); + assertThat(readerConnectionString).doesNotContain(WRITER_HOSTNAME); + + // Writer connection should contain writer hostname + assertThat(writerConnectionString).contains(WRITER_HOSTNAME); + assertThat(writerConnectionString).doesNotContain(READER_HOSTNAME); + } + + @Test + void testCreateReaderMySqlConnectionFallsBackToWriterWhenNoReaderConfigured() { + MySqlSourceConfig config = createConfig(WRITER_HOSTNAME, null); + + // Create reader connection - should fall back to writer hostname + MySqlConnection readerConnection = DebeziumUtils.createSnapshotMySqlConnection(config); + String readerConnectionString = readerConnection.connectionString(); + + // Create writer connection + MySqlConnection writerConnection = DebeziumUtils.createMySqlConnection(config); + String writerConnectionString = writerConnection.connectionString(); + + // Both should use the writer hostname when reader is not configured + assertThat(readerConnectionString).contains(WRITER_HOSTNAME); + assertThat(writerConnectionString).contains(WRITER_HOSTNAME); + } + + @Test + void testJdbcConnectionFactoryUsesHostnameOverride() { + MySqlSourceConfig config = createConfig(WRITER_HOSTNAME, null); + + // Factory without hostname override - should use config hostname + JdbcConnectionFactory factoryWithoutOverride = new JdbcConnectionFactory(config); + + // Factory with hostname override - should use override + JdbcConnectionFactory factoryWithOverride = + new JdbcConnectionFactory(config, READER_HOSTNAME); + + // Verify the override hostname is stored correctly + assertThat(factoryWithoutOverride.getHostnameOverride()).isNull(); + assertThat(factoryWithOverride.getHostnameOverride()).isEqualTo(READER_HOSTNAME); + assertThat(factoryWithOverride.getHostnameOverride()) + .isNotEqualTo(config.getHostname()); + } + + @Test + void testConnectionPoolIdDifferentForWriterAndReader() { + // Pool IDs should be different for writer and reader to ensure separate pools + ConnectionPoolId writerPoolId = new ConnectionPoolId(WRITER_HOSTNAME, PORT, "testuser"); + ConnectionPoolId readerPoolId = new ConnectionPoolId(READER_HOSTNAME, PORT, "testuser"); + + assertThat(writerPoolId).isNotEqualTo(readerPoolId); + assertThat(writerPoolId.getHost()).isEqualTo(WRITER_HOSTNAME); + assertThat(readerPoolId.getHost()).isEqualTo(READER_HOSTNAME); + } + + @Test + void testConnectionPoolIdSameForSameHostname() { + // Same hostname should result in equal pool IDs + ConnectionPoolId poolId1 = new ConnectionPoolId(WRITER_HOSTNAME, PORT, "testuser"); + ConnectionPoolId poolId2 = new ConnectionPoolId(WRITER_HOSTNAME, PORT, "testuser"); + + assertThat(poolId1).isEqualTo(poolId2); + assertThat(poolId1.hashCode()).isEqualTo(poolId2.hashCode()); + } + + private MySqlSourceConfig createConfig(String hostname, String scanSnapshotHostname) { + MySqlSourceConfigFactory factory = + new MySqlSourceConfigFactory() + .startupOptions(StartupOptions.initial()) + .databaseList("testdb") + .tableList("testdb.testtable") + .includeSchemaChanges(false) + .hostname(hostname) + .port(PORT) + .splitSize(10) + .fetchSize(2) + .connectTimeout(Duration.ofSeconds(20)) + .username("testuser") + .password("testpw") + .serverTimeZone(ZoneId.of("UTC").toString()) + .jdbcProperties(new Properties()); + + if (scanSnapshotHostname != null) { + factory.snapshotHostname(scanSnapshotHostname); + } + + return factory.createConfig(0); + } +}