Skip to content

Commit a1006c4

Browse files
fix: Add until predicates to any listeners (#1377)
* fix: Add until predicates to any listeners Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com> * Incorporate co-pilot's review Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com> --------- Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com>
1 parent 0a9a29e commit a1006c4

5 files changed

Lines changed: 342 additions & 0 deletions

File tree

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenToBuilder.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import io.serverlessworkflow.api.types.ListenTo;
2121
import io.serverlessworkflow.api.types.OneEventConsumptionStrategy;
2222
import io.serverlessworkflow.api.types.Until;
23+
import io.serverlessworkflow.api.types.func.ContextPredicate;
24+
import io.serverlessworkflow.api.types.func.FilterPredicate;
2325
import io.serverlessworkflow.api.types.func.UntilPredicate;
2426
import io.serverlessworkflow.fluent.spec.AbstractEventConsumptionStrategyBuilder;
2527
import java.util.function.Predicate;
@@ -66,4 +68,14 @@ public <T> FuncListenToBuilder until(Predicate<T> predicate, Class<T> predClass)
6668
this.setUntil(new UntilPredicate().withPredicate(predicate, predClass));
6769
return this;
6870
}
71+
72+
public <T> FuncListenToBuilder until(ContextPredicate<T> predicate, Class<T> predClass) {
73+
this.setUntil(new UntilPredicate().withPredicate(predicate, predClass));
74+
return this;
75+
}
76+
77+
public <T> FuncListenToBuilder until(FilterPredicate<T> predicate, Class<T> predClass) {
78+
this.setUntil(new UntilPredicate().withPredicate(predicate, predClass));
79+
return this;
80+
}
6981
}

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/BaseFuncListenSpec.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package io.serverlessworkflow.fluent.func.dsl;
1717

