Skip to content

Commit d1e30df

Browse files
authored
[fix](cloud) Align colocate proc output and tablet health in cloud mode (#60944)
- Fix incorrect tablet health statistics in cloud mode for SHOW PROC '/cluster_health/tablet_health': avoid reporting UNRECOVERABLE due to local-mode health checks. - Add cloud fallback for colocation group detail: when backend sequence metadata is empty, derive backend ids from current tablets. - In cloud mode, show ReplicaAllocation as null in SHOW PROC '/colocation_group/{GroupId}' for consistent output semantics. - Keep local mode behavior unchanged.
1 parent 7a79dd8 commit d1e30df

9 files changed

Lines changed: 679 additions & 23 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,19 @@ public GroupId getGroup(long tableId) {
437437
}
438438
}
439439

440+
public Long getDbIdByTblIdNullable(GroupId groupId, long tableId) {
441+
readLock();
442+
try {
443+
GroupId tableGroupId = table2Group.get(tableId);
444+
if (tableGroupId == null || !tableGroupId.equals(groupId)) {
445+
return null;
446+
}
447+
return tableGroupId.tblId2DbId.get(tableId);
448+
} finally {
449+
readUnlock();
450+
}
451+
}
452+
440453
public Set<GroupId> getAllGroupIds() {
441454
readLock();
442455
try {
@@ -711,7 +724,11 @@ public List<List<String>> getInfos() {
711724
info.add(Joiner.on(", ").join(group2Tables.get(groupId)));
712725
ColocateGroupSchema groupSchema = group2Schema.get(groupId);
713726
info.add(String.valueOf(groupSchema.getBucketsNum()));
714-
info.add(String.valueOf(groupSchema.getReplicaAlloc().toCreateStmt()));
727+
if (Config.isCloudMode()) {
728+
info.add("null");
729+
} else {
730+
info.add(String.valueOf(groupSchema.getReplicaAlloc().toCreateStmt()));
731+
}
715732
List<String> cols = groupSchema.getDistributionColTypes().stream().map(
716733
e -> e.toSql()).collect(Collectors.toList());
717734
info.add(Joiner.on(", ").join(cols));

fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
import org.apache.logging.log4j.Logger;
3030

3131
import java.util.Comparator;
32+
import java.util.HashMap;
33+
import java.util.List;
34+
import java.util.Map;
3235

3336
/**
3437
* This class represents the olap replica related metadata.
@@ -179,6 +182,24 @@ protected long getBackendIdValue() {
179182
return -1L;
180183
}
181184

185+
public long getBackendIdForProcDisplay() {
186+
return getBackendIdValue();
187+
}
188+
189+
// For proc display only. Returns a "scope key -> backendId" mapping used to render the
190+
// replica's placement. In local deployment there is a single scope (empty key) and the
191+
// cache is unused; in cloud deployment each compute group is a separate scope and the
192+
// cache lets a single proc call fetch each compute group's backends once (see
193+
// CloudReplica). The cache is keyed by compute group id.
194+
public Map<String, Long> getClusterToBackendForProcDisplay(Map<String, List<Backend>> computeGroupBackendCache) {
195+
Map<String, Long> result = new HashMap<>();
196+
long backendId = getBackendIdForProcDisplay();
197+
if (backendId != -1L) {
198+
result.put("", backendId);
199+
}
200+
return result;
201+
}
202+
182203
public void setBackendId(long backendId) {
183204
if (backendId != -1) {
184205
throw new UnsupportedOperationException("setBackendId is not supported in Replica");

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444

4545
import java.io.IOException;
4646
import java.util.ArrayList;
47+
import java.util.HashMap;
4748
import java.util.List;
4849
import java.util.Map;
4950
import java.util.Random;
@@ -111,8 +112,17 @@ private boolean isColocated() {
111112
}
112113

113114
public long getColocatedBeId(String clusterId) throws ComputeGroupException {
115+
List<Backend> clusterBackends = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
116+
.getBackendsByClusterId(clusterId);
117+
return getColocatedBeId(clusterId, clusterBackends);
118+
}
119+
120+
// Same as getColocatedBeId(clusterId) but reuses an already fetched backend list of
121+
// the compute group. Lets callers that resolve many replicas across the same compute
122+
// groups (e.g. the colocate proc display) fetch each group's backends only once.
123+
public long getColocatedBeId(String clusterId, List<Backend> clusterBackends) throws ComputeGroupException {
114124
CloudSystemInfoService infoService = ((CloudSystemInfoService) Env.getCurrentSystemInfo());
115-
List<Backend> bes = infoService.getBackendsByClusterId(clusterId).stream()
125+
List<Backend> bes = clusterBackends.stream()
116126
.filter(be -> be.isQueryAvailable()).collect(Collectors.toList());
117127
String clusterName = infoService.getClusterNameByClusterId(clusterId);
118128
if (bes.isEmpty()) {
@@ -216,6 +226,47 @@ public long getClusterPrimaryBackendId(String clusterId) {
216226
return primaryClusterToBackend.getOrDefault(clusterId, -1L);
217227
}
218228

229+
// For proc display only. In cloud mode a replica is hashed to a different BE in each
230+
// compute group, so expose a clusterId -> backendId mapping; the proc display builds
231+
// a separate bucket sequence per compute group from it so each group's sequence is
232+
// self-consistent. Do not collapse this into a single BE (e.g. the first one):
233+
// backends differ across compute groups and would not match.
234+
//
235+
// ATTN: colocated replicas do NOT use primaryClusterToBackend (see getBackendIdImpl /
236+
// getClusterPrimaryBackendId, which short-circuit to getColocatedBeId), so that cache
237+
// is empty for them. Resolve their placement per compute group on the fly instead.
238+
// This reads CloudSystemInfoService / the colocate index, so callers must invoke it
239+
// OUTSIDE any table lock to avoid nested lock acquisition. It does not auto-start any
240+
// compute group: getColocatedBeId only reads the already-known backends of a clusterId.
241+
// computeGroupBackendCache maps compute group id -> getBackendsByClusterId() result and
242+
// is shared across all replicas resolved in a single proc call, so each compute group's
243+
// backend list is fetched only once instead of once per replica.
244+
@Override
245+
public Map<String, Long> getClusterToBackendForProcDisplay(Map<String, List<Backend>> computeGroupBackendCache) {
246+
if (!isColocated()) {
247+
return new HashMap<>(primaryClusterToBackend);
248+
}
249+
Map<String, Long> result = new HashMap<>();
250+
CloudSystemInfoService infoService = (CloudSystemInfoService) Env.getCurrentSystemInfo();
251+
for (String clusterId : infoService.getCloudClusterIds()) {
252+
try {
253+
List<Backend> clusterBackends =
254+
computeGroupBackendCache.computeIfAbsent(clusterId, infoService::getBackendsByClusterId);
255+
long backendId = getColocatedBeId(clusterId, clusterBackends);
256+
if (backendId != -1L) {
257+
result.put(clusterId, backendId);
258+
}
259+
} catch (ComputeGroupException e) {
260+
// Skip compute groups that currently have no available backend.
261+
if (LOG.isDebugEnabled()) {
262+
LOG.debug("skip compute group {} for colocate proc display, replica {}",
263+
clusterId, getId(), e);
264+
}
265+
}
266+
}
267+
return result;
268+
}
269+
219270
private String getCurrentClusterId() throws ComputeGroupException {
220271
// Not in a connect session
221272
String cluster = null;

fe/fe-core/src/main/java/org/apache/doris/common/proc/ColocationGroupBackendSeqsProcNode.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.doris.common.proc;
1919

2020
import org.apache.doris.common.AnalysisException;
21-
import org.apache.doris.resource.Tag;
2221

2322
import com.google.common.base.Joiner;
2423
import com.google.common.collect.Lists;
@@ -30,10 +29,13 @@
3029
* show proc "/colocation_group/group_name";
3130
*/
3231
public class ColocationGroupBackendSeqsProcNode implements ProcNodeInterface {
33-
private Map<Tag, List<List<Long>>> backendsSeq;
32+
// Column name -> per-bucket backend id sequence. The column name is a resource Tag in
33+
// local mode, a compute group name in cloud mode, or "BackendIds" when there is no
34+
// per-scope breakdown.
35+
private Map<String, List<List<Long>>> backendsSeqByColumn;
3436

35-
public ColocationGroupBackendSeqsProcNode(Map<Tag, List<List<Long>>> backendsSeq) {
36-
this.backendsSeq = backendsSeq;
37+
public ColocationGroupBackendSeqsProcNode(Map<String, List<List<Long>>> backendsSeqByColumn) {
38+
this.backendsSeqByColumn = backendsSeqByColumn;
3739
}
3840

3941
@Override
@@ -42,21 +44,21 @@ public ProcResult fetchResult() throws AnalysisException {
4244
List<String> titleNames = Lists.newArrayList();
4345
titleNames.add("BucketIndex");
4446
int bucketNum = 0;
45-
for (Tag tag : backendsSeq.keySet()) {
46-
titleNames.add(tag.toString());
47+
for (String column : backendsSeqByColumn.keySet()) {
48+
titleNames.add(column);
4749
if (bucketNum == 0) {
48-
bucketNum = backendsSeq.get(tag).size();
49-
} else if (bucketNum != backendsSeq.get(tag).size()) {
50+
bucketNum = backendsSeqByColumn.get(column).size();
51+
} else if (bucketNum != backendsSeqByColumn.get(column).size()) {
5052
throw new AnalysisException("Invalid bucket number: "
51-
+ bucketNum + " vs. " + backendsSeq.get(tag).size());
53+
+ bucketNum + " vs. " + backendsSeqByColumn.get(column).size());
5254
}
5355
}
5456
result.setNames(titleNames);
5557
for (int i = 0; i < bucketNum; i++) {
5658
List<String> info = Lists.newArrayList();
5759
info.add(String.valueOf(i)); // bucket index
58-
for (Tag tag : backendsSeq.keySet()) {
59-
List<List<Long>> bucketBackends = backendsSeq.get(tag);
60+
for (String column : backendsSeqByColumn.keySet()) {
61+
List<List<Long>> bucketBackends = backendsSeqByColumn.get(column);
6062
info.add(Joiner.on(", ").join(bucketBackends.get(i)));
6163
}
6264
result.addRow(info);

fe/fe-core/src/main/java/org/apache/doris/common/proc/ColocationGroupProcDir.java

Lines changed: 157 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,30 @@
1919

2020
import org.apache.doris.catalog.ColocateTableIndex;
2121
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
22+
import org.apache.doris.catalog.Database;
2223
import org.apache.doris.catalog.Env;
24+
import org.apache.doris.catalog.MaterializedIndex;
25+
import org.apache.doris.catalog.OlapTable;
26+
import org.apache.doris.catalog.Partition;
27+
import org.apache.doris.catalog.Replica;
28+
import org.apache.doris.catalog.Table;
29+
import org.apache.doris.catalog.Tablet;
30+
import org.apache.doris.cloud.system.CloudSystemInfoService;
2331
import org.apache.doris.common.AnalysisException;
32+
import org.apache.doris.common.Config;
2433
import org.apache.doris.resource.Tag;
34+
import org.apache.doris.system.Backend;
2535

36+
import com.google.common.base.Strings;
2637
import com.google.common.collect.ImmutableList;
38+
import com.google.common.collect.Lists;
39+
import com.google.common.collect.Maps;
40+
import com.google.common.collect.Sets;
2741

42+
import java.util.ArrayList;
2843
import java.util.List;
2944
import java.util.Map;
45+
import java.util.Set;
3046

3147
/*
3248
* show proc "/colocation_group";
@@ -61,7 +77,22 @@ public ProcNodeInterface lookup(String groupIdStr) throws AnalysisException {
6177
GroupId groupId = new GroupId(dbId, grpId);
6278
ColocateTableIndex index = Env.getCurrentColocateIndex();
6379
Map<Tag, List<List<Long>>> beSeqs = index.getBackendsPerBucketSeq(groupId);
64-
return new ColocationGroupBackendSeqsProcNode(beSeqs);
80+
Map<String, List<List<Long>>> columns;
81+
if ((beSeqs == null || beSeqs.isEmpty()) && Config.isCloudMode()) {
82+
// In cloud mode, legacy backend sequence metadata may be empty. Derive the
83+
// sequence from current tablets, one column per compute group. This path must
84+
// not resolve cloud backends in a way that auto-starts a compute group.
85+
columns = getCloudBackendSeqsFromTablets(groupId, index);
86+
} else {
87+
// Local mode: one column per resource tag.
88+
columns = Maps.newLinkedHashMap();
89+
if (beSeqs != null) {
90+
for (Map.Entry<Tag, List<List<Long>>> entry : beSeqs.entrySet()) {
91+
columns.put(entry.getKey().toString(), entry.getValue());
92+
}
93+
}
94+
}
95+
return new ColocationGroupBackendSeqsProcNode(columns);
6596
}
6697

6798
@Override
@@ -74,4 +105,129 @@ public ProcResult fetchResult() throws AnalysisException {
74105
result.setRows(infos);
75106
return result;
76107
}
108+
109+
private Map<String, List<List<Long>>> getCloudBackendSeqsFromTablets(GroupId groupId, ColocateTableIndex index) {
110+
Map<String, List<List<Long>>> backendsSeq = Maps.newLinkedHashMap();
111+
List<Long> tableIds = index.getAllTableIds(groupId);
112+
for (Long tableId : tableIds) {
113+
long dbId = groupId.dbId;
114+
if (dbId == 0) {
115+
Long tableDbId = index.getDbIdByTblIdNullable(groupId, tableId);
116+
if (tableDbId == null) {
117+
continue;
118+
}
119+
dbId = tableDbId;
120+
}
121+
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
122+
if (db == null) {
123+
continue;
124+
}
125+
Table table = db.getTableNullable(tableId);
126+
if (!(table instanceof OlapTable)) {
127+
continue;
128+
}
129+
backendsSeq = getCloudBackendSeqsFromTable((OlapTable) table);
130+
if (!backendsSeq.isEmpty()) {
131+
return backendsSeq;
132+
}
133+
}
134+
return backendsSeq;
135+
}
136+
137+
private Map<String, List<List<Long>>> getCloudBackendSeqsFromTable(OlapTable olapTable) {
138+
// Snapshot replicas (ordered by bucket) under the table lock only. Resolving the
139+
// per-compute-group placement of colocate cloud replicas calls into
140+
// CloudSystemInfoService / the colocate index, which must run outside the table
141+
// lock to avoid nested lock acquisition.
142+
List<List<Replica>> bucketReplicas = Lists.newArrayList();
143+
olapTable.readLock();
144+
try {
145+
Partition firstPartition = null;
146+
for (Partition partition : olapTable.getAllPartitions()) {
147+
firstPartition = partition;
148+
break;
149+
}
150+
if (firstPartition == null) {
151+
return Maps.newLinkedHashMap();
152+
}
153+
MaterializedIndex baseIndex = firstPartition.getBaseIndex();
154+
for (Tablet tablet : baseIndex.getTablets()) {
155+
bucketReplicas.add(new ArrayList<>(tablet.getReplicas()));
156+
}
157+
} finally {
158+
olapTable.readUnlock();
159+
}
160+
161+
// Resolve each replica's per-compute-group placement outside the table lock. In
162+
// cloud mode a replica is hashed to a different BE in each compute group, so build
163+
// a separate bucket sequence per compute group. Merging across groups (picking an
164+
// arbitrary first BE) would mix BEs from different compute groups into one bucket
165+
// sequence, which is meaningless. For colocate cloud tables placement is computed
166+
// on the fly; otherwise it comes from the cached clusterId -> backendId map (or an
167+
// empty scope key for local-style replicas).
168+
List<List<Map<String, Long>>> tabletReplicaBackends = Lists.newArrayListWithCapacity(bucketReplicas.size());
169+
Set<String> scopeKeys = Sets.newLinkedHashSet();
170+
// Shared across all replicas in this proc call so each compute group's backend
171+
// list is fetched only once (colocate placement is resolved per compute group).
172+
Map<String, List<Backend>> computeGroupBackendCache = Maps.newHashMap();
173+
for (List<Replica> replicas : bucketReplicas) {
174+
List<Map<String, Long>> replicaBackends = new ArrayList<>();
175+
for (Replica replica : replicas) {
176+
Map<String, Long> clusterToBackend =
177+
replica.getClusterToBackendForProcDisplay(computeGroupBackendCache);
178+
replicaBackends.add(clusterToBackend);
179+
scopeKeys.addAll(clusterToBackend.keySet());
180+
}
181+
tabletReplicaBackends.add(replicaBackends);
182+
}
183+
184+
Map<String, List<List<Long>>> seqByScopeKey = Maps.newLinkedHashMap();
185+
for (String scopeKey : scopeKeys) {
186+
List<List<Long>> bucketSeq = Lists.newArrayListWithCapacity(tabletReplicaBackends.size());
187+
boolean hasBackend = false;
188+
for (List<Map<String, Long>> replicaBackends : tabletReplicaBackends) {
189+
List<Long> bucketBackends = new ArrayList<>();
190+
for (Map<String, Long> clusterToBackend : replicaBackends) {
191+
Long backendId = clusterToBackend.get(scopeKey);
192+
if (backendId == null || backendId < 0) {
193+
continue;
194+
}
195+
bucketBackends.add(backendId);
196+
hasBackend = true;
197+
}
198+
bucketSeq.add(bucketBackends);
199+
}
200+
if (hasBackend) {
201+
seqByScopeKey.put(scopeKey, bucketSeq);
202+
}
203+
}
204+
205+
// Resolve scope keys to display column names (also outside the table lock): name
206+
// resolution acquires CloudSystemInfoService's lock.
207+
Map<String, List<List<Long>>> backendsSeq = Maps.newLinkedHashMap();
208+
for (Map.Entry<String, List<List<Long>>> entry : seqByScopeKey.entrySet()) {
209+
backendsSeq.put(scopeKeyToColumnName(entry.getKey()), entry.getValue());
210+
}
211+
return backendsSeq;
212+
}
213+
214+
// Map a proc-display scope key to its column name. An empty key means there is no
215+
// per-compute-group breakdown (local-style replicas), shown as a single "BackendIds"
216+
// column. Otherwise the key is a cloud compute group id, shown by its compute group
217+
// name (falling back to the raw id when the name cannot be resolved).
218+
private String scopeKeyToColumnName(String scopeKey) {
219+
if (Strings.isNullOrEmpty(scopeKey)) {
220+
return "BackendIds";
221+
}
222+
try {
223+
String name = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
224+
.getClusterNameByClusterId(scopeKey);
225+
if (!Strings.isNullOrEmpty(name)) {
226+
return name;
227+
}
228+
} catch (Exception e) {
229+
// Fall back to the raw compute group id if name resolution is unavailable.
230+
}
231+
return scopeKey;
232+
}
77233
}

fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,12 @@ static class DBTabletStatistic {
199199
Tablet tablet = tablets.get(i);
200200
++tabletNum;
201201
Tablet.TabletStatus res = null;
202-
if (groupId != null) {
202+
if (Config.isCloudMode()) {
203+
// In cloud mode, tablet replica health is managed by cloud components.
204+
// getHealth/getColocateHealth follows local deployment logic and may
205+
// misclassify tablets as UNRECOVERABLE.
206+
res = Tablet.TabletStatus.HEALTHY;
207+
} else if (groupId != null) {
203208
ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(groupId);
204209
if (groupSchema != null) {
205210
replicaAlloc = groupSchema.getReplicaAlloc();

0 commit comments

Comments
 (0)