Skip to content

Commit 8edef8f

Browse files
committed
Fix code reviews
1 parent ec0d3f7 commit 8edef8f

2 files changed

Lines changed: 23 additions & 5 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
2323
import org.apache.flink.table.catalog.ObjectPath;
2424

25+
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
26+
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
27+
import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
28+
2529
import io.debezium.config.Configuration;
2630
import io.debezium.connector.mysql.MySqlConnectorConfig;
2731
import io.debezium.relational.RelationalTableFilters;
@@ -35,14 +39,15 @@
3539
import java.util.List;
3640
import java.util.Map;
3741
import java.util.Properties;
38-
import java.util.concurrent.ConcurrentHashMap;
3942
import java.util.function.Predicate;
4043

4144
import static org.apache.flink.util.Preconditions.checkNotNull;
4245

4346
/** A MySql Source configuration which is used by {@link MySqlSource}. */
4447
public class MySqlSourceConfig implements Serializable {
4548
private static final long serialVersionUID = 1L;
49+
private static final Duration TABLE_FILTER_CACHE_EXPIRE_DURATION = Duration.ofHours(1);
50+
private static final long TABLE_FILTER_CACHE_MAXIMUM_SIZE = 1024;
4651

4752
private final String hostname;
4853
private final int port;
@@ -283,10 +288,19 @@ public Predicate<TableId> getTableFilter() {
283288

284289
static Tables.TableFilter createCachedTableFilter(
285290
Tables.TableFilter tableFilter, @Nullable Selectors excludeTableFilter) {
286-
Map<TableId, Boolean> tableFilterCache = new ConcurrentHashMap<>();
287-
return tableId ->
288-
tableFilterCache.computeIfAbsent(
289-
tableId, id -> isTableIncluded(tableFilter, excludeTableFilter, id));
291+
LoadingCache<TableId, Boolean> tableFilterCache =
292+
CacheBuilder.newBuilder()
293+
.expireAfterAccess(TABLE_FILTER_CACHE_EXPIRE_DURATION)
294+
.maximumSize(TABLE_FILTER_CACHE_MAXIMUM_SIZE)
295+
.build(
296+
new CacheLoader<TableId, Boolean>() {
297+
@Override
298+
public Boolean load(TableId tableId) {
299+
return isTableIncluded(
300+
tableFilter, excludeTableFilter, tableId);
301+
}
302+
});
303+
return tableFilterCache::getUnchecked;
290304
}
291305

292306
private static boolean isTableIncluded(

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,16 @@ void testCachesTableFilterResults() {
4141
null);
4242

4343
TableId includedTable = new TableId("test_db", null, "orders_1");
44+
TableId sameIncludedTable = new TableId("test_db", null, "orders_1");
4445
TableId unmatchedTable = new TableId("test_db", null, "customers");
46+
TableId sameUnmatchedTable = new TableId("test_db", null, "customers");
4547

4648
assertThat(cachedTableFilter.isIncluded(includedTable)).isTrue();
4749
assertThat(cachedTableFilter.isIncluded(includedTable)).isTrue();
50+
assertThat(cachedTableFilter.isIncluded(sameIncludedTable)).isTrue();
4851
assertThat(cachedTableFilter.isIncluded(unmatchedTable)).isFalse();
4952
assertThat(cachedTableFilter.isIncluded(unmatchedTable)).isFalse();
53+
assertThat(cachedTableFilter.isIncluded(sameUnmatchedTable)).isFalse();
5054
assertThat(filterInvocationCount).hasValue(2);
5155
}
5256

0 commit comments

Comments
 (0)