Skip to content

Commit 37b3f39

Browse files
authored
[Fix #1339] Add SerializableFunction support to ForEach on DSL (#1340)
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 1d7838d commit 37b3f39

4 files changed

Lines changed: 91 additions & 11 deletions

File tree

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForTaskBuilder.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,17 @@ public <T, V> FuncForTaskBuilder whileC(LoopPredicateIndex<T, V> predicate) {
6464
return this;
6565
}
6666

67-
public <T> FuncForTaskBuilder collection(Function<T, Collection<?>> collectionF) {
67+
public <T, V> FuncForTaskBuilder collection(Function<T, Collection<V>> collectionF) {
6868
this.forTaskFunction.withCollection(collectionF);
6969
return this;
7070
}
7171

72+
public <T, V> FuncForTaskBuilder collection(
73+
Function<T, Collection<V>> collectionF, Class<T> clazz) {
74+
this.forTaskFunction.withCollection(collectionF, clazz);
75+
return this;
76+
}
77+
7278
public <T, V, R> FuncForTaskBuilder tasks(String name, LoopFunction<T, V, R> function) {
7379
if (name == null || name.isBlank()) {
7480
name = "for-task-" + this.items.size();

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.serverlessworkflow.api.types.OAuth2AuthenticationData;
2727
import io.serverlessworkflow.api.types.func.ContextFunction;
2828
import io.serverlessworkflow.api.types.func.FilterFunction;
29+
import io.serverlessworkflow.api.types.func.LoopFunction;
2930
import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder;
3031
import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder;
3132
import io.serverlessworkflow.fluent.func.FuncRaiseTaskBuilder;
@@ -1064,9 +1065,25 @@ public static FuncTaskConfigurer switchWhenOrElse(
10641065
* @param <T> input type for the collection function
10651066
* @return list configurer
10661067
*/
1067-
public static <T> FuncTaskConfigurer forEach(
1068-
Function<T, Collection<?>> collection, Consumer<FuncTaskItemListBuilder> body) {
1069-
return list -> list.forEach(j -> j.collection(collection).tasks(body));
1068+
public static <T, V> FuncTaskConfigurer forEach(
1069+
SerializableFunction<T, Collection<V>> collection, Consumer<FuncTaskItemListBuilder> body) {
1070+
return list ->
1071+
list.forEach(
1072+
j -> j.collection(collection, ReflectionUtils.inferInputType(collection)).tasks(body));
1073+
}
1074+
1075+
public static <T, V> FuncTaskConfigurer forEach(
1076+
SerializableFunction<T, Collection<V>> collection, LoopFunction<T, V, ?> function) {
1077+
return list ->
1078+
list.forEach(
1079+
j ->
1080+
j.collection(collection, ReflectionUtils.inferInputType(collection))
1081+
.tasks(function));
1082+
}
1083+
1084+
public static <T, V> FuncTaskConfigurer forEachItem(
1085+
SerializableFunction<T, Collection<V>> collection, Function<V, ?> function) {
1086+
return forEach(collection, ((t, v) -> function.apply((V) v)));
10701087
}
10711088

10721089
/**
@@ -1077,9 +1094,9 @@ public static <T> FuncTaskConfigurer forEach(
10771094
* @param <T> ignored (kept for signature consistency)
10781095
* @return list configurer
10791096
*/
1080-
public static <T> FuncTaskConfigurer forEach(
1081-
Collection<?> collection, Consumer<FuncTaskItemListBuilder> body) {
1082-
Function<T, Collection<?>> f = ctx -> (Collection<?>) collection;
1097+
public static <T, V> FuncTaskConfigurer forEach(
1098+
Collection<V> collection, Consumer<FuncTaskItemListBuilder> body) {
1099+
Function<T, Collection<V>> f = ctx -> collection;
10831100
return list -> list.forEach(j -> j.collection(f).tasks(body));
10841101
}
10851102

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.fluent.test;
17+
18+
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*;
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import io.serverlessworkflow.api.types.Workflow;
22+
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;
23+
import io.serverlessworkflow.impl.WorkflowApplication;
24+
import io.serverlessworkflow.impl.WorkflowModel;
25+
import java.util.List;
26+
import org.junit.jupiter.api.Test;
27+
28+
public class ForEachFuncTest {
29+
30+
private static record Order(String id) {}
31+
32+
private static record EnhancedOrder(String id, int salary) {}
33+
34+
private static record OrdersPayload(List<Order> orders) {}
35+
36+
@Test
37+
void testForEachIteration() throws Exception {
38+
39+
Workflow workflow =
40+
FuncWorkflowBuilder.workflow("foreach-workflow")
41+
.tasks(forEachItem(OrdersPayload::orders, ForEachFuncTest::enhace))
42+
.build();
43+
44+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
45+
OrdersPayload input =
46+
new OrdersPayload(
47+
List.of(new Order("ORD-001"), new Order("ORD-002"), new Order("ORD-003")));
48+
WorkflowModel result = app.workflowDefinition(workflow).instance(input).start().join();
49+
assertThat(result.as(EnhancedOrder.class).orElseThrow().id())
50+
.isEqualTo(input.orders().get(input.orders.size() - 1).id());
51+
}
52+
}
53+
54+
private static EnhancedOrder enhace(Order order) {
55+
return new EnhancedOrder(order.id(), 1000);
56+
}
57+
}

experimental/types/src/main/java/io/serverlessworkflow/api/types/func/ForTaskFunction.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class ForTaskFunction extends ForTask {
2929
private Optional<Class<?>> whileClass = Optional.empty();
3030
private Optional<Class<?>> itemClass = Optional.empty();
3131
private Optional<Class<?>> forClass = Optional.empty();
32-
private Function<?, Collection<?>> collection;
32+
private Function collection;
3333

3434
public <T, V> ForTaskFunction withWhile(LoopPredicate<T, V> whilePredicate) {
3535
return withWhile(toPredicate(whilePredicate));
@@ -119,12 +119,12 @@ private <T, V> ForTaskFunction withWhile(
119119
return this;
120120
}
121121

122-
public <T> ForTaskFunction withCollection(Function<T, Collection<?>> collection) {
122+
public <T, V> ForTaskFunction withCollection(Function<T, Collection<V>> collection) {
123123
return withCollection(collection, null);
124124
}
125125

126-
public <T> ForTaskFunction withCollection(
127-
Function<T, Collection<?>> collection, Class<T> colArgClass) {
126+
public <T, V> ForTaskFunction withCollection(
127+
Function<T, Collection<V>> collection, Class<T> colArgClass) {
128128
this.collection = collection;
129129
this.forClass = Optional.ofNullable(colArgClass);
130130
return this;

0 commit comments

Comments
 (0)