Skip to content

Commit 690b8aa

Browse files
author
qq_62395577
committed
修改all模式的注入机制
1 parent 1d93d7b commit 690b8aa

7 files changed

Lines changed: 111 additions & 85 deletions

File tree

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowConditionNode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public FitStream.Processor<FlowData, FlowData> getProcessor(String streamId, Flo
7979
@Override
8080
protected void subscribe(
8181
FitStream.Publisher<FlowData> from, FitStream.Subscriber<FlowData, FlowData> to, FlowEvent event) {
82+
to.setFromFlowDefinition(true);
8283
from.subscribe(event.getMetaId(), to, null, this.getWhether(from.getStreamId(), event));
8384
}
8485

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowNode.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,11 @@ public FitStream.Subscriber<FlowData, FlowData> getSubscriber(String streamId, F
228228
* @param event {@link FlowEvent}节点之间的连线
229229
*/
230230
public void subscribe(String streamId, FlowEnv flowEnv, FlowNode toNode, FlowEvent event) {
231+
FitStream.Subscriber<FlowData, FlowData> toSubscriber = getTo(streamId, flowEnv.getRepo(),
232+
flowEnv.getMessenger(), flowEnv.getLocks(), toNode);
233+
toSubscriber.setFromFlowDefinition(true);
231234
this.subscribe(getFrom(streamId, flowEnv.getRepo(), flowEnv.getMessenger(), flowEnv.getLocks()),
232-
getTo(streamId, flowEnv.getRepo(), flowEnv.getMessenger(), flowEnv.getLocks(), toNode), event);
235+
toSubscriber, event);
233236
}
234237

235238
/**

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowParallelNode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public FitStream.Processor<FlowData, FlowData> getProcessor(String streamId, Flo
8484
@Override
8585
protected void subscribe(FitStream.Publisher<FlowData> from, FitStream.Subscriber<FlowData, FlowData> to,
8686
FlowEvent event) {
87+
this.joiner.setFromFlowDefinition(true);
8788
this.joiner.subscribe(event.getMetaId(), to);
8889
}
8990

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/FitStream.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -427,13 +427,22 @@ public interface Subscriber<I, O> extends StreamIdentity, InterStream<O> {
427427
*/
428428
void onGlobalAfter(Processors.Just<Callback<FlowContext<O>>> handler);
429429

430-
/**
431-
* 设置节点优先级
432-
*
433-
* @param order 优先级
434-
*/
435-
void setOrder(int order);
430+
/**
431+
* 设置节点优先级
432+
*
433+
* @param order 优先级
434+
*/
435+
void setOrder(int order);
436+
437+
/**
438+
* 设置是否来自流程定义,用于控制fanInMode的自动设置
439+
*
440+
* @param fromFlowDefinition 是否来自流程定义
441+
*/
442+
default void setFromFlowDefinition(boolean fromFlowDefinition) {
443+
// 默认实现为空,子类可覆盖
436444
}
445+
}
437446

438447
/**
439448
* publisher与subscriber之间的连接器

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/To.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ public class To<I, O> extends IdGenerator implements FitStream.Subscriber<I, O>
127127

128128
private Processors.Validator<I> validator = (i, all) -> true;
129129
private FanInMode fanInMode = FanInMode.ANY;
130+
private boolean fromFlowDefinition = false;
130131
private Processors.Map<FlowContext<I>, String> mergeKeyGenerator = this::defaultMergeKey;
131132
private Processors.Merger<I> merger;
132133

@@ -664,8 +665,19 @@ public ProcessMode getProcessMode() {
664665
@Override
665666
public void onSubscribe(FitStream.Subscription<?, I> subscription) {
666667
this.froms.add(subscription); // 将该节点的from的event加入
668+
if (this.fromFlowDefinition) {
667669
long fromCount = this.froms.stream().map(Identity::getId).distinct().count();
668670
this.fanInMode = fromCount > 1 ? FanInMode.ALL : FanInMode.ANY;
671+
}
672+
}
673+
674+
/**
675+
* 设置是否来自流程定义,用于控制fanInMode的自动设置
676+
*
677+
* @param fromFlowDefinition 是否来自流程定义
678+
*/
679+
public void setFromFlowDefinition(boolean fromFlowDefinition) {
680+
this.fromFlowDefinition = fromFlowDefinition;
669681
}
670682

