Skip to content

Commit 77143c1

Browse files
committed
feat: Implement correlation on event filter
- Add CorrelationPredicate for evaluating correlation expressions - Add correlate support in AbstractEventFilterBuilder and AbstractEventFilterSpec - Update TypeEventRegistration and TypeEventRegistrationBuilder with correlation predicates - Implement correlation matching in AbstractTypeConsumer - Add CorrelationTest and listen-correlate.yaml - Add correlate tests in WorkflowBuilderTest and DSLTest Signed-off-by: Matheus André <matheusandr2@gmail.com>
1 parent fcd26da commit 77143c1

10 files changed

Lines changed: 387 additions & 20 deletions

File tree

fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventFilterBuilder.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,11 @@ public SELF with(Consumer<P> c) {
3737
}
3838

3939
public SELF correlate(String key, Consumer<ListenTaskBuilder.CorrelatePropertyBuilder> c) {
40-
throw new UnsupportedOperationException(
41-
"correlate is not supported in the engine level: https://github.com/serverlessworkflow/sdk-java/issues/1206");
40+
ListenTaskBuilder.CorrelatePropertyBuilder cb =
41+
new ListenTaskBuilder.CorrelatePropertyBuilder();
42+
c.accept(cb);
43+
correlate.setAdditionalProperty(key, cb.build());
44+
return self();
4245
}
4346

4447
public EventFilter build() {

fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/AbstractEventFilterSpec.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.serverlessworkflow.fluent.spec.AbstractEventFilterBuilder;
1919
import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder;
20+
import io.serverlessworkflow.fluent.spec.AbstractListenTaskBuilder;
2021
import java.util.ArrayList;
2122
import java.util.List;
2223
import java.util.function.Consumer;
@@ -41,13 +42,11 @@ protected List<Consumer<EVENT_FILTER>> getFilterSteps() {
4142
return filterSteps;
4243
}
4344

44-
// TODO: "correlate is not supported in the engine level:
45-
// https://github.com/serverlessworkflow/sdk-java/issues/1206". Keeping the code for a future
46-
// reference.
47-
// public SELF correlate(String key, Consumer<ListenTaskBuilder.CorrelatePropertyBuilder> c) {
48-
// filterSteps.add(f -> f.correlate(key, c));
49-
// return self();
50-
// }
45+
public SELF correlate(
46+
String key, Consumer<AbstractListenTaskBuilder.CorrelatePropertyBuilder> c) {
47+
addFilterStep(f -> f.correlate(key, c));
48+
return self();
49+
}
5150

5251
@Override
5352
public void accept(EVENT_FILTER filterBuilder) {

fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import io.serverlessworkflow.api.types.AuthenticationPolicyUnion;
3838
import io.serverlessworkflow.api.types.CallHTTP;
3939
import io.serverlessworkflow.api.types.CatchErrors;
40+
import io.serverlessworkflow.api.types.CorrelateProperty;
4041
import io.serverlessworkflow.api.types.Document;
4142
import io.serverlessworkflow.api.types.EmitEventDefinition;
4243
import io.serverlessworkflow.api.types.EmitTask;
@@ -310,8 +311,12 @@ void testDoTaskListenOne() {
310311
to ->
311312
to.one(
312313
f ->
313-
f.with(
314-
p -> p.type("com.fake.pet").source("mySource"))))))
314+
f.with(p -> p.type("com.fake.pet").source("mySource"))
315+
.correlate(
316+
"orderId",
317+
c ->
318+
c.from("$.data.orderId")
319+
.expect("$.input.orderId"))))))
315320
.build();
316321

317322
List<TaskItem> items = wf.getDo();
@@ -327,6 +332,10 @@ void testDoTaskListenOne() {
327332
EventFilter filter = one.getOne();
328333
assertNotNull(filter, "EventFilter should be present");
329334
assertEquals("com.fake.pet", filter.getWith().getType(), "Filter type should match");
335+
CorrelateProperty correlate = filter.getCorrelate().getAdditionalProperties().get("orderId");
336+
assertNotNull(correlate, "Correlate property should be present");
337+
assertEquals("$.data.orderId", correlate.getFrom(), "Correlate from should match");
338+
assertEquals("$.input.orderId", correlate.getExpect(), "Correlate expect should match");
330339
}
331340

332341
@Test

fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static io.serverlessworkflow.fluent.spec.dsl.DSL.workflow;
3131
import static org.assertj.core.api.Assertions.assertThat;
3232

