Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/db2-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,16 @@ Db2 server.
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
The maximum number of records emitted per second, the default value: -1, means no rate limiting.(only for flink2.x)<br>
WARNING: Incremental/binlog phase: Rate limiting can cause the connector to fall behind the upstream change stream, risking binlog/WAL purging before the connector catches up (data loss for MySQL, replication slot issues for PostgreSQL).
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,16 @@ MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
每秒发出的最大记录数,默认值为 -1,表示不进行速率限制。(仅适用于 flink 2.x)<br>
警告:增量/二进制日志阶段:速率限制可能导致连接器落后于上游变更流,在连接器赶上之前,二进制日志/WAL 可能会被清除(MySQL 数据丢失,PostgreSQL 复制槽问题)。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,16 @@ Flink SQL> SELECT * FROM orders;
如果设置 'use.legacy.json.format' = 'false', 这条数据会被转换为 {"key1": "value1", "key2": "value2"}, 也就是 key 和 value 前的空格都会被保留。
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
每秒发出的最大记录数,默认值为 -1,表示不进行速率限制。(仅适用于 flink 2.x)<br>
警告:增量/二进制日志阶段:速率限制可能导致连接器落后于上游变更流,在连接器赶上之前,二进制日志/WAL 可能会被清除(MySQL 数据丢失,PostgreSQL 复制槽问题)。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/oceanbase-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,16 @@ Flink SQL> SELECT * FROM orders;
如果设置 'use.legacy.json.format' = 'false', 这条数据会被转换为 {"key1": "value1", "key2": "value2"}, 也就是 key 和 value 前的空格都会被保留。
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
每秒发出的最大记录数,默认值为 -1,表示不进行速率限制。(仅适用于 flink 2.x)<br>
警告:增量/二进制日志阶段:速率限制可能导致连接器落后于上游变更流,在连接器赶上之前,二进制日志/WAL 可能会被清除(MySQL 数据丢失,PostgreSQL 复制槽问题)。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,16 @@ Connector Options
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
The maximum number of records emitted per second, the default value: -1, means no rate limiting.(only for flink2.x)<br>
WARNING: Incremental/binlog phase: Rate limiting can cause the connector to fall behind the upstream change stream, risking binlog/WAL purging before the connector catches up (data loss for MySQL, replication slot issues for PostgreSQL).
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,16 @@ Connector Options
(2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice.
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
每秒发出的最大记录数,默认值为 -1,表示不进行速率限制。(仅适用于 flink 2.x)<br>
警告:增量/二进制日志阶段:速率限制可能导致连接器落后于上游变更流,在连接器赶上之前,二进制日志/WAL 可能会被清除(MySQL 数据丢失,PostgreSQL 复制槽问题)。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/sqlserver-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,16 @@ Connector Options
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
The maximum number of records emitted per second, the default value: -1, means no rate limiting.(only for flink2.x)<br>
WARNING: Incremental/binlog phase: Rate limiting can cause the connector to fall behind the upstream change stream, risking binlog/WAL purging before the connector catches up (data loss for MySQL, replication slot issues for PostgreSQL).
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,16 @@ pipeline:
如果设置 'use.legacy.json.format' = 'false', 这条数据会被转换为 {"key1": "value1", "key2": "value2"}, 也就是 key 和 value 前的空格都会被保留。
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
每秒发出的最大记录数,默认值为 -1,表示不进行速率限制。(仅适用于 flink 2.x)<br>
警告:增量/二进制日志阶段:速率限制可能导致连接器落后于上游变更流,在连接器赶上之前,二进制日志/WAL 可能会被清除(MySQL 数据丢失,PostgreSQL 复制槽问题)。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/oracle.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,16 @@ pipeline:
是否在快照读取阶段跳过回填。如果跳过回填,则快照阶段捕获的表上的更改将在稍后的增量读取阶段被使用,而不是合并到快照中。警告:跳过回填可能会导致数据不一致,因为在快照阶段发生的一些binlog事件可能会被重放(至少只有一次语义承诺)。例如,更新快照中已更新的值,或删除快照中已删除的条目。这些重放的增量事件应该特别处理。
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
每秒发出的最大记录数,默认值为 -1,表示不进行速率限制。(仅适用于 flink 2.x)<br>
警告:增量/二进制日志阶段:速率限制可能导致连接器落后于上游变更流,在连接器赶上之前,二进制日志/WAL 可能会被清除(MySQL 数据丢失,PostgreSQL 复制槽问题)。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,16 @@ pipeline:
默认值为 false。
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
每秒发出的最大记录数,默认值为 -1,表示不进行速率限制。(仅适用于 flink 2.x)<br>
警告:增量/二进制日志阶段:速率限制可能导致连接器落后于上游变更流,在连接器赶上之前,二进制日志/WAL 可能会被清除(MySQL 数据丢失,PostgreSQL 复制槽问题)。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content/docs/connectors/flink-sources/db2-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,16 @@ Db2 server.
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
The maximum number of records emitted per second, the default value: -1, means no rate limiting.(only for flink2.x)<br>
WARNING: Incremental/binlog phase: Rate limiting can cause the connector to fall behind the upstream change stream, risking binlog/WAL purging before the connector catches up (data loss for MySQL, replication slot issues for PostgreSQL).
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content/docs/connectors/flink-sources/mongodb-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,16 @@ Connector Options
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
The maximum number of records emitted per second, the default value: -1, means no rate limiting.(only for flink2.x)<br>
WARNING: Incremental/binlog phase: Rate limiting can cause the connector to fall behind the upstream change stream, risking binlog/WAL purging before the connector catches up (data loss for MySQL, replication slot issues for PostgreSQL).
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,16 @@ Only valid for cdc 1.x version. During a snapshot operation, the connector will
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
The maximum number of records emitted per second, the default value: -1, means no rate limiting.(only for flink2.x)<br>
WARNING: Incremental/binlog phase: Rate limiting can cause the connector to fall behind the upstream change stream, risking binlog/WAL purging before the connector catches up (data loss for MySQL, replication slot issues for PostgreSQL).
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content/docs/connectors/flink-sources/oceanbase-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,16 @@ During a snapshot operation, the connector will query each included table to pro
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
The maximum number of records emitted per second, the default value: -1, means no rate limiting.(only for flink2.x)<br>
WARNING: Incremental/binlog phase: Rate limiting can cause the connector to fall behind the upstream change stream, risking binlog/WAL purging before the connector catches up (data loss for MySQL, replication slot issues for PostgreSQL).
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content/docs/connectors/flink-sources/oracle-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,16 @@ Connector Options
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
The maximum number of records emitted per second, the default value: -1, means no rate limiting.(only for flink2.x)<br>
WARNING: Incremental/binlog phase: Rate limiting can cause the connector to fall behind the upstream change stream, risking binlog/WAL purging before the connector catches up (data loss for MySQL, replication slot issues for PostgreSQL).
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content/docs/connectors/flink-sources/postgres-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,16 @@ SELECT * FROM shipments;
(2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice.
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
The maximum number of records emitted per second, the default value: -1, means no rate limiting.(only for flink2.x)<br>
WARNING: Incremental/binlog phase: Rate limiting can cause the connector to fall behind the upstream change stream, risking binlog/WAL purging before the connector catches up (data loss for MySQL, replication slot issues for PostgreSQL).
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content/docs/connectors/flink-sources/sqlserver-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,16 @@ Connector Options
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
The maximum number of records emitted per second, the default value: -1, means no rate limiting.(only for flink2.x)<br>
WARNING: Incremental/binlog phase: Rate limiting can cause the connector to fall behind the upstream change stream, risking binlog/WAL purging before the connector catches up (data loss for MySQL, replication slot issues for PostgreSQL).
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,16 @@ pipeline:
List of readable metadata from SourceRecord to be passed to downstream and could be used in transform module, split by `,`. Available readable metadata are: op_ts.
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
The maximum number of records emitted per second, the default value: -1, means no rate limiting.(only for flink2.x)<br>
WARNING: Incremental/binlog phase: Rate limiting can cause the connector to fall behind the upstream change stream, risking binlog/WAL purging before the connector catches up (data loss for MySQL, replication slot issues for PostgreSQL).
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/oracle.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,16 @@ The max retry times that the connector should retry to build oracle database ser
Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in incremental reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some incremental events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed binlog events should be handled specially.
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
The maximum number of records emitted per second, the default value: -1, means no rate limiting.(only for flink2.x)<br>
WARNING: Incremental/binlog phase: Rate limiting can cause the connector to fall behind the upstream change stream, risking binlog/WAL purging before the connector catches up (data loss for MySQL, replication slot issues for PostgreSQL).
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
10 changes: 10 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,16 @@ pipeline:
Defaults to false.
</td>
</tr>
<tr>
<td>records.per.second</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Double</td>
<td>
The maximum number of records emitted per second, the default value: -1, means no rate limiting.(only for flink2.x)<br>
WARNING: Incremental/binlog phase: Rate limiting can cause the connector to fall behind the upstream change stream, risking binlog/WAL purging before the connector catches up (data loss for MySQL, replication slot issues for PostgreSQL).
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.RECORDS_PER_SECOND;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
Expand Down Expand Up @@ -167,6 +168,7 @@ public DataSource createDataSource(Context context) {
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
boolean isAssignUnboundedChunkFirst =
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
double recordsPerSecond = config.get(RECORDS_PER_SECOND);

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -220,7 +222,8 @@ public DataSource createDataSource(Context context) {
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
.useLegacyJsonFormat(useLegacyJsonFormat)
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
.skipSnapshotBackfill(skipSnapshotBackfill);
.skipSnapshotBackfill(skipSnapshotBackfill)
.recordsPerSecond(recordsPerSecond);

List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);

Expand Down Expand Up @@ -358,6 +361,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
options.add(RECORDS_PER_SECOND);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,4 +330,11 @@ public class MySqlDataSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.");

public static final ConfigOption<Double> RECORDS_PER_SECOND =
ConfigOptions.key("records.per.second")
.doubleType()
.defaultValue(-1d)
.withDescription(
"The maximum number of records emitted per second, the default value: -1, means no rate limiting.");
}
Loading