Skip to content

Commit 9f56628

Browse files
authored
[Fix serverlessworkflow#1107] Adding workflow status change event (serverlessworkflow#1108)
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 600dee1 commit 9f56628

7 files changed

Lines changed: 143 additions & 22 deletions

File tree

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public class WorkflowApplication implements AutoCloseable {
7070
private final EventConsumer<?, ?> eventConsumer;
7171
private final Collection<EventPublisher> eventPublishers;
7272
private final boolean lifeCycleCEPublishingEnabled;
73+
private final boolean lifeCycleStatusChangeEnabled;
7374
private final WorkflowModelFactory modelFactory;
7475
private final WorkflowModelFactory contextFactory;
7576
private final WorkflowScheduler scheduler;
@@ -95,6 +96,7 @@ private WorkflowApplication(Builder builder) {
9596
this.eventConsumer = builder.eventConsumer;
9697
this.eventPublishers = builder.eventPublishers;
9798
this.lifeCycleCEPublishingEnabled = builder.lifeCycleCEPublishingEnabled;
99+
this.lifeCycleStatusChangeEnabled = builder.lifeCycleStatusChangeEnabled;
98100
this.modelFactory = builder.modelFactory;
99101
this.contextFactory = builder.contextFactory;
100102
this.scheduler = builder.scheduler;
@@ -179,6 +181,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
179181
private RuntimeDescriptorFactory descriptorFactory =
180182
() -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap());
181183
private boolean lifeCycleCEPublishingEnabled = true;
184+
private boolean lifeCycleStatusChangeEnabled = true;
182185
private WorkflowModelFactory modelFactory;
183186
private WorkflowModelFactory contextFactory;
184187
private Map<String, WorkflowAdditionalObject<?>> additionalObjects = new HashMap<>();
@@ -224,6 +227,11 @@ public Builder disableLifeCycleCEPublishing() {
224227
return this;
225228
}
226229

230+
public Builder disableStatusChangePublishing() {
231+
this.lifeCycleStatusChangeEnabled = false;
232+
return this;
233+
}
234+
227235
public Builder withExecutorFactory(ExecutorServiceFactory executorFactory) {
228236
this.executorFactory = executorFactory;
229237
return this;
@@ -421,6 +429,10 @@ public boolean isLifeCycleCEPublishingEnabled() {
421429
return lifeCycleCEPublishingEnabled;
422430
}
423431

432+
public boolean isStatusChangePublishingEnabled() {
433+
return lifeCycleStatusChangeEnabled;
434+
}
435+
424436
public WorkflowScheduler scheduler() {
425437
return scheduler;
426438
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
2424
import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent;
2525
import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent;
26+
import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent;
2627
import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent;
2728
import java.time.Instant;
2829
import java.util.Map;
@@ -38,7 +39,7 @@
3839

3940
public class WorkflowMutableInstance implements WorkflowInstance {
4041

41-
protected final AtomicReference<WorkflowStatus> status;
42+
private final AtomicReference<WorkflowStatus> status;
4243
protected final String id;
4344
protected final WorkflowModel input;
4445

@@ -75,7 +76,7 @@ protected final CompletableFuture<WorkflowModel> startExecution(Runnable runnabl
7576
if (future != null) {
7677
return future;
7778
}
78-
status.set(WorkflowStatus.RUNNING);
79+
status(WorkflowStatus.RUNNING);
7980
runnable.run();
8081
future =
8182
TaskExecutorHelper.processTaskList(
@@ -106,7 +107,7 @@ private void whenCompleted(WorkflowModel result, Throwable ex) {
106107

107108
private void handleException(Throwable ex) {
108109
if (!(ex instanceof CancellationException)) {
109-
status.set(WorkflowStatus.FAULTED);
110+
status(WorkflowStatus.FAULTED);
110111
publishEvent(
111112
workflowContext, l -> l.onWorkflowFailed(new WorkflowFailedEvent(workflowContext, ex)));
112113
}
@@ -120,7 +121,7 @@ private WorkflowModel whenSuccess(WorkflowModel node) {
120121
.map(f -> f.apply(workflowContext, null, node))
121122
.orElse(node);
122123
workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(output));
123-
status.set(WorkflowStatus.COMPLETED);
124+
status(WorkflowStatus.COMPLETED);
124125
publishEvent(
125126
workflowContext,
126127
l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext, output)));
@@ -177,7 +178,14 @@ public <T> T outputAs(Class<T> clazz) {
177178
}
178179

179180
public void status(WorkflowStatus state) {
180-
this.status.set(state);
181+
WorkflowStatus prevState = this.status.getAndSet(state);
182+
if (prevState != state) {
183+
publishEvent(
184+
workflowContext,
185+
l ->
186+
l.onWorkflowStatusChanged(
187+
new WorkflowStatusEvent(workflowContext, prevState, state)));
188+
}
181189
}
182190

183191
@Override
@@ -213,7 +221,7 @@ public boolean suspend() {
213221

214222
protected final void internalSuspend() {
215223
suspended = new ConcurrentHashMap<>();
216-
status.set(WorkflowStatus.SUSPENDED);
224+
status(WorkflowStatus.SUSPENDED);
217225
}
218226

219227
@Override
@@ -259,7 +267,7 @@ public CompletableFuture<TaskContext> suspendedCheck(TaskContext t) {
259267
suspended.put(suspendedTask, t);
260268
return suspendedTask;
261269
} else if (TaskExecutorHelper.isActive(status.get())) {
262-
status.set(WorkflowStatus.RUNNING);
270+
status(WorkflowStatus.RUNNING);
263271
}
264272
} finally {
265273
statusLock.unlock();
@@ -272,7 +280,7 @@ public boolean cancel() {
272280
try {
273281
statusLock.lock();
274282
if (TaskExecutorHelper.isActive(status.get())) {
275-
status.set(WorkflowStatus.CANCELLED);
283+
status(WorkflowStatus.CANCELLED);
276284
publishEvent(
277285
workflowContext,
278286
l -> l.onWorkflowCancelled(new WorkflowCancelledEvent(workflowContext)));

impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListener.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ default void onTaskResumed(TaskResumedEvent ev) {}
4343

4444
default void onTaskRetried(TaskRetriedEvent ev) {}
4545

46+
default void onWorkflowStatusChanged(WorkflowStatusEvent ev) {}
47+
4648
@Override
4749
default void close() {}
4850
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.lifecycle;
17+
18+
import io.serverlessworkflow.impl.WorkflowContextData;
19+
import io.serverlessworkflow.impl.WorkflowStatus;
20+
21+
public class WorkflowStatusEvent extends WorkflowEvent {
22+
23+
private final WorkflowStatus status;
24+
private final WorkflowStatus prevStatus;
25+
26+
public WorkflowStatusEvent(
27+
WorkflowContextData workflow, WorkflowStatus prevStatus, WorkflowStatus status) {
28+
super(workflow);
29+
this.status = status;
30+
this.prevStatus = prevStatus;
31+
}
32+
33+
public WorkflowStatus status() {
34+
return status;
35+
}
36+
37+
public WorkflowStatus previousStatus() {
38+
return prevStatus;
39+
}
40+
}

impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
4242
import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent;
4343
import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent;
44+
import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent;
4445
import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent;
4546
import java.time.OffsetDateTime;
4647
import java.util.Collection;
@@ -63,6 +64,8 @@ public abstract class AbstractLifeCyclePublisher implements WorkflowExecutionLis
6364
private static final String WORKFLOW_RESUMED = "io.serverlessworkflow.workflow.resumed.v1";
6465
private static final String WORKFLOW_FAULTED = "io.serverlessworkflow.workflow.faulted.v1";
6566
private static final String WORKFLOW_CANCELLED = "io.serverlessworkflow.workflow.cancelled.v1";
67+
private static final String WORKFLOW_STATUS_CHANGED =
68+
"io.serverlessworkflow.workflow.status-changed.v1";
6669

6770
public static Collection<String> getLifeCycleTypes() {
6871
return Set.of(
@@ -78,7 +81,8 @@ public static Collection<String> getLifeCycleTypes() {
7881
WORKFLOW_SUSPENDED,
7982
WORKFLOW_RESUMED,
8083
WORKFLOW_FAULTED,
81-
WORKFLOW_CANCELLED);
84+
WORKFLOW_CANCELLED,
85+
WORKFLOW_STATUS_CHANGED);
8286
}
8387

8488
@Override
@@ -263,6 +267,23 @@ public void onWorkflowFailed(WorkflowFailedEvent event) {
263267
.build());
264268
}
265269

270+
@Override
271+
public void onWorkflowStatusChanged(WorkflowStatusEvent event) {
272+
if (appl(event).isStatusChangePublishingEnabled()) {
273+
publish(
274+
event,
275+
ev ->
276+
builder()
277+
.withData(
278+
cloudEventData(
279+
new WorkflowStatusCEDataEvent(
280+
id(ev), ref(ev), ev.eventDate(), ev.status().toString()),
281+
this::convert))
282+
.withType(WORKFLOW_STATUS_CHANGED)
283+
.build());
284+
}
285+
}
286+
266287
protected byte[] convert(WorkflowStartedCEData data) {
267288
return convertToBytes(data);
268289
}
@@ -315,10 +336,14 @@ protected byte[] convert(TaskResumedCEData data) {
315336
return convertToBytes(data);
316337
}
317338

339+
protected byte[] convert(WorkflowStatusCEDataEvent data) {
340+
return convertToBytes(data);
341+
}
342+
318343
protected abstract <T> byte[] convertToBytes(T data);
319344

320345
protected <T extends WorkflowEvent> void publish(T ev, Function<T, CloudEvent> ceFunction) {
321-
WorkflowApplication appl = ev.workflowContext().definition().application();
346+
WorkflowApplication appl = appl(ev);
322347
if (appl.isLifeCycleCEPublishingEnabled()) {
323348
publish(appl, ceFunction.apply(ev));
324349
}
@@ -342,6 +367,10 @@ private static CloudEventBuilder builder() {
342367
.withTime(OffsetDateTime.now());
343368
}
344369

370+
private static WorkflowApplication appl(WorkflowEvent ev) {
371+
return ev.workflowContext().definition().application();
372+
}
373+
345374
private static String id(WorkflowEvent ev) {
346375
return ev.workflowContext().instanceData().id();
347376
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.lifecycle.ce;
17+
18+
import java.time.OffsetDateTime;
19+
20+
public record WorkflowStatusCEDataEvent(
21+
String name, WorkflowDefinitionCEData definition, OffsetDateTime updatetAt, String status) {}

0 commit comments

Comments
 (0)