Skip to content

Commit 10d2aae

Browse files
author
gitlab
committed
Merge branch '9875@@3' into 'master'
Fixes ZSTAC-9875 Closes ZSTAC-9875 See merge request zstackio/zstack!2339
2 parents f81b9d5 + 923a855 commit 10d2aae

38 files changed

Lines changed: 1961 additions & 580 deletions

File tree

compute/src/main/java/org/zstack/compute/cluster/ClusterApiInterceptor.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
package org.zstack.compute.cluster;
22

33
import org.springframework.beans.factory.annotation.Autowired;
4+
import org.zstack.core.Platform;
45
import org.zstack.core.cloudbus.CloudBus;
56
import org.zstack.core.db.DatabaseFacade;
7+
import org.zstack.core.db.SQLBatch;
68
import org.zstack.header.apimediator.ApiMessageInterceptionException;
79
import org.zstack.header.apimediator.ApiMessageInterceptor;
810
import org.zstack.header.apimediator.StopRoutingException;
911
import org.zstack.header.cluster.*;
12+
import org.zstack.header.host.HostState;
13+
import org.zstack.header.host.HostStatus;
14+
import org.zstack.header.host.HostVO;
15+
import org.zstack.header.host.HostVO_;
1016
import org.zstack.header.message.APIMessage;
1117

