Skip to content

Commit 431bbcd

Browse files
bobhan1Copilot
andcommitted
refactor: derive tableFilterExpr from rules, show db.table names in MatchedTables
- Make tableFilterExpr transient (not persisted), computed from tableFilterRules via canonicalize() as single source of truth - Add computeTableFilterExpr() helper, called in constructor and rebuildOnTablesFilter() - Remove setTableFilterExpr() from Builder - Change currentTableIds from Set<Long> to Map<Long, String> mapping table ID to 'db.table' qualified name - Update getMatchedTablesString() to show sorted db.table names (e.g., 'ods.orders, ods.products') instead of numeric IDs - Update resolveTableIds() in CacheHotspotManager to return Map<Long, String> and update all callers - Add 3 new tests, update all existing tests for new API Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 0179b4d commit 431bbcd

File tree

4 files changed

+149
-89
lines changed

4 files changed

+149
-89
lines changed

fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
import java.util.Collections;
7070
import java.util.Comparator;
7171
import java.util.HashMap;
72-
import java.util.HashSet;
7372
import java.util.Iterator;
7473
import java.util.List;
7574
import java.util.Map;
@@ -873,9 +872,7 @@ public long createJob(WarmUpClusterCommand stmt) throws AnalysisException {
873872
pr.pattern = rule.getRawPattern();
874873
persistedRules.add(pr);
875874
}
876-
String tableFilterExpr = CloudWarmUpJob.canonicalize(persistedRules);
877-
builder.setTableFilterExpr(tableFilterExpr)
878-
.setTableFilterRules(persistedRules);
875+
builder.setTableFilterRules(persistedRules);
879876
}
880877
} else {
881878
builder.setSyncMode(SyncMode.ONCE);
@@ -885,9 +882,9 @@ public long createJob(WarmUpClusterCommand stmt) throws AnalysisException {
885882
// For event-driven jobs with ON TABLES, rebuild filter and resolve initial table IDs
886883
if (warmUpJob.hasTableFilter()) {
887884
warmUpJob.rebuildOnTablesFilter();
888-
Set<Long> initialTableIds = resolveTableIds(warmUpJob.getOnTablesFilter());
889-
warmUpJob.setCurrentTableIds(initialTableIds);
890-
if (initialTableIds.isEmpty()) {
885+
Map<Long, String> initialTableIdNames = resolveTableIds(warmUpJob.getOnTablesFilter());
886+
warmUpJob.setCurrentTableIdNames(initialTableIdNames);
887+
if (initialTableIdNames.isEmpty()) {
891888
LOG.warn("ON TABLES rules do not match any existing table for job {}", jobId);
892889
}
893890
}
@@ -952,8 +949,8 @@ public void replayCloudWarmUpJob(CloudWarmUpJob cloudWarmUpJob) throws Exception
952949
// Rebuild transient ON TABLES filter from persisted rules
953950
if (cloudWarmUpJob.hasTableFilter()) {
954951
cloudWarmUpJob.rebuildOnTablesFilter();
955-
Set<Long> tableIds = resolveTableIds(cloudWarmUpJob.getOnTablesFilter());
956-
cloudWarmUpJob.setCurrentTableIds(tableIds);
952+
Map<Long, String> tableIdNames = resolveTableIds(cloudWarmUpJob.getOnTablesFilter());
953+
cloudWarmUpJob.setCurrentTableIdNames(tableIdNames);
957954
}
958955

959956
if (cloudWarmUpJob.isDone()) {
@@ -974,11 +971,11 @@ public void replayCloudWarmUpJob(CloudWarmUpJob cloudWarmUpJob) throws Exception
974971
}
975972

976973
/**
977-
* Resolve glob-based ON TABLES filter to a set of matching table IDs
974+
* Resolve glob-based ON TABLES filter to a map of matching table ID → "db.table" name
978975
* by iterating all databases and tables in the internal catalog.
979976
*/
980-
public Set<Long> resolveTableIds(OnTablesFilter filter) {
981-
Set<Long> result = new HashSet<>();
977+
public Map<Long, String> resolveTableIds(OnTablesFilter filter) {
978+
Map<Long, String> result = new HashMap<>();
982979
if (filter == null) {
983980
return result;
984981
}
@@ -992,7 +989,7 @@ public Set<Long> resolveTableIds(OnTablesFilter filter) {
992989
}
993990
for (TableIf tableIf : dbIf.getTables()) {
994991
if (filter.shouldWarmUp(dbName, tableIf.getName())) {
995-
result.add(tableIf.getId());
992+
result.put(tableIf.getId(), dbName + "." + tableIf.getName());
996993
}
997994
}
998995
}
@@ -1009,14 +1006,14 @@ public void refreshAllTableFilters() {
10091006
continue;
10101007
}
10111008
try {
1012-
Set<Long> newTableIds = resolveTableIds(job.getOnTablesFilter());
1009+
Map<Long, String> newTableIdNames = resolveTableIds(job.getOnTablesFilter());
10131010
Set<Long> oldTableIds = job.getCurrentTableIds();
1014-
if (!newTableIds.equals(oldTableIds)) {
1015-
job.setCurrentTableIds(newTableIds);
1011+
if (!newTableIdNames.keySet().equals(oldTableIds)) {
1012+
job.setCurrentTableIdNames(newTableIdNames);
10161013
LOG.info("refreshed table filter for job {}: {} -> {} tables",
10171014
job.getJobId(),
10181015
oldTableIds == null ? 0 : oldTableIds.size(),
1019-
newTableIds.size());
1016+
newTableIdNames.size());
10201017
}
10211018
} catch (Exception e) {
10221019
LOG.warn("failed to refresh table filter for job {}", job.getJobId(), e);

fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
import java.io.IOException;
5959
import java.util.ArrayList;
6060
import java.util.HashMap;
61-
import java.util.HashSet;
6261
import java.util.Iterator;
6362
import java.util.List;
6463
import java.util.Map;
@@ -143,14 +142,14 @@ public enum SyncEvent {
143142
@SerializedName(value = "syncEvent")
144143
protected SyncEvent syncEvent;
145144

146-
@SerializedName(value = "tableFilterExpr")
147-
protected String tableFilterExpr = "";
148-
149145
@SerializedName(value = "tableFilterRules")
150146
protected List<PersistedTableFilterRule> tableFilterRules = new ArrayList<>();
151147

148+
// Computed from tableFilterRules via canonicalize(); not persisted.
149+
private transient String tableFilterExpr = "";
152150
private transient OnTablesFilter onTablesFilter;
153-
private transient Set<Long> currentTableIds = new HashSet<>();
151+
// Maps table ID → "db.table" qualified name for matched tables.
152+
private transient Map<Long, String> currentTableIdNames = new HashMap<>();
154153

155154
/**
156155
* Serializable rule for GSON persistence.
@@ -182,7 +181,6 @@ public static class Builder {
182181
private SyncMode syncMode = SyncMode.ONCE;
183182
private SyncEvent syncEvent;
184183
private long syncInterval;
185-
private String tableFilterExpr = "";
186184
private List<PersistedTableFilterRule> tableFilterRules = new ArrayList<>();
187185

188186
public Builder() {}
@@ -222,11 +220,6 @@ public Builder setSyncInterval(long syncInterval) {
222220
return this;
223221
}
224222

225-
public Builder setTableFilterExpr(String tableFilterExpr) {
226-
this.tableFilterExpr = tableFilterExpr;
227-
return this;
228-
}
229-
230223
public Builder setTableFilterRules(List<PersistedTableFilterRule> tableFilterRules) {
231224
this.tableFilterRules = tableFilterRules;
232225
return this;
@@ -249,8 +242,8 @@ private CloudWarmUpJob(Builder builder) {
249242
this.syncMode = builder.syncMode;
250243
this.syncEvent = builder.syncEvent;
251244
this.syncInterval = builder.syncInterval;
252-
this.tableFilterExpr = builder.tableFilterExpr;
253245
this.tableFilterRules = builder.tableFilterRules;
246+
this.tableFilterExpr = computeTableFilterExpr();
254247
this.createTimeMs = System.currentTimeMillis();
255248
}
256249

@@ -443,11 +436,10 @@ public List<String> getJobInfo() {
443436
}
444437

445438
private String getMatchedTablesString() {
446-
if (currentTableIds == null || currentTableIds.isEmpty()) {
439+
if (currentTableIdNames == null || currentTableIdNames.isEmpty()) {
447440
return "";
448441
}
449-
return currentTableIds.stream()
450-
.map(String::valueOf)
442+
return currentTableIdNames.values().stream()
451443
.sorted()
452444
.collect(Collectors.joining(", "));
453445
}
@@ -526,12 +518,29 @@ public OnTablesFilter getOnTablesFilter() {
526518
return onTablesFilter;
527519
}
528520

521+
/**
522+
* Returns the set of currently matched table IDs.
523+
*/
529524
public Set<Long> getCurrentTableIds() {
530-
return currentTableIds;
525+
return currentTableIdNames.keySet();
531526
}
532527

533-
public void setCurrentTableIds(Set<Long> tableIds) {
534-
this.currentTableIds = tableIds;
528+
/**
529+
* Sets the current matched table ID-to-name mapping.
530+
*/
531+
public void setCurrentTableIdNames(Map<Long, String> idNames) {
532+
this.currentTableIdNames = idNames;
533+
}
534+
535+
/**
536+
* Compute the canonical table filter expression from persisted rules.
537+
* Returns empty string when no table filter rules exist.
538+
*/
539+
private String computeTableFilterExpr() {
540+
if (tableFilterRules == null || tableFilterRules.isEmpty()) {
541+
return "";
542+
}
543+
return canonicalize(tableFilterRules);
535544
}
536545

537546
/**
@@ -565,13 +574,14 @@ public static String canonicalize(List<PersistedTableFilterRule> rules) {
565574
}
566575

567576
/**
568-
* Rebuild the transient OnTablesFilter from persisted tableFilterRules.
577+
* Rebuild the transient OnTablesFilter and tableFilterExpr from persisted tableFilterRules.
569578
* Called after deserialization (EditLog replay, FE restart).
570579
*/
571580
public void rebuildOnTablesFilter() {
572581
if (tableFilterRules == null || tableFilterRules.isEmpty()) {
573582
return;
574583
}
584+
this.tableFilterExpr = computeTableFilterExpr();
575585
List<OnTablesFilter.TableFilterRule> rules = tableFilterRules.stream()
576586
.map(r -> new OnTablesFilter.TableFilterRule(
577587
"INCLUDE".equals(r.ruleType)
@@ -821,13 +831,13 @@ private void runEventDrivenJob() throws Exception {
821831
}
822832
request.setEvent(event);
823833
if (hasTableFilter()) {
824-
request.setTableIds(currentTableIds != null
825-
? new ArrayList<>(currentTableIds) : new ArrayList<>());
834+
request.setTableIds(currentTableIdNames != null
835+
? new ArrayList<>(currentTableIdNames.keySet()) : new ArrayList<>());
826836
}
827837
LOG.debug("send warm up request to BE {} ({}). job_id={}, event={}, "
828838
+ "request_type=SET_JOB(EVENT), table_ids_count={}",
829839
entry.getKey(), getBackendEndpoint(entry.getKey()), jobId, syncEvent,
830-
hasTableFilter() ? currentTableIds.size() : "all");
840+
hasTableFilter() ? currentTableIdNames.size() : "all");
831841
TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request);
832842
if (response.getStatus().getStatusCode() != TStatusCode.OK) {
833843
if (!response.getStatus().getErrorMsgs().isEmpty()) {

0 commit comments

Comments
 (0)