Skip to content

Commit bbd3785

Browse files
adding parallel execution core workflow
1 parent fa2d333 commit bbd3785

9 files changed

Lines changed: 309 additions & 97 deletions

File tree

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
package org.apache.cloudstack.api.command;
18+
19+
import org.apache.cloudstack.api.ApiConstants;
20+
import org.apache.cloudstack.api.BaseAsyncCmd;
21+
import org.apache.cloudstack.api.Parameter;
22+
import org.apache.cloudstack.service.NimbleService;
23+
24+
import javax.inject.Inject;
25+
26+
public class DeployIacTemplateCmd extends BaseAsyncCmd {
27+
@Inject
28+
private NimbleService nimbleService;
29+
30+
@Parameter(name = ApiConstants.IAC_RESOURCE_TYPE_CONTENT, type = CommandType.STRING, length = 65535, description = "")
31+
private String iacTemplateContent;
32+
33+
@Override
34+
public void execute() {
35+
nimbleService.deployIacTemplate(iacTemplateContent);
36+
}
37+
38+
@Override
39+
public String getEventType() {
40+
return "";
41+
}
42+
43+
@Override
44+
public String getEventDescription() {
45+
return "";
46+
}
47+
48+
@Override
49+
public long getEntityOwnerId() {
50+
return 0;
51+
}
52+
}

plugins/iac/nimble/src/main/java/org/apache/cloudstack/service/NimbleManagerImpl.java

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,32 +25,25 @@
2525
import org.apache.cloudstack.framework.config.ConfigKey;
2626
import org.apache.cloudstack.persistence.iactemplatesprofile.IacResourceTypeDao;
2727
import org.apache.cloudstack.persistence.iactemplatesprofile.IacResourceTypeVO;
28-
import org.apache.cloudstack.tosca.model.ToscaNodeType;
29-
import org.apache.cloudstack.tosca.parser.ToscaParser;
28+
import org.apache.cloudstack.tosca.ToscaOrchestrator;
3029

3130
import javax.inject.Inject;
3231
import javax.naming.ConfigurationException;
3332
import java.util.ArrayList;
3433
import java.util.List;
3534
import java.util.Map;
36-
import java.util.concurrent.ExecutorService;
37-
import java.util.concurrent.Executors;
3835
import java.util.stream.Collectors;
3936

