Skip to content

Commit a138648

Browse files
committed
feat: Implement correlation on event filter
1 parent ad1eb92 commit a138648

14 files changed

Lines changed: 1487 additions & 28 deletions

File tree

fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventFilterBuilder.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,11 @@ public SELF with(Consumer<P> c) {
3737
}
3838

3939
public SELF correlate(String key, Consumer<ListenTaskBuilder.CorrelatePropertyBuilder> c) {
40-
throw new UnsupportedOperationException(
41-
"correlate is not supported in the engine level: https://github.com/serverlessworkflow/sdk-java/issues/1206");
40+
ListenTaskBuilder.CorrelatePropertyBuilder cb =
41+
new ListenTaskBuilder.CorrelatePropertyBuilder();
42+
c.accept(cb);
43+
correlate.setAdditionalProperty(key, cb.build());
44+
return self();
4245
}
4346

4447
public EventFilter build() {

fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/UseBuilder.java

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,17 @@
1515
*/
1616
package io.serverlessworkflow.fluent.spec;
1717

18+
import io.serverlessworkflow.api.types.Error;
19+
import io.serverlessworkflow.api.types.ErrorDetails;
20+
import io.serverlessworkflow.api.types.ErrorTitle;
21+
import io.serverlessworkflow.api.types.ErrorType;
22+
import io.serverlessworkflow.api.types.UriTemplate;
1823
import io.serverlessworkflow.api.types.Use;
1924
import io.serverlessworkflow.api.types.UseAuthentications;
25+
import io.serverlessworkflow.api.types.UseErrors;
26+
import io.serverlessworkflow.api.types.UseRetries;
27+
import java.net.URI;
28+
import java.net.URISyntaxException;
2029
import java.util.List;
2130
import java.util.function.Consumer;
2231

@@ -47,9 +56,99 @@ public UseBuilder authentications(Consumer<UseAuthenticationsBuilder> authentica
4756
return this;
4857
}
4958

50-
// TODO: implement the remaining `use` attributes
59+
public UseBuilder errors(Consumer<UseErrorsBuilder> errorsConsumer) {
60+
final UseErrorsBuilder builder = new UseErrorsBuilder();
61+
errorsConsumer.accept(builder);
62+
this.use.setErrors(builder.build());
63+
return this;
64+
}
65+
66+
public UseBuilder retries(Consumer<UseRetriesBuilder> retriesConsumer) {
67+
final UseRetriesBuilder builder = new UseRetriesBuilder();
68+
retriesConsumer.accept(builder);
69+
this.use.setRetries(builder.build());
70+
return this;
71+
}
5172

5273
public Use build() {
5374
return use;
5475
}
76+
77+
public static final class UseErrorsBuilder {
78+
private final UseErrors useErrors;
79+
80+
UseErrorsBuilder() {
81+
this.useErrors = new UseErrors();
82+
}
83+
84+
public UseErrorsBuilder error(String name, Consumer<UseErrorBuilder> errorConsumer) {
85+
final UseErrorBuilder builder = new UseErrorBuilder();
86+
errorConsumer.accept(builder);
87+
this.useErrors.setAdditionalProperty(name, builder.build());
88+
return this;
89+
}
90+
91+
public UseErrors build() {
92+
return useErrors;
93+
}
94+
}
95+
96+
public static final class UseErrorBuilder {
97+
private final Error error;
98+
99+
UseErrorBuilder() {
100+
this.error = new Error();
101+
}
102+
103+
public UseErrorBuilder type(String expression) {
104+
ErrorType errorType = new ErrorType();
105+
try {
106+
errorType.withLiteralErrorType(new UriTemplate().withLiteralUri(new URI(expression)));
107+
} catch (URISyntaxException ex) {
108+
errorType.withExpressionErrorType(expression);
109+
}
110+
this.error.setType(errorType);
111+
return this;
112+
}
113+
114+
public UseErrorBuilder status(int status) {
115+
this.error.setStatus(status);
116+
return this;
117+
}
118+
119+
public UseErrorBuilder title(String expression) {
120+
this.error.setTitle(new ErrorTitle().withExpressionErrorTitle(expression));
121+
return this;
122+
}
123+
124+
public UseErrorBuilder detail(String expression) {
125+
this.error.setDetail(new ErrorDetails().withExpressionErrorDetails(expression));
126+
return this;
127+
}
128+
129+
public Error build() {
130+
return error;
131+
}
132+
}
133+
134+
public static final class UseRetriesBuilder {
135+
private final UseRetries useRetries;
136+
137+
UseRetriesBuilder() {
138+
this.useRetries = new UseRetries();
139+
}
140+
141+
public UseRetriesBuilder retry(
142+
String name, Consumer<BaseTryTaskBuilder.RetryPolicyBuilder> retryConsumer) {
143+
final BaseTryTaskBuilder.RetryPolicyBuilder builder =
144+
new BaseTryTaskBuilder.RetryPolicyBuilder();
145+
retryConsumer.accept(builder);
146+
this.useRetries.setAdditionalProperty(name, builder.build());
147+
return this;
148+
}
149+
150+
public UseRetries build() {
151+
return useRetries;
152+
}
153+
}
55154
}

fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/AbstractEventFilterSpec.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.serverlessworkflow.fluent.spec.AbstractEventFilterBuilder;
1919
import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder;
20+
import io.serverlessworkflow.fluent.spec.AbstractListenTaskBuilder;
2021
import java.util.ArrayList;
2122
import java.util.List;
2223
import java.util.function.Consumer;
@@ -41,13 +42,11 @@ protected List<Consumer<EVENT_FILTER>> getFilterSteps() {
4142
return filterSteps;
4243
}
4344

44-
// TODO: "correlate is not supported in the engine level:
45-
// https://github.com/serverlessworkflow/sdk-java/issues/1206". Keeping the code for a future
46-
// reference.
47-
// public SELF correlate(String key, Consumer<ListenTaskBuilder.CorrelatePropertyBuilder> c) {
48-
// filterSteps.add(f -> f.correlate(key, c));
49-
// return self();
50-
// }
45+
public SELF correlate(
46+
String key, Consumer<AbstractListenTaskBuilder.CorrelatePropertyBuilder> c) {
47+
addFilterStep(f -> f.correlate(key, c));
48+
return self();
49+
}
5150

5251
@Override
5352
public void accept(EVENT_FILTER filterBuilder) {

fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/DSL.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.serverlessworkflow.fluent.spec.TaskItemListBuilder;
2626
import io.serverlessworkflow.fluent.spec.TimeoutBuilder;
2727
import io.serverlessworkflow.fluent.spec.TryTaskBuilder;
28+
import io.serverlessworkflow.fluent.spec.UseBuilder;
2829
import io.serverlessworkflow.fluent.spec.configurers.AuthenticationConfigurer;
2930
import io.serverlessworkflow.fluent.spec.configurers.CallGrpcConfigurer;
3031
import io.serverlessworkflow.fluent.spec.configurers.CallHttpConfigurer;
@@ -1227,4 +1228,14 @@ public static Consumer<ScheduleBuilder> on(
12271228
Consumer<EventFilterBuilder>... filters) {
12281229
return b -> ((AbstractEventConsumptionStrategyBuilder) b).any(filters);
12291230
}
1231+
1232+
public static Consumer<UseBuilder.UseErrorsBuilder> errors(
1233+
Consumer<UseBuilder.UseErrorsBuilder> errorsConsumer) {
1234+
return errorsConsumer;
1235+
}
1236+
1237+
public static Consumer<UseBuilder.UseRetriesBuilder> retries(
1238+
Consumer<UseBuilder.UseRetriesBuilder> retriesConsumer) {
1239+
return retriesConsumer;
1240+
}
12301241
}

fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import io.serverlessworkflow.api.types.AuthenticationPolicyUnion;
3838
import io.serverlessworkflow.api.types.CallHTTP;
3939
import io.serverlessworkflow.api.types.CatchErrors;
40+
import io.serverlessworkflow.api.types.CorrelateProperty;
4041
import io.serverlessworkflow.api.types.Document;
4142
import io.serverlessworkflow.api.types.EmitEventDefinition;
4243
import io.serverlessworkflow.api.types.EmitTask;
@@ -310,8 +311,12 @@ void testDoTaskListenOne() {
310311
to ->
311312
to.one(
312313
f ->
313-
f.with(
314-
p -> p.type("com.fake.pet").source("mySource"))))))
314+
f.with(p -> p.type("com.fake.pet").source("mySource"))
315+
.correlate(
316+
"orderId",
317+
c ->
318+
c.from("$.data.orderId")
319+
.expect("$.input.orderId"))))))
315320
.build();
316321

317322
List<TaskItem> items = wf.getDo();
@@ -327,6 +332,10 @@ void testDoTaskListenOne() {
327332
EventFilter filter = one.getOne();
328333
assertNotNull(filter, "EventFilter should be present");
329334
assertEquals("com.fake.pet", filter.getWith().getType(), "Filter type should match");
335+
CorrelateProperty correlate = filter.getCorrelate().getAdditionalProperties().get("orderId");
336+
assertNotNull(correlate, "Correlate property should be present");
337+
assertEquals("$.data.orderId", correlate.getFrom(), "Correlate from should match");
338+
assertEquals("$.input.orderId", correlate.getExpect(), "Correlate expect should match");
330339
}
331340

332341
@Test

fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static io.serverlessworkflow.fluent.spec.dsl.DSL.workflow;
3131
import static org.assertj.core.api.Assertions.assertThat;
3232

33+
import io.serverlessworkflow.api.types.CorrelateProperty;
3334
import io.serverlessworkflow.api.types.HTTPArguments;
3435
import io.serverlessworkflow.api.types.ListenTaskConfiguration;
3536
import io.serverlessworkflow.api.types.RunTaskConfiguration;
@@ -166,7 +167,15 @@ public void when_listen_any_with_until() {
166167
public void when_listen_one() {
167168
Workflow wf =
168169
WorkflowBuilder.workflow("f", "ns", "1")
169-
.tasks(t -> t.listen(to().one(event().type("only-once"))))
170+
.tasks(
171+
t ->
172+
t.listen(
173+
to().one(
174+
event()
175+
.type("only-once")
176+
.correlate(
177+
"workflowInstanceId",
178+
c -> c.from("$.metadata.instanceId")))))
170179
.build();
171180

172181
var to = wf.getDo().get(0).getTask().getListenTask().getListen().getTo();
@@ -178,6 +187,10 @@ public void when_listen_one() {
178187
var one = to.getOneEventConsumptionStrategy().getOne();
179188
assertThat(one.getWith()).isNotNull();
180189
assertThat(one.getWith().getType()).isEqualTo("only-once");
190+
CorrelateProperty correlate =
191+
one.getCorrelate().getAdditionalProperties().get("workflowInstanceId");
192+
assertThat(correlate).isNotNull();
193+
assertThat(correlate.getFrom()).isEqualTo("$.metadata.instanceId");
181194
}
182195

183196
@Test

impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@
1616
package io.serverlessworkflow.impl.events;
1717

1818
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.api.types.CorrelateProperty;
1920
import io.serverlessworkflow.api.types.EventFilter;
21+
import io.serverlessworkflow.api.types.EventFilterCorrelate;
2022
import io.serverlessworkflow.api.types.EventProperties;
2123
import io.serverlessworkflow.impl.TaskContext;
2224
import io.serverlessworkflow.impl.WorkflowApplication;
2325
import io.serverlessworkflow.impl.WorkflowContext;
2426
import java.util.AbstractCollection;
27+
import java.util.ArrayList;
2528
import java.util.Collection;
2629
import java.util.Iterator;
2730
import java.util.List;
@@ -52,8 +55,31 @@ public TypeEventRegistrationBuilder listen(
5255
EventFilter register, WorkflowApplication application) {
5356
EventProperties properties = register.getWith();
5457
String type = properties.getType();
55-
return new TypeEventRegistrationBuilder(
56-
type, application.cloudEventPredicateFactory().build(application, properties));
58+
logger.info(
59+
"listen: type={}, correlate={}, with.additionalProperties={}",
60+
type,
61+
register.getCorrelate(),
62+
properties.getAdditionalProperties());
63+
CloudEventPredicate cePredicate =
64+
application.cloudEventPredicateFactory().build(application, properties);
65+
Collection<CorrelationPredicate> correlationPredicates =
66+
buildCorrelationPredicates(register.getCorrelate(), application);
67+
return correlationPredicates.isEmpty()
68+
? new TypeEventRegistrationBuilder(type, cePredicate)
69+
: new TypeEventRegistrationBuilder(type, cePredicate, correlationPredicates);
70+
}
71+
72+
private Collection<CorrelationPredicate> buildCorrelationPredicates(
73+
EventFilterCorrelate correlate, WorkflowApplication application) {
74+
if (correlate == null || correlate.getAdditionalProperties().isEmpty()) {
75+
return List.of();
76+
}
77+
Collection<CorrelationPredicate> predicates = new ArrayList<>();
78+
for (Map.Entry<String, CorrelateProperty> entry :
79+
correlate.getAdditionalProperties().entrySet()) {
80+
predicates.add(CorrelationPredicate.from(entry.getKey(), entry.getValue(), application));
81+
}
82+
return predicates;
5783
}
5884

5985
@Override
@@ -67,12 +93,35 @@ private static class CloudEventConsumer extends AbstractCollection<TypeEventRegi
6793

6894
@Override
6995
public void accept(CloudEvent ce) {
70-
logger.debug("Received cloud event {}", ce);
7196
for (TypeEventRegistration registration : registrations) {
72-
if (registration.predicate().test(ce, registration.workflow(), registration.task())) {
73-
registration.consumer().accept(ce);
97+
try {
98+
boolean predResult =
99+
registration.predicate().test(ce, registration.workflow(), registration.task());
100+
logger.info(
101+
"Predicate result for '{}': {}, correlation={}",
102+
registration.type(),
103+
predResult,
104+
registration.correlationPredicates().size());
105+
if (predResult && testCorrelation(ce, registration)) {
106+
registration.consumer().accept(ce);
107+
}
108+
} catch (Exception e) {
109+
logger.error("Error processing event for registration type='{}'", registration.type(), e);
110+
}
111+
}
112+
}
113+
114+
private boolean testCorrelation(CloudEvent ce, TypeEventRegistration registration) {
115+
Collection<CorrelationPredicate> predicates = registration.correlationPredicates();
116+
if (predicates.isEmpty()) {
117+
return true;
118+
}
119+
for (CorrelationPredicate pred : predicates) {
120+
if (!pred.test(ce, registration.workflow(), registration.task())) {
121+
return false;
74122
}
75123
}
124+
return true;
76125
}
77126

78127
@Override
@@ -107,7 +156,13 @@ public TypeEventRegistration register(
107156
return new TypeEventRegistration(null, ce, null, workflow, task);
108157
} else {
109158
TypeEventRegistration registration =
110-
new TypeEventRegistration(builder.type(), ce, builder.cePredicate(), workflow, task);
159+
new TypeEventRegistration(
160+
builder.type(),
161+
ce,
162+
builder.cePredicate(),
163+
builder.correlationPredicates(),
164+
workflow,
165+
task);
111166
registrations
112167
.computeIfAbsent(
113168
registration.type(),

0 commit comments

Comments
 (0)