[FLINK-39824][mysql] Cache table filter results to reduce regex matching#4422
[FLINK-39824][mysql] Cache table filter results to reduce regex matching#4422taoran92 wants to merge 2 commits into
Conversation
|
@lvyanquan Can u help me review this PR? |
There was a problem hiding this comment.
Pull request overview
This PR reduces CPU overhead in the MySQL CDC connector by caching the result of Debezium table-filter checks per TableId, avoiding repeated regex-based matching on the binlog event hot path.
Changes:
- Wrap Debezium’s
dataCollectionFilter()with a cachedTables.TableFilterbacked by aConcurrentHashMap. - Preserve existing include +
excludeTableListsemantics by caching the combined include/exclude decision. - Add unit tests to validate cache reuse and unchanged
excludeTableListbehavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java | Introduces a cached TableFilter via createCachedTableFilter(...) and installs it into Debezium table filters. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigTest.java | Adds tests verifying cached evaluation for repeated TableId checks and correct include/exclude semantics with excludeTableList. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| static Tables.TableFilter createCachedTableFilter( | ||
| Tables.TableFilter tableFilter, @Nullable Selectors excludeTableFilter) { | ||
| Map<TableId, Boolean> tableFilterCache = new ConcurrentHashMap<>(); | ||
| return tableId -> | ||
| tableFilterCache.computeIfAbsent( | ||
| tableId, id -> isTableIncluded(tableFilter, excludeTableFilter, id)); | ||
| } |
There was a problem hiding this comment.
Good point. I replaced the unbounded ConcurrentHashMap with a bounded LoadingCache (max 1024 entries, expire after 1h access), so hot TableIds are still cached while long-running jobs won't retain unlimited dynamic table keys.
BTW, I kept the cache bounds as internal constants instead of adding new connector options. This cache is only an internal optimization: eviction may cause the table filter to be evaluated again, but it does not affect correctness. Exposing these values would expand the public connector API without a clear user-facing semantic need.
The default maximum size follows the existing selector cache convention in the codebase, and we can still expose/tune it later if real workloads show the defaults are insufficient.
| assertThat(cachedTableFilter.isIncluded(includedTable)).isTrue(); | ||
| assertThat(cachedTableFilter.isIncluded(includedTable)).isTrue(); | ||
| assertThat(cachedTableFilter.isIncluded(unmatchedTable)).isFalse(); | ||
| assertThat(cachedTableFilter.isIncluded(unmatchedTable)).isFalse(); | ||
| assertThat(filterInvocationCount).hasValue(2); |
There was a problem hiding this comment.
Good point. The cache is keyed by TableId. I have addressed it in new commit.
|
Hi @leonardBang All review comments have been addressed. PTAL, thanks |
7f2977a to
8edef8f
Compare
What is the purpose of the change
This PR reduces high CPU usage in MySQL CDC source when synchronizing a large number of tables.
In large-table scenarios, MySQL binlog event processing may repeatedly check whether the same TableId should be included by the configured table filters. The hot path goes through Debezium's table filter
predicates, which rely on regex matching:
flame graph:
When the table list is large or the regex patterns are complex, repeatedly evaluating the same table filter result can consume significant CPU and cause TaskManager CPU usage to stay close to 100%.
This PR caches the table filter result by TableId after constructing the Debezium table filter. The cached filter preserves the existing semantics of the Debezium include filter and Flink CDC excludeTableList, while avoiding repeated regex evaluation for the same table.
Brief change log
No configuration option is added because this only caches deterministic table filter results and does not change filtering semantics.
Verifying this change
This change is verified by unit tests:
Does this pull request potentially affect one of the following parts:
Documentation
Does this pull request introduce a new feature? no
If yes, how is the feature documented? not applicable