Skip to content

Commit c0085b0

Browse files
committed
[waterflow] support branch syntax for 'when' and 'then' clauses
1 parent 2bda813 commit c0085b0

10 files changed

Lines changed: 224 additions & 2 deletions

File tree

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMessenger.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,18 @@ default <I> void send(ProcessType type, Subscriber<I, ?> subscriber, List<FlowCo
5454
* @param <I> 流程实例执行时的入参数据类型,用于泛型推倒
5555
*/
5656
<I> void sendCallback(List<FlowContext<I>> contexts);
57+
58+
/**
59+
* Directly processes a list of flow contexts through the specified subscriber.
60+
* This method serves as a default implementation for immediate processing without
61+
* any intermediate transformations or routing.
62+
*
63+
* @param <I> The type of input data contained in the flow contexts.
64+
* @param type The type of processing to be performed.
65+
* @param subscriber The subscriber that will handle the processing.
66+
* @param context List of flow contexts to be processed.
67+
*/
68+
default <I> void directProcess(ProcessType type, Subscriber<I, ?> subscriber, List<FlowContext<I>> context) {
69+
subscriber.process(type, context);
70+
}
5771
}

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Conditions.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,21 @@ public <O> MatchHappen<O, D, I, F> match(Operators.Whether<I> whether,
4949
return new MatchHappen<>(whether, processor, this);
5050
}
5151

52+
/**
53+
* Creates a conditional branch that executes when the specified condition is met.
54+
* This establishes a processing path that will only be followed if the predicate
55+
* evaluates to true for the input data.
56+
*
57+
* @param <O> The output type of the branch processor.
58+
* @param whether The condition predicate that determines branch activation.
59+
* @param processor The transformation to apply when the condition is met.
60+
* @return A {@link WhenHappen} instance representing the conditional relationship,
61+
* allowing for further chaining of operations.
62+
*/
63+
public <O> WhenHappen<O, D, I, F> when(Operators.Whether<I> whether, Operators.Then<I, O> processor) {
64+
return new WhenHappen<>(whether, processor, this);
65+
}
66+
5267
/**
5368
* 在满足条件时跳转到指定节点。
5469
*
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*---------------------------------------------------------------------------------------------
2+
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
3+
* This file is a part of the ModelEngine Project.
4+
* Licensed under the MIT License. See License.txt in the project root for license information.
5+
*--------------------------------------------------------------------------------------------*/
6+
7+
package modelengine.fit.waterflow.domain.states;
8+
9+
import modelengine.fit.waterflow.domain.enums.SpecialDisplayNode;
10+
import modelengine.fit.waterflow.domain.flow.Flow;
11+
import modelengine.fit.waterflow.domain.stream.operators.Operators;
12+
13+
import java.util.ArrayList;
14+
import java.util.List;
15+
16+
/**
17+
* Represent conditional branches (when clause) in a processing flow.
18+
*
19+
* @param <O> The output data type of this node.
20+
* @param <D> The initial data type of the containing flow.
21+
* @param <F> The flow type used for generic type inference.
22+
*
23+
* @author 宋永坦
24+
* @since 2025-06-12
25+
*/
26+
public class WhenHappen<O, D, I, F extends Flow<D>> {
27+
private final State<I, D, I, F> node;
28+
private final List<State<O, D, ?, F>> branches = new ArrayList<>();
29+
30+
WhenHappen(Operators.Whether<I> whether, Operators.Then<I, O> processor, Conditions<D, I, F> conditions) {
31+
this.node = conditions.node;
32+
this.when(whether, processor);
33+
}
34+
35+
/**
36+
* Creates a conditional branch that executes when the specified condition is met.
37+
*
38+
* @param whether The condition predicate.
39+
* @param processor The transformation logic.
40+
* @return The current WhenHappen instance for method chaining.
41+
*/
42+
public WhenHappen<O, D, I, F> when(Operators.Whether<I> whether, Operators.Then<I, O> processor) {
43+
State<O, D, I, F> branch = new State<>(this.node.publisher()
44+
.map(input -> processor.process(input.getData()), whether)
45+
.displayAs(SpecialDisplayNode.BRANCH.name()), this.node.getFlow());
46+
this.branches.add(branch);
47+
return this;
48+
}
49+
50+
/**
51+
* Provides a default processing logic and terminates the conditional node.
52+
*
53+
* @param processor The handler to process unmatched inputs.
54+
* @return An {@link State} representing the join node of the conditional flow.
55+
* @throws IllegalArgumentException if processor is null
56+
*/
57+
public State<O, D, O, F> others(Operators.Then<I, O> processor) {
58+
this.when(null, processor);
59+
State<O, D, O, F> joinState = this.branches.get(0).just(any -> {});
60+
joinState.processor.displayAs(SpecialDisplayNode.OTHERS.name());
61+
this.branches.stream().skip(1).forEach(branch -> branch.publisher().subscribe(joinState.subscriber()));
62+
return joinState;
63+
}
64+
}

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import modelengine.fit.waterflow.domain.context.repo.flowlock.FlowLocks;
1313
import modelengine.fit.waterflow.domain.enums.FlowNodeType;
1414
import modelengine.fit.waterflow.domain.utils.UUIDUtil;
15+
import modelengine.fitframework.util.CollectionUtils;
1516