18+
import io.serverlessworkflow.api.types.func.ContextPredicate;
19+
import io.serverlessworkflow.api.types.func.FilterPredicate;
1820
import io.serverlessworkflow.fluent.func.FuncEventFilterBuilder;
1921
import io.serverlessworkflow.fluent.func.FuncListenToBuilder;
2022
import io.serverlessworkflow.fluent.spec.dsl.BaseListenSpec;
@@ -46,4 +48,16 @@ public <T> SELF until(Predicate<T> predicate, Class<T> predClass) {
4648
this.setUntilStep(u -> u.until(predicate, predClass));
4749
return self();
4850
}
51+
52+
public <T> SELF until(ContextPredicate<T> predicate, Class<T> predClass) {
53+
Objects.requireNonNull(predicate, "predicate");
54+
this.setUntilStep(u -> u.until(predicate, predClass));
55+
return self();
56+
}
57+
58+
public <T> SELF until(FilterPredicate<T> predicate, Class<T> predClass) {
59+
Objects.requireNonNull(predicate, "predicate");
60+
this.setUntilStep(u -> u.until(predicate, predClass));
61+
return self();
62+
}
4963
}
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverless.workflow.impl.executors.func;
17+
18+
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.listen;
19+
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.toAny;
20+
import static org.awaitility.Awaitility.await;
21+
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
import static org.junit.jupiter.api.Assertions.assertNotNull;
23+
24+
import com.fasterxml.jackson.databind.ObjectMapper;
25+
import io.cloudevents.CloudEvent;
26+
import io.cloudevents.core.builder.CloudEventBuilder;
27+
import io.serverlessworkflow.api.types.Workflow;
28+
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;
29+
import io.serverlessworkflow.impl.WorkflowApplication;
30+
import io.serverlessworkflow.impl.WorkflowDefinition;
31+
import io.serverlessworkflow.impl.WorkflowInstance;
32+
import io.serverlessworkflow.impl.WorkflowModel;
33+
import io.serverlessworkflow.impl.WorkflowModelCollection;
34+
import io.serverlessworkflow.impl.WorkflowStatus;
35+
import io.serverlessworkflow.impl.events.EventPublisher;
36+
import java.net.URI;
37+
import java.time.Duration;
38+
import java.util.concurrent.CompletableFuture;
39+
import org.junit.jupiter.api.Test;
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
42+
43+
public class ListenUntilCurrentTest {
44+
45+
private static final Logger log = LoggerFactory.getLogger(ListenUntilCurrentTest.class);
46+
private static final ObjectMapper MAPPER = new ObjectMapper();
47+
48+
private CloudEvent createOrderEvent(String instanceId, int orderNum) {
49+
Order order = new Order("order-" + orderNum, "PENDING", 100.0 * orderNum);
50+
try {
51+
return CloudEventBuilder.v1()
52+
.withId("event-" + orderNum)
53+
.withSource(URI.create("test:/orders"))
54+
.withType("order.created")
55+
.withExtension("instanceid", instanceId)
56+
.withData("application/json", MAPPER.writeValueAsBytes(order))
57+
.build();
58+
} catch (Exception e) {
59+
throw new RuntimeException("Failed to create order event", e);
60+
}
61+
}
62+
63+
@Test
64+
public void testCurrentToAnyWithUntilExpression() throws Exception {
65+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
66+
Workflow workflow =
67+
FuncWorkflowBuilder.workflow("test-toany-until")
68+
.tasks(
69+
listen(
70+
"waitOrders",
71+
toAny("order.created")
72+
.until(
73+
(WorkflowModelCollection events) -> {
74+
log.info("Predicate called!");
75+
log.info(" Param type: {}", events.getClass().getName());
76+
log.info(" Param value: {}", events);
77+
log.info(" Event count: {}", (long) events.size());
78+
boolean result = (long) events.size() >= 3;
79+
log.info(" Returning: {}", result);
80+
return result;
81+
},
82+
WorkflowModelCollection.class)))
83+
.build();
84+
85+
WorkflowDefinition definition = app.workflowDefinition(workflow);
86+
WorkflowInstance instance = definition.instance(new Object());
87+
CompletableFuture<WorkflowModel> future = instance.start();
88+
89+
// Wait for WAITING status
90+
await()
91+
.atMost(Duration.ofSeconds(5))
92+
.until(() -> instance.status() == WorkflowStatus.WAITING);
93+
94+
EventPublisher publisher = app.eventPublishers().iterator().next();
95+
96+
// Emit 3 order events
97+
log.info("Publishing event 1...");
98+
publisher.publish(createOrderEvent(instance.id(), 1)).toCompletableFuture().join();
99+
100+
log.info("Publishing event 2...");
101+
publisher.publish(createOrderEvent(instance.id(), 2)).toCompletableFuture().join();
102+
103+
log.info("Publishing event 3...");
104+
publisher.publish(createOrderEvent(instance.id(), 3)).toCompletableFuture().join();
105+
106+
// Workflow should complete after 3 events
107+
await()
108+
.atMost(Duration.ofSeconds(5))
109+
.until(() -> instance.status() == WorkflowStatus.COMPLETED);
110+
111+
WorkflowModel result = future.join();
112+
long count = ((WorkflowModelCollection) result).size();
113+
log.info("Workflow completed with {} items", count);
114+
assertEquals(3, count);
115+
}
116+
}
117+
118+
@Test
119+
public void testToAnyWithUntilContextPredicate() {
120+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
121+
Workflow workflow =
122+
FuncWorkflowBuilder.workflow("test-toany-until-context")
123+
.tasks(
124+
listen(
125+
"waitOrders",
126+
toAny("order.created")
127+
.until(
128+
(events, context) -> {
129+
log.info("ContextPredicate called!");
130+
log.info(" Events count: {}", (long) events.size());
131+
assertNotNull(context);
132+
log.info(" Context instance id: {}", context.instanceData().id());
133+
// Stop after 2 events
134+
return (long) events.size() >= 2;
135+
},
136+
WorkflowModelCollection.class)))
137+
.build();
138+
139+
WorkflowDefinition definition = app.workflowDefinition(workflow);
140+
WorkflowInstance instance = definition.instance(new Object());
141+
CompletableFuture<WorkflowModel> future = instance.start();
142+
143+
await()
144+
.atMost(Duration.ofSeconds(5))
145+
.until(() -> instance.status() == WorkflowStatus.WAITING);
146+
147+
EventPublisher publisher = app.eventPublishers().iterator().next();
148+
149+
log.info("Publishing event 1...");
150+
publisher.publish(createOrderEvent(instance.id(), 1)).toCompletableFuture().join();
151+
152+
log.info("Publishing event 2...");
153+
publisher.publish(createOrderEvent(instance.id(), 2)).toCompletableFuture().join();
154+
155+
// Should complete after 2 events
156+
await()
157+
.atMost(Duration.ofSeconds(5))
158+
.until(() -> instance.status() == WorkflowStatus.COMPLETED);
159+
160+
WorkflowModel result = future.join();
161+
long count = ((WorkflowModelCollection) result).size();
162+
log.info("Workflow completed with {} items", count);
163+
assertEquals(2, count);
164+
}
165+
}
166+
167+
@Test
168+
public void testToAnyWithUntilFilterPredicate() {
169+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
170+
Workflow workflow =
171+
FuncWorkflowBuilder.workflow("test-toany-until-filter")
172+
.tasks(
173+
listen(
174+
"waitOrders",
175+
toAny("order.created")
176+
.until(
177+
(events, workflowCtx, taskCtx) -> {
178+
log.info("FilterPredicate called!");
179+
log.info(" Events count: {}", (long) events.size());
180+
assertNotNull(workflowCtx);
181+
assertNotNull(taskCtx);
182+
log.info(" Task position: {}", taskCtx.position());
183+
return (long) events.size() >= 3;
184+
},
185+
WorkflowModelCollection.class)))
186+
.build();
187+
188+
WorkflowDefinition definition = app.workflowDefinition(workflow);
189+
WorkflowInstance instance = definition.instance(new Object());
190+
CompletableFuture<WorkflowModel> future = instance.start();
191+
192+
await()
193+
.atMost(Duration.ofSeconds(5))
194+
.until(() -> instance.status() == WorkflowStatus.WAITING);
195+
196+
EventPublisher publisher = app.eventPublishers().iterator().next();
197+
198+
log.info("Publishing event 1...");
199+
publisher.publish(createOrderEvent(instance.id(), 1)).toCompletableFuture().join();
200+
201+
log.info("Publishing event 2...");
202+
publisher.publish(createOrderEvent(instance.id(), 2)).toCompletableFuture().join();
203+
204+
log.info("Publishing event 3...");
205+
publisher.publish(createOrderEvent(instance.id(), 3)).toCompletableFuture().join();
206+
207+
await()
208+
.atMost(Duration.ofSeconds(5))
209+
.until(() -> instance.status() == WorkflowStatus.COMPLETED);
210+
211+
WorkflowModel result = future.join();
212+
long count = ((WorkflowModelCollection) result).size();
213+
log.info("Workflow completed with {} items", count);
214+
assertEquals(3, count);
215+
}
216+
}
217+
218+
public record Order(String id, String status, double amount) {}
219+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverless.workflow.impl.executors.func;
17+
18+
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;
19+
import static org.junit.jupiter.api.Assertions.*;
20+
21+
import io.serverlessworkflow.api.types.Workflow;
22+
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;
23+
import io.serverlessworkflow.impl.WorkflowApplication;
24+
import org.junit.jupiter.api.Test;
25+
26+
public class ListenUntilValidationTest {
27+
28+
@Test
29+
public void testUntilWithAllThrowsException() {
30+
IllegalArgumentException exception =
31+
assertThrows(
32+
IllegalArgumentException.class,
33+
() -> {
34+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
35+
Workflow workflow =
36+
FuncWorkflowBuilder.workflow("test-all-until-invalid")
37+
.tasks(
38+
listen(
39+
"waitOrders",
40+
toAll("order.created")
41+
.until(
42+
(io.serverlessworkflow.impl.WorkflowModelCollection
43+
events) -> events.stream().count() >= 3,
44+
io.serverlessworkflow.impl.WorkflowModelCollection.class)))
45+
.build();
46+
47+
app.workflowDefinition(workflow);
48+
}
49+
});
50+
51+
assertTrue(exception.getMessage().contains("until() is only supported with any()"));
52+
assertTrue(exception.getMessage().contains("ALL"));
53+
}
54+
55+
@Test
56+
public void testUntilWithOneThrowsException() {
57+
IllegalArgumentException exception =
58+
assertThrows(
59+
IllegalArgumentException.class,
60+
() -> {
61+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
62+
Workflow workflow =
63+
FuncWorkflowBuilder.workflow("test-one-until-invalid")
64+
.tasks(
65+
listen(
66+
"waitOrders",
67+
toOne("order.created")
68+
.until(
69+
(io.serverlessworkflow.impl.WorkflowModelCollection
70+
events) -> events.stream().count() >= 3,
71+
io.serverlessworkflow.impl.WorkflowModelCollection.class)))
72+
.build();
73+
74+
app.workflowDefinition(workflow);
75+
}
76+
});
77+
78+
assertTrue(exception.getMessage().contains("until() is only supported with any()"));
79+
assertTrue(exception.getMessage().contains("ONE"));
80+
}
81+
}

0 commit comments

Comments
 (0)