Skip to content

Commit c6e2c03

Browse files
fix: Exception on loop after agent call (#1289)
* fix: Exception on loop after agent call Signed-off-by: Matheus Andre <matheusandr2@gmail> Signed-off-by: Matheus Andre <matheusandr2@gmail.com> * fix: normalize optional fields in ForTaskFunction and enhance serialization tests Signed-off-by: Matheus Andre <matheusandr2@gmail.com> * ajust JavaForExecutorBuilder and ForTaskFunction Signed-off-by: Matheus Andre <matheusandr2@gmail.com> * fix: collectionFilterObject Signed-off-by: Matheus Andre <matheusandr2@gmail.com> --------- Signed-off-by: Matheus Andre <matheusandr2@gmail> Signed-off-by: Matheus Andre <matheusandr2@gmail.com>
1 parent c8e2ac9 commit c6e2c03

3 files changed

Lines changed: 154 additions & 7 deletions

File tree

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaForExecutorBuilder.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,14 @@ protected WorkflowValueResolver<Collection<?>> buildCollectionFilter() {
6969
}
7070

7171
private Object collectionFilterObject(ForTaskFunction taskFunctions) {
72-
return taskFunctions.getForClass().isPresent()
73-
? new TypedFunction(
74-
taskFunctions.getCollection(), taskFunctions.getForClass().orElseThrow())
75-
: taskFunctions.getCollection();
72+
return taskFunctions
73+
.getForClass()
74+
.<Object>map(forClass -> typedCollectionFunction(taskFunctions, forClass))
75+
.orElse(taskFunctions.getCollection());
76+
}
77+
78+
@SuppressWarnings({"rawtypes", "unchecked"})
79+
private Object typedCollectionFunction(ForTaskFunction taskFunctions, Class<?> forClass) {
80+
return new TypedFunction(taskFunctions.getCollection(), forClass);
7681
}
7782
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverless.workflow.impl.executors.func;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import io.serverlessworkflow.api.types.Document;
21+
import io.serverlessworkflow.api.types.ForTaskConfiguration;
22+
import io.serverlessworkflow.api.types.Task;
23+
import io.serverlessworkflow.api.types.TaskItem;
24+
import io.serverlessworkflow.api.types.Workflow;
25+
import io.serverlessworkflow.api.types.func.CallJava;
26+
import io.serverlessworkflow.api.types.func.CallTaskJava;
27+
import io.serverlessworkflow.api.types.func.ForTaskFunction;
28+
import io.serverlessworkflow.impl.WorkflowApplication;
29+
import java.io.ByteArrayInputStream;
30+
import java.io.ByteArrayOutputStream;
31+
import java.io.ObjectInputStream;
32+
import java.io.ObjectOutputStream;
33+
import java.lang.reflect.Field;
34+
import java.util.Collection;
35+
import java.util.List;
36+
import java.util.concurrent.ExecutionException;
37+
import org.junit.jupiter.api.Test;
38+
39+
class ForTaskFunctionRegressionTest {
40+
41+
@Test
42+
void initializesOptionalFieldsAsEmpty() {
43+
ForTaskFunction taskFunction = new ForTaskFunction();
44+
45+
assertThat(taskFunction.getWhileClass()).isNotNull().isEmpty();
46+
assertThat(taskFunction.getItemClass()).isNotNull().isEmpty();
47+
assertThat(taskFunction.getForClass()).isNotNull().isEmpty();
48+
}
49+
50+
@Test
51+
void optionalFieldsSurviveJavaSerializationRoundTrip() throws Exception {
52+
ForTaskFunction taskFunction = new ForTaskFunction();
53+
clearField(taskFunction, "whileClass");
54+
clearField(taskFunction, "itemClass");
55+
clearField(taskFunction, "forClass");
56+
clearField(taskFunction, "collection");
57+
58+
ForTaskFunction copy = roundTrip(taskFunction);
59+
60+
assertThat(copy.getWhileClass()).isNotNull().isEmpty();
61+
assertThat(copy.getItemClass()).isNotNull().isEmpty();
62+
assertThat(copy.getForClass()).isNotNull().isEmpty();
63+
}
64+
65+
@Test
66+
void forLoopWithExplicitCollectionClassExecutesSuccessfully()
67+
throws InterruptedException, ExecutionException {
68+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
69+
ForTaskConfiguration forConfig = new ForTaskConfiguration();
70+
Workflow workflow =
71+
new Workflow()
72+
.withDocument(
73+
new Document()
74+
.withNamespace("test")
75+
.withName("loop-with-collection-class")
76+
.withVersion("1.0"))
77+
.withDo(
78+
List.of(
79+
new TaskItem(
80+
"forLoop",
81+
new Task()
82+
.withForTask(
83+
new ForTaskFunction()
84+
.withWhile(CallTest::isEven)
85+
.withCollection(v -> v, Collection.class)
86+
.withFor(forConfig)
87+
.withDo(
88+
List.of(
89+
new TaskItem(
90+
"javaCall",
91+
new Task()
92+
.withCallTask(
93+
new CallTaskJava(
94+
CallJava.loopFunction(
95+
CallTest::sum,
96+
forConfig.getEach()))))))))));
97+
98+
var result = app.workflowDefinition(workflow).instance(List.of(2, 4, 6)).start().get();
99+
100+
assertThat(result.asNumber().orElseThrow()).isEqualTo(12);
101+
}
102+
}
103+
104+
private static ForTaskFunction roundTrip(ForTaskFunction taskFunction) throws Exception {
105+
ByteArrayOutputStream output = new ByteArrayOutputStream();
106+
try (ObjectOutputStream oos = new ObjectOutputStream(output)) {
107+
oos.writeObject(taskFunction);
108+
}
109+
110+
try (ObjectInputStream ois =
111+
new ObjectInputStream(new ByteArrayInputStream(output.toByteArray()))) {
112+
return (ForTaskFunction) ois.readObject();
113+
}
114+
}
115+
116+
private static void clearField(Object target, String fieldName)
117+
throws ReflectiveOperationException {
118+
Field field = target.getClass().getDeclaredField(fieldName);
119+
field.setAccessible(true);
120+
field.set(target, null);
121+
}
122+
}

