Skip to content

Commit 5491bbd

Browse files
authored
branch-4.1:[feature](fe) Show compute group for MTMV refresh task (#63206) (#63933)
pr: #63206 commitId: 66dbb85
1 parent 01bba92 commit 5491bbd

5 files changed

Lines changed: 237 additions & 7 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.doris.catalog.MTMV;
2525
import org.apache.doris.catalog.ScalarType;
2626
import org.apache.doris.catalog.TableIf;
27+
import org.apache.doris.cloud.qe.ComputeGroupException;
2728
import org.apache.doris.common.AnalysisException;
2829
import org.apache.doris.common.Config;
2930
import org.apache.doris.common.DdlException;
@@ -70,6 +71,7 @@
7071
import org.apache.doris.thrift.TStatusCode;
7172
import org.apache.doris.thrift.TUniqueId;
7273

74+
import com.google.common.base.Strings;
7375
import com.google.common.collect.ImmutableList;
7476
import com.google.common.collect.ImmutableMap;
7577
import com.google.common.collect.Lists;
@@ -114,7 +116,8 @@ public class MTMVTask extends AbstractTask {
114116
new Column("NeedRefreshPartitions", ScalarType.createStringType()),
115117
new Column("CompletedPartitions", ScalarType.createStringType()),
116118
new Column("Progress", ScalarType.createStringType()),
117-
new Column("LastQueryId", ScalarType.createStringType()));
119+
new Column("LastQueryId", ScalarType.createStringType()),
120+
new Column("ComputeGroup", ScalarType.createStringType()));
118121

119122
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
120123