1617
import java.util.List;
1718
import java.util.stream.Collectors;
@@ -82,7 +83,12 @@ public void offer(List<FlowContext<I>> contexts) {
8283
.filter(context -> subscription.getWhether().is(context.getData()))
8384
.collect(Collectors.toList());
8485
matched.forEach(contexts::remove);
85-
subscription.cache(matched);
86+
// For order-sensitive data, directly synchronously executes the next conditional branch node.
87+
if (CollectionUtils.isNotEmpty(matched) && matched.get(0).getSession().preserved()) {
88+
subscription.process(matched);
89+
} else {
90+
subscription.cache(matched);
91+
}
8692
});
8793
}
8894

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public class To<I, O> extends IdGenerator implements Subscriber<I, O> {
140140
@Getter
141141
private ProcessMode processMode;
142142

143-
private Map<String, Integer> processingSessions = new ConcurrentHashMap<>();
143+
private final Map<String, Integer> processingSessions = new ConcurrentHashMap<>();
144144

145145
private Operators.Validator<I> validator = (repo, to) -> repo.requestMappingContext(to.streamId,
146146
to.froms.stream().map(Identity::getId).collect(Collectors.toList()),
@@ -291,6 +291,38 @@ public synchronized void accept(ProcessType type, List<FlowContext<I>> contexts)
291291
this.triggerNodeProcessor(type);
292292
}
293293

294+
@Override
295+
public void process(ProcessType type, List<FlowContext<I>> contexts) {
296+
Validation.isTrue(ProcessType.PROCESS.equals(type),
297+
"Direct processing requires PROCESS type, but received: " + type);
298+
this.directProcess(contexts);
299+
}
300+
301+
private void directProcess(List<FlowContext<I>> preList) {
302+
try {
303+
if (CollectionUtils.isEmpty(preList)) {
304+
return;
305+
}
306+
if (preList.size() == 1 && preList.get(0).getData() == null) {
307+
this.afterProcess(preList, new ArrayList<>());
308+
return;
309+
}
310+
List<FlowContext<O>> afterList = this.getProcessMode().process(this, preList);
311+
this.afterProcess(preList, afterList);
312+
if (CollectionUtils.isNotEmpty(afterList)) {
313+
feedback(afterList);
314+
this.onNext(afterList.get(0).getBatchId());
315+
}
316+
afterList.forEach(context -> this.emit(context.getData(), context.getSession()));
317+
} catch (Exception ex) {
318+
LOG.error("Node direct process exception. [streamId={}, nodeId={}, positionId={}, traceId={}, causedBy={}]",
319+
this.streamId, this.id, preList.get(0).getPosition(), preList.get(0).getTraceId(),
320+
ex.getClass().getName());
321+
LOG.debug("Node process exception details: ", ex);
322+
this.fail(ex, preList);
323+
}
324+
}
325+
294326
private synchronized void triggerNodeProcessor(ProcessType type) {
295327
if (type == ProcessType.PRE_PROCESS && (preProcessT == null || !preProcessRunning)) {
296328
preProcessRunning = true;

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/Operators.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,5 +292,24 @@ public interface Validator<T> {
292292
*/
293293
List<FlowContext<T>> validate(FlowContextRepo repo, To<T, ?> to);
294294
}
295+
296+
/**
297+
* Represents a conditional node's transformation from raw material to product.
298+
* This functional interface defines the processing operation that converts an
299+
* input of one type to an output of potentially different type.
300+
*
301+
* @param <T> The type of raw material (input) to be processed
302+
* @param <R> The type of product (output) to be produced
303+
*/
304+
@FunctionalInterface
305+
public interface Then<T, R> {
306+
/**
307+
* Transforms the input raw material into a processed product.
308+
*
309+
* @param input The raw material to be processed.
310+
* @return The transformed product result.
311+
*/
312+
R process(T input);
313+
}
295314
}
296315

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscriber.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,16 @@ public interface Subscriber<I, O> extends StreamIdentity, Emitter<O, FlowSession
3232
*/
3333
void accept(ProcessType type, List<FlowContext<I>> contexts);
3434

35+
/**
36+
* Processes a batch of flow contexts according to the specified processing type.
37+
* This method handles the core execution logic for the workflow engine, applying
38+
* the appropriate operations to each context in the batch.
39+
*
40+
* @param type The type of processing to perform.
41+
* @param contexts The list of flow contexts to process.
42+
*/
43+
void process(ProcessType type, List<FlowContext<I>> contexts);
44+
3545
/**
3646
* 设置节点block
3747
*

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscription.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@ public interface Subscription<I> extends StreamIdentity {
2727
*/
2828
void cache(List<FlowContext<I>> contexts);
2929

30+
/**
31+
* Immediately sends data to subscriber for processing the given flow contexts.
32+
* This method executes synchronously and blocks until all contexts have been processed.
33+
*
34+
* @param contexts The list of flow contexts to process and send to subscribers。
35+
*/
36+
void process(List<FlowContext<I>> contexts);
37+
3038
/**
3139
* getWhether
3240
*

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/When.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,4 +105,18 @@ public void cache(List<FlowContext<I>> contexts) {
105105
public String getStreamId() {
106106
return this.streamId;
107107
}
108+
109+
@Override
110+
public void process(List<FlowContext<I>> contexts) {
111+
if (CollectionUtils.isEmpty(contexts)) {
112+
return;
113+
}
114+
List<FlowContext<I>> converted = contexts.stream()
115+
.map(context -> context.convertData(context.getData(), context.getId())
116+
.setPosition(this.to.getId())
117+
.setStatus(FlowNodeStatus.READY))
118+
.collect(Collectors.toList());
119+
repo.updateStatus(converted, FlowNodeStatus.READY.toString(), this.to.getId());
120+
messenger.directProcess(this.to.isAuto() ? PROCESS : PRE_PROCESS, this.to, converted);
121+
}
108122
}

framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1137,5 +1137,45 @@ void try_a_complicated_map_reduce() throws InterruptedException {
11371137
assertEquals("150unit", result.get(0));
11381138
assertEquals("240unit", result.get(1));
11391139
}
1140+
1141+
@Test
1142+
void shouldExecuteRightBranchWhenUseWhenThenBranchGivenPreserved() {
1143+
List<Integer> result = new ArrayList<>();
1144+
ProcessFlow<Integer> flow = Flows.<Integer>create()
1145+
.conditions()
1146+
.when(i -> i >= 5, i -> i * 10)
1147+
.others(i -> i)
1148+
.reduce(Integer::sum)
1149+
.just(i -> result.add(i))
1150+
.close();
1151+
FlowSession session = new FlowSession(true);
1152+
Window window = session.begin();
1153+
flow.offer(new Integer[] {1, 2, 3, 4, 5, 6}, session);
1154+
window.complete();
1155+
1156+
FlowsTestUtil.waitUntil(() -> !result.isEmpty(), 1000);
1157+
assertEquals(1, result.size());
1158+
assertEquals(120, result.get(0));
1159+
}
1160+
1161+
@Test
1162+
void shouldExecuteRightBranchWhenUseWhenThenBranchGivenNotPreserved() {
1163+
List<Integer> result = new ArrayList<>();
1164+
ProcessFlow<Integer> flow = Flows.<Integer>create()
1165+
.conditions()
1166+
.when(i -> i >= 5, i -> i * 10)
1167+
.others(i -> i)
1168+
.reduce(Integer::sum)
1169+
.just(i -> result.add(i))
1170+
.close();
1171+
FlowSession session = new FlowSession(false);
1172+
Window window = session.begin();
1173+
flow.offer(new Integer[] {1, 2, 3, 4, 5, 6}, session);
1174+
window.complete();
1175+
1176+
FlowsTestUtil.waitUntil(() -> !result.isEmpty(), 1000);
1177+
assertEquals(1, result.size());
1178+
assertEquals(120, result.get(0));
1179+
}
11401180
}
11411181
}

0 commit comments

Comments
 (0)