1218
/**
@@ -34,11 +40,55 @@ public APIMessage intercept(APIMessage msg) throws ApiMessageInterceptionExcepti
3440

3541
if (msg instanceof APIDeleteClusterMsg) {
3642
validate((APIDeleteClusterMsg) msg);
43+
} else if (msg instanceof APIUpdateClusterOSMsg) {
44+
validate((APIUpdateClusterOSMsg) msg);
3745
}
3846

3947
return msg;
4048
}
4149

50+
private void validate(APIUpdateClusterOSMsg msg) {
51+
new SQLBatch() {
52+
@Override
53+
protected void scripts() {
54+
// we can only update os that uses rpm/yum for now, and we assume all KvmHost are using ZStack ISO
55+
String type = q(ClusterVO.class)
56+
.select(ClusterVO_.hypervisorType)
57+
.eq(ClusterVO_.uuid, msg.getUuid())
58+
.findValue();
59+
if (type != null && !type.equals("KVM")) {
60+
throw new ApiMessageInterceptionException(Platform.argerr(
61+
"only kvm hosts' operating system can be updated, for now"
62+
));
63+
}
64+
65+
// all hosts in the cluster must not be in the premaintenance state
66+
Long premaintain = q(HostVO.class)
67+
.eq(HostVO_.clusterUuid, msg.getUuid())
68+
.eq(HostVO_.state, HostState.PreMaintenance)
69+
.count();
70+
if (premaintain != 0) {
71+
throw new ApiMessageInterceptionException(Platform.argerr(
72+
"there are hosts in cluster[uuid:%s] in the PreMaintenance state, cannot update cluster os right now",
73+
msg.getUuid()
74+
));
75+
}
76+
77+
// all hosts in the cluster must be connected
78+
Long notConnected = q(HostVO.class)
79+
.eq(HostVO_.clusterUuid, msg.getUuid())
80+
.notEq(HostVO_.status, HostStatus.Connected)
81+
.count();
82+
if (notConnected != 0) {
83+
throw new ApiMessageInterceptionException(Platform.argerr(
84+
"not all hosts in cluster[uuid:%s] are in the Connected status, cannot update cluster os right now",
85+
msg.getUuid()
86+
));
87+
}
88+
}
89+
}.execute();
90+
}
91+
4292
private void validate(APIDeleteClusterMsg msg) {
4393
if (!dbf.isExist(msg.getUuid(), ClusterVO.class)) {
4494
APIDeleteClusterEvent evt = new APIDeleteClusterEvent(msg.getId());

compute/src/main/java/org/zstack/compute/cluster/ClusterBase.java

Lines changed: 116 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,48 @@
11
package org.zstack.compute.cluster;
22

3+
import org.apache.logging.log4j.ThreadContext;
34
import org.springframework.beans.factory.annotation.Autowire;
45
import org.springframework.beans.factory.annotation.Autowired;
56
import org.springframework.beans.factory.annotation.Configurable;
7+
import org.zstack.core.asyncbatch.While;
68
import org.zstack.core.cascade.CascadeConstant;
79
import org.zstack.core.cascade.CascadeFacade;
810
import org.zstack.core.cloudbus.CloudBus;
11+
import org.zstack.core.cloudbus.CloudBusCallBack;
912
import org.zstack.core.db.DatabaseFacade;
13+
import org.zstack.core.db.Q;
1014
import org.zstack.core.errorcode.ErrorFacade;
11-
import org.zstack.header.core.NopeCompletion;
12-
import org.zstack.header.core.workflow.*;
13-
import org.zstack.header.errorcode.SysErrors;
15+
import org.zstack.core.progress.ProgressReportService;
16+
import org.zstack.core.thread.ChainTask;
1417
import org.zstack.core.thread.SyncTask;
18+
import org.zstack.core.thread.SyncTaskChain;
1519
import org.zstack.core.thread.ThreadFacade;
16-
import org.zstack.core.workflow.*;
20+
import org.zstack.core.workflow.FlowChainBuilder;
1721
import org.zstack.header.cluster.*;
1822
import org.zstack.header.core.Completion;
23+
import org.zstack.header.core.NoErrorCompletion;
24+
import org.zstack.header.core.NopeCompletion;
25+
import org.zstack.header.core.workflow.*;
1926
import org.zstack.header.errorcode.ErrorCode;
27+
import org.zstack.header.errorcode.SysErrors;
28+
import org.zstack.header.host.HostConstant;
29+
import org.zstack.header.host.HostVO;
30+
import org.zstack.header.host.HostVO_;
31+
import org.zstack.header.host.UpdateHostOSMsg;
2032
import org.zstack.header.message.APIDeleteMessage;
2133
import org.zstack.header.message.APIMessage;
2234
import org.zstack.header.message.Message;
35+
import org.zstack.header.message.MessageReply;
2336
import org.zstack.utils.Utils;
2437
import org.zstack.utils.logging.CLogger;
2538

2639
import java.util.Arrays;
2740
import java.util.List;
2841
import java.util.Map;
42+
import java.util.concurrent.ConcurrentHashMap;
43+
44+
import static org.zstack.header.Constants.THREAD_CONTEXT_API;
45+
import static org.zstack.header.Constants.THREAD_CONTEXT_TASK_NAME;
2946

3047
@Configurable(preConstruction = true, autowire = Autowire.BY_TYPE)
3148
public class ClusterBase extends AbstractCluster {
@@ -71,11 +88,33 @@ protected void handleApiMessage(APIMessage msg) {
7188
handle((APIDeleteClusterMsg) msg);
7289
} else if (msg instanceof APIUpdateClusterMsg) {
7390
handle((APIUpdateClusterMsg) msg);
91+
} else if (msg instanceof APIUpdateClusterOSMsg) {
92+
handle((APIUpdateClusterOSMsg) msg);
7493
} else {
7594
bus.dealWithUnknownMessage(msg);
7695
}
7796
}
7897

98+
private void handle(APIUpdateClusterOSMsg msg) {
99+
APIUpdateClusterOSEvent evt = new APIUpdateClusterOSEvent(msg.getId());
100+
101+
UpdateClusterOSMsg umsg = new UpdateClusterOSMsg();
102+
umsg.setUuid(msg.getUuid());
103+
bus.makeTargetServiceIdByResourceUuid(umsg, ClusterConstant.SERVICE_ID, msg.getUuid());
104+
bus.send(umsg, new CloudBusCallBack(msg) {
105+
@Override
106+
public void run(MessageReply reply) {
107+
if (reply.isSuccess()) {
108+
UpdateClusterOSReply rly = reply.castReply();
109+
evt.setResults(rly.getResults());
110+
} else {
111+
evt.setError(reply.getError());
112+
}
113+
bus.publish(evt);
114+
}
115+
});
116+
}
117+
79118
private void handle(APIUpdateClusterMsg msg) {
80119
boolean update = false;
81120
if (msg.getName() != null) {
@@ -219,12 +258,84 @@ protected void handleLocalMessage(Message msg) {
219258
handle((ChangeClusterStateMsg) msg);
220259
} else if (msg instanceof ClusterDeletionMsg) {
221260
handle((ClusterDeletionMsg) msg);
261+
} else if (msg instanceof UpdateClusterOSMsg) {
262+
handle((UpdateClusterOSMsg) msg);
222263
} else {
223264
bus.dealWithUnknownMessage(msg);
224265
}
225266
}
226267

227-
private void handle(ClusterDeletionMsg msg) {
268+
private void handle(UpdateClusterOSMsg msg) {
269+
UpdateClusterOSReply reply = new UpdateClusterOSReply();
270+
reply.setResults(new ConcurrentHashMap<>());
271+
272+
ErrorCode error = extpEmitter.preUpdateOS(self);
273+
if (error != null) {
274+
reply.setError(error);
275+
bus.reply(msg, reply);
276+
return;
277+
}
278+
279+
thdf.chainSubmit(new ChainTask(msg) {
280+
@Override
281+
public String getSyncSignature() {
282+
return "update-cluster-os";
283+
}
284+
285+
@Override
286+
public int getSyncLevel() {
287+
return ClusterGlobalConfig.CLUSTER_UPDATE_OS_PARALLELISM_DEGREE.value(Integer.class);
288+
}
289+
290+
@Override
291+
public void run(SyncTaskChain chain) {
292+
String apiId = ThreadContext.get(THREAD_CONTEXT_API);
293+
String taskName = ThreadContext.get(THREAD_CONTEXT_TASK_NAME);
294+
extpEmitter.beforeUpdateOS(self);
295+
296+
// update each hosts os in the cluster
297+
List<String> hostUuids = Q.New(HostVO.class)
298+
.select(HostVO_.uuid)
299+
.eq(HostVO_.clusterUuid, msg.getUuid())
300+
.listValues();
301+
new While<>(hostUuids).all((hostUuid, completion) -> {
302+
UpdateHostOSMsg umsg = new UpdateHostOSMsg();
303+
umsg.setUuid(hostUuid);
304+
umsg.setClusterUuid(msg.getUuid());
305+
bus.makeTargetServiceIdByResourceUuid(umsg, HostConstant.SERVICE_ID, hostUuid);
306+
bus.send(umsg, new CloudBusCallBack(completion) {
307+
@Override
308+
public void run(MessageReply rly) {
309+
if (rly.isSuccess()) {
310+
reply.getResults().put(hostUuid, "success");
311+
} else {
312+
reply.getResults().put(hostUuid, rly.getError().getDetails());
313+
}
314+
// progress info
315+
ThreadContext.put(THREAD_CONTEXT_API, apiId);
316+
ThreadContext.put(THREAD_CONTEXT_TASK_NAME, taskName);
317+
ProgressReportService.reportProgress(String.valueOf(100 * reply.getResults().size() / hostUuids.size()));
318+
completion.done();
319+
}
320+
});
321+
}).run(new NoErrorCompletion() {
322+
@Override
323+
public void done() {
324+
extpEmitter.afterUpdateOS(self);
325+
bus.reply(msg, reply);
326+
chain.next();
327+
}
328+
});
329+
}
330+
331+
@Override
332+
public String getName() {
333+
return getSyncSignature();
334+
}
335+
});
336+
}
337+
338+
private void handle(ClusterDeletionMsg msg) {
228339
ClusterInventory inv = ClusterInventory.valueOf(self);
229340
extpEmitter.beforeDelete(inv);
230341
deleteHook();

compute/src/main/java/org/zstack/compute/cluster/ClusterExtensionPointEmitter.java

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
package org.zstack.compute.cluster;
22

33
import org.springframework.beans.factory.annotation.Autowired;
4-
import org.zstack.core.componentloader.PluginExtension;
54
import org.zstack.core.componentloader.PluginRegistry;
65
import org.zstack.header.Component;
76
import org.zstack.header.cluster.*;
7+
import org.zstack.header.errorcode.ErrorCode;
88
import org.zstack.utils.CollectionUtils;
99
import org.zstack.utils.Utils;
1010
import org.zstack.utils.function.ForEachFunction;
1111
import org.zstack.utils.logging.CLogger;
1212

1313
import java.util.List;
1414

15+
import static org.zstack.core.Platform.operr;
16+
1517
class ClusterExtensionPointEmitter implements Component {
1618
private static final CLogger logger = Utils.getLogger(ClusterExtensionPointEmitter.class);
1719

@@ -20,8 +22,7 @@ class ClusterExtensionPointEmitter implements Component {
2022

2123
private List<ClusterDeleteExtensionPoint> deleteExts;
2224
private List<ClusterChangeStateExtensionPoint> changeExts;
23-
24-
25+
private List<ClusterUpdateOSExtensionPoint> updateOSExts;
2526

2627
void preDelete(ClusterInventory cinv) throws ClusterException {
2728
for (ClusterDeleteExtensionPoint extp : deleteExts) {
@@ -70,7 +71,7 @@ public void run(ClusterChangeStateExtensionPoint extp) {
7071
}
7172
});
7273
}
73-
74+
7475
void afterChange(ClusterVO vo, final ClusterStateEvent event, final ClusterState prevState) {
7576
final ClusterInventory cinv = ClusterInventory.valueOf(vo);
7677
CollectionUtils.safeForEach(changeExts, new ForEachFunction<ClusterChangeStateExtensionPoint>() {
@@ -80,7 +81,7 @@ public void run(ClusterChangeStateExtensionPoint extp) {
8081
}
8182
});
8283
}
83-
84+
8485
void beforeDelete(final ClusterInventory cinv) {
8586
CollectionUtils.safeForEach(deleteExts, new ForEachFunction<ClusterDeleteExtensionPoint>() {
8687
@Override
@@ -89,7 +90,7 @@ public void run(ClusterDeleteExtensionPoint arg) {
8990
}
9091
});
9192
}
92-
93+
9394
void afterDelete(final ClusterInventory cinv) {
9495
CollectionUtils.safeForEach(deleteExts, new ForEachFunction<ClusterDeleteExtensionPoint>() {
9596
@Override
@@ -99,9 +100,42 @@ public void run(ClusterDeleteExtensionPoint arg) {
99100
});
100101
}
101102

103+
ErrorCode preUpdateOS(final ClusterVO cls) {
104+
if (updateOSExts == null || updateOSExts.isEmpty()) {
105+
return null;
106+
}
107+
108+
for (ClusterUpdateOSExtensionPoint ext : updateOSExts) {
109+
String error = ext.preUpdateClusterOS(cls);
110+
if (error != null) {
111+
return operr(error);
112+
}
113+
}
114+
return null;
115+
}
116+
117+
void beforeUpdateOS(final ClusterVO cls) {
118+
CollectionUtils.safeForEach(updateOSExts, new ForEachFunction<ClusterUpdateOSExtensionPoint>() {
119+
@Override
120+
public void run(ClusterUpdateOSExtensionPoint arg) {
121+
arg.beforeUpdateClusterOS(cls);
122+
}
123+
});
124+
}
125+
126+
void afterUpdateOS(final ClusterVO cls) {
127+
CollectionUtils.safeForEach(updateOSExts, new ForEachFunction<ClusterUpdateOSExtensionPoint>() {
128+
@Override
129+
public void run(ClusterUpdateOSExtensionPoint arg) {
130+
arg.afterUpdateClusterOS(cls);
131+
}
132+
});
133+
}
134+
102135
private void populateExtensions() {
103136
deleteExts = pluginRgty.getExtensionList(ClusterDeleteExtensionPoint.class);
104137
changeExts = pluginRgty.getExtensionList(ClusterChangeStateExtensionPoint.class);
138+
updateOSExts = pluginRgty.getExtensionList(ClusterUpdateOSExtensionPoint.class);
105139
}
106140

107141
@Override
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.zstack.compute.cluster;
2+
3+
import org.zstack.core.config.GlobalConfig;
4+
import org.zstack.core.config.GlobalConfigDefinition;
5+
import org.zstack.core.config.GlobalConfigValidation;
6+
7+
/**
8+
* Created by GuoYi on 3/13/18
9+
*/
10+
@GlobalConfigDefinition
11+
public class ClusterGlobalConfig {
12+
public static final String CATEGORY = "cluster";
13+
14+
@GlobalConfigValidation
15+
public static GlobalConfig CLUSTER_UPDATE_OS_PARALLELISM_DEGREE = new GlobalConfig(CATEGORY, "update.os.parallelismDegree");
16+
}

0 commit comments

Comments
 (0)