From ec0d3f772415e7e8773536280391fd9d652dc4e7 Mon Sep 17 00:00:00 2001 From: Ran Tao Date: Mon, 1 Jun 2026 17:37:30 +0800 Subject: [PATCH 1/2] [FLINK-39824][mysql] Cache table filter results to reduce regex matching --- .../source/config/MySqlSourceConfig.java | 23 ++++-- .../source/config/MySqlSourceConfigTest.java | 74 +++++++++++++++++++ 2 files changed, 92 insertions(+), 5 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigTest.java diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index cf456fcaed0..13e3ffdd1f1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -146,11 +147,7 @@ public class MySqlSourceConfig implements Serializable { Tables.TableFilter tableFilter = dbzMySqlConfig.getTableFilters().dataCollectionFilter(); dbzMySqlConfig .getTableFilters() - .setDataCollectionFilters( - (TableId tableId) -> - tableFilter.isIncluded(tableId) - && (excludeTableFilter == null - || !excludeTableFilter.isMatch(tableId))); + .setDataCollectionFilters(createCachedTableFilter(tableFilter, excludeTableFilter)); this.jdbcProperties = jdbcProperties; this.chunkKeyColumns = chunkKeyColumns; this.skipSnapshotBackfill = skipSnapshotBackfill; @@ -284,6 +281,22 @@ public Predicate getTableFilter() { return tableId -> tableFilters.dataCollectionFilter().isIncluded(tableId); } + static Tables.TableFilter createCachedTableFilter( + Tables.TableFilter tableFilter, @Nullable Selectors excludeTableFilter) { + Map tableFilterCache = new ConcurrentHashMap<>(); + return tableId -> + tableFilterCache.computeIfAbsent( + tableId, id -> isTableIncluded(tableFilter, excludeTableFilter, id)); + } + + private static boolean isTableIncluded( + Tables.TableFilter tableFilter, + @Nullable Selectors excludeTableFilter, + TableId tableId) { + return tableFilter.isIncluded(tableId) + && (excludeTableFilter == null || !excludeTableFilter.isMatch(tableId)); + } + public Properties getJdbcProperties() { return jdbcProperties; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigTest.java new file mode 100644 index 00000000000..2ef1893f834 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.config; + +import io.debezium.relational.TableId; +import io.debezium.relational.Tables; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link MySqlSourceConfig}. */ +class MySqlSourceConfigTest { + + @Test + void testCachesTableFilterResults() { + AtomicInteger filterInvocationCount = new AtomicInteger(); + Tables.TableFilter cachedTableFilter = + MySqlSourceConfig.createCachedTableFilter( + tableId -> { + filterInvocationCount.incrementAndGet(); + return tableId.table().startsWith("orders"); + }, + null); + + TableId includedTable = new TableId("test_db", null, "orders_1"); + TableId unmatchedTable = new TableId("test_db", null, "customers"); + + assertThat(cachedTableFilter.isIncluded(includedTable)).isTrue(); + assertThat(cachedTableFilter.isIncluded(includedTable)).isTrue(); + assertThat(cachedTableFilter.isIncluded(unmatchedTable)).isFalse(); + assertThat(cachedTableFilter.isIncluded(unmatchedTable)).isFalse(); + assertThat(filterInvocationCount).hasValue(2); + } + + @Test + void testTableFilterWithExcludeTableList() { + MySqlSourceConfig config = + new MySqlSourceConfigFactory() + .hostname("localhost") + .username("user") + .password("password") + .databaseList("test_db") + .tableList("test_db\\.orders_.*") + .excludeTableList("test_db.orders_skip") + .createConfig(0); + + Predicate tableFilter = config.getTableFilter(); + TableId includedTable = new TableId("test_db", null, "orders_1"); + TableId excludedTable = new TableId("test_db", null, "orders_skip"); + TableId unmatchedTable = new TableId("test_db", null, "customers"); + + assertThat(tableFilter.test(includedTable)).isTrue(); + assertThat(tableFilter.test(excludedTable)).isFalse(); + assertThat(tableFilter.test(unmatchedTable)).isFalse(); + } +} From 8edef8f8dd91c6e96cc02329868b67f146e8553c Mon Sep 17 00:00:00 2001 From: Ran Tao Date: Wed, 17 Jun 2026 15:36:39 +0800 Subject: [PATCH 2/2] Fix code reviews --- .../source/config/MySqlSourceConfig.java | 24 +++++++++++++++---- .../source/config/MySqlSourceConfigTest.java | 4 ++++ 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index 13e3ffdd1f1..991f397601b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -22,6 +22,10 @@ import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader; +import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache; + import io.debezium.config.Configuration; import io.debezium.connector.mysql.MySqlConnectorConfig; import io.debezium.relational.RelationalTableFilters; @@ -35,7 +39,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -43,6 +46,8 @@ /** A MySql Source configuration which is used by {@link MySqlSource}. */ public class MySqlSourceConfig implements Serializable { private static final long serialVersionUID = 1L; + private static final Duration TABLE_FILTER_CACHE_EXPIRE_DURATION = Duration.ofHours(1); + private static final long TABLE_FILTER_CACHE_MAXIMUM_SIZE = 1024; private final String hostname; private final int port; @@ -283,10 +288,19 @@ public Predicate getTableFilter() { static Tables.TableFilter createCachedTableFilter( Tables.TableFilter tableFilter, @Nullable Selectors excludeTableFilter) { - Map tableFilterCache = new ConcurrentHashMap<>(); - return tableId -> - tableFilterCache.computeIfAbsent( - tableId, id -> isTableIncluded(tableFilter, excludeTableFilter, id)); + LoadingCache tableFilterCache = + CacheBuilder.newBuilder() + .expireAfterAccess(TABLE_FILTER_CACHE_EXPIRE_DURATION) + .maximumSize(TABLE_FILTER_CACHE_MAXIMUM_SIZE) + .build( + new CacheLoader() { + @Override + public Boolean load(TableId tableId) { + return isTableIncluded( + tableFilter, excludeTableFilter, tableId); + } + }); + return tableFilterCache::getUnchecked; } private static boolean isTableIncluded( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigTest.java index 2ef1893f834..0c400619244 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigTest.java @@ -41,12 +41,16 @@ void testCachesTableFilterResults() { null); TableId includedTable = new TableId("test_db", null, "orders_1"); + TableId sameIncludedTable = new TableId("test_db", null, "orders_1"); TableId unmatchedTable = new TableId("test_db", null, "customers"); + TableId sameUnmatchedTable = new TableId("test_db", null, "customers"); assertThat(cachedTableFilter.isIncluded(includedTable)).isTrue(); assertThat(cachedTableFilter.isIncluded(includedTable)).isTrue(); + assertThat(cachedTableFilter.isIncluded(sameIncludedTable)).isTrue(); assertThat(cachedTableFilter.isIncluded(unmatchedTable)).isFalse(); assertThat(cachedTableFilter.isIncluded(unmatchedTable)).isFalse(); + assertThat(cachedTableFilter.isIncluded(sameUnmatchedTable)).isFalse(); assertThat(filterInvocationCount).hasValue(2); }