33+
import io.serverlessworkflow.api.types.CorrelateProperty;
3334
import io.serverlessworkflow.api.types.HTTPArguments;
3435
import io.serverlessworkflow.api.types.ListenTaskConfiguration;
3536
import io.serverlessworkflow.api.types.RunTaskConfiguration;
@@ -166,7 +167,15 @@ public void when_listen_any_with_until() {
166167
public void when_listen_one() {
167168
Workflow wf =
168169
WorkflowBuilder.workflow("f", "ns", "1")
169-
.tasks(t -> t.listen(to().one(event().type("only-once"))))
170+
.tasks(
171+
t ->
172+
t.listen(
173+
to().one(
174+
event()
175+
.type("only-once")
176+
.correlate(
177+
"workflowInstanceId",
178+
c -> c.from("$.metadata.instanceId")))))
170179
.build();
171180

172181
var to = wf.getDo().get(0).getTask().getListenTask().getListen().getTo();
@@ -178,6 +187,10 @@ public void when_listen_one() {
178187
var one = to.getOneEventConsumptionStrategy().getOne();
179188
assertThat(one.getWith()).isNotNull();
180189
assertThat(one.getWith().getType()).isEqualTo("only-once");
190+
CorrelateProperty correlate =
191+
one.getCorrelate().getAdditionalProperties().get("workflowInstanceId");
192+
assertThat(correlate).isNotNull();
193+
assertThat(correlate.getFrom()).isEqualTo("$.metadata.instanceId");
181194
}
182195

183196
@Test

impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@
1616
package io.serverlessworkflow.impl.events;
1717

1818
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.api.types.CorrelateProperty;
1920
import io.serverlessworkflow.api.types.EventFilter;
21+
import io.serverlessworkflow.api.types.EventFilterCorrelate;
2022
import io.serverlessworkflow.api.types.EventProperties;
2123
import io.serverlessworkflow.impl.TaskContext;
2224
import io.serverlessworkflow.impl.WorkflowApplication;
2325
import io.serverlessworkflow.impl.WorkflowContext;
2426
import java.util.AbstractCollection;
27+
import java.util.ArrayList;
2528
import java.util.Collection;
2629
import java.util.Iterator;
2730
import java.util.List;
@@ -52,8 +55,26 @@ public TypeEventRegistrationBuilder listen(
5255
EventFilter register, WorkflowApplication application) {
5356
EventProperties properties = register.getWith();
5457
String type = properties.getType();
55-
return new TypeEventRegistrationBuilder(
56-
type, application.cloudEventPredicateFactory().build(application, properties));
58+
CloudEventPredicate cePredicate =
59+
application.cloudEventPredicateFactory().build(application, properties);
60+
Collection<CorrelationPredicate> correlationPredicates =
61+
buildCorrelationPredicates(register.getCorrelate(), application);
62+
return correlationPredicates.isEmpty()
63+
? new TypeEventRegistrationBuilder(type, cePredicate)
64+
: new TypeEventRegistrationBuilder(type, cePredicate, correlationPredicates);
65+
}
66+
67+
private Collection<CorrelationPredicate> buildCorrelationPredicates(
68+
EventFilterCorrelate correlate, WorkflowApplication application) {
69+
if (correlate == null || correlate.getAdditionalProperties().isEmpty()) {
70+
return List.of();
71+
}
72+
Collection<CorrelationPredicate> predicates = new ArrayList<>();
73+
for (Map.Entry<String, CorrelateProperty> entry :
74+
correlate.getAdditionalProperties().entrySet()) {
75+
predicates.add(CorrelationPredicate.from(entry.getKey(), entry.getValue(), application));
76+
}
77+
return predicates;
5778
}
5879

5980
@Override
@@ -67,14 +88,27 @@ private static class CloudEventConsumer extends AbstractCollection<TypeEventRegi
6788

6889
@Override
6990
public void accept(CloudEvent ce) {
70-
logger.debug("Received cloud event {}", ce);
7191
for (TypeEventRegistration registration : registrations) {
72-
if (registration.predicate().test(ce, registration.workflow(), registration.task())) {
92+
if (registration.predicate().test(ce, registration.workflow(), registration.task())
93+
&& testCorrelation(ce, registration)) {
7394
registration.consumer().accept(ce);
7495
}
7596
}
7697
}
7798

99+
private boolean testCorrelation(CloudEvent ce, TypeEventRegistration registration) {
100+
Collection<CorrelationPredicate> predicates = registration.correlationPredicates();
101+
if (predicates.isEmpty()) {
102+
return true;
103+
}
104+
for (CorrelationPredicate pred : predicates) {
105+
if (!pred.test(ce, registration.workflow(), registration.task())) {
106+
return false;
107+
}
108+
}
109+
return true;
110+
}
111+
78112
@Override
79113
public boolean add(TypeEventRegistration registration) {
80114
return registrations.add(registration);
@@ -107,7 +141,13 @@ public TypeEventRegistration register(
107141
return new TypeEventRegistration(null, ce, null, workflow, task);
108142
} else {
109143
TypeEventRegistration registration =
110-
new TypeEventRegistration(builder.type(), ce, builder.cePredicate(), workflow, task);
144+
new TypeEventRegistration(
145+
builder.type(),
146+
ce,
147+
builder.cePredicate(),
148+
builder.correlationPredicates(),
149+
workflow,
150+
task);
111151
registrations
112152
.computeIfAbsent(
113153
registration.type(),
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.events;
17+
18+
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.TaskContext;
20+
import io.serverlessworkflow.impl.WorkflowApplication;
21+
import io.serverlessworkflow.impl.WorkflowContext;
22+
import io.serverlessworkflow.impl.WorkflowModel;
23+
import io.serverlessworkflow.impl.WorkflowModelFactory;
24+
import io.serverlessworkflow.impl.WorkflowValueResolver;
25+
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
26+
import java.util.Objects;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
public class CorrelationPredicate {
31+
32+
private static final Logger logger = LoggerFactory.getLogger(CorrelationPredicate.class);
33+
34+
private final String key;
35+
private final WorkflowModelFactory modelFactory;
36+
private final WorkflowValueResolver<String> fromResolver;
37+
private final WorkflowValueResolver<String> expectResolver;
38+
39+
private CorrelationPredicate(
40+
String key,
41+
WorkflowModelFactory modelFactory,
42+
WorkflowValueResolver<String> fromResolver,
43+
WorkflowValueResolver<String> expectResolver) {
44+
this.key = key;
45+
this.modelFactory = modelFactory;
46+
this.fromResolver = fromResolver;
47+
this.expectResolver = expectResolver;
48+
}
49+
50+
public static CorrelationPredicate from(
51+
String key, io.serverlessworkflow.api.types.CorrelateProperty prop, WorkflowApplication app) {
52+
WorkflowValueResolver<String> fromResolver =
53+
app.expressionFactory().resolveString(ExpressionDescriptor.from(prop.getFrom()));
54+
WorkflowValueResolver<String> expectResolver =
55+
prop.getExpect() != null
56+
? app.expressionFactory().resolveString(ExpressionDescriptor.from(prop.getExpect()))
57+
: null;
58+
return new CorrelationPredicate(key, app.modelFactory(), fromResolver, expectResolver);
59+
}
60+
61+
public boolean test(CloudEvent event, WorkflowContext workflow, TaskContext task) {
62+
String eventValue = extractFromEvent(event, workflow, task);
63+
if (eventValue == null) {
64+
logger.debug("Correlation key '{}': from expression returned null for event {}", key, event);
65+
return false;
66+
}
67+
68+
if (expectResolver == null) {
69+
logger.debug(
70+
"Correlation key '{}': no expect expression, accepting event value '{}'",
71+
key,
72+
eventValue);
73+
return true;
74+
}
75+
76+
String expectedValue = expectResolver.apply(workflow, task, task.input());
77+
boolean result = Objects.equals(eventValue, expectedValue);
78+
logger.debug(
79+
"Correlation key '{}': eventValue='{}', expectedValue='{}', match={}",
80+
key,
81+
eventValue,
82+
expectedValue,
83+
result);
84+
return result;
85+
}
86+
87+
private String extractFromEvent(CloudEvent event, WorkflowContext workflow, TaskContext task) {
88+
WorkflowModel eventModel = modelFactory.from(event);
89+
return fromResolver.apply(workflow, task, eventModel);
90+
}
91+
92+
public String key() {
93+
return key;
94+
}
95+
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistration.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,25 @@
1818
import io.cloudevents.CloudEvent;
1919
import io.serverlessworkflow.impl.TaskContext;
2020
import io.serverlessworkflow.impl.WorkflowContext;
21+
import java.util.Collection;
22+
import java.util.Collections;
2123
import java.util.function.Consumer;
2224

2325
public record TypeEventRegistration(
2426
String type,
2527
Consumer<CloudEvent> consumer,
2628
CloudEventPredicate predicate,
29+
Collection<CorrelationPredicate> correlationPredicates,
2730
WorkflowContext workflow,
2831
TaskContext task)
29-
implements EventRegistration {}
32+
implements EventRegistration {
33+
34+
public TypeEventRegistration(
35+
String type,
36+
Consumer<CloudEvent> consumer,
37+
CloudEventPredicate predicate,
38+
WorkflowContext workflow,
39+
TaskContext task) {
40+
this(type, consumer, predicate, Collections.emptyList(), workflow, task);
41+
}
42+
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistrationBuilder.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,16 @@
1515
*/
1616
package io.serverlessworkflow.impl.events;
1717

18-
public record TypeEventRegistrationBuilder(String type, CloudEventPredicate cePredicate)
19-
implements EventRegistrationBuilder {}
18+
import java.util.Collection;
19+
import java.util.Collections;
20+
21+
public record TypeEventRegistrationBuilder(
22+
String type,
23+
CloudEventPredicate cePredicate,
24+
Collection<CorrelationPredicate> correlationPredicates)
25+
implements EventRegistrationBuilder {
26+
27+
public TypeEventRegistrationBuilder(String type, CloudEventPredicate cePredicate) {
28+
this(type, cePredicate, Collections.emptyList());
29+
}
30+
}

0 commit comments

Comments
 (0)