experimental/types/src/main/java/io/serverlessworkflow/api/types/func/ForTaskFunction.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package io.serverlessworkflow.api.types.func;
1717

1818
import io.serverlessworkflow.api.types.ForTask;
19+
import java.io.IOException;
20+
import java.io.ObjectInputStream;
1921
import java.util.Collection;
2022
import java.util.Optional;
2123
import java.util.function.Function;
@@ -24,9 +26,9 @@ public class ForTaskFunction extends ForTask {
2426

2527
private static final long serialVersionUID = 1L;
2628
private LoopPredicateIndexFilter<?, ?> whilePredicate;
27-
private Optional<Class<?>> whileClass;
28-
private Optional<Class<?>> itemClass;
29-
private Optional<Class<?>> forClass;
29+
private Optional<Class<?>> whileClass = Optional.empty();
30+
private Optional<Class<?>> itemClass = Optional.empty();
31+
private Optional<Class<?>> forClass = Optional.empty();
3032
private Function<?, Collection<?>> collection;
3133

3234
public <T, V> ForTaskFunction withWhile(LoopPredicate<T, V> whilePredicate) {
@@ -147,4 +149,22 @@ public Optional<Class<?>> getItemClass() {
147149
public Function<?, Collection<?>> getCollection() {
148150
return collection;
149151
}
152+
153+
private void normalizeOptionalFields() {
154+
if (whileClass == null) {
155+
whileClass = Optional.empty();
156+
}
157+
if (itemClass == null) {
158+
itemClass = Optional.empty();
159+
}
160+
if (forClass == null) {
161+
forClass = Optional.empty();
162+
}
163+
}
164+
165+
private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException {
166+
input.defaultReadObject();
167+
// Preserve compatibility with older serialized instances that may have null optionals.
168+
normalizeOptionalFields();
169+
}
150170
}

0 commit comments

Comments
 (0)