@@ -152,6 +155,8 @@ public enum MTMVTaskRefreshMode {
152155
MTMVTaskRefreshMode refreshMode;
153156
@SerializedName("lastQueryId")
154157
String lastQueryId;
158+
@SerializedName("cg")
159+
private String computeGroup;
155160

156161
private MTMV mtmv;
157162
private MTMVRelation relation;
@@ -322,6 +327,8 @@ private void exec(Set<String> refreshPartitionNames,
322327
Map<TableIf, String> tableWithPartKey)
323328
throws Exception {
324329
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv, MTMVPlanUtil.DISABLE_RULES_WHEN_RUN_MTMV_TASK);
330+
setComputeGroup(ctx);
331+
recordComputeGroup(ctx);
325332
StatementContext statementContext = new StatementContext();
326333
for (Entry<MvccTableInfo, MvccSnapshot> entry : snapshots.entrySet()) {
327334
statementContext.setSnapshot(entry.getKey(), entry.getValue());
@@ -354,6 +361,26 @@ private void exec(Set<String> refreshPartitionNames,
354361
}
355362
}
356363

364+
private void setComputeGroup(ConnectContext ctx) {
365+
String taskComputeGroup = taskContext.getComputeGroup();
366+
if (Config.isCloudMode() && !Strings.isNullOrEmpty(taskComputeGroup)) {
367+
ctx.setCloudCluster(taskComputeGroup);
368+
}
369+
}
370+
371+
private void recordComputeGroup(ConnectContext ctx) {
372+
if (!Config.isCloudMode()) {
373+
computeGroup = FeConstants.null_string;
374+
return;
375+
}
376+
try {
377+
computeGroup = ctx.getCloudCluster(false);
378+
} catch (ComputeGroupException e) {
379+
computeGroup = FeConstants.null_string;
380+
LOG.warn("failed to resolve compute group for mtmv task, taskId: {}", getTaskId(), e);
381+
}
382+
}
383+
357384
private String getDummyStmt(Set<String> refreshPartitionNames) {
358385
String mvName = mtmv.getName();
359386
DatabaseIf database = mtmv.getDatabase();
@@ -532,6 +559,8 @@ public TRow getTvfInfo(String jobName) {
532559
new TCell().setStringVal(getProgress()));
533560
trow.addToColumnValue(
534561
new TCell().setStringVal(lastQueryId));
562+
trow.addToColumnValue(new TCell().setStringVal(
563+
computeGroup == null || computeGroup.isEmpty() ? FeConstants.null_string : computeGroup));
535564
return trow;
536565
}
537566

fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTaskContext.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,19 @@ public class MTMVTaskContext {
3434
@SerializedName(value = "isComplete")
3535
private boolean isComplete;
3636

37+
@SerializedName(value = "computeGroup")
38+
private String computeGroup;
39+
3740
public MTMVTaskContext(MTMVTaskTriggerMode triggerMode) {
3841
this.triggerMode = triggerMode;
3942
}
4043

41-
public MTMVTaskContext(MTMVTaskTriggerMode triggerMode, List<String> partitions, boolean isComplete) {
44+
public MTMVTaskContext(MTMVTaskTriggerMode triggerMode, List<String> partitions, boolean isComplete,
45+
String computeGroup) {
4246
this.triggerMode = triggerMode;
4347
this.partitions = partitions;
4448
this.isComplete = isComplete;
49+
this.computeGroup = computeGroup;
4550
}
4651

4752
public List<String> getPartitions() {
@@ -56,12 +61,17 @@ public boolean isComplete() {
5661
return isComplete;
5762
}
5863

64+
public String getComputeGroup() {
65+
return computeGroup;
66+
}
67+
5968
@Override
6069
public String toString() {
6170
return "MTMVTaskContext{"
6271
+ "triggerMode=" + triggerMode
6372
+ ", partitions=" + partitions
6473
+ ", isComplete=" + isComplete
74+
+ ", computeGroup=" + computeGroup
6575
+ '}';
6676
}
6777
}

fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.doris.catalog.MTMV;
2424
import org.apache.doris.catalog.Table;
2525
import org.apache.doris.catalog.TableIf.TableType;
26+
import org.apache.doris.cloud.qe.ComputeGroupException;
27+
import org.apache.doris.common.Config;
2628
import org.apache.doris.common.DdlException;
2729
import org.apache.doris.common.MetaNotFoundException;
2830
import org.apache.doris.common.util.TimeUtils;
@@ -42,6 +44,7 @@
4244
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
4345
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
4446
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
47+
import org.apache.doris.qe.ConnectContext;
4548

4649
import com.google.common.collect.Lists;
4750
import org.apache.commons.lang3.StringUtils;
@@ -64,7 +67,7 @@ public void postCreateMTMV(MTMV mtmv) {
6467
if (!mtmv.getRefreshInfo().getBuildMode().equals(BuildMode.IMMEDIATE)) {
6568
return;
6669
}
67-
MTMVTaskContext mtmvTaskContext = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM, null, true);
70+
MTMVTaskContext mtmvTaskContext = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM, null, true, null);
6871
try {
6972
Env.getCurrentEnv().getJobManager().triggerJob(mtmv.getId(), mtmvTaskContext);
7073
} catch (JobException e) {
@@ -155,10 +158,26 @@ public void alterJob(MTMV mtmv, boolean isReplay) {
155158
public void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException, JobException {
156159
MTMVJob job = getJobByTableNameInfo(info.getMvName());
157160
MTMVTaskContext mtmvTaskContext = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, info.getPartitions(),
158-
info.isComplete());
161+
info.isComplete(), getCurrentComputeGroup());
159162
Env.getCurrentEnv().getJobManager().triggerJob(job.getJobId(), mtmvTaskContext);
160163
}
161164

165+
private String getCurrentComputeGroup() {
166+
if (!Config.isCloudMode()) {
167+
return null;
168+
}
169+
ConnectContext ctx = ConnectContext.get();
170+
if (ctx == null) {
171+
return null;
172+
}
173+
try {
174+
return ctx.getCloudCluster(false);
175+
} catch (ComputeGroupException e) {
176+
LOG.warn("failed to resolve compute group for refresh mtmv", e);
177+
return null;
178+
}
179+
}
180+
162181
@Override
163182
public void refreshComplete(MTMV mtmv, MTMVRelation relation, MTMVTask task) {
164183

@@ -202,7 +221,7 @@ public void onCommit(MTMV mtmv) throws DdlException, JobException {
202221
return;
203222
}
204223
MTMVTaskContext mtmvTaskContext = new MTMVTaskContext(MTMVTaskTriggerMode.COMMIT, Lists.newArrayList(),
205-
false);
224+
false, null);
206225
Env.getCurrentEnv().getJobManager().triggerJob(job.getJobId(), mtmvTaskContext);
207226
}
208227

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.mtmv;
19+
20+
import org.apache.doris.catalog.Database;
21+
import org.apache.doris.catalog.Env;
22+
import org.apache.doris.catalog.MTMV;
23+
import org.apache.doris.catalog.TableIf.TableType;
24+
import org.apache.doris.common.Config;
25+
import org.apache.doris.datasource.InternalCatalog;
26+
import org.apache.doris.info.TableNameInfo;
27+
import org.apache.doris.job.extensions.mtmv.MTMVJob;
28+
import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode;
29+
import org.apache.doris.job.extensions.mtmv.MTMVTaskContext;
30+
import org.apache.doris.job.manager.JobManager;
31+
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
32+
import org.apache.doris.qe.ConnectContext;
33+
34+
import com.google.common.collect.Lists;
35+
import org.junit.Assert;
36+
import org.junit.Test;
37+
import org.mockito.ArgumentCaptor;
38+
import org.mockito.MockedStatic;
39+
import org.mockito.Mockito;
40+
41+
public class MTMVJobManagerTest {
42+
43+
@Test
44+
public void testRefreshMTMVPassesCurrentComputeGroupToTaskContext() throws Exception {
45+
String originCloudUniqueId = Config.cloud_unique_id;
46+
ConnectContext previousContext = ConnectContext.get();
47+
try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
48+
Config.cloud_unique_id = "test_cloud";
49+
Env env = Mockito.mock(Env.class);
50+
InternalCatalog internalCatalog = Mockito.mock(InternalCatalog.class);
51+
Database db = Mockito.mock(Database.class);
52+
MTMV mtmv = Mockito.mock(MTMV.class);
53+
MTMVJob job = Mockito.mock(MTMVJob.class);
54+
JobManager jobManager = Mockito.mock(JobManager.class);
55+
56+
mockedEnv.when(Env::getCurrentEnv).thenReturn(env);
57+
mockedEnv.when(Env::getCurrentInternalCatalog).thenReturn(internalCatalog);
58+
Mockito.when(internalCatalog.getDbOrDdlException("db1")).thenReturn(db);
59+
Mockito.when(db.getTableOrMetaException(Mockito.eq("mv1"), Mockito.eq(TableType.MATERIALIZED_VIEW)))
60+
.thenReturn(mtmv);
61+
Mockito.when(env.getJobManager()).thenReturn(jobManager);
62+
Mockito.when(jobManager.getJob(mtmv.getId())).thenReturn(job);
63+
Mockito.when(job.getJobId()).thenReturn(100L);
64+
65+
ConnectContext ctx = new ConnectContext();
66+
ctx.setCloudCluster("cg1");
67+
ctx.setThreadLocalInfo();
68+
69+
RefreshMTMVInfo info = new RefreshMTMVInfo(new TableNameInfo("db1", "mv1"),
70+
Lists.newArrayList("p1"), false);
71+
new MTMVJobManager().refreshMTMV(info);
72+
73+
ArgumentCaptor<MTMVTaskContext> captor = ArgumentCaptor.forClass(MTMVTaskContext.class);
74+
Mockito.verify(jobManager).triggerJob(Mockito.eq(100L), captor.capture());
75+
MTMVTaskContext taskContext = captor.getValue();
76+
Assert.assertEquals(MTMVTaskTriggerMode.MANUAL, taskContext.getTriggerMode());
77+
Assert.assertEquals(Lists.newArrayList("p1"), taskContext.getPartitions());
78+
Assert.assertFalse(taskContext.isComplete());
79+
Assert.assertEquals("cg1", taskContext.getComputeGroup());
80+
} finally {
81+
Config.cloud_unique_id = originCloudUniqueId;
82+
ConnectContext.remove();
83+
if (previousContext != null) {
84+
previousContext.setThreadLocalInfo();
85+
}
86+
}
87+
}
88+
}

0 commit comments

Comments
 (0)