Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,24 @@ pipeline:
<td>Integer</td>
<td>The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table.</td>
</tr>
<tr>
<td>scan.snapshot.hostname</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Optional IP address or hostname of a MySQL read replica to use for snapshot and metadata
queries (table discovery, chunk splitting). When set, all snapshot data reads and metadata
operations are routed to this instance, reducing load on the primary writer. Binlog
position tracking and changelog streaming always use the primary writer instance.
<br/><br/>
<strong>⚠ WARNING: At-least-once semantics only.</strong> When this option is set, exactly-once
guarantees cannot be maintained because the binlog positions recorded during snapshot
scanning originate from the primary writer while the data is read from the replica, and
storage replication lag (even in the millisecond range, as with Aurora/RDS) means the
two positions may not be perfectly consistent. Duplicate records are possible during
recovery. See <a href="#read-replica-snapshot">Read Replica Snapshot</a> for details.
</td>
</tr>
<tr>
<td>scan.snapshot.fetch.size</td>
<td>optional</td>
Expand Down Expand Up @@ -405,6 +423,47 @@ source:
# ...
```

## Read Replica Snapshot

The `scan.snapshot.hostname` option allows routing snapshot data reads and metadata queries
(table discovery, chunk splitting) to a MySQL read replica, while all binlog operations remain
on the primary writer instance.

This is particularly useful in AWS Aurora/RDS deployments where the primary writer handles
high-throughput transactional workloads and you want to avoid the additional I/O pressure of
a full-table snapshot scan.

### How it works

During the snapshot phase, Flink CDC performs the following sequence for each chunk:

1. **Record low watermark** — queries `SHOW MASTER STATUS` on the **primary writer** to capture the current binlog position.
2. **Scan chunk data** — executes `SELECT` queries on the **snapshot instance** (`scan.snapshot.hostname`).
3. **Record high watermark** — queries `SHOW MASTER STATUS` on the **primary writer** again.
4. **Backfill** — replays changelog events from the primary between low and high watermark to catch up any changes that occurred during the scan.

Metadata operations (table discovery, `SELECT MIN/MAX` for chunk splitting) also run on the snapshot instance.

### At-least-once semantics

> **⚠ Warning:** When `scan.snapshot.hostname` is configured, exactly-once delivery cannot be guaranteed.

The low/high watermark binlog positions are recorded from the primary writer, while the data is read from the replica. Even with storage-level replication (as used by Aurora), replication lag — even at the millisecond scale — means there is no strict guarantee that the replica's data at the moment of scanning corresponds exactly to the primary's binlog position recorded as the watermark. As a result, some events may be delivered more than once after a job restart or failover.

This is a known and accepted trade-off: **reduced load on the primary writer at the cost of at-least-once semantics** during the snapshot phase.

### Example

```yaml
source:
type: mysql
hostname: writer.cluster.us-east-1.rds.amazonaws.com
scan.snapshot.hostname: reader.cluster-ro.us-east-1.rds.amazonaws.com
username: flink
password: secret
tables: mydb.orders
```

## Available Source metrics

Metrics can help understand the progress of assignments, and the following are the supported [Flink metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET;
Expand Down Expand Up @@ -120,6 +121,7 @@ public DataSource createDataSource(Context context) {

final Configuration config = context.getFactoryConfiguration();
String hostname = config.get(HOSTNAME);
String scanSnapshotHostname = config.getOptional(SCAN_SNAPSHOT_HOSTNAME).orElse(null);
int port = config.get(PORT);

String username = config.get(USERNAME);
Expand Down Expand Up @@ -194,6 +196,7 @@ public DataSource createDataSource(Context context) {
MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
.hostname(hostname)
.snapshotHostname(scanSnapshotHostname)
.port(port)
.username(username)
.password(password)
Expand Down Expand Up @@ -326,6 +329,7 @@ public Set<ConfigOption<?>> requiredOptions() {
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(SCAN_SNAPSHOT_HOSTNAME);
options.add(PORT);
options.add(TABLES_EXCLUDE);
options.add(SCHEMA_CHANGE_ENABLED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,21 @@ public class MySqlDataSourceOptions {
.withDescription(
"The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table.");

public static final ConfigOption<String> SCAN_SNAPSHOT_HOSTNAME =
ConfigOptions.key("scan.snapshot.hostname")
.stringType()
.noDefaultValue()
.withDescription(
"Optional IP address or hostname of a MySQL read replica to use for snapshot and metadata "
+ "queries (table discovery, chunk splitting). When set, all snapshot data reads and metadata "
+ "operations are routed to this instance, reducing load on the primary writer. "
+ "Binlog position tracking and changelog streaming always use the primary writer instance. "
+ "WARNING: At-least-once semantics only. When this option is set, exactly-once "
+ "guarantees cannot be maintained because the binlog positions recorded during snapshot "
+ "scanning originate from the primary writer while the data is read from the replica, and "
+ "storage replication lag (even in the millisecond range, as with Aurora/RDS) means the "
+ "two positions may not be perfectly consistent.");

public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
ConfigOptions.key("scan.snapshot.fetch.size")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ public class DebeziumUtils {

private static final Logger LOG = LoggerFactory.getLogger(DebeziumUtils.class);

/** Creates and opens a new {@link JdbcConnection} backing connection pool. */
/** Creates and opens a new {@link JdbcConnection} to the primary writer instance. */
public static JdbcConnection openJdbcConnection(MySqlSourceConfig sourceConfig) {
LOG.info("Opening a new JDBC connection to MySQL server at {}", sourceConfig.getHostname());
JdbcConnection jdbc =
new JdbcConnection(
JdbcConfiguration.adapt(sourceConfig.getDbzConfiguration()),
Expand All @@ -73,7 +74,32 @@ public static JdbcConnection openJdbcConnection(MySqlSourceConfig sourceConfig)
try {
jdbc.connect();
} catch (Exception e) {
LOG.error("Failed to open MySQL connection", e);
LOG.error("Failed to open MySQL connection to {}", sourceConfig.getHostname(), e);
throw new FlinkRuntimeException(e);
}
Comment thread
locchipinti marked this conversation as resolved.
return jdbc;
}

/**
* Creates and opens a new {@link JdbcConnection} for metadata and snapshot queries. Routes to
* the snapshot instance when {@link MySqlSourceConfig#getSnapshotHostname()} is configured,
* otherwise falls back to the primary writer.
*/
public static JdbcConnection openSnapshotJdbcConnection(MySqlSourceConfig sourceConfig) {
String snapshotHostname = sourceConfig.getSnapshotHostname();
String targetHost =
snapshotHostname != null ? snapshotHostname : sourceConfig.getHostname();
LOG.info("Opening a new JDBC connection for metadata queries at {}", targetHost);
JdbcConnection jdbc =
new JdbcConnection(
JdbcConfiguration.adapt(sourceConfig.getDbzConfiguration()),
new JdbcConnectionFactory(sourceConfig, snapshotHostname),
QUOTED_CHARACTER,
QUOTED_CHARACTER);
try {
jdbc.connect();
} catch (Exception e) {
LOG.error("Failed to open MySQL connection to {}", targetHost, e);
throw new FlinkRuntimeException(e);
}
return jdbc;
Expand All @@ -85,6 +111,24 @@ public static MySqlConnection createMySqlConnection(MySqlSourceConfig sourceConf
sourceConfig.getDbzConfiguration(), sourceConfig.getJdbcProperties());
}

/**
* Creates a new {@link MySqlConnection} to the snapshot instance for snapshot queries. If no
* snapshot hostname is configured, returns a connection to the primary. The connection is not
* opened.
*/
public static MySqlConnection createSnapshotMySqlConnection(MySqlSourceConfig sourceConfig) {
String snapshotHostname = sourceConfig.getSnapshotHostname();
if (snapshotHostname == null) {
LOG.debug("No snapshot hostname configured, using primary for snapshot queries");
return createMySqlConnection(sourceConfig);
}
LOG.info("Creating MySQL connection for snapshot queries at {}", snapshotHostname);
Configuration dbzConfig = sourceConfig.getDbzConfiguration();
Configuration snapshotConfig =
dbzConfig.edit().with("database.hostname", snapshotHostname).build();
return createMySqlConnection(snapshotConfig, sourceConfig.getJdbcProperties());
}

/** Creates a new {@link MySqlConnection}, but not open the connection. */
public static MySqlConnection createMySqlConnection(
Configuration dbzConfiguration, Properties jdbcProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@

import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createBinaryClient;
import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection;
import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createSnapshotMySqlConnection;

/**
* A snapshot reader that reads data from Table in split level, the split is assigned by primary key
Expand Down Expand Up @@ -103,7 +104,8 @@ public SnapshotSplitReader(
new StatefulTaskContext(
sourceConfig,
createBinaryClient(sourceConfig.getDbzConfiguration()),
createMySqlConnection(sourceConfig)),
createMySqlConnection(sourceConfig),
createSnapshotMySqlConnection(sourceConfig)),
subtaskId,
hooks);
}
Expand Down Expand Up @@ -143,6 +145,7 @@ public void submitSplit(MySqlSplit mySqlSplit) {
statefulTaskContext.getSnapshotChangeEventSourceMetrics(),
statefulTaskContext.getDatabaseSchema(),
statefulTaskContext.getConnection(),
statefulTaskContext.getSnapshotConnection(),
statefulTaskContext.getDispatcher(),
statefulTaskContext.getTopicSelector(),
statefulTaskContext.getSnapshotReceiver(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public class MySqlSnapshotSplitReadTask

private final MySqlSourceConfig sourceConfig;
private final MySqlDatabaseSchema databaseSchema;
private final MySqlConnection jdbcConnection;
private final MySqlConnection primaryConnection;
private final MySqlConnection snapshotConnection;
private final EventDispatcherImpl<TableId> dispatcher;
private final Clock clock;
private final MySqlSnapshotSplit snapshotSplit;
Expand All @@ -84,12 +85,20 @@ public class MySqlSnapshotSplitReadTask
private final SnapshotPhaseHooks hooks;
private final boolean isBackfillSkipped;

/**
* Creates a new MySqlSnapshotSplitReadTask with separate writer and reader connections.
*
* @param primaryConnection Connection to the primary writer instance (used for binlog position)
* @param snapshotConnection Connection to the snapshot instance (used for snapshot data
* queries)
*/
public MySqlSnapshotSplitReadTask(
MySqlSourceConfig sourceConfig,
MySqlConnectorConfig connectorConfig,
SnapshotChangeEventSourceMetrics<MySqlPartition> snapshotChangeEventSourceMetrics,
MySqlDatabaseSchema databaseSchema,
MySqlConnection jdbcConnection,
MySqlConnection primaryConnection,
MySqlConnection snapshotConnection,
EventDispatcherImpl<TableId> dispatcher,
TopicSelector<TableId> topicSelector,
EventDispatcher.SnapshotReceiver<MySqlPartition> snapshotReceiver,
Expand All @@ -100,7 +109,8 @@ public MySqlSnapshotSplitReadTask(
super(connectorConfig, snapshotChangeEventSourceMetrics);
this.sourceConfig = sourceConfig;
this.databaseSchema = databaseSchema;
this.jdbcConnection = jdbcConnection;
this.primaryConnection = primaryConnection;
this.snapshotConnection = snapshotConnection;
this.dispatcher = dispatcher;
this.clock = clock;
this.snapshotSplit = snapshotSplit;
Expand All @@ -111,6 +121,39 @@ public MySqlSnapshotSplitReadTask(
this.isBackfillSkipped = isBackfillSkipped;
}

/**
* Legacy constructor for backward compatibility. Uses same connection for both writer and
* snapshot operations.
*/
public MySqlSnapshotSplitReadTask(
MySqlSourceConfig sourceConfig,
MySqlConnectorConfig connectorConfig,
SnapshotChangeEventSourceMetrics<MySqlPartition> snapshotChangeEventSourceMetrics,
MySqlDatabaseSchema databaseSchema,
MySqlConnection jdbcConnection,
EventDispatcherImpl<TableId> dispatcher,
TopicSelector<TableId> topicSelector,
EventDispatcher.SnapshotReceiver<MySqlPartition> snapshotReceiver,
Clock clock,
MySqlSnapshotSplit snapshotSplit,
SnapshotPhaseHooks hooks,
boolean isBackfillSkipped) {
this(
sourceConfig,
connectorConfig,
snapshotChangeEventSourceMetrics,
databaseSchema,
jdbcConnection,
jdbcConnection,
dispatcher,
topicSelector,
snapshotReceiver,
clock,
snapshotSplit,
hooks,
isBackfillSkipped);
}

@Override
public SnapshotResult<MySqlOffsetContext> execute(
ChangeEventSourceContext context,
Expand Down Expand Up @@ -151,9 +194,10 @@ protected SnapshotResult<MySqlOffsetContext> doExecute(
dispatcher.getQueue());

if (hooks.getPreLowWatermarkAction() != null) {
hooks.getPreLowWatermarkAction().accept(jdbcConnection, snapshotSplit);
hooks.getPreLowWatermarkAction().accept(primaryConnection, snapshotSplit);
}
final BinlogOffset lowWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection);
// Use writer connection for binlog position (low watermark)
final BinlogOffset lowWatermark = DebeziumUtils.currentBinlogOffset(primaryConnection);
LOG.info(
"Snapshot step 1 - Determining low watermark {} for split {}",
lowWatermark,
Expand All @@ -164,14 +208,14 @@ protected SnapshotResult<MySqlOffsetContext> doExecute(
snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW);

if (hooks.getPostLowWatermarkAction() != null) {
hooks.getPostLowWatermarkAction().accept(jdbcConnection, snapshotSplit);
hooks.getPostLowWatermarkAction().accept(primaryConnection, snapshotSplit);
}

LOG.info("Snapshot step 2 - Snapshotting data");
createDataEvents(ctx, snapshotSplit.getTableId());

if (hooks.getPreHighWatermarkAction() != null) {
hooks.getPreHighWatermarkAction().accept(jdbcConnection, snapshotSplit);
hooks.getPreHighWatermarkAction().accept(primaryConnection, snapshotSplit);
}

BinlogOffset highWatermark;
Expand All @@ -185,8 +229,8 @@ protected SnapshotResult<MySqlOffsetContext> doExecute(
// phase.
highWatermark = lowWatermark;
} else {
// Get the current binlog offset as HW
highWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection);
// Use writer connection for binlog position (high watermark)
highWatermark = DebeziumUtils.currentBinlogOffset(primaryConnection);
}

LOG.info(
Expand All @@ -199,7 +243,7 @@ protected SnapshotResult<MySqlOffsetContext> doExecute(
.setHighWatermark(highWatermark);

if (hooks.getPostHighWatermarkAction() != null) {
hooks.getPostHighWatermarkAction().accept(jdbcConnection, snapshotSplit);
hooks.getPostHighWatermarkAction().accept(primaryConnection, snapshotSplit);
}
return SnapshotResult.completed(ctx.offset);
}
Expand Down Expand Up @@ -254,9 +298,14 @@ private void createDataEventsForTable(
table.id(),
selectSql);

LOG.debug(
"Executing snapshot query for split '{}' of table {} via host {}",
snapshotSplit.splitId(),
table.id(),
snapshotConnection.connectionConfig().hostname());
try (PreparedStatement selectStatement =
StatementUtils.readTableSplitDataStatement(
jdbcConnection,
snapshotConnection,
selectSql,
snapshotSplit.getSplitStart() == null,
snapshotSplit.getSplitEnd() == null,
Expand Down Expand Up @@ -337,7 +386,7 @@ else if (actualColumn.jdbcType() == Types.TIMESTAMP) {
return readTimestampField(rs, fieldNo, actualColumn, actualTable);
}
// JDBC's rs.GetObject() will return a Boolean for all TINYINT(1) columns.
// TINYINT columns are reprtoed as SMALLINT by JDBC driver
// TINYINT columns are reported as SMALLINT by JDBC driver
else if (actualColumn.jdbcType() == Types.TINYINT
|| actualColumn.jdbcType() == Types.SMALLINT) {
// It seems that rs.wasNull() returns false when default value is set and NULL is
Expand Down
Loading
Loading