Skip to content

Commit 4569920

Browse files
author
qq_62395577
committed
尝试修改try1
1 parent b06d21c commit 4569920

5 files changed

Lines changed: 58 additions & 42 deletions

File tree

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowConditionNode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import modelengine.fit.waterflow.flowsengine.domain.flows.definitions.nodes.events.FlowEvent;
2222
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.FitStream;
2323
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.Processors;
24+
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.To;
2425
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.nodes.ConditionsNode;
2526
import modelengine.fit.waterflow.flowsengine.domain.flows.utils.FlowExecuteInfoUtil;
2627
import modelengine.fit.waterflow.flowsengine.utils.OhScriptExecutor;
@@ -79,8 +80,8 @@ public FitStream.Processor<FlowData, FlowData> getProcessor(String streamId, Flo
7980
@Override
8081
protected void subscribe(
8182
FitStream.Publisher<FlowData> from, FitStream.Subscriber<FlowData, FlowData> to, FlowEvent event) {
82-
to.setFromFlowDefinition(true);
8383
from.subscribe(event.getMetaId(), to, null, this.getWhether(from.getStreamId(), event));
84+
to.setFanInMode(To.FanInMode.ALL);
8485
}
8586

8687
private Processors.Whether<FlowData> getWhether(String streamId, FlowEvent event) {

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowNode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import modelengine.fit.waterflow.flowsengine.domain.flows.enums.FlowNodeType;
5050
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.FitStream;
5151
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.Processors;
52+
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.To;
5253
import modelengine.fit.waterflow.spi.FlowExceptionService;
5354
import modelengine.fitframework.broker.client.BrokerClient;
5455
import modelengine.fitframework.broker.client.filter.route.FitableIdFilter;
@@ -230,7 +231,7 @@ public FitStream.Subscriber<FlowData, FlowData> getSubscriber(String streamId, F
230231
public void subscribe(String streamId, FlowEnv flowEnv, FlowNode toNode, FlowEvent event) {
231232
FitStream.Subscriber<FlowData, FlowData> toSubscriber = getTo(streamId, flowEnv.getRepo(),
232233
flowEnv.getMessenger(), flowEnv.getLocks(), toNode);
233-
toSubscriber.setFromFlowDefinition(true);
234+
toSubscriber.setFanInMode(To.FanInMode.ALL);
234235
this.subscribe(getFrom(streamId, flowEnv.getRepo(), flowEnv.getMessenger(), flowEnv.getLocks()),
235236
toSubscriber, event);
236237
}

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowParallelNode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public FitStream.Processor<FlowData, FlowData> getProcessor(String streamId, Flo
8484
@Override
8585
protected void subscribe(FitStream.Publisher<FlowData> from, FitStream.Subscriber<FlowData, FlowData> to,
8686
FlowEvent event) {
87-
this.joiner.setFromFlowDefinition(true);
87+
this.joiner.setAllMode();
8888
this.joiner.subscribe(event.getMetaId(), to);
8989
}
9090

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/FitStream.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -427,22 +427,32 @@ public interface Subscriber<I, O> extends StreamIdentity, InterStream<O> {
427427
*/
428428
void onGlobalAfter(Processors.Just<Callback<FlowContext<O>>> handler);
429429

430-
/**
431-
* 设置节点优先级
432-
*
433-
* @param order 优先级
434-
*/
435-
void setOrder(int order);
430+
/**
431+
* 设置节点优先级
432+
*
433+
* @param order 优先级
434+
*/
435+
void setOrder(int order);
436436

437-
/**
438-
* 设置是否来自流程定义,用于控制fanInMode的自动设置
439-
*
440-
* @param fromFlowDefinition 是否来自流程定义
441-
*/
442-
default void setFromFlowDefinition(boolean fromFlowDefinition) {
443-
// 默认实现为空,子类可覆盖
437+
/**
438+
* 设置FanIn模式
439+
*
440+
* @param fanInMode FanIn模式
441+
*/
442+
void setFanInMode(To.FanInMode fanInMode);
443+
444+
/**
445+
* 设置为ALL模式,强制等待所有输入数据到齐后再处理
446+
*/
447+
default void setAllMode() {
448+
}
449+
450+
/**
451+
* 设置为ANY模式,有数据到达即处理
452+
*/
453+
default void setAnyMode() {
454+
}
444455
}
445-
}
446456

447457
/**
448458
* publisher与subscriber之间的连接器

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/To.java

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@
5353
* @since 2023/08/14
5454
*/
5555
public class To<I, O> extends IdGenerator implements FitStream.Subscriber<I, O> {
56+
/**
57+
* FanIn模式枚举
58+
*/
59+
public enum FanInMode {
60+
ANY,
61+
ALL
62+
}
63+
5664
/**
5765
* 最大流量,也就是该节点可以处理的最大数据量
5866
*/
@@ -126,8 +134,7 @@ public class To<I, O> extends IdGenerator implements FitStream.Subscriber<I, O>
126134
private Boolean isAsyncJob = false;
127135

128136
private Processors.Validator<I> validator = (i, all) -> true;
129-
private FanInMode fanInMode = FanInMode.ANY;
130-
private boolean fromFlowDefinition = false;
137+
private To.FanInMode fanInMode = To.FanInMode.ANY;
131138
private Processors.Map<FlowContext<I>, String> mergeKeyGenerator = this::defaultMergeKey;
132139
private Processors.Merger<I> merger;
133140

@@ -544,7 +551,7 @@ public Processors.Filter<I> postFilter() {
544551
}
545552

546553
private Processors.Filter<I> requestFilter(Processors.Filter<I> fallbackFilter) {
547-
if (!FanInMode.ALL.equals(this.fanInMode)) {
554+
if (!To.FanInMode.ALL.equals(this.fanInMode)) {
548555
return fallbackFilter;
549556
}
550557
return this::selectReadyMergeGroup;
@@ -571,8 +578,22 @@ public void setValidator(Processors.Validator<I> validator) {
571578
}
572579
}
573580

574-
public void setFanInMode(FanInMode fanInMode) {
575-
this.fanInMode = Optional.ofNullable(fanInMode).orElse(FanInMode.ANY);
581+
public void setFanInMode(To.FanInMode fanInMode) {
582+
this.fanInMode = Optional.ofNullable(fanInMode).orElse(To.FanInMode.ANY);
583+
}
584+
585+
/**
586+
* 设置为ALL模式,强制等待所有输入数据到齐后再处理
587+
*/
588+
public void setAllMode() {
589+
this.fanInMode = To.FanInMode.ALL;
590+
}
591+
592+
/**
593+
* 设置为ANY模式,有数据到达即处理
594+
*/
595+
public void setAnyMode() {
596+
this.fanInMode = To.FanInMode.ANY;
576597
}
577598

578599
public void setMergeKeyGenerator(Processors.Map<FlowContext<I>, String> mergeKeyGenerator) {
@@ -606,7 +627,7 @@ private <T1> List<FlowContext<T1>> filterReadyByFanIn(List<FlowContext<T1>> cand
606627
if (CollectionUtils.isEmpty(candidates)) {
607628
return Collections.emptyList();
608629
}
609-
if (FanInMode.ANY.equals(this.fanInMode)) {
630+
if (To.FanInMode.ANY.equals(this.fanInMode)) {
610631
return candidates;
611632
}
612633

@@ -664,20 +685,7 @@ public ProcessMode getProcessMode() {
664685

665686
@Override
666687
public void onSubscribe(FitStream.Subscription<?, I> subscription) {
667-
this.froms.add(subscription); // 将该节点的from的event加入
668-
if (this.fromFlowDefinition) {
669-
long fromCount = this.froms.stream().map(Identity::getId).distinct().count();
670-
this.fanInMode = fromCount > 1 ? FanInMode.ALL : FanInMode.ANY;
671-
}
672-
}
673-
674-
/**
675-
* 设置是否来自流程定义,用于控制fanInMode的自动设置
676-
*
677-
* @param fromFlowDefinition 是否来自流程定义
678-
*/
679-
public void setFromFlowDefinition(boolean fromFlowDefinition) {
680-
this.fromFlowDefinition = fromFlowDefinition;
688+
this.froms.add(subscription);
681689
}
682690

683691
@Override
@@ -737,7 +745,7 @@ public void setFailed(List<FlowContext<I>> pre, Exception ex) {
737745
}
738746

739747
private List<FlowContext<I>> mergeProcessInputs(List<FlowContext<I>> pre) {
740-
if (!FanInMode.ALL.equals(this.fanInMode) || pre.size() <= 1) {
748+
if (!To.FanInMode.ALL.equals(this.fanInMode) || pre.size() <= 1) {
741749
return pre;
742750
}
743751
if (!(ProcessMode.MAPPING.equals(this.processMode)
@@ -1233,8 +1241,4 @@ private <T1, R1> void handleProcessConcurrentConflict(To<T1, R1> to) {
12331241
/*
12341242
多个数据到达后采用的处理方式,ANY表示即到即用,ALL表示所有数据到来才能使用
12351243
* */
1236-
public enum FanInMode {
1237-
ANY,
1238-
ALL
1239-
}
12401244
}

0 commit comments

Comments
 (0)