Skip to content

[FLINK-39824][mysql] Cache table filter results to reduce regex matching#4422

Open
taoran92 wants to merge 2 commits into
apache:masterfrom
taoran92:cache-mysql-table-filter
Open

[FLINK-39824][mysql] Cache table filter results to reduce regex matching#4422
taoran92 wants to merge 2 commits into
apache:masterfrom
taoran92:cache-mysql-table-filter

Conversation

@taoran92

@taoran92 taoran92 commented Jun 2, 2026

Copy link
Copy Markdown
Member

What is the purpose of the change

This PR reduces high CPU usage in MySQL CDC source when synchronizing a large number of tables.

In large-table scenarios, MySQL binlog event processing may repeatedly check whether the same TableId should be included by the configured table filters. The hot path goes through Debezium's table filter
predicates, which rely on regex matching:

java.util.regex.Matcher.match
java.util.regex.Matcher.matches
io.debezium.relational.RelationalTableFilters
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.informAboutUnknownTableIfRequired

flame graph:

20260602-203520 (1)

When the table list is large or the regex patterns are complex, repeatedly evaluating the same table filter result can consume significant CPU and cause TaskManager CPU usage to stay close to 100%.

This PR caches the table filter result by TableId after constructing the Debezium table filter. The cached filter preserves the existing semantics of the Debezium include filter and Flink CDC excludeTableList, while avoiding repeated regex evaluation for the same table.

Brief change log

  • Cache MySQL CDC table filter results by TableId in MySqlSourceConfig
  • Preserve existing include/exclude table filter semantics when using the cached filter
  • Add unit tests to verify repeated checks for the same table reuse the cached result
  • Add unit tests to verify excludeTableList behavior is unchanged

No configuration option is added because this only caches deterministic table filter results and does not change filtering semantics.

Verifying this change

This change is verified by unit tests:

  • MySqlSourceConfigTest#testCachesTableFilterResults
  • MySqlSourceConfigTest#testTableFilterWithExcludeTableList

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e. is any changed class annotated with @public(@PublicEvolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes
  • Anything that affects deployment or recovery: no

Documentation

Does this pull request introduce a new feature? no

If yes, how is the feature documented? not applicable

@taoran92

taoran92 commented Jun 2, 2026

Copy link
Copy Markdown
Member Author

@lvyanquan Can u help me review this PR?

@leonardBang leonardBang requested review from Copilot and lvyanquan and removed request for lvyanquan June 15, 2026 04:10

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR reduces CPU overhead in the MySQL CDC connector by caching the result of Debezium table-filter checks per TableId, avoiding repeated regex-based matching on the binlog event hot path.

Changes:

  • Wrap Debezium’s dataCollectionFilter() with a cached Tables.TableFilter backed by a ConcurrentHashMap.
  • Preserve existing include + excludeTableList semantics by caching the combined include/exclude decision.
  • Add unit tests to validate cache reuse and unchanged excludeTableList behavior.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.

File Description
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java Introduces a cached TableFilter via createCachedTableFilter(...) and installs it into Debezium table filters.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigTest.java Adds tests verifying cached evaluation for repeated TableId checks and correct include/exclude semantics with excludeTableList.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@taoran92 taoran92 requested a review from Copilot June 17, 2026 06:51

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.

Comment on lines +284 to +290
static Tables.TableFilter createCachedTableFilter(
Tables.TableFilter tableFilter, @Nullable Selectors excludeTableFilter) {
Map<TableId, Boolean> tableFilterCache = new ConcurrentHashMap<>();
return tableId ->
tableFilterCache.computeIfAbsent(
tableId, id -> isTableIncluded(tableFilter, excludeTableFilter, id));
}

@taoran92 taoran92 Jun 17, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I replaced the unbounded ConcurrentHashMap with a bounded LoadingCache (max 1024 entries, expire after 1h access), so hot TableIds are still cached while long-running jobs won't retain unlimited dynamic table keys.

BTW, I kept the cache bounds as internal constants instead of adding new connector options. This cache is only an internal optimization: eviction may cause the table filter to be evaluated again, but it does not affect correctness. Exposing these values would expand the public connector API without a clear user-facing semantic need.

The default maximum size follows the existing selector cache convention in the codebase, and we can still expose/tune it later if real workloads show the defaults are insufficient.

Comment on lines +46 to +50
assertThat(cachedTableFilter.isIncluded(includedTable)).isTrue();
assertThat(cachedTableFilter.isIncluded(includedTable)).isTrue();
assertThat(cachedTableFilter.isIncluded(unmatchedTable)).isFalse();
assertThat(cachedTableFilter.isIncluded(unmatchedTable)).isFalse();
assertThat(filterInvocationCount).hasValue(2);

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. The cache is keyed by TableId. I have addressed it in new commit.

@taoran92

taoran92 commented Jun 17, 2026

Copy link
Copy Markdown
Member Author

Hi @leonardBang All review comments have been addressed. PTAL, thanks

@taoran92 taoran92 force-pushed the cache-mysql-table-filter branch from 7f2977a to 8edef8f Compare June 17, 2026 08:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants