diff --git a/.travis.yml b/.travis.yml
index b179f30..7de3efb 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,3 +1,8 @@
language: java
jdk:
- openjdk8
+install: true
+script: mvn package -DskipTests=false -Dmaven.javadoc.skip=true -B -V
+cache:
+ directories:
+ - $HOME/.m2
\ No newline at end of file
diff --git a/Dockerfile b/Dockerfile
index 639c488..0c84709 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,6 +1,6 @@
FROM openjdk:8u151-jre-alpine
-ENV KRONOS_VERSION 3.0.0
+ENV KRONOS_VERSION 3.1.0
ENV KRONOS_HOME /home/kronos-${KRONOS_VERSION}
ENV MODE all
diff --git a/api/pom.xml b/api/pom.xml
index aaf7c00..17bc615 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -20,7 +20,7 @@
kronos
com.cognitree
- 3.0.0
+ 3.1.0
4.0.0
diff --git a/api/src/main/java/com/cognitree/kronos/api/JobResource.java b/api/src/main/java/com/cognitree/kronos/api/JobResource.java
index 8f3e067..7b25e49 100644
--- a/api/src/main/java/com/cognitree/kronos/api/JobResource.java
+++ b/api/src/main/java/com/cognitree/kronos/api/JobResource.java
@@ -66,7 +66,7 @@ public Response getAllJobs(@ApiParam(value = "job status", allowMultiple = true)
@DefaultValue(DEFAULT_DAYS) @QueryParam("date_range") int numberOfDays,
@HeaderParam("namespace") String namespace) throws ServiceException, ValidationException {
logger.info("Received request to get all jobs under namespace {} with param status in {}, date range {}, " +
- "from {}, to {}", namespace, statuses, numberOfDays, createdAfter, createdBefore, namespace);
+ "from {}, to {}", namespace, statuses, numberOfDays, createdAfter, createdBefore);
if (namespace == null || namespace.isEmpty()) {
return Response.status(BAD_REQUEST).entity("missing namespace header").build();
}
diff --git a/api/src/main/java/com/cognitree/kronos/api/WorkflowJobResource.java b/api/src/main/java/com/cognitree/kronos/api/WorkflowJobResource.java
index dc07738..6b4a6df 100644
--- a/api/src/main/java/com/cognitree/kronos/api/WorkflowJobResource.java
+++ b/api/src/main/java/com/cognitree/kronos/api/WorkflowJobResource.java
@@ -43,6 +43,7 @@
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -133,7 +134,19 @@ public Response getJob(@ApiParam(value = "workflow name", required = true)
logger.error("No job exists with id {}", jobId);
return Response.status(NOT_FOUND).build();
}
- final List tasks = JobService.getService().getTasks(job);
+ final List tasks = JobService.getService().getTasks(job).stream()
+ .sorted((task1, task2) -> {
+ int diff = task1.getStatus().getOrder() - task2.getStatus().getOrder();
+ if (diff == 0){
+ if (task1.getCompletedAt() != null && task2.getCompletedAt() != null){
+ return task1.getCompletedAt().compareTo(task2.getCompletedAt());
+ }else {
+ return task1.getCreatedAt().compareTo(task2.getCreatedAt());
+ }
+ }else {
+ return diff;
+ }
+ }).collect(Collectors.toList());
return Response.status(OK).entity(JobResponse.create(job, tasks)).build();
}
diff --git a/api/src/main/java/com/cognitree/kronos/api/WorkflowTriggerResource.java b/api/src/main/java/com/cognitree/kronos/api/WorkflowTriggerResource.java
index 2b12fc4..877310f 100644
--- a/api/src/main/java/com/cognitree/kronos/api/WorkflowTriggerResource.java
+++ b/api/src/main/java/com/cognitree/kronos/api/WorkflowTriggerResource.java
@@ -167,7 +167,7 @@ public Response updateWorkflowTrigger(@ApiParam(value = "workflow name", require
@HeaderParam("namespace") String namespace)
throws ServiceException, ValidationException, SchedulerException {
logger.info("Received request to update workflow trigger {} for workflow {} under namespace {} set enable to {}",
- triggerName, workflowName, namespace);
+ triggerName, workflowName, namespace, enable);
if (namespace == null || namespace.isEmpty()) {
return Response.status(BAD_REQUEST).entity("missing namespace header").build();
}
diff --git a/app/pom.xml b/app/pom.xml
index 5a09696..4052483 100644
--- a/app/pom.xml
+++ b/app/pom.xml
@@ -22,7 +22,7 @@
kronos
com.cognitree
- 3.0.0
+ 3.1.0
4.0.0
diff --git a/app/src/main/conf/log4j.properties b/app/src/main/conf/log4j.properties
deleted file mode 100755
index 721c81a..0000000
--- a/app/src/main/conf/log4j.properties
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-log4j.rootLogger=INFO, STDOUT
-log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender
-log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout
-log4j.appender.STDOUT.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %5p [%t] (%F:%L) - %m%n
diff --git a/app/src/main/conf/log4j2.properties b/app/src/main/conf/log4j2.properties
new file mode 100644
index 0000000..55be63c
--- /dev/null
+++ b/app/src/main/conf/log4j2.properties
@@ -0,0 +1,16 @@
+name = Log4j2Config
+#The level of internal Log4j events that should be logged to the console
+status = error
+#The minimum amount of time, in seconds, that must elapse before the file configuration is checked for changes.
+#Any changes to the configuration file during runtime will come into effect.
+monitorInterval = 10
+
+appender.console.type = Console
+appender.console.name = STDOUT
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %5p [%t] (%F:%L) - %m%n
+appender.console.immediateFlush=true
+
+rootLogger.level = INFO
+rootLogger.appenderRefs = stdout
+rootLogger.appenderRef.stdout.ref = STDOUT
\ No newline at end of file
diff --git a/app/src/test/java/com/cognitree/kronos/scheduler/JobServiceTest.java b/app/src/test/java/com/cognitree/kronos/scheduler/JobServiceTest.java
index d8cbcc5..59ff563 100644
--- a/app/src/test/java/com/cognitree/kronos/scheduler/JobServiceTest.java
+++ b/app/src/test/java/com/cognitree/kronos/scheduler/JobServiceTest.java
@@ -253,6 +253,103 @@ public void testGetJobTasksFailedDueToHandler() throws Exception {
}
}
+ @Test
+ public void testConditionSuccessJob() throws Exception {
+ final WorkflowTrigger workflowTrigger = scheduleWorkflow(CONDITION_WORKFLOW_TEMPLATE_SUCCESS_YAML);
+
+ waitForJobsToTriggerAndComplete(workflowTrigger);
+
+ JobService jobService = JobService.getService();
+ final List workflowOneJobs = jobService.get(workflowTrigger.getNamespace(), workflowTrigger.getWorkflow(),
+ workflowTrigger.getName(), 0, System.currentTimeMillis());
+ Assert.assertEquals(1, workflowOneJobs.size());
+
+ final Job job = workflowOneJobs.get(0);
+ final List tasks = jobService.getTasks(job);
+ Assert.assertEquals(3, tasks.size());
+ for (Task task : tasks) {
+ switch (task.getName()) {
+ case "taskOne":
+ case "taskTwo":
+ case "taskThree":
+ Assert.assertEquals(Task.Status.SUCCESSFUL, task.getStatus());
+ Assert.assertTrue(MockSuccessTaskHandler.isHandled(task.getIdentity()));
+ Assert.assertEquals(0, task.getRetryCount());
+ break;
+ default:
+ Assert.fail();
+ }
+ }
+ }
+
+ @Test
+ public void testLastConditionFailsJob() throws Exception {
+ final WorkflowTrigger workflowTrigger = scheduleWorkflow(CONDITION_WORKFLOW_TEMPLATE_FAILURE_LASTCONDITION_YAML);
+
+ waitForJobsToTriggerAndComplete(workflowTrigger);
+
+ JobService jobService = JobService.getService();
+ final List workflowOneJobs = jobService.get(workflowTrigger.getNamespace(), workflowTrigger.getWorkflow(),
+ workflowTrigger.getName(), 0, System.currentTimeMillis());
+ Assert.assertEquals(1, workflowOneJobs.size());
+
+ final Job job = workflowOneJobs.get(0);
+ final List tasks = jobService.getTasks(job);
+ Assert.assertEquals(3, tasks.size());
+ for (Task task : tasks) {
+ switch (task.getName()) {
+ case "taskOne":
+ case "taskTwo":
+ Assert.assertEquals(Task.Status.SUCCESSFUL, task.getStatus());
+ Assert.assertTrue(MockSuccessTaskHandler.isHandled(task.getIdentity()));
+ break;
+ case "taskThree":
+ Assert.assertEquals(SKIPPED, task.getStatus());
+ Assert.assertEquals(Messages.TASK_SKIPPED_CONDITION_FAILS, task.getStatusMessage());
+ Assert.assertEquals(0, task.getRetryCount());
+ break;
+ default:
+ Assert.fail();
+ }
+ }
+ }
+
+ @Test
+ public void testSecondConditionFails() throws Exception {
+ final WorkflowTrigger workflowTrigger = scheduleWorkflow(CONDITION_WORKFLOW_TEMPLATE_FAILURE_SECONDCONDITION_YAML);
+
+ waitForJobsToTriggerAndComplete(workflowTrigger);
+
+ JobService jobService = JobService.getService();
+ final List workflowOneJobs = jobService.get(workflowTrigger.getNamespace(), workflowTrigger.getWorkflow(),
+ workflowTrigger.getName(), 0, System.currentTimeMillis());
+ Assert.assertEquals(1, workflowOneJobs.size());
+
+ final Job job = workflowOneJobs.get(0);
+ final List tasks = jobService.getTasks(job);
+ Assert.assertEquals(3, tasks.size());
+ for (Task task : tasks) {
+ switch (task.getName()) {
+ case "taskOne":
+ Assert.assertEquals(Task.Status.SUCCESSFUL, task.getStatus());
+ Assert.assertTrue(MockSuccessTaskHandler.isHandled(task.getIdentity()));
+ break;
+ case "taskTwo":
+ Assert.assertEquals(SKIPPED, task.getStatus());
+ Assert.assertEquals(Messages.TASK_SKIPPED_CONDITION_FAILS, task.getStatusMessage());
+ Assert.assertEquals(0, task.getRetryCount());
+ break;
+ case "taskThree":
+ Assert.assertEquals(SKIPPED, task.getStatus());
+ Assert.assertEquals(Messages.SKIPPED_DEPENDEE_TASK_MESSAGE, task.getStatusMessage());
+ Assert.assertEquals(0, task.getRetryCount());
+ break;
+ default:
+ Assert.fail();
+ }
+ }
+ }
+
@Test(expected = ValidationException.class)
public void testAbortJobNotFound() throws Exception {
final WorkflowTrigger workflowTrigger = scheduleWorkflow(WORKFLOW_TEMPLATE_YAML);
diff --git a/app/src/test/java/com/cognitree/kronos/scheduler/ServiceTest.java b/app/src/test/java/com/cognitree/kronos/scheduler/ServiceTest.java
index 70cb84e..d90faf1 100644
--- a/app/src/test/java/com/cognitree/kronos/scheduler/ServiceTest.java
+++ b/app/src/test/java/com/cognitree/kronos/scheduler/ServiceTest.java
@@ -48,6 +48,12 @@ public class ServiceTest {
protected static final String WORKFLOW_TEMPLATE_WITH_TASK_CONTEXT_YAML = "workflows/workflow-template-with-task-context.yaml";
protected static final String WORKFLOW_TEMPLATE_WITH_PROPERTIES_YAML = "workflows/workflow-template-with-properties.yaml";
protected static final String WORKFLOW_TEMPLATE_WITH_DUPLICATE_POLICY_YAML = "workflows/workflow-template-with-duplicate-policy.yaml";
+ protected static final String WORKFLOW_TEMPLATE_FAILED_PARALLEL_BRANCH_YAML = "workflows/workflow-template-failed-parallel-branch.yaml";
+
+ protected static final String CONDITION_WORKFLOW_TEMPLATE_SUCCESS_YAML = "workflows/condition-workflow-template-success.yaml";
+ protected static final String CONDITION_WORKFLOW_TEMPLATE_FAILURE_LASTCONDITION_YAML = "workflows/condition-workflow-template-failure-last-task.yaml";
+ protected static final String CONDITION_WORKFLOW_TEMPLATE_FAILURE_SECONDCONDITION_YAML = "workflows/condition-workflow-template-failure-second-task";
+ protected static final String CONDITION_WORKFLOW_TEMPLATE_FAILED_PARALLEL_BRANCH_YAML = "workflows/condition-workflow-template-failed-parallel-branch.yaml";
private static final ObjectMapper MAPPER = new ObjectMapper(new YAMLFactory());
private static final List EXISTING_NAMESPACE = new ArrayList<>();
diff --git a/app/src/test/java/com/cognitree/kronos/scheduler/TaskServiceTest.java b/app/src/test/java/com/cognitree/kronos/scheduler/TaskServiceTest.java
index b6aecfe..070e309 100644
--- a/app/src/test/java/com/cognitree/kronos/scheduler/TaskServiceTest.java
+++ b/app/src/test/java/com/cognitree/kronos/scheduler/TaskServiceTest.java
@@ -33,10 +33,7 @@
import java.util.List;
import java.util.UUID;
-import static com.cognitree.kronos.TestUtil.scheduleWorkflow;
-import static com.cognitree.kronos.TestUtil.waitForJobsToTriggerAndComplete;
-import static com.cognitree.kronos.TestUtil.waitForTaskToBeRunning;
-import static com.cognitree.kronos.TestUtil.waitForTriggerToComplete;
+import static com.cognitree.kronos.TestUtil.*;
public class TaskServiceTest extends ServiceTest {
@@ -155,6 +152,38 @@ public void testAbortTasks() throws Exception {
}
}
+ @Test
+ public void testFailedBranchTasks() throws Exception {
+ final WorkflowTrigger workflowTrigger = scheduleWorkflow(WORKFLOW_TEMPLATE_FAILED_PARALLEL_BRANCH_YAML);
+
+ waitForJobsToTriggerAndComplete(workflowTrigger);
+ TaskService taskService = TaskService.getService();
+ List tasks = taskService.get(workflowTrigger.getNamespace());
+ for (Task task : tasks) {
+ if (task.getName().equals("B") || task.getName().equals("C")) {
+ Assert.assertNotEquals(Task.Status.SUCCESSFUL, task.getStatus());
+ } else {
+ Assert.assertEquals(Task.Status.SUCCESSFUL, task.getStatus());
+ }
+ }
+ }
+
+ @Test
+ public void testConditionFailedBranchTasks() throws Exception {
+ final WorkflowTrigger workflowTrigger = scheduleWorkflow(CONDITION_WORKFLOW_TEMPLATE_FAILED_PARALLEL_BRANCH_YAML);
+
+ waitForJobsToTriggerAndComplete(workflowTrigger);
+ TaskService taskService = TaskService.getService();
+ List tasks = taskService.get(workflowTrigger.getNamespace());
+ for (Task task : tasks) {
+ if (task.getName().equals("B") || task.getName().equals("C")) {
+ Assert.assertNotEquals(Task.Status.SUCCESSFUL, task.getStatus());
+ } else {
+ Assert.assertEquals(Task.Status.SUCCESSFUL, task.getStatus());
+ }
+ }
+ }
+
@Test
public void testDeleteTask() throws Exception {
final WorkflowTrigger workflowTrigger = scheduleWorkflow(WORKFLOW_TEMPLATE_YAML);
diff --git a/app/src/test/resources/workflows/condition-workflow-template-failed-parallel-branch.yaml b/app/src/test/resources/workflows/condition-workflow-template-failed-parallel-branch.yaml
new file mode 100644
index 0000000..1fdc35e
--- /dev/null
+++ b/app/src/test/resources/workflows/condition-workflow-template-failed-parallel-branch.yaml
@@ -0,0 +1,35 @@
+tasks:
+ - name: A
+ type: typeSuccess
+ properties:
+ keyA: valA
+ keyB: valB
+ - name: B
+ type: typeSuccess
+ properties:
+ keyA: valA
+ keyB: valB
+ condition: 2==3
+ dependsOn:
+ - A
+ - name: C
+ type: typeSuccess
+ properties:
+ keyA: valA
+ keyB: valB
+ dependsOn:
+ - B
+ - name: D
+ type: typeSuccess
+ properties:
+ keyA: valA
+ keyB: valB
+ dependsOn:
+ - A
+ - name: E
+ type: typeSuccess
+ properties:
+ keyA: valA
+ keyB: valB
+ dependsOn:
+ - D
\ No newline at end of file
diff --git a/app/src/test/resources/workflows/condition-workflow-template-failure-last-task.yaml b/app/src/test/resources/workflows/condition-workflow-template-failure-last-task.yaml
new file mode 100644
index 0000000..5a7df41
--- /dev/null
+++ b/app/src/test/resources/workflows/condition-workflow-template-failure-last-task.yaml
@@ -0,0 +1,25 @@
+# name and namespace is set as part of test case
+description: Workflow with condition only last conditions fails
+tasks:
+ - name: taskOne
+ type: typeSuccess
+ properties:
+ keyA: valA
+ keyB: valB
+ - name: taskTwo
+ type: typeSuccess
+ dependsOn:
+ - taskOne
+ condition: taskOne.get("valOne") == 1234 && taskOne.get("valTwo") == 'abcd'
+ properties:
+ keyA: valA
+ keyB: valB
+ - name: taskThree
+ type: typeSuccess
+ dependsOn:
+ - taskTwo
+ - taskOne
+ condition: taskOne.get("valOne") + taskTwo.get("valOne") == 0 || taskOne.get("valTwo") != taskTwo.get("valTwo")
+ properties:
+ keyA: valA
+ keyB: valB
diff --git a/app/src/test/resources/workflows/condition-workflow-template-failure-second-task b/app/src/test/resources/workflows/condition-workflow-template-failure-second-task
new file mode 100644
index 0000000..38b454a
--- /dev/null
+++ b/app/src/test/resources/workflows/condition-workflow-template-failure-second-task
@@ -0,0 +1,25 @@
+# name and namespace is set as part of test case
+description: Workflow with conditions with three tasks out of which only second condition fails.
+tasks:
+ - name: taskOne
+ type: typeSuccess
+ properties:
+ keyA: valA
+ keyB: valB
+ - name: taskTwo
+ type: typeSuccess
+ dependsOn:
+ - taskOne
+ condition: taskOne.get("valOne") == 1234 && taskOne.get("valTwo") != 'abcd'
+ properties:
+ keyA: valA
+ keyB: valB
+ - name: taskThree
+ type: typeSuccess
+ dependsOn:
+ - taskTwo
+ - taskOne
+ condition: taskOne.get("valOne") + taskTwo.get("valOne") == 2468 || taskOne.get("valTwo") == taskTwo.get("valTwo")
+ properties:
+ keyA: valA
+ keyB: valB
diff --git a/app/src/test/resources/workflows/condition-workflow-template-success.yaml b/app/src/test/resources/workflows/condition-workflow-template-success.yaml
new file mode 100644
index 0000000..f181537
--- /dev/null
+++ b/app/src/test/resources/workflows/condition-workflow-template-success.yaml
@@ -0,0 +1,25 @@
+# name and namespace is set as part of test case
+description: workflow with all successful conditions
+tasks:
+ - name: taskOne
+ type: typeSuccess
+ properties:
+ keyA: valA
+ keyB: valB
+ - name: taskTwo
+ type: typeSuccess
+ dependsOn:
+ - taskOne
+ condition: taskOne.get("valOne") == 1234 && taskOne.get("valTwo") == 'abcd'
+ properties:
+ keyA: valA
+ keyB: valB
+ - name: taskThree
+ type: typeSuccess
+ dependsOn:
+ - taskTwo
+ - taskOne
+ condition: taskOne.get("valOne")-taskTwo.get("valOne") == 0 && taskOne.get("valTwo") == taskTwo.get("valTwo")
+ properties:
+ keyA: valA
+ keyB: valB
diff --git a/app/src/test/resources/workflows/workflow-template-failed-parallel-branch.yaml b/app/src/test/resources/workflows/workflow-template-failed-parallel-branch.yaml
new file mode 100644
index 0000000..ba7ea42
--- /dev/null
+++ b/app/src/test/resources/workflows/workflow-template-failed-parallel-branch.yaml
@@ -0,0 +1,34 @@
+tasks:
+ - name: A
+ type: typeSuccess
+ properties:
+ keyA: valA
+ keyB: valB
+ - name: B
+ type: typeFailure
+ properties:
+ keyA: valA
+ keyB: valB
+ dependsOn:
+ - A
+ - name: C
+ type: typeSuccess
+ properties:
+ keyA: valA
+ keyB: valB
+ dependsOn:
+ - B
+ - name: D
+ type: typeSuccess
+ properties:
+ keyA: valA
+ keyB: valB
+ dependsOn:
+ - A
+ - name: E
+ type: typeSuccess
+ properties:
+ keyA: valA
+ keyB: valB
+ dependsOn:
+ - D
\ No newline at end of file
diff --git a/common/pom.xml b/common/pom.xml
index 809471e..0e10388 100755
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -24,7 +24,7 @@
kronos
com.cognitree
- 3.0.0
+ 3.1.0
com.cognitree.kronos
diff --git a/common/src/main/java/com/cognitree/kronos/model/Messages.java b/common/src/main/java/com/cognitree/kronos/model/Messages.java
index 7dbf396..ecbe56b 100755
--- a/common/src/main/java/com/cognitree/kronos/model/Messages.java
+++ b/common/src/main/java/com/cognitree/kronos/model/Messages.java
@@ -26,4 +26,5 @@ public interface Messages {
String TASK_SCHEDULING_FAILED_MESSAGE = "error scheduling task for execution";
String TASK_ABORTED_MESSAGE = "task has been aborted";
String MISSING_TASK_HANDLER_MESSAGE = "failed to resolve handler for the task";
+ String TASK_SKIPPED_CONDITION_FAILS = "Task Condition failed";
}
diff --git a/common/src/main/java/com/cognitree/kronos/model/Task.java b/common/src/main/java/com/cognitree/kronos/model/Task.java
index 833660f..8b7b607 100755
--- a/common/src/main/java/com/cognitree/kronos/model/Task.java
+++ b/common/src/main/java/com/cognitree/kronos/model/Task.java
@@ -48,6 +48,8 @@ public class Task extends TaskId {
private Long submittedAt;
private Long completedAt;
private int retryCount = 0;
+ private String condition;
+
public String getType() {
return type;
@@ -145,6 +147,14 @@ public void setRetryCount(int retryCount) {
this.retryCount = retryCount;
}
+ public String getCondition() {
+ return condition;
+ }
+
+ public void setCondition(String condition) {
+ this.condition = condition;
+ }
+
@JsonIgnore
@BsonIgnore
public TaskId getIdentity() {
@@ -176,27 +186,33 @@ public String toString() {
", submittedAt=" + submittedAt +
", completedAt=" + completedAt +
", retryCount=" + retryCount +
- "} " + super.toString();
+ ", condition='" + condition + '\'' +
+ "}" + super.toString();
}
public enum Status {
- CREATED(false),
- WAITING(false),
- UP_FOR_RETRY(false),
- SCHEDULED(false),
- RUNNING(false),
- SUCCESSFUL(true),
- SKIPPED(true), // a task is marked as skipped it the task it depends on fails.
- FAILED(true),
- TIMED_OUT(true),
- ABORTED(true);
+ CREATED(false, 9),
+ WAITING(false, 8),
+ UP_FOR_RETRY(false, 2),
+ SCHEDULED(false, 7),
+ RUNNING(false, 6),
+ SUCCESSFUL(true, 0),
+ SKIPPED(true, 4), // a task is marked as skipped it the task it depends on fails.
+ FAILED(true, 1),
+ TIMED_OUT(true, 3),
+ ABORTED(true, 5);
private final boolean isFinal;
+ private final int order;
- Status(boolean isFinal) {
+ Status(boolean isFinal, int order) {
this.isFinal = isFinal;
+ this.order = order;
}
+ public int getOrder(){
+ return this.order;
+ }
public boolean isFinal() {
return this.isFinal;
}
diff --git a/executor/pom.xml b/executor/pom.xml
index 1bfe05c..1ec40c8 100755
--- a/executor/pom.xml
+++ b/executor/pom.xml
@@ -22,7 +22,7 @@
kronos
com.cognitree
- 3.0.0
+ 3.1.0
4.0.0
diff --git a/pom.xml b/pom.xml
old mode 100755
new mode 100644
index ac31796..22410ff
--- a/pom.xml
+++ b/pom.xml
@@ -26,7 +26,7 @@
kronos
kronos
pom
- 3.0.0
+ 3.1.0
1.8
@@ -34,8 +34,9 @@
UTF-8
1.7.12
- 2.9.9
+ 2.10.1
4.12
+ 2.11.1
@@ -65,18 +66,18 @@
${slf4j.version}
compile
-
- org.slf4j
- slf4j-log4j12
- ${slf4j.version}
- runtime
-
junit
junit
${junit.version}
test
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ ${log4j2.version}
+ runtime
+
diff --git a/scheduler/pom.xml b/scheduler/pom.xml
index a80a3b2..8bc09c8 100755
--- a/scheduler/pom.xml
+++ b/scheduler/pom.xml
@@ -22,7 +22,7 @@
kronos
com.cognitree
- 3.0.0
+ 3.1.0
4.0.0
@@ -34,6 +34,7 @@
23.5-jre
5.0.3
1.7
+ 19.2.0.1
@@ -62,6 +63,11 @@
velocity
${velocity.version}
+
+ org.graalvm.js
+ js
+ ${graalvm.version}
+
diff --git a/scheduler/src/main/java/com/cognitree/kronos/scheduler/TaskProvider.java b/scheduler/src/main/java/com/cognitree/kronos/scheduler/TaskProvider.java
index 55557f8..52866d2 100755
--- a/scheduler/src/main/java/com/cognitree/kronos/scheduler/TaskProvider.java
+++ b/scheduler/src/main/java/com/cognitree/kronos/scheduler/TaskProvider.java
@@ -25,22 +25,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static com.cognitree.kronos.model.Task.Status.FAILED;
-import static com.cognitree.kronos.model.Task.Status.RUNNING;
-import static com.cognitree.kronos.model.Task.Status.SUCCESSFUL;
-import static com.cognitree.kronos.model.Task.Status.UP_FOR_RETRY;
-import static com.cognitree.kronos.model.Task.Status.WAITING;
+import static com.cognitree.kronos.model.Task.Status.*;
/**
* Task provider manages/ resolves task dependencies and exposes APIs to add, remove, retrieve tasks in active and
diff --git a/scheduler/src/main/java/com/cognitree/kronos/scheduler/TaskSchedulerService.java b/scheduler/src/main/java/com/cognitree/kronos/scheduler/TaskSchedulerService.java
index 29e4a4c..d096187 100755
--- a/scheduler/src/main/java/com/cognitree/kronos/scheduler/TaskSchedulerService.java
+++ b/scheduler/src/main/java/com/cognitree/kronos/scheduler/TaskSchedulerService.java
@@ -20,51 +20,30 @@
import com.cognitree.kronos.Service;
import com.cognitree.kronos.ServiceException;
import com.cognitree.kronos.ServiceProvider;
-import com.cognitree.kronos.model.ControlMessage;
-import com.cognitree.kronos.model.Messages;
-import com.cognitree.kronos.model.Task;
+import com.cognitree.kronos.model.*;
import com.cognitree.kronos.model.Task.Action;
import com.cognitree.kronos.model.Task.Status;
-import com.cognitree.kronos.model.TaskId;
-import com.cognitree.kronos.model.TaskStatusUpdate;
import com.cognitree.kronos.queue.QueueService;
import com.cognitree.kronos.queue.producer.Producer;
import com.cognitree.kronos.scheduler.model.Namespace;
+import com.cognitree.kronos.scheduler.model.Workflow;
+import com.cognitree.kronos.scheduler.model.WorkflowId;
+import org.graalvm.polyglot.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import static com.cognitree.kronos.model.Messages.ABORTED_DEPENDEE_TASK_MESSAGE;
-import static com.cognitree.kronos.model.Messages.FAILED_DEPENDEE_TASK_MESSAGE;
-import static com.cognitree.kronos.model.Messages.FAILED_TO_RESOLVE_DEPENDENCY_MESSAGE;
-import static com.cognitree.kronos.model.Messages.SKIPPED_DEPENDEE_TASK_MESSAGE;
-import static com.cognitree.kronos.model.Messages.TASK_SCHEDULING_FAILED_MESSAGE;
-import static com.cognitree.kronos.model.Messages.TIMED_OUT_EXECUTING_TASK_MESSAGE;
-import static com.cognitree.kronos.model.Task.Status.ABORTED;
-import static com.cognitree.kronos.model.Task.Status.CREATED;
-import static com.cognitree.kronos.model.Task.Status.FAILED;
-import static com.cognitree.kronos.model.Task.Status.SCHEDULED;
-import static com.cognitree.kronos.model.Task.Status.SKIPPED;
-import static com.cognitree.kronos.model.Task.Status.TIMED_OUT;
-import static com.cognitree.kronos.model.Task.Status.UP_FOR_RETRY;
-import static com.cognitree.kronos.model.Task.Status.WAITING;
+import static com.cognitree.kronos.model.Messages.*;
+import static com.cognitree.kronos.model.Task.Status.*;
import static com.cognitree.kronos.queue.QueueService.SCHEDULER_QUEUE;
import static com.cognitree.kronos.scheduler.model.Constants.DYNAMIC_VAR_PREFIX;
import static com.cognitree.kronos.scheduler.model.Constants.DYNAMIC_VAR_SUFFFIX;
import static java.util.Comparator.comparing;
-import static java.util.concurrent.TimeUnit.HOURS;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.concurrent.TimeUnit.*;
/**
* A task scheduler service resolves dependency via {@link TaskProvider} for each submitted task and
@@ -82,6 +61,8 @@ final class TaskSchedulerService implements Service {
// task purge interval in hour
private static final int TASK_PURGE_INTERVAL = 1;
private static final List NON_FINAL_TASK_STATUS_LIST = new ArrayList<>();
+ private static final String CONDITION_LANG_ID = "js";
+ private static final String WORKFLOW = "workflow";
static {
for (Status status : Status.values()) {
@@ -249,6 +230,48 @@ private void resolve(Task task) {
}
}
+ /**
+ * Evaluates the condition of the task if there exists a condition and dependsOn
+ *
+ * @return True if condtion is satisfied else false
+ */
+ private boolean evaluateCondition(Task task) {
+ final List dependsOn = task.getDependsOn();
+ String namespace = task.getNamespace();
+ String workflowName = task.getWorkflow();
+ Map workflowProperties = new HashMap<>();
+ try {
+ final Workflow workflow = WorkflowService.getService().get(WorkflowId.build(namespace, workflowName));
+ workflowProperties = workflow.getProperties();
+ } catch (ServiceException | ValidationException e) {
+ logger.error("Error while getting {} from WorkflowService", workflowName, e);
+ }
+ if (dependsOn != null && task.getCondition() != null) {
+ try (Context context = Context.newBuilder()
+ .allowAllAccess(true)
+ .build()) {
+ context.getBindings(CONDITION_LANG_ID).putMember(WORKFLOW, workflowProperties);
+ for (String dependentTaskName : dependsOn) {
+ TaskId dependentTaskId = TaskId.build(task.getNamespace(), dependentTaskName, task.getJob(),
+ task.getWorkflow());
+ Task dependentTask = taskProvider.getTask(dependentTaskId);
+ if (dependentTask != null) {
+ Map dependentTaskContext = dependentTask.getContext();
+ context.getBindings(CONDITION_LANG_ID).putMember(dependentTaskName, dependentTaskContext);
+ }
+ }
+ boolean valid = context.eval(CONDITION_LANG_ID, task.getCondition()).asBoolean();
+ if (!valid) {
+ return false;
+ }
+ } catch (Exception e) {
+ logger.error("Failed to evaluate the condition", e);
+ return false;
+ }
+ }
+ return true;
+ }
+
private void updateStatus(TaskId taskId, Status status, String statusMessage) {
updateStatus(taskId, status, statusMessage, null);
}
@@ -276,7 +299,6 @@ private void updateStatus(TaskId taskId, Status status, String statusMessage,
private void handleTaskStatusChange(Task task) {
switch (task.getStatus()) {
case CREATED:
- break;
case SCHEDULED:
break;
case RUNNING:
@@ -330,13 +352,17 @@ private synchronized void scheduleReadyTasks() {
for (Task task : readyTasks) {
logger.info("Scheduling task {} for execution", task);
try {
- // update dynamic task properties from the tasks it depends on before scheduling
- // only if the task is not being retried
- if (task.getStatus() != UP_FOR_RETRY) {
- updateTaskProperties(task);
+ if (evaluateCondition(task)) {
+ // update dynamic task properties from the tasks it depends on before scheduling
+ // only if the task is not being retried
+ if (task.getStatus() != UP_FOR_RETRY) {
+ updateTaskProperties(task);
+ }
+ QueueService.getService(SCHEDULER_QUEUE).send(task);
+ updateStatus(task.getIdentity(), SCHEDULED, null);
+ } else {
+ updateStatus(task.getIdentity(), SKIPPED, TASK_SKIPPED_CONDITION_FAILS);
}
- QueueService.getService(SCHEDULER_QUEUE).send(task);
- updateStatus(task.getIdentity(), SCHEDULED, null);
} catch (ServiceException e) {
logger.error("Error scheduling task {} for execution", task.getIdentity(), e);
updateStatus(task.getIdentity(), FAILED, TASK_SCHEDULING_FAILED_MESSAGE);
diff --git a/scheduler/src/main/java/com/cognitree/kronos/scheduler/TaskService.java b/scheduler/src/main/java/com/cognitree/kronos/scheduler/TaskService.java
index ac2ebfa..710f755 100644
--- a/scheduler/src/main/java/com/cognitree/kronos/scheduler/TaskService.java
+++ b/scheduler/src/main/java/com/cognitree/kronos/scheduler/TaskService.java
@@ -123,15 +123,15 @@ Task create(String namespace, WorkflowTask workflowTask, String jobId, String wo
workflowTask, jobId, workflowName, namespace);
validateJob(namespace, jobId, workflowName);
Task task = new Task();
- task.setName(UUID.randomUUID().toString());
task.setJob(jobId);
task.setWorkflow(workflowName);
- task.setName(workflowTask.getName());
+ task.setName(workflowTask.getName() != null ? workflowTask.getName() : UUID.randomUUID().toString());
task.setNamespace(namespace);
task.setType(workflowTask.getType());
task.setPolicies(workflowTask.getPolicies());
task.setMaxExecutionTimeInMs(workflowTask.getMaxExecutionTimeInMs());
task.setDependsOn(workflowTask.getDependsOn());
+ task.setCondition(workflowTask.getCondition());
final Map taskProperties = modifyAndGetTaskProperties(workflowTask.getProperties(), workflowProperties);
task.setProperties(taskProperties);
task.setCreatedAt(System.currentTimeMillis());
diff --git a/scheduler/src/main/java/com/cognitree/kronos/scheduler/model/Workflow.java b/scheduler/src/main/java/com/cognitree/kronos/scheduler/model/Workflow.java
index 213cebd..9c27b5e 100755
--- a/scheduler/src/main/java/com/cognitree/kronos/scheduler/model/Workflow.java
+++ b/scheduler/src/main/java/com/cognitree/kronos/scheduler/model/Workflow.java
@@ -24,11 +24,7 @@
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.bson.codecs.pojo.annotations.BsonIgnore;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
@JsonSerialize(as = Workflow.class)
@JsonDeserialize(as = Workflow.class)
@@ -120,6 +116,7 @@ public static class WorkflowTask {
private String name;
private String type;
+ private String condition;
private List dependsOn = new ArrayList<>();
private Map properties = new HashMap<>();
private List policies = new ArrayList<>();
@@ -143,6 +140,14 @@ public void setType(String type) {
this.type = type;
}
+ public String getCondition() {
+ return condition;
+ }
+
+ public void setCondition(String condition) {
+ this.condition = condition;
+ }
+
public List getDependsOn() {
return dependsOn;
}
@@ -186,12 +191,13 @@ public void setEnabled(boolean enabled) {
@Override
public boolean equals(Object o) {
if (this == o) return true;
- if (!(o instanceof WorkflowTask)) return false;
+ if (o == null || getClass() != o.getClass()) return false;
WorkflowTask that = (WorkflowTask) o;
return maxExecutionTimeInMs == that.maxExecutionTimeInMs &&
enabled == that.enabled &&
Objects.equals(name, that.name) &&
Objects.equals(type, that.type) &&
+ Objects.equals(condition, that.condition) &&
Objects.equals(dependsOn, that.dependsOn) &&
Objects.equals(properties, that.properties) &&
Objects.equals(policies, that.policies);
@@ -199,7 +205,7 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
- return Objects.hash(name, type, dependsOn, properties, policies, maxExecutionTimeInMs, enabled);
+ return Objects.hash(name, type, condition, dependsOn, properties, policies, maxExecutionTimeInMs, enabled);
}
@Override
@@ -207,6 +213,7 @@ public String toString() {
return "WorkflowTask{" +
"name='" + name + '\'' +
", type='" + type + '\'' +
+ ", condition='" + condition + '\'' +
", dependsOn=" + dependsOn +
", properties=" + properties +
", policies=" + policies +
diff --git a/scheduler/src/test/resources/log4j.properties b/scheduler/src/test/resources/log4j.properties
deleted file mode 100755
index 9140ad7..0000000
--- a/scheduler/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-log4j.rootLogger=INFO, STDOUT
-log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender
-log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout
-log4j.appender.STDOUT.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %5p [%t] (%F:%L) - %m%n
\ No newline at end of file
diff --git a/scheduler/src/test/resources/log4j2.properties b/scheduler/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..cf38e8a
--- /dev/null
+++ b/scheduler/src/test/resources/log4j2.properties
@@ -0,0 +1,13 @@
+name = Log4j2Config
+status = error
+monitorInterval = 10
+
+appender.console.type = Console
+appender.console.name = STDOUT
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %5p [%t] (%F:%L) - %m%n
+appender.console.immediateFlush=true
+
+rootLogger.level = INFO
+rootLogger.appenderRefs = stdout
+rootLogger.appenderRef.stdout.ref = STDOUT
\ No newline at end of file
diff --git a/scheduler/src/test/resources/workflow.yaml b/scheduler/src/test/resources/workflow.yaml
index a793e08..654713f 100644
--- a/scheduler/src/test/resources/workflow.yaml
+++ b/scheduler/src/test/resources/workflow.yaml
@@ -13,6 +13,7 @@ tasks:
keyB: valB
dependsOn:
- taskOne
+ condition: 2 == 2
- name: taskThree
type: typeSuccess
properties: