Skip to content

Commit 8c39ac3

Browse files
fjtiradomatheusandre1
authored andcommitted
Review comments
Signed-off-by: Matheus André <matheusandr2@gmail.com>
1 parent 4ffacf0 commit 8c39ac3

7 files changed

Lines changed: 71 additions & 39 deletions

File tree

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -355,10 +355,6 @@ public <T> T addMetadataIfAbsent(String key, Supplier<T> supplier) {
355355
return (T) additionalObjects.computeIfAbsent(key, k -> supplier.get());
356356
}
357357

358-
public Object computeCorrelationValue(String key, Object value) {
359-
return additionalObjects.computeIfAbsent(key, k -> value);
360-
}
361-
362358
@Override
363359
public <T> Optional<T> findMetadata(String key, Class<T> objectClass) {
364360
Object value = additionalObjects.get(key);

impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import io.serverlessworkflow.impl.WorkflowApplication;
2525
import io.serverlessworkflow.impl.WorkflowContext;
2626
import io.serverlessworkflow.impl.WorkflowModel;
27-
import io.serverlessworkflow.impl.WorkflowModelFactory;
2827
import java.util.AbstractCollection;
2928
import java.util.ArrayList;
3029
import java.util.Collection;
@@ -42,6 +41,8 @@ public abstract class AbstractTypeConsumer
4241

4342
private static final Logger logger = LoggerFactory.getLogger(AbstractTypeConsumer.class);
4443

44+
private static final CloudEventPredicate ALWAYS_TRUE = (ce, wf, t) -> true;
45+
4546
protected abstract void registerToAll(Consumer<CloudEvent> consumer);
4647

4748
protected abstract void unregisterFromAll();
@@ -61,8 +62,7 @@ public TypeEventRegistrationBuilder listen(
6162
application.cloudEventPredicateFactory().build(application, properties);
6263
Collection<CloudEventPredicate> correlationPredicates =
6364
buildCorrelationPredicates(register.getCorrelate(), application);
64-
return new TypeEventRegistrationBuilder(
65-
type, cePredicate, correlationPredicates, application.modelFactory());
65+
return new TypeEventRegistrationBuilder(type, cePredicate, correlationPredicates);
6666
}
6767

6868
private Collection<CloudEventPredicate> buildCorrelationPredicates(
@@ -83,29 +83,24 @@ private Collection<CloudEventPredicate> buildCorrelationPredicates(
8383

8484
@Override
8585
public Collection<TypeEventRegistrationBuilder> listenToAll(WorkflowApplication application) {
86-
return List.of(
87-
new TypeEventRegistrationBuilder(null, null, List.of(), application.modelFactory()));
86+
return List.of(new TypeEventRegistrationBuilder(null, ALWAYS_TRUE, List.of()));
8887
}
8988

9089
private static class CloudEventConsumer extends AbstractCollection<TypeEventRegistration>
9190
implements Consumer<CloudEvent> {
92-
private final WorkflowModelFactory modelFactory;
9391
private Collection<TypeEventRegistration> registrations = new CopyOnWriteArrayList<>();
9492

95-
CloudEventConsumer(WorkflowModelFactory modelFactory) {
96-
this.modelFactory = modelFactory;
97-
}
98-
9993
@Override
10094
public void accept(CloudEvent ce) {
10195
logger.debug("Received cloud event {}", ce);
10296
for (TypeEventRegistration registration : registrations) {
103-
if (registration.predicate().test(ce, registration.workflow(), registration.task())) {
104-
if (!testCorrelation(ce, registration)) {
105-
continue;
106-
}
107-
registration.consumer().accept(ce);
97+
if (!registration.predicate().test(ce, registration.workflow(), registration.task())) {
98+
continue;
99+
}
100+
if (!testCorrelation(ce, registration)) {
101+
continue;
108102
}
103+
registration.consumer().accept(ce);
109104
}
110105
}
111106

@@ -114,12 +109,14 @@ private boolean testCorrelation(CloudEvent ce, TypeEventRegistration registratio
114109
if (predicates.isEmpty()) {
115110
return true;
116111
}
117-
WorkflowModel eventModel = null;
112+
boolean hasModelAware =
113+
predicates.stream().anyMatch(ModelAwareCloudEventPredicate.class::isInstance);
114+
WorkflowModel eventModel =
115+
hasModelAware
116+
? registration.workflow().definition().application().modelFactory().from(ce)
117+
: null;
118118
for (CloudEventPredicate pred : predicates) {
119119
if (pred instanceof ModelAwareCloudEventPredicate ma) {
120-
if (eventModel == null) {
121-
eventModel = modelFactory.from(ce);
122-
}
123120
if (!ma.test(eventModel, registration.workflow(), registration.task())) {
124121
return false;
125122
}
@@ -161,7 +158,7 @@ public TypeEventRegistration register(
161158
TaskContext task) {
162159
if (builder.type() == null) {
163160
registerToAll(ce);
164-
return new TypeEventRegistration(null, ce, null, workflow, task);
161+
return new TypeEventRegistration(null, ce, ALWAYS_TRUE, workflow, task);
165162
} else {
166163
TypeEventRegistration registration =
167164
new TypeEventRegistration(
@@ -175,7 +172,7 @@ public TypeEventRegistration register(
175172
.computeIfAbsent(
176173
registration.type(),
177174
k -> {
178-
CloudEventConsumer consumer = new CloudEventConsumer(builder.modelFactory());
175+
CloudEventConsumer consumer = new CloudEventConsumer();
179176
register(k, consumer);
180177
return consumer;
181178
})

impl/core/src/main/java/io/serverlessworkflow/impl/events/CorrelationPredicate.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,12 @@ public static CorrelationPredicate from(
5656
}
5757

5858
private String correlationStateKey(TaskContext task) {
59-
return "correlation:" + task.position().jsonPointer() + ":" + correlationKey;
59+
return "correlation:"
60+
+ task.position().jsonPointer()
61+
+ ":"
62+
+ task.iteration()
63+
+ ":"
64+
+ correlationKey;
6065
}
6166

6267
@Override
@@ -75,7 +80,7 @@ public boolean test(WorkflowModel eventModel, WorkflowContext workflow, TaskCont
7580

7681
if (expectResolver == null) {
7782
String stateKey = correlationStateKey(task);
78-
Object firstValue = workflow.instance().computeCorrelationValue(stateKey, eventValue);
83+
Object firstValue = workflow.instance().addMetadataIfAbsent(stateKey, () -> eventValue);
7984
boolean result = Objects.equals(eventValue, firstValue);
8085
logger.debug(
8186
"Correlation no expect, eventValue='{}', firstValue='{}', match={}",

impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistrationBuilder.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,16 @@
1515
*/
1616
package io.serverlessworkflow.impl.events;
1717

18-
import io.serverlessworkflow.impl.WorkflowModelFactory;
1918
import java.util.Collection;
2019
import java.util.Collections;
2120

2221
public record TypeEventRegistrationBuilder(
2322
String type,
2423
CloudEventPredicate cePredicate,
25-
Collection<CloudEventPredicate> correlationPredicates,
26-
WorkflowModelFactory modelFactory)
24+
Collection<CloudEventPredicate> correlationPredicates)
2725
implements EventRegistrationBuilder {
2826

2927
public TypeEventRegistrationBuilder(String type, CloudEventPredicate cePredicate) {
30-
this(type, cePredicate, Collections.emptyList(), null);
28+
this(type, cePredicate, Collections.emptyList());
3129
}
3230
}

impl/test/src/test/java/io/serverlessworkflow/impl/test/CorrelationTest.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,11 @@ void testCorrelateMatch(String sourceName, Workflow workflow) throws Exception {
8383
Map.of("patientId", "P123", "name", "John"))));
8484

8585
WorkflowModel result = future.get(2, TimeUnit.SECONDS);
86-
List<Object> output = (List<Object>) JsonUtils.toJavaValue(JsonUtils.modelToJson(result));
86+
Object outputValue = JsonUtils.toJavaValue(JsonUtils.modelToJson(result));
87+
assertThat(outputValue).isInstanceOf(List.class);
88+
List<?> output = (List<?>) outputValue;
8789
assertThat(output).hasSize(1);
90+
assertThat(output.get(0)).isInstanceOf(Map.class);
8891
Map<String, Object> eventData = (Map<String, Object>) output.get(0);
8992
assertThat(eventData).containsEntry("patientId", "P123");
9093
assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED);
@@ -192,9 +195,16 @@ private static Workflow listenCorrelateNoExpectWorkflow() {
192195
.build();
193196
}
194197

195-
@Test
196-
void testCorrelateNoExpectMatch() throws Exception {
197-
Workflow workflow = listenCorrelateNoExpectWorkflow();
198+
private static Stream<Arguments> correlateNoExpectWorkflowSources() throws IOException {
199+
return Stream.of(
200+
readWorkflowFromClasspath("workflows-samples/listen-correlate-no-expect.yaml"),
201+
listenCorrelateNoExpectWorkflow())
202+
.map(wf -> Arguments.of(wf.getDocument().getName(), wf));
203+
}
204+
205+
@ParameterizedTest(name = "{0}")
206+
@MethodSource("correlateNoExpectWorkflowSources")
207+
void testCorrelateNoExpectMatch(String sourceName, Workflow workflow) throws Exception {
198208
WorkflowDefinition def = appl.workflowDefinition(workflow);
199209
WorkflowInstance instance = def.instance();
200210
CompletableFuture<WorkflowModel> future = instance.start();
@@ -213,16 +223,23 @@ void testCorrelateNoExpectMatch() throws Exception {
213223
Map.of("patientId", "P123", "name", "John"))));
214224

215225
WorkflowModel result = future.get(2, TimeUnit.SECONDS);
216-
List<Object> output = (List<Object>) JsonUtils.toJavaValue(JsonUtils.modelToJson(result));
226+
Object outputValue = JsonUtils.toJavaValue(JsonUtils.modelToJson(result));
227+
assertThat(outputValue).isInstanceOf(List.class);
228+
List<?> output = (List<?>) outputValue;
217229
assertThat(output).hasSize(1);
230+
assertThat(output.get(0)).isInstanceOf(Map.class);
218231
Map<String, Object> eventData = (Map<String, Object>) output.get(0);
219232
assertThat(eventData).containsEntry("patientId", "P123");
220233
assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED);
221234
}
222235

223-
@Test
224-
void testCorrelateNoExpectNoMatch() throws Exception {
225-
Workflow workflow = listenCorrelateNoExpectWorkflow();
236+
@ParameterizedTest(name = "{0}")
237+
@MethodSource("correlateNoExpectWorkflowSources")
238+
void testCorrelateNoExpectNoMatch(String sourceName, Workflow workflow) throws Exception {
239+
assertCorrelateNoExpectNoMatch(workflow);
240+
}
241+
242+
private void assertCorrelateNoExpectNoMatch(Workflow workflow) throws Exception {
226243
WorkflowDefinition def = appl.workflowDefinition(workflow);
227244
WorkflowInstance instance = def.instance();
228245
CompletableFuture<WorkflowModel> future = instance.start();
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
document:
2+
dsl: '1.0.0-alpha5'
3+
namespace: test
4+
name: listen-correlate-no-expect
5+
version: '0.1.0'
6+
do:
7+
- waitForPatient:
8+
listen:
9+
to:
10+
one:
11+
with:
12+
type: com.example.hospital.patient.admitted
13+
correlate:
14+
patientId:
15+
from: .data.patientId

impl/test/src/test/resources/workflows-samples/listen-correlate.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ document:
44
name: listen-correlate
55
version: '0.1.0'
66
input:
7+
# Raw input shape: { "patientId": "P123" }
8+
# After transform: { "patientId": "P123" } — same key, but explicitly shaped by input.from
79
from: '{ patientId: .patientId }'
810
do:
911
- waitForPatient:
@@ -15,4 +17,6 @@ do:
1517
correlate:
1618
patientId:
1719
from: .data.patientId
20+
# expect is evaluated against the task input (post input.from transform),
21+
# so .patientId resolves to the patientId field in the shaped input
1822
expect: .patientId

0 commit comments

Comments
 (0)