671683
@Override

app-builder/waterflow/java/waterflow-service/src/test/java/modelengine/fit/waterflow/flowsengine/domain/flows/WaterFlowsTest.java

Lines changed: 51 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -326,57 +326,57 @@ void testFitStreamWithForkJoinEither() {
326326
assertEquals(0, output.get(0).s);
327327
}
328328

329-
// @Test
330-
// @DisplayName("流程实例condition节点以及match节点以及others节点流转逻辑")
331-
// void testFitStreamWithCondition() {
332-
// List<TestData> output = new ArrayList<>();
333-
// // test conditions and others for just
334-
// Flows.ProcessFlow<TestData> flow = Flows.<TestData>create(repo, messenger, locks)
335-
// .conditions()
336-
// .match(i -> i.getData().f > 10)
337-
// .just(i -> i.f++)
338-
// .match(i -> i.getData().s > 10)
339-
// .map(i -> {
340-
// i.s++;
341-
// return i;
342-
// })
343-
// .others(i -> {
344-
// i.t++;
345-
// return i;
346-
// })
347-
// .close(r -> output.add(r.get().getData()));
348-
// TestData input = new TestData();
349-
// flow.offer(input.first(11).second(0).third(0));
350-
// FlowsTestUtil.waitSingle(() -> output);
351-
// assertTestData(new TestData(12, 0, 0), output);
352-
// output.clear();
353-
//
354-
// flow.offer(input.first(0).second(11).third(0));
355-
// FlowsTestUtil.waitSingle(() -> output);
356-
// assertTestData(new TestData(0, 12, 0), output);
357-
// output.clear();
358-
//
359-
// flow.offer(input.first(0).second(0).third(11));
360-
// FlowsTestUtil.waitSingle(() -> output);
361-
// assertTestData(new TestData(0, 0, 12), output);
362-
//
363-
// // test when and others for map
364-
// output.clear();
365-
// flow = Flows.<TestData>create(repo, messenger, locks).when(i -> {
366-
// i.f++;
367-
// return i;
368-
// }).map(i -> {
369-
// i.f--;
370-
// return i;
371-
// }).conditions(i -> i.t++).match(i -> i.getData().f > 10).just(i -> i.f++).others(i -> {
372-
// i.t++;
373-
// output.add(i);
374-
// return i;
375-
// }).close();
376-
// flow.offer(input.first(0).second(0).third(11));
377-
// FlowsTestUtil.waitSingle(() -> output);
378-
// assertTestData(new TestData(0, 0, 13), output);
379-
// }
329+
@Test
330+
@DisplayName("流程实例condition节点以及match节点以及others节点流转逻辑")
331+
void testFitStreamWithCondition() {
332+
List<TestData> output = new ArrayList<>();
333+
// test conditions and others for just
334+
Flows.ProcessFlow<TestData> flow = Flows.<TestData>create(repo, messenger, locks)
335+
.conditions()
336+
.match(i -> i.getData().f > 10)
337+
.just(i -> i.f++)
338+
.match(i -> i.getData().s > 10)
339+
.map(i -> {
340+
i.s++;
341+
return i;
342+
})
343+
.others(i -> {
344+
i.t++;
345+
return i;
346+
})
347+
.close(r -> output.add(r.get().getData()));
348+
TestData input = new TestData();
349+
flow.offer(input.first(11).second(0).third(0));
350+
FlowsTestUtil.waitSingle(() -> output);
351+
assertTestData(new TestData(12, 0, 0), output);
352+
output.clear();
353+
354+
flow.offer(input.first(0).second(11).third(0));
355+
FlowsTestUtil.waitSingle(() -> output);
356+
assertTestData(new TestData(0, 12, 0), output);
357+
output.clear();
358+
359+
flow.offer(input.first(0).second(0).third(11));
360+
FlowsTestUtil.waitSingle(() -> output);
361+
assertTestData(new TestData(0, 0, 12), output);
362+
363+
// test when and others for map
364+
output.clear();
365+
flow = Flows.<TestData>create(repo, messenger, locks).when(i -> {
366+
i.f++;
367+
return i;
368+
}).map(i -> {
369+
i.f--;
370+
return i;
371+
}).conditions(i -> i.t++).match(i -> i.getData().f > 10).just(i -> i.f++).others(i -> {
372+
i.t++;
373+
output.add(i);
374+
return i;
375+
}).close();
376+
flow.offer(input.first(0).second(0).third(11));
377+
FlowsTestUtil.waitSingle(() -> output);
378+
assertTestData(new TestData(0, 0, 13), output);
379+
}
380380

