Skip to content

Commit ec0d3f7

Browse files
committed
[FLINK-39824][mysql] Cache table filter results to reduce regex matching
1 parent be7d374 commit ec0d3f7

2 files changed

Lines changed: 92 additions & 5 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.List;
3636
import java.util.Map;
3737
import java.util.Properties;
38+
import java.util.concurrent.ConcurrentHashMap;
3839
import java.util.function.Predicate;
3940

4041
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -146,11 +147,7 @@ public class MySqlSourceConfig implements Serializable {
146147
Tables.TableFilter tableFilter = dbzMySqlConfig.getTableFilters().dataCollectionFilter();
147148
dbzMySqlConfig
148149
.getTableFilters()
149-
.setDataCollectionFilters(
150-
(TableId tableId) ->
151-
tableFilter.isIncluded(tableId)
152-
&& (excludeTableFilter == null
153-
|| !excludeTableFilter.isMatch(tableId)));
150+
.setDataCollectionFilters(createCachedTableFilter(tableFilter, excludeTableFilter));
154151
this.jdbcProperties = jdbcProperties;
155152
this.chunkKeyColumns = chunkKeyColumns;
156153
this.skipSnapshotBackfill = skipSnapshotBackfill;
@@ -284,6 +281,22 @@ public Predicate<TableId> getTableFilter() {
284281
return tableId -> tableFilters.dataCollectionFilter().isIncluded(tableId);
285282
}
286283

284+
static Tables.TableFilter createCachedTableFilter(
285+
Tables.TableFilter tableFilter, @Nullable Selectors excludeTableFilter) {
286+
Map<TableId, Boolean> tableFilterCache = new ConcurrentHashMap<>();
287+
return tableId ->
288+
tableFilterCache.computeIfAbsent(
289+
tableId, id -> isTableIncluded(tableFilter, excludeTableFilter, id));
290+
}
291+
292+
private static boolean isTableIncluded(
293+
Tables.TableFilter tableFilter,
294+
@Nullable Selectors excludeTableFilter,
295+
TableId tableId) {
296+
return tableFilter.isIncluded(tableId)
297+
&& (excludeTableFilter == null || !excludeTableFilter.isMatch(tableId));
298+
}
299+
287300
public Properties getJdbcProperties() {
288301
return jdbcProperties;
289302
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.mysql.source.config;
19+
20+
import io.debezium.relational.TableId;
21+
import io.debezium.relational.Tables;
22+
import org.junit.jupiter.api.Test;
23+
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.function.Predicate;
26+
27+
import static org.assertj.core.api.Assertions.assertThat;
28+
29+
/** Tests for {@link MySqlSourceConfig}. */
30+
class MySqlSourceConfigTest {
31+
32+
@Test
33+
void testCachesTableFilterResults() {
34+
AtomicInteger filterInvocationCount = new AtomicInteger();
35+
Tables.TableFilter cachedTableFilter =
36+
MySqlSourceConfig.createCachedTableFilter(
37+
tableId -> {
38+
filterInvocationCount.incrementAndGet();
39+
return tableId.table().startsWith("orders");
40+
},
41+
null);
42+
43+
TableId includedTable = new TableId("test_db", null, "orders_1");
44+
TableId unmatchedTable = new TableId("test_db", null, "customers");
45+
46+
assertThat(cachedTableFilter.isIncluded(includedTable)).isTrue();
47+
assertThat(cachedTableFilter.isIncluded(includedTable)).isTrue();
48+
assertThat(cachedTableFilter.isIncluded(unmatchedTable)).isFalse();
49+
assertThat(cachedTableFilter.isIncluded(unmatchedTable)).isFalse();
50+
assertThat(filterInvocationCount).hasValue(2);
51+
}
52+
53+
@Test
54+
void testTableFilterWithExcludeTableList() {
55+
MySqlSourceConfig config =
56+
new MySqlSourceConfigFactory()
57+
.hostname("localhost")
58+
.username("user")
59+
.password("password")
60+
.databaseList("test_db")
61+
.tableList("test_db\\.orders_.*")
62+
.excludeTableList("test_db.orders_skip")
63+
.createConfig(0);
64+
65+
Predicate<TableId> tableFilter = config.getTableFilter();
66+
TableId includedTable = new TableId("test_db", null, "orders_1");
67+
TableId excludedTable = new TableId("test_db", null, "orders_skip");
68+
TableId unmatchedTable = new TableId("test_db", null, "customers");
69+
70+
assertThat(tableFilter.test(includedTable)).isTrue();
71+
assertThat(tableFilter.test(excludedTable)).isFalse();
72+
assertThat(tableFilter.test(unmatchedTable)).isFalse();
73+
}
74+
}

0 commit comments

Comments
 (0)