Skip to content

Commit ba1b278

Browse files
fjtiradomatheusandre1
authored andcommitted
Review comments
Signed-off-by: Matheus André <matheusandr2@gmail.com>
1 parent 1b295be commit ba1b278

9 files changed

Lines changed: 142 additions & 52 deletions

File tree

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,6 @@ public interface WorkflowInstance extends WorkflowInstanceData {
5252
boolean resume();
5353

5454
<T> T addMetadataIfAbsent(String key, Supplier<T> supplier);
55+
56+
void removeMetadata(String key);
5557
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,8 +355,9 @@ public <T> T addMetadataIfAbsent(String key, Supplier<T> supplier) {
355355
return (T) additionalObjects.computeIfAbsent(key, k -> supplier.get());
356356
}
357357

358-
public Object computeCorrelationValue(String key, Object value) {
359-
return additionalObjects.computeIfAbsent(key, k -> value);
358+
@Override
359+
public void removeMetadata(String key) {
360+
additionalObjects.remove(key);
360361
}
361362

362363
@Override

impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import io.serverlessworkflow.impl.WorkflowApplication;
2525
import io.serverlessworkflow.impl.WorkflowContext;
2626
import io.serverlessworkflow.impl.WorkflowModel;
27-
import io.serverlessworkflow.impl.WorkflowModelFactory;
2827
import java.util.AbstractCollection;
2928
import java.util.ArrayList;
3029
import java.util.Collection;
@@ -42,6 +41,8 @@ public abstract class AbstractTypeConsumer
4241

4342
private static final Logger logger = LoggerFactory.getLogger(AbstractTypeConsumer.class);
4443

44+
private static final CloudEventPredicate ALWAYS_TRUE = (ce, wf, t) -> true;
45+
4546
protected abstract void registerToAll(Consumer<CloudEvent> consumer);
4647

4748
protected abstract void unregisterFromAll();
@@ -61,8 +62,7 @@ public TypeEventRegistrationBuilder listen(
6162
application.cloudEventPredicateFactory().build(application, properties);
6263
Collection<CloudEventPredicate> correlationPredicates =
6364
buildCorrelationPredicates(register.getCorrelate(), application);
64-
return new TypeEventRegistrationBuilder(
65-
type, cePredicate, correlationPredicates, application.modelFactory());
65+
return new TypeEventRegistrationBuilder(type, cePredicate, correlationPredicates);
6666
}
6767

6868
private Collection<CloudEventPredicate> buildCorrelationPredicates(
@@ -83,43 +83,40 @@ private Collection<CloudEventPredicate> buildCorrelationPredicates(
8383

8484
@Override
8585
public Collection<TypeEventRegistrationBuilder> listenToAll(WorkflowApplication application) {
86-
return List.of(
87-
new TypeEventRegistrationBuilder(null, null, List.of(), application.modelFactory()));
86+
return List.of(new TypeEventRegistrationBuilder(null, ALWAYS_TRUE, List.of()));
8887
}
8988

9089
private static class CloudEventConsumer extends AbstractCollection<TypeEventRegistration>
9190
implements Consumer<CloudEvent> {
92-
private final WorkflowModelFactory modelFactory;
9391
private Collection<TypeEventRegistration> registrations = new CopyOnWriteArrayList<>();
9492

95-
CloudEventConsumer(WorkflowModelFactory modelFactory) {
96-
this.modelFactory = modelFactory;
97-
}
98-
9993
@Override
10094
public void accept(CloudEvent ce) {
10195
logger.debug("Received cloud event {}", ce);
96+
WorkflowModel eventModel = null;
10297
for (TypeEventRegistration registration : registrations) {
103-
if (registration.predicate().test(ce, registration.workflow(), registration.task())) {
104-
if (!testCorrelation(ce, registration)) {
98+
if (!registration.predicate().test(ce, registration.workflow(), registration.task())) {
99+
continue;
100+
}
101+
Collection<CloudEventPredicate> correlationPredicates =
102+
registration.correlationPredicates();
103+
if (!correlationPredicates.isEmpty()) {
104+
if (eventModel == null && registration.hasModelAwareCorrelation()) {
105+
eventModel = registration.workflow().definition().application().modelFactory().from(ce);
106+
}
107+
if (!testCorrelation(ce, registration, eventModel)) {
105108
continue;
106109
}
107-
registration.consumer().accept(ce);
108110
}
111+
registration.consumer().accept(ce);
109112
}
110113
}
111114

112-
private boolean testCorrelation(CloudEvent ce, TypeEventRegistration registration) {
115+
private boolean testCorrelation(
116+
CloudEvent ce, TypeEventRegistration registration, WorkflowModel eventModel) {
113117
Collection<CloudEventPredicate> predicates = registration.correlationPredicates();
114-
if (predicates.isEmpty()) {
115-
return true;
116-
}
117-
WorkflowModel eventModel = null;
118118
for (CloudEventPredicate pred : predicates) {
119119
if (pred instanceof ModelAwareCloudEventPredicate ma) {
120-
if (eventModel == null) {
121-
eventModel = modelFactory.from(ce);
122-
}
123120
if (!ma.test(eventModel, registration.workflow(), registration.task())) {
124121
return false;
125122
}
@@ -161,7 +158,7 @@ public TypeEventRegistration register(
161158
TaskContext task) {
162159
if (builder.type() == null) {
163160
registerToAll(ce);
164-
return new TypeEventRegistration(null, ce, null, workflow, task);
161+
return new TypeEventRegistration(null, ce, ALWAYS_TRUE, workflow, task);
165162
} else {
166163
TypeEventRegistration registration =
167164
new TypeEventRegistration(
@@ -175,7 +172,7 @@ public TypeEventRegistration register(
175172
.computeIfAbsent(
176173
registration.type(),
177174
k -> {
178-
CloudEventConsumer consumer = new CloudEventConsumer(builder.modelFactory());
175+
CloudEventConsumer consumer = new CloudEventConsumer();
179176
register(k, consumer);
180177
return consumer;
181178
})
@@ -201,5 +198,15 @@ public void unregister(TypeEventRegistration registration) {
201198
}
202199
});
203200
}
201+
cleanupCorrelationState(registration);
202+
}
203+
204+
private void cleanupCorrelationState(TypeEventRegistration registration) {
205+
for (CloudEventPredicate pred : registration.correlationPredicates()) {
206+
if (pred instanceof CorrelationPredicate cp) {
207+
cp.stateKey(registration.task())
208+
.ifPresent(key -> registration.workflow().instance().removeMetadata(key));
209+
}
210+
}
204211
}
205212
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/CorrelationPredicate.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.serverlessworkflow.impl.WorkflowValueResolver;
2525
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
2626
import java.util.Objects;
27+
import java.util.Optional;
2728
import org.slf4j.Logger;
2829
import org.slf4j.LoggerFactory;
2930

@@ -56,7 +57,16 @@ public static CorrelationPredicate from(
5657
}
5758

5859
private String correlationStateKey(TaskContext task) {
59-
return "correlation:" + task.position().jsonPointer() + ":" + correlationKey;
60+
return "correlation:"
61+
+ task.position().jsonPointer()
62+
+ ":"
63+
+ task.iteration()
64+
+ ":"
65+
+ correlationKey;
66+
}
67+
68+
Optional<String> stateKey(TaskContext task) {
69+
return expectResolver == null ? Optional.of(correlationStateKey(task)) : Optional.empty();
6070
}
6171

6272
@Override
@@ -75,7 +85,7 @@ public boolean test(WorkflowModel eventModel, WorkflowContext workflow, TaskCont
7585

7686
if (expectResolver == null) {
7787
String stateKey = correlationStateKey(task);
78-
Object firstValue = workflow.instance().computeCorrelationValue(stateKey, eventValue);
88+
Object firstValue = workflow.instance().addMetadataIfAbsent(stateKey, () -> eventValue);
7989
boolean result = Objects.equals(eventValue, firstValue);
8090
logger.debug(
8191
"Correlation no expect, eventValue='{}', firstValue='{}', match={}",

impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistration.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,28 @@ public record TypeEventRegistration(
2727
Consumer<CloudEvent> consumer,
2828
CloudEventPredicate predicate,
2929
Collection<CloudEventPredicate> correlationPredicates,
30+
boolean hasModelAwareCorrelation,
3031
WorkflowContext workflow,
3132
TaskContext task)
3233
implements EventRegistration {
3334

35+
public TypeEventRegistration(
36+
String type,
37+
Consumer<CloudEvent> consumer,
38+
CloudEventPredicate predicate,
39+
Collection<CloudEventPredicate> correlationPredicates,
40+
WorkflowContext workflow,
41+
TaskContext task) {
42+
this(
43+
type,
44+
consumer,
45+
predicate,
46+
correlationPredicates,
47+
correlationPredicates.stream().anyMatch(ModelAwareCloudEventPredicate.class::isInstance),
48+
workflow,
49+
task);
50+
}
51+
3452
public TypeEventRegistration(
3553
String type,
3654
Consumer<CloudEvent> consumer,

impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistrationBuilder.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,16 @@
1515
*/
1616
package io.serverlessworkflow.impl.events;
1717

18-
import io.serverlessworkflow.impl.WorkflowModelFactory;
1918
import java.util.Collection;
2019
import java.util.Collections;
2120

2221
public record TypeEventRegistrationBuilder(
2322
String type,
2423
CloudEventPredicate cePredicate,
25-
Collection<CloudEventPredicate> correlationPredicates,
26-
WorkflowModelFactory modelFactory)
24+
Collection<CloudEventPredicate> correlationPredicates)
2725
implements EventRegistrationBuilder {
2826

2927
public TypeEventRegistrationBuilder(String type, CloudEventPredicate cePredicate) {
30-
this(type, cePredicate, Collections.emptyList(), null);
28+
this(type, cePredicate, Collections.emptyList());
3129
}
3230
}

impl/test/src/test/java/io/serverlessworkflow/impl/test/CorrelationTest.java

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import java.util.stream.Stream;
4343
import org.junit.jupiter.api.AfterAll;
4444
import org.junit.jupiter.api.BeforeAll;
45-
import org.junit.jupiter.api.Test;
4645
import org.junit.jupiter.params.ParameterizedTest;
4746
import org.junit.jupiter.params.provider.Arguments;
4847
import org.junit.jupiter.params.provider.MethodSource;
@@ -83,8 +82,11 @@ void testCorrelateMatch(String sourceName, Workflow workflow) throws Exception {
8382
Map.of("patientId", "P123", "name", "John"))));
8483

8584
WorkflowModel result = future.get(2, TimeUnit.SECONDS);
86-
List<Object> output = (List<Object>) JsonUtils.toJavaValue(JsonUtils.modelToJson(result));
85+
Object outputValue = JsonUtils.toJavaValue(JsonUtils.modelToJson(result));
86+
assertThat(outputValue).isInstanceOf(List.class);
87+
List<?> output = (List<?>) outputValue;
8788
assertThat(output).hasSize(1);
89+
assertThat(output.get(0)).isInstanceOf(Map.class);
8890
Map<String, Object> eventData = (Map<String, Object>) output.get(0);
8991
assertThat(eventData).containsEntry("patientId", "P123");
9092
assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED);
@@ -96,11 +98,6 @@ void testCorrelateNoMatch(String sourceName, Workflow workflow) throws Exception
9698
assertCorrelateNoMatch(workflow);
9799
}
98100

99-
@Test
100-
void testCorrelateNoMatchDsl() throws Exception {
101-
assertCorrelateNoMatch(listenCorrelateWorkflow());
102-
}
103-
104101
private static Stream<Arguments> correlateWorkflowSources() throws IOException {
105102
return Stream.of(
106103
readWorkflowFromClasspath("workflows-samples/listen-correlate.yaml"),
@@ -110,7 +107,7 @@ private static Stream<Arguments> correlateWorkflowSources() throws IOException {
110107

111108
private static Workflow listenCorrelateWorkflow() {
112109
return WorkflowBuilder.workflow("listen-correlate-java-dsl", "test", "0.1.0")
113-
.input(i -> i.from("{ patientId: .patientId }"))
110+
.input(i -> i.from("{ id: .patientId }"))
114111
.tasks(
115112
doTasks(
116113
listen(
@@ -127,9 +124,7 @@ private static Workflow listenCorrelateWorkflow() {
127124
"com.example.hospital.patient.admitted"))
128125
.correlate(
129126
"patientId",
130-
cp ->
131-
cp.from(".data.patientId")
132-
.expect(".patientId")))))))
127+
cp -> cp.from(".data.patientId").expect(".id")))))))
133128
.build();
134129
}
135130

@@ -192,9 +187,16 @@ private static Workflow listenCorrelateNoExpectWorkflow() {
192187
.build();
193188
}
194189

195-
@Test
196-
void testCorrelateNoExpectMatch() throws Exception {
197-
Workflow workflow = listenCorrelateNoExpectWorkflow();
190+
private static Stream<Arguments> correlateNoExpectWorkflowSources() throws IOException {
191+
return Stream.of(
192+
readWorkflowFromClasspath("workflows-samples/listen-correlate-no-expect.yaml"),
193+
listenCorrelateNoExpectWorkflow())
194+
.map(wf -> Arguments.of(wf.getDocument().getName(), wf));
195+
}
196+
197+
@ParameterizedTest(name = "{0}")
198+
@MethodSource("correlateNoExpectWorkflowSources")
199+
void testCorrelateNoExpectMatch(String sourceName, Workflow workflow) throws Exception {
198200
WorkflowDefinition def = appl.workflowDefinition(workflow);
199201
WorkflowInstance instance = def.instance();
200202
CompletableFuture<WorkflowModel> future = instance.start();
@@ -213,16 +215,20 @@ void testCorrelateNoExpectMatch() throws Exception {
213215
Map.of("patientId", "P123", "name", "John"))));
214216

215217
WorkflowModel result = future.get(2, TimeUnit.SECONDS);
216-
List<Object> output = (List<Object>) JsonUtils.toJavaValue(JsonUtils.modelToJson(result));
218+
Object outputValue = JsonUtils.toJavaValue(JsonUtils.modelToJson(result));
219+
assertThat(outputValue).isInstanceOf(List.class);
220+
List<?> output = (List<?>) outputValue;
217221
assertThat(output).hasSize(1);
222+
assertThat(output.get(0)).isInstanceOf(Map.class);
218223
Map<String, Object> eventData = (Map<String, Object>) output.get(0);
219224
assertThat(eventData).containsEntry("patientId", "P123");
220225
assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED);
221226
}
222227

223-
@Test
224-
void testCorrelateNoExpectNoMatch() throws Exception {
225-
Workflow workflow = listenCorrelateNoExpectWorkflow();
228+
@ParameterizedTest(name = "{0}")
229+
@MethodSource("correlateNoExpectWorkflowSources")
230+
void testCorrelateNoExpectMismatchThenMatch(String sourceName, Workflow workflow)
231+
throws Exception {
226232
WorkflowDefinition def = appl.workflowDefinition(workflow);
227233
WorkflowInstance instance = def.instance();
228234
CompletableFuture<WorkflowModel> future = instance.start();
@@ -239,6 +245,18 @@ void testCorrelateNoExpectNoMatch() throws Exception {
239245
buildCloudEvent(
240246
"com.example.hospital.patient.admitted", Map.of("name", "Jane"))));
241247

248+
await()
249+
.during(Duration.ofMillis(200))
250+
.atMost(Duration.ofSeconds(3))
251+
.untilAsserted(() -> assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING));
252+
253+
appl.eventPublishers()
254+
.forEach(
255+
p ->
256+
p.publish(
257+
buildCloudEvent(
258+
"com.example.hospital.patient.admitted", Map.of("name", "Alice"))));
259+
242260
await()
243261
.during(Duration.ofMillis(200))
244262
.atMost(Duration.ofSeconds(3))
@@ -247,6 +265,23 @@ void testCorrelateNoExpectNoMatch() throws Exception {
247265
assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING);
248266
assertThat(future.isDone()).isFalse();
249267
});
250-
instance.cancel();
268+
269+
appl.eventPublishers()
270+
.forEach(
271+
p ->
272+
p.publish(
273+
buildCloudEvent(
274+
"com.example.hospital.patient.admitted",
275+
Map.of("patientId", "P123", "name", "Bob"))));
276+
277+
WorkflowModel result = future.get(2, TimeUnit.SECONDS);
278+
Object outputValue = JsonUtils.toJavaValue(JsonUtils.modelToJson(result));
279+
assertThat(outputValue).isInstanceOf(List.class);
280+
List<?> output = (List<?>) outputValue;
281+
assertThat(output).hasSize(1);
282+
assertThat(output.get(0)).isInstanceOf(Map.class);
283+
Map<String, Object> eventData = (Map<String, Object>) output.get(0);
284+
assertThat(eventData).containsEntry("patientId", "P123");
285+
assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED);
251286
}
252287
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
document:
2+
dsl: '1.0.0-alpha5'
3+
namespace: test
4+
name: listen-correlate-no-expect
5+
version: '0.1.0'
6+
do:
7+
- waitForPatient:
8+
listen:
9+
to:
10+
one:
11+
with:
12+
type: com.example.hospital.patient.admitted
13+
correlate:
14+
patientId:
15+
from: .data.patientId

0 commit comments

Comments
 (0)