381381
private void assertTestData(TestData expected, List<TestData> output) {
382382
assertEquals(expected.f, output.get(0).f);

app-builder/waterflow/java/waterflow-service/src/test/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/FlowContextPersistTest.java

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -238,33 +238,33 @@ void testFlowContextPersistWithBlockSuccess() {
238238
Assertions.assertEquals(4, result.size());
239239
}
240240

241-
// @Test
242-
// @DisplayName("测试带有condition节点流程实例持久化成功")
243-
// void testFlowContextPersistWithCondition() {
244-
// List<FlowData> result = new ArrayList<>();
245-
// FlowData data = genFlowData("url", "www.123.com");
246-
// FlowData data1 = genFlowData("applyService", "fitable");
247-
//
248-
// Flows.ProcessFlow<FlowData> flow = Flows.<FlowData>create(REPO, MEMO_MESSENGER, LOCKS)
249-
// .conditions()
250-
// .match(i -> i.getData().getBusinessData().equals(data.getBusinessData()))
251-
// .just(i -> i.getBusinessData().put("url", "success"))
252-
// .match(i -> i.getData().getBusinessData().equals(data1.getBusinessData()))
253-
// .just(i -> i.getBusinessData().put("applyService", "success"))
254-
// .others(input -> input)
255-
// .close(r -> result.add(r.get().getData()));
256-
//
257-
// flow.offer(data);
258-
// FlowsTestUtil.waitSingle(() -> result);
259-
// FlowData data2 = genFlowData("url", "success");
260-
// assertEquals(data2.getBusinessData(), result.get(0).getBusinessData());
261-
//
262-
// result.clear();
263-
// flow.offer(data1);
264-
// FlowsTestUtil.waitSingle(() -> result);
265-
// FlowData data3 = genFlowData("applyService", "success");
266-
// assertEquals(data3.getBusinessData(), result.get(0).getBusinessData());
267-
// }
241+
@Test
242+
@DisplayName("测试带有condition节点流程实例持久化成功")
243+
void testFlowContextPersistWithCondition() {
244+
List<FlowData> result = new ArrayList<>();
245+
FlowData data = genFlowData("url", "www.123.com");
246+
FlowData data1 = genFlowData("applyService", "fitable");
247+
248+
Flows.ProcessFlow<FlowData> flow = Flows.<FlowData>create(REPO, MEMO_MESSENGER, LOCKS)
249+
.conditions()
250+
.match(i -> i.getData().getBusinessData().equals(data.getBusinessData()))
251+
.just(i -> i.getBusinessData().put("url", "success"))
252+
.match(i -> i.getData().getBusinessData().equals(data1.getBusinessData()))
253+
.just(i -> i.getBusinessData().put("applyService", "success"))
254+
.others(input -> input)
255+
.close(r -> result.add(r.get().getData()));
256+
257+
flow.offer(data);
258+
FlowsTestUtil.waitSingle(() -> result);
259+
FlowData data2 = genFlowData("url", "success");
260+
assertEquals(data2.getBusinessData(), result.get(0).getBusinessData());
261+
262+
result.clear();
263+
flow.offer(data1);
264+
FlowsTestUtil.waitSingle(() -> result);
265+
FlowData data3 = genFlowData("applyService", "success");
266+
assertEquals(data3.getBusinessData(), result.get(0).getBusinessData());
267+
}
268268

269269
@Test
270270
@DisplayName("测试一个节点不同实例context查找某一个实例context成功")

0 commit comments

Comments
 (0)