4037
public class NimbleManagerImpl extends ManagerBase implements NimbleService {
4138
@Inject
42-
private ToscaParser toscaParser;
39+
private ToscaOrchestrator toscaOrchestrator;
4340

4441
@Inject
4542
private IacResourceTypeDao iacResourceTypeDao;
4643

4744
@Inject
4845
private NimbleResponseBuilder responseBuilder;
4946

50-
private ExecutorService nimbleExecutorPool;
51-
52-
private Map<String, ToscaNodeType> toscaProfile;
53-
5447
@Override
5548
public ListResponse<IacResourceTypeResponse> listIacResourceTypes(ListIacResourceTypesCmd cmd) {
5649
Pair<List<IacResourceTypeVO>, Integer> iacResourceTypes = iacResourceTypeDao.listIacResourceTypes(cmd.getId(), cmd.getName(),
@@ -64,35 +57,25 @@ public ListResponse<IacResourceTypeResponse> listIacResourceTypes(ListIacResourc
6457
return response;
6558
}
6659

60+
@Override
61+
public void deployIacTemplate(String iacTemplateContent) {
62+
toscaOrchestrator.deployIacTemplate(iacTemplateContent);
63+
}
64+
6765
@Override
6866
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
6967
super.configure(name, params);
7068

7169
int nimbleServicePoolSize = NimbleService.NimbleServicePoolSize.value();
72-
logger.debug("Configuring NIMBLE's fixed thread pool with [{}] threads.", nimbleServicePoolSize);
73-
nimbleExecutorPool = Executors.newFixedThreadPool(nimbleServicePoolSize);
74-
toscaProfile = loadToscaProfile();
70+
toscaOrchestrator.configureExecutorPool(nimbleServicePoolSize);
71+
toscaOrchestrator.loadToscaProfile(iacResourceTypeDao.listAll());
7572
return true;
7673
}
7774

78-
protected Map<String, ToscaNodeType> loadToscaProfile() {
79-
logger.info("Loading NIMBLE's TOSCA profile.");
80-
List<IacResourceTypeVO> profileResourceTypes = iacResourceTypeDao.listAll();
81-
82-
return profileResourceTypes.stream()
83-
.map(resourceType -> toscaParser.parseNodeTypeDefinitionFile(resourceType.getContent()))
84-
.collect(Collectors.toMap(ToscaNodeType::getName, nodeType -> nodeType));
85-
}
86-
8775
@Override
8876
public boolean stop() {
8977
logger.info("Stopping NIMBLE's manager.");
90-
91-
if (nimbleExecutorPool != null) {
92-
logger.debug("Shutting down NIMBLE's fixed thread pool.");
93-
nimbleExecutorPool.shutdown();
94-
}
95-
78+
toscaOrchestrator.shutdownExecutorPool();
9679
return true;
9780
}
9881

plugins/iac/nimble/src/main/java/org/apache/cloudstack/service/NimbleService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,5 @@ public interface NimbleService extends PluggableService, Configurable {
3535
true, NimbleServiceEnabled.key());
3636

3737
ListResponse<IacResourceTypeResponse> listIacResourceTypes(ListIacResourceTypesCmd cmd);
38+
void deployIacTemplate(String iacTemplateContent);
3839
}

plugins/iac/nimble/src/main/java/org/apache/cloudstack/tosca/ToscaManager.java

Lines changed: 0 additions & 20 deletions
This file was deleted.

plugins/iac/nimble/src/main/java/org/apache/cloudstack/tosca/ToscaManagerImpl.java

Lines changed: 0 additions & 23 deletions
This file was deleted.
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
package org.apache.cloudstack.tosca;
18+
19+
import com.cloud.exception.InvalidParameterValueException;
20+
import org.apache.cloudstack.persistence.iactemplatesprofile.IacResourceTypeVO;
21+
import org.apache.cloudstack.tosca.model.ToscaNodeTemplate;
22+
import org.apache.cloudstack.tosca.model.ToscaNodeType;
23+
import org.apache.cloudstack.tosca.model.ToscaServiceTemplate;
24+
import org.apache.cloudstack.tosca.parser.ToscaParser;
25+
import org.apache.logging.log4j.LogManager;
26+
import org.apache.logging.log4j.Logger;
27+
28+
import javax.inject.Inject;
29+
import java.util.ArrayList;
30+
import java.util.Collections;
31+
import java.util.HashMap;
32+
import java.util.HashSet;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.Set;
36+
import java.util.concurrent.CompletableFuture;
37+
import java.util.concurrent.CompletionException;
38+
import java.util.concurrent.ExecutorService;
39+
import java.util.concurrent.Executors;
40+
import java.util.stream.Collectors;
41+
42+
public class ToscaOrchestrator {
43+
private final Logger logger = LogManager.getLogger(ToscaOrchestrator.class);
44+
45+
@Inject
46+
private ToscaParser toscaParser;
47+
48+
private Map<String, ToscaNodeType> toscaProfile;
49+
50+
private ExecutorService executorPool;
51+
52+
public void deployIacTemplate(String iacTemplateContent) {
53+
logger.debug("Parsing service template");
54+
ToscaServiceTemplate serviceTemplate = toscaParser.parseServiceTemplate(iacTemplateContent, toscaProfile, null);
55+
Map<ToscaNodeTemplate, CompletableFuture<String>> provisioningTasksFutures = createProvisioningTasksFutures(serviceTemplate);
56+
CompletableFuture<Void> serviceTemplateFeature = CompletableFuture.allOf(provisioningTasksFutures.values().toArray(new CompletableFuture[0]));
57+
serviceTemplateFeature.join();
58+
}
59+
60+
private Map<ToscaNodeTemplate, CompletableFuture<String>> createProvisioningTasksFutures(ToscaServiceTemplate serviceTemplate) {
61+
logger.debug("Creating provisioning tasks futures");
62+
Map<ToscaNodeTemplate, CompletableFuture<String>> futures = new HashMap<>();
63+
Map<String, Set<ToscaNodeTemplate>> dependencyGraph = serviceTemplate.getDependencyGraph();
64+
65+
List<String> nodesWithoutDependencies = dependencyGraph.keySet().stream().filter(node -> dependencyGraph.get(node).isEmpty()).collect(Collectors.toList());
66+
logger.debug("Nodes without dependencies: " + nodesWithoutDependencies);
67+
nodesWithoutDependencies.forEach(node -> {
68+
ToscaNodeTemplate nodeTemplate = serviceTemplate.getNodeTemplates().get(node);
69+
CompletableFuture<String> taskFuture = buildNodeProvisioningTask(nodeTemplate);
70+
futures.put(nodeTemplate, taskFuture);
71+
});
72+
logger.debug("Built all the features for the nodes without dependencies.");
73+
74+
logger.debug("Service template size: " + serviceTemplate.getNodeTemplates().size());
75+
while (futures.size() < serviceTemplate.getNodeTemplates().size()) {
76+
logger.debug("Futures size: " + futures.size());
77+
List<ToscaNodeTemplate> nodesWhoseDependenciesAlreadyHaveFutures = dependencyGraph.keySet().stream()
78+
.filter(node -> !dependencyGraph.get(node).isEmpty() && dependencyGraph.get(node).stream().allMatch(futures::containsKey))
79+
.map(node -> serviceTemplate.getNodeTemplates().get(node))
80+
.collect(Collectors.toList());
81+
82+
for (ToscaNodeTemplate node : nodesWhoseDependenciesAlreadyHaveFutures) {
83+
CompletableFuture<?>[] dependencies = dependencyGraph.get(node.getName()).stream().map(futures::get).toArray(CompletableFuture[]::new);
84+
CompletableFuture<String> taskFuture = CompletableFuture.allOf(dependencies).thenCompose(v -> {
85+
logger.debug("All dependencies of the node [" + node.getName() + "] are ready.");
86+
logger.debug("Here you'll be able to resolve the unresolved properties by get property and get attribute");
87+
return buildNodeProvisioningTask(node);
88+
});
89+
90+
futures.put(node, taskFuture);
91+
}
92+
}
93+
94+
return futures;
95+
}
96+
97+
// O(|V|+|E|)
98+
private List<String> getServiceTemplateTopologicalSort(ToscaServiceTemplate serviceTemplate) {
99+
Set<String> visitedNodes = new HashSet<>();
100+
Set<String> branchAncestors = new HashSet<>();
101+
List<String> topologicalSort = new ArrayList<>();
102+
for (ToscaNodeTemplate node : serviceTemplate.getNodeTemplates().values()) {
103+
if (!visitedNodes.contains(node.getName())) {
104+
depthFirstSearch(topologicalSort, node.getName(), serviceTemplate.getDependencyGraph(), branchAncestors, visitedNodes);
105+
}
106+
}
107+
return topologicalSort;
108+
}
109+
110+
private void depthFirstSearch(List<String> topologicalSort, String node, Map<String, Set<ToscaNodeTemplate>> graph, Set<String> branchAncestors, Set<String> visitedNodes) {
111+
visitedNodes.add(node);
112+
branchAncestors.add(node);
113+
for (ToscaNodeTemplate dependency : graph.getOrDefault(node, Collections.emptySet())) {
114+
if (branchAncestors.contains(dependency.getName())) {
115+
throw new InvalidParameterValueException("Cycle detected in node dependency graph - this should have been caught during template validation");
116+
}
117+
118+
if (!visitedNodes.contains(dependency.getName())) {
119+
depthFirstSearch(topologicalSort, dependency.getName(), graph, branchAncestors, visitedNodes);
120+
}
121+
}
122+
123+
branchAncestors.remove(node);
124+
topologicalSort.add(node);
125+
}
126+
127+
private CompletableFuture<String> buildNodeProvisioningTask(ToscaNodeTemplate nodeTemplate) {
128+
logger.debug("Inside build node provisioning task for node: " + nodeTemplate.getName());
129+
return provisionNode(nodeTemplate)
130+
// .orTimeout(5, TimeUnit.SECONDS)
131+
.thenApply(result -> {
132+
System.out.println("✔ SUCCESS " + nodeTemplate.getName());
133+
return result;
134+
}).exceptionally(ex -> {
135+
System.out.println("✖ FAILURE " + nodeTemplate.getName() + " → " + ex);
136+
throw new CompletionException(ex);
137+
});
138+
}
139+
140+
private CompletableFuture<String> provisionNode(ToscaNodeTemplate nodeTemplate) {
141+
return CompletableFuture.supplyAsync(() -> {
142+
143+
logger.debug("Submitting async job for " + nodeTemplate.getName() + " on " + Thread.currentThread().getName());
144+
145+
try {
146+
Thread.sleep(1500);
147+
} catch (InterruptedException e) {
148+
throw new RuntimeException(e);
149+
}
150+
151+
logger.debug("Here you'll be able to populate the attributes");
152+
return "res-" + nodeTemplate.getName();
153+
154+
}, executorPool);
155+
}
156+
157+
public void configureExecutorPool(int poolSize) {
158+
logger.info("Configuring TOSCA's fixed executor thread pool with [{}] threads.", poolSize);
159+
executorPool = Executors.newFixedThreadPool(poolSize);
160+
}
161+
162+
public void loadToscaProfile(List<IacResourceTypeVO> iacResourceTypes) {
163+
logger.info("Loading TOSCA's profile with the following resource types: {}", iacResourceTypes.stream().map(IacResourceTypeVO::getName).collect(Collectors.toList()));
164+
165+
toscaProfile = iacResourceTypes.stream()
166+
.map(resourceType -> toscaParser.parseNodeTypeDefinitionFile(resourceType.getContent()))
167+
.collect(Collectors.toMap(ToscaNodeType::getName, nodeType -> nodeType));
168+
}
169+
170+
public void shutdownExecutorPool() {
171+
logger.info("Shutting down TOSCA's fixed executor thread pool.");
172+
if (executorPool != null) {
173+
executorPool.shutdown();
174+
}
175+
}
176+
177+
protected Map<String, ToscaNodeType> getToscaProfile() {
178+
return Collections.unmodifiableMap(toscaProfile);
179+
}
180+
}

plugins/iac/nimble/src/main/resources/META-INF/cloudstack/nimble/spring-nimble-context.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,5 @@
2727
<bean id="toscaFieldParser" class="org.apache.cloudstack.tosca.parser.ToscaFieldParser" />
2828
<bean id="toscaNodeTypeParser" class="org.apache.cloudstack.tosca.parser.ToscaNodeTypeParser" />
2929
<bean id="toscaServiceTemplateParser" class="org.apache.cloudstack.tosca.parser.ToscaServiceTemplateParser" />
30+
<bean id="toscaOrchestrator" class="org.apache.cloudstack.tosca.ToscaOrchestrator" />
3031
</beans>

0 commit comments

Comments
 (0)