Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ services:
- ./wait_for_db.sh:/usr/local/tomcat/bin/wait_for_db.sh
environment:
PROPERTIES_FILENAME: sal
CLUSTER_TYPE: "k8s" # (defult) or "k3s"
CLUSTER_TYPE: "k8s" # (defult) or "k3s"
#Set up connection to ProActive server (PWS)
PWS_URL: <CHANGE_ME>
PWS_USERNAME: <CHANGE_ME>
Expand All @@ -68,4 +68,4 @@ networks:
db-tier: {}
#Comment this part if you do not want to include volume for peristant data storage
volumes:
mariadb_data: {}
mariadb_data: {}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public ResponseEntity<Long> submitJob(@ApiParam(value = "Proactive authenticatio
final String sessionId, @ApiParam(value = "A job identifier", required = true)
@PathVariable
final String jobId) throws NotConnectedException {
return ResponseEntity.ok(jobService.submitJob(sessionId, jobId));
return ResponseEntity.ok(jobService.submitJob(sessionId, jobId, null));
}

@RequestMapping(value = "/{jobId}/status", method = RequestMethod.GET)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import java.util.Map;

import org.apache.commons.lang3.Validate;
import org.ow2.proactive.sal.model.*;
Expand All @@ -18,12 +19,17 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import lombok.extern.log4j.Log4j2;


@Log4j2
@Service("ClusterService")
public class ClusterService {
public static final String CONTAINERIZATION_FLAVOR_ENV = "export CONTAINERIZATION_FLAVOR=";

@Autowired
private PAGatewayService paGatewayService;

Expand All @@ -39,6 +45,8 @@ public class ClusterService {
@Autowired
private EdgeService edgeService;

private static final ObjectMapper objectMapper = new ObjectMapper();

private boolean isValidClusterName(String name) {
return name != null && !name.isEmpty() && name.matches("^[a-z0-9-]+$");
}
Expand All @@ -56,6 +64,62 @@ private void validateNode(ClusterNodeDefinition node) {
}
}

public static String stripQuotes(String value) {
if (value == null || value.isEmpty())
return value;

int start = 0;
int end = value.length();

if (value.charAt(0) == '"' || value.charAt(0) == '\'') {
start++;
}
if (value.length() > 1 &&
(value.charAt(value.length() - 1) == '"' || value.charAt(value.length() - 1) == '\'')) {
end--;
}

return value.substring(start, end);
}

public static String getContainerizationFlavor(String envVarsScript) {
if (envVarsScript == null || envVarsScript.isEmpty()) {
return null;
}

try {
String[] lines = envVarsScript.split("\\r?\\n");

for (String line : lines) {
line = line.trim();

if (line.contains(CONTAINERIZATION_FLAVOR_ENV)) {
// Extract only the export part before '>>'
int exportIndex = line.indexOf(CONTAINERIZATION_FLAVOR_ENV);
String exportPart = line.substring(exportIndex).split(">>")[0].trim();

// Split and get the value part
String[] keyValue = exportPart.replace("export ", "").split("=", 2);
if (keyValue.length == 2) {
String rawValue = keyValue[1].trim();

// Remove surrounding quotes and convert to lower case
String cleanValue = stripQuotes(rawValue.trim()).toLowerCase();

return ClusterUtils.CLUSTER_TYPE_K3S.equals(cleanValue) ? ClusterUtils.CLUSTER_TYPE_K3S
: ClusterUtils.CLUSTER_TYPE_K8S;
}
}
}

return null;
} catch (Exception e) {
LOGGER.error("Failed to parse containerization flavor from envVarsScript: {}", e.getMessage(), e);
return null;
}

}

public boolean defineCluster(String sessionId, ClusterDefinition clusterDefinition)
throws NotConnectedException, IOException {
if (!paGatewayService.isConnectionActive(sessionId)) {
Expand Down Expand Up @@ -95,7 +159,9 @@ public boolean defineCluster(String sessionId, ClusterDefinition clusterDefiniti
}

cluster.setStatus(ClusterStatus.DEFINED);

cluster.setEnvVars(ClusterUtils.createEnvVarsScript(clusterDefinition.getEnvVars()));

nodes.forEach(repositoryService::saveClusterNodeDefinition);
cluster.setNodes(nodes);
repositoryService.saveCluster(cluster);
Expand Down Expand Up @@ -210,7 +276,8 @@ private void submitClusterNode(String sessionId, Cluster cluster, String nodeNam
repositoryService.saveDeployment(currentDeployment);
repositoryService.flush();
// submit job
jobService.submitJob(sessionId, jobId);
String containerizationFlavor = getContainerizationFlavor(cluster.getEnvVars());
jobService.submitJob(sessionId, jobId, containerizationFlavor);
LOGGER.info("Node {} is submitted for deployment", nodeName);
} else {
LOGGER.error("The node {} was not found in the cluster {} definition", nodeName, cluster.getName());
Expand Down Expand Up @@ -343,13 +410,15 @@ public Long labelNodes(String sessionId, String clusterName, List<Map<String, St
LOGGER.info("labelNodes endpoint is called to label nodes in the cluster: " + clusterName);
String masterNodeToken = "";
Cluster cluster = ClusterUtils.getClusterByName(clusterName, repositoryService.listCluster());

if (cluster != null) {
masterNodeToken = cluster.getMasterNode() + "_" + clusterName;
} else {
LOGGER.error("The cluster with the name {} was not found!", clusterName);
return -1L;
}
String script = ClusterUtils.createLabelNodesScript(nodeLabels, clusterName);
String containerizationFlavor = getContainerizationFlavor(cluster.getEnvVars());
String script = ClusterUtils.createLabelNodesScript(nodeLabels, clusterName, containerizationFlavor);

try {
String paJobName = "label_nodes_" + clusterName;
Expand Down Expand Up @@ -377,7 +446,8 @@ public Long deployApplication(String sessionId, String clusterName, ClusterAppli
}
String script = "";
try {
script = ClusterUtils.createDeployApplicationScript(application);
String containerizationFlavor = getContainerizationFlavor(cluster.getEnvVars());
script = ClusterUtils.createDeployApplication(application, containerizationFlavor);

} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public String getGraphInDotFormat(String sessionId, String jobId) throws NotConn
* @return The submitted job id
*/
@Transactional
public Long submitJob(String sessionId, String jobId) throws NotConnectedException {
public Long submitJob(String sessionId, String jobId, String containerizationFlavor) throws NotConnectedException {
if (!paGatewayService.isConnectionActive(sessionId)) {
throw new NotConnectedException();
}
Expand All @@ -319,7 +319,9 @@ public Long submitJob(String sessionId, String jobId) throws NotConnectedExcepti
.stream()
.filter(task -> task.getDeployments() != null && !task.getDeployments().isEmpty())
.forEach(task -> {
List<ScriptTask> scriptTasks = taskBuilder.buildPATask(task, jobToSubmit);
List<ScriptTask> scriptTasks = taskBuilder.buildPATask(task,
jobToSubmit,
containerizationFlavor);

addAllScriptTasksToPAJob(paJob, task, scriptTasks);
repositoryService.saveTask(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ public List<ScriptTask> buildReconfigurationPATask(Task task, Job job,
} else if (addedTaskNames.contains(task.getName())) {
// When the scaled task is a parent of the task to be built
LOGGER.info("Building task [{}] as a new added task ", task.getTaskId());
scriptTasks.addAll(buildPATask(task, job));
scriptTasks.addAll(buildPATask(task, job, null));
} else {
LOGGER.warn("Task [{}] is neither unchanged nor added. This should not figure ine job!", task.getTaskId());
}
Expand Down Expand Up @@ -800,7 +800,7 @@ private List<ScriptTask> setFirstAndLastSubmittedTaskNamesFromScriptTasks(Task t
* @param job The related job skeleton
* @return A list of ProActive tasks
*/
public List<ScriptTask> buildPATask(Task task, Job job) {
public List<ScriptTask> buildPATask(Task task, Job job, String containerizationFlavor) {
List<ScriptTask> scriptTasks = new LinkedList<>();
LOGGER.debug("Building PA task for: {}", task.getTaskId());
if (task.getDeployments() == null || task.getDeployments().isEmpty()) {
Expand All @@ -816,7 +816,8 @@ public List<ScriptTask> buildPATask(Task task, Job job) {
String suffix = "_" + deployment.getNumber();
scriptTasks.add(createInfraTask(task, deployment, suffix, token));
if (deployment.getWorker() != null && deployment.getWorker()) {
ScriptTask waitForMasterTask = createWaitForMasterTask(deployment.getMasterToken());
ScriptTask waitForMasterTask = createWaitForMasterTask(deployment.getMasterToken(),
containerizationFlavor);
waitForMasterTask.addDependence(scriptTasks.get(scriptTasks.size() - 1));
scriptTasks.add(waitForMasterTask);
}
Expand Down Expand Up @@ -901,8 +902,9 @@ private Map<String, TaskVariable> createVariablesMapForSynchronizationChannels(J
return (variablesMap);
}

private ScriptTask createWaitForMasterTask(String masterNodeToken) {
String clusterType = System.getenv(ClusterUtils.CLUSTER_TYPE_ENV);
private ScriptTask createWaitForMasterTask(String masterNodeToken, String containerizationFlavor) {
String clusterType = (containerizationFlavor != null) ? containerizationFlavor
: System.getenv(ClusterUtils.CLUSTER_TYPE_ENV);
String waitForMasterScript;

if (ClusterUtils.CLUSTER_TYPE_K3S.equalsIgnoreCase(clusterType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,10 @@ private static String getBashFilesContent(String fileName) throws IOException {

}

public static String createLabelNodesScript(List<Map<String, String>> nodeLabels, String clusterName) {
String clusterType = System.getenv(CLUSTER_TYPE_ENV);
public static String createLabelNodesScript(List<Map<String, String>> nodeLabels, String clusterName,
String containerizationFlavor) {
String clusterType = (containerizationFlavor != null) ? containerizationFlavor
: System.getenv(CLUSTER_TYPE_ENV);

if (CLUSTER_TYPE_K3S.equalsIgnoreCase(clusterType)) {
return createK3sLabelNodesScript(nodeLabels, clusterName);
Expand Down Expand Up @@ -229,8 +231,10 @@ public static String createK8sLabelNodesScript(List<Map<String, String>> nodeLab
return script.toString();
}

public static String createDeployApplicationScript(ClusterApplication application) throws IOException {
String clusterType = System.getenv(CLUSTER_TYPE_ENV); // Get cluster type from env variable
public static String createDeployApplication(ClusterApplication application, String containerizationFlavor)
throws IOException {
String clusterType = (containerizationFlavor != null) ? containerizationFlavor
: System.getenv(CLUSTER_TYPE_ENV);
return createDeployApplicationScript(application, clusterType);
}

Expand Down