diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 0a5bb4d..c18adc3 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -41,7 +41,7 @@ services: - ./wait_for_db.sh:/usr/local/tomcat/bin/wait_for_db.sh environment: PROPERTIES_FILENAME: sal - CLUSTER_TYPE: "k8s" # (defult) or "k3s" + CLUSTER_TYPE: "k8s" # (defult) or "k3s" #Set up connection to ProActive server (PWS) PWS_URL: PWS_USERNAME: @@ -68,4 +68,4 @@ networks: db-tier: {} #Comment this part if you do not want to include volume for peristant data storage volumes: - mariadb_data: {} + mariadb_data: {} \ No newline at end of file diff --git a/sal-service/src/main/java/org/ow2/proactive/sal/service/rest/JobRest.java b/sal-service/src/main/java/org/ow2/proactive/sal/service/rest/JobRest.java index 3466e8f..0c5b2c5 100644 --- a/sal-service/src/main/java/org/ow2/proactive/sal/service/rest/JobRest.java +++ b/sal-service/src/main/java/org/ow2/proactive/sal/service/rest/JobRest.java @@ -92,7 +92,7 @@ public ResponseEntity submitJob(@ApiParam(value = "Proactive authenticatio final String sessionId, @ApiParam(value = "A job identifier", required = true) @PathVariable final String jobId) throws NotConnectedException { - return ResponseEntity.ok(jobService.submitJob(sessionId, jobId)); + return ResponseEntity.ok(jobService.submitJob(sessionId, jobId, null)); } @RequestMapping(value = "/{jobId}/status", method = RequestMethod.GET) diff --git a/sal-service/src/main/java/org/ow2/proactive/sal/service/service/ClusterService.java b/sal-service/src/main/java/org/ow2/proactive/sal/service/service/ClusterService.java index bc0c21e..325e7d3 100644 --- a/sal-service/src/main/java/org/ow2/proactive/sal/service/service/ClusterService.java +++ b/sal-service/src/main/java/org/ow2/proactive/sal/service/service/ClusterService.java @@ -8,6 +8,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.*; +import java.util.Map; import org.apache.commons.lang3.Validate; import org.ow2.proactive.sal.model.*; @@ -18,12 +19,17 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + import lombok.extern.log4j.Log4j2; @Log4j2 @Service("ClusterService") public class ClusterService { + public static final String CONTAINERIZATION_FLAVOR_ENV = "export CONTAINERIZATION_FLAVOR="; + @Autowired private PAGatewayService paGatewayService; @@ -39,6 +45,8 @@ public class ClusterService { @Autowired private EdgeService edgeService; + private static final ObjectMapper objectMapper = new ObjectMapper(); + private boolean isValidClusterName(String name) { return name != null && !name.isEmpty() && name.matches("^[a-z0-9-]+$"); } @@ -56,6 +64,62 @@ private void validateNode(ClusterNodeDefinition node) { } } + public static String stripQuotes(String value) { + if (value == null || value.isEmpty()) + return value; + + int start = 0; + int end = value.length(); + + if (value.charAt(0) == '"' || value.charAt(0) == '\'') { + start++; + } + if (value.length() > 1 && + (value.charAt(value.length() - 1) == '"' || value.charAt(value.length() - 1) == '\'')) { + end--; + } + + return value.substring(start, end); + } + + public static String getContainerizationFlavor(String envVarsScript) { + if (envVarsScript == null || envVarsScript.isEmpty()) { + return null; + } + + try { + String[] lines = envVarsScript.split("\\r?\\n"); + + for (String line : lines) { + line = line.trim(); + + if (line.contains(CONTAINERIZATION_FLAVOR_ENV)) { + // Extract only the export part before '>>' + int exportIndex = line.indexOf(CONTAINERIZATION_FLAVOR_ENV); + String exportPart = line.substring(exportIndex).split(">>")[0].trim(); + + // Split and get the value part + String[] keyValue = exportPart.replace("export ", "").split("=", 2); + if (keyValue.length == 2) { + String rawValue = keyValue[1].trim(); + + // Remove surrounding quotes and convert to lower case + String cleanValue = stripQuotes(rawValue.trim()).toLowerCase(); + + return ClusterUtils.CLUSTER_TYPE_K3S.equals(cleanValue) ? ClusterUtils.CLUSTER_TYPE_K3S + : ClusterUtils.CLUSTER_TYPE_K8S; + } + } + } + + return null; + } catch (Exception e) { + LOGGER.error("Failed to parse containerization flavor from envVarsScript: {}", e.getMessage(), e); + return null; + } + + } + public boolean defineCluster(String sessionId, ClusterDefinition clusterDefinition) throws NotConnectedException, IOException { if (!paGatewayService.isConnectionActive(sessionId)) { @@ -95,7 +159,9 @@ public boolean defineCluster(String sessionId, ClusterDefinition clusterDefiniti } cluster.setStatus(ClusterStatus.DEFINED); + cluster.setEnvVars(ClusterUtils.createEnvVarsScript(clusterDefinition.getEnvVars())); + nodes.forEach(repositoryService::saveClusterNodeDefinition); cluster.setNodes(nodes); repositoryService.saveCluster(cluster); @@ -210,7 +276,8 @@ private void submitClusterNode(String sessionId, Cluster cluster, String nodeNam repositoryService.saveDeployment(currentDeployment); repositoryService.flush(); // submit job - jobService.submitJob(sessionId, jobId); + String containerizationFlavor = getContainerizationFlavor(cluster.getEnvVars()); + jobService.submitJob(sessionId, jobId, containerizationFlavor); LOGGER.info("Node {} is submitted for deployment", nodeName); } else { LOGGER.error("The node {} was not found in the cluster {} definition", nodeName, cluster.getName()); @@ -343,13 +410,15 @@ public Long labelNodes(String sessionId, String clusterName, List task.getDeployments() != null && !task.getDeployments().isEmpty()) .forEach(task -> { - List scriptTasks = taskBuilder.buildPATask(task, jobToSubmit); + List scriptTasks = taskBuilder.buildPATask(task, + jobToSubmit, + containerizationFlavor); addAllScriptTasksToPAJob(paJob, task, scriptTasks); repositoryService.saveTask(task); diff --git a/sal-service/src/main/java/org/ow2/proactive/sal/service/service/TaskBuilder.java b/sal-service/src/main/java/org/ow2/proactive/sal/service/service/TaskBuilder.java index 5e32679..bc5f264 100644 --- a/sal-service/src/main/java/org/ow2/proactive/sal/service/service/TaskBuilder.java +++ b/sal-service/src/main/java/org/ow2/proactive/sal/service/service/TaskBuilder.java @@ -747,7 +747,7 @@ public List buildReconfigurationPATask(Task task, Job job, } else if (addedTaskNames.contains(task.getName())) { // When the scaled task is a parent of the task to be built LOGGER.info("Building task [{}] as a new added task ", task.getTaskId()); - scriptTasks.addAll(buildPATask(task, job)); + scriptTasks.addAll(buildPATask(task, job, null)); } else { LOGGER.warn("Task [{}] is neither unchanged nor added. This should not figure ine job!", task.getTaskId()); } @@ -800,7 +800,7 @@ private List setFirstAndLastSubmittedTaskNamesFromScriptTasks(Task t * @param job The related job skeleton * @return A list of ProActive tasks */ - public List buildPATask(Task task, Job job) { + public List buildPATask(Task task, Job job, String containerizationFlavor) { List scriptTasks = new LinkedList<>(); LOGGER.debug("Building PA task for: {}", task.getTaskId()); if (task.getDeployments() == null || task.getDeployments().isEmpty()) { @@ -816,7 +816,8 @@ public List buildPATask(Task task, Job job) { String suffix = "_" + deployment.getNumber(); scriptTasks.add(createInfraTask(task, deployment, suffix, token)); if (deployment.getWorker() != null && deployment.getWorker()) { - ScriptTask waitForMasterTask = createWaitForMasterTask(deployment.getMasterToken()); + ScriptTask waitForMasterTask = createWaitForMasterTask(deployment.getMasterToken(), + containerizationFlavor); waitForMasterTask.addDependence(scriptTasks.get(scriptTasks.size() - 1)); scriptTasks.add(waitForMasterTask); } @@ -901,8 +902,9 @@ private Map createVariablesMapForSynchronizationChannels(J return (variablesMap); } - private ScriptTask createWaitForMasterTask(String masterNodeToken) { - String clusterType = System.getenv(ClusterUtils.CLUSTER_TYPE_ENV); + private ScriptTask createWaitForMasterTask(String masterNodeToken, String containerizationFlavor) { + String clusterType = (containerizationFlavor != null) ? containerizationFlavor + : System.getenv(ClusterUtils.CLUSTER_TYPE_ENV); String waitForMasterScript; if (ClusterUtils.CLUSTER_TYPE_K3S.equalsIgnoreCase(clusterType)) { diff --git a/sal-service/src/main/java/org/ow2/proactive/sal/service/util/ClusterUtils.java b/sal-service/src/main/java/org/ow2/proactive/sal/service/util/ClusterUtils.java index 6f80b03..ea50f0c 100644 --- a/sal-service/src/main/java/org/ow2/proactive/sal/service/util/ClusterUtils.java +++ b/sal-service/src/main/java/org/ow2/proactive/sal/service/util/ClusterUtils.java @@ -186,8 +186,10 @@ private static String getBashFilesContent(String fileName) throws IOException { } - public static String createLabelNodesScript(List> nodeLabels, String clusterName) { - String clusterType = System.getenv(CLUSTER_TYPE_ENV); + public static String createLabelNodesScript(List> nodeLabels, String clusterName, + String containerizationFlavor) { + String clusterType = (containerizationFlavor != null) ? containerizationFlavor + : System.getenv(CLUSTER_TYPE_ENV); if (CLUSTER_TYPE_K3S.equalsIgnoreCase(clusterType)) { return createK3sLabelNodesScript(nodeLabels, clusterName); @@ -229,8 +231,10 @@ public static String createK8sLabelNodesScript(List> nodeLab return script.toString(); } - public static String createDeployApplicationScript(ClusterApplication application) throws IOException { - String clusterType = System.getenv(CLUSTER_TYPE_ENV); // Get cluster type from env variable + public static String createDeployApplication(ClusterApplication application, String containerizationFlavor) + throws IOException { + String clusterType = (containerizationFlavor != null) ? containerizationFlavor + : System.getenv(CLUSTER_TYPE_ENV); return createDeployApplicationScript(application, clusterType); }