Skip to content

Commit 1e174f5

Browse files
author
qq_62395577
committed
修改完毕,解决From=1但是pre>1的问题
1 parent 4569920 commit 1e174f5

3 files changed

Lines changed: 16 additions & 8 deletions

File tree

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowConditionNode.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import modelengine.fit.waterflow.flowsengine.domain.flows.definitions.nodes.events.FlowEvent;
2222
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.FitStream;
2323
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.Processors;
24-
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.To;
2524
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.nodes.ConditionsNode;
2625
import modelengine.fit.waterflow.flowsengine.domain.flows.utils.FlowExecuteInfoUtil;
2726
import modelengine.fit.waterflow.flowsengine.utils.OhScriptExecutor;
@@ -81,7 +80,7 @@ public FitStream.Processor<FlowData, FlowData> getProcessor(String streamId, Flo
8180
protected void subscribe(
8281
FitStream.Publisher<FlowData> from, FitStream.Subscriber<FlowData, FlowData> to, FlowEvent event) {
8382
from.subscribe(event.getMetaId(), to, null, this.getWhether(from.getStreamId(), event));
84-
to.setFanInMode(To.FanInMode.ALL);
83+
to.setAllMode();
8584
}
8685

8786
private Processors.Whether<FlowData> getWhether(String streamId, FlowEvent event) {

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/definitions/nodes/FlowNode.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import modelengine.fit.waterflow.flowsengine.domain.flows.enums.FlowNodeType;
5050
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.FitStream;
5151
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.Processors;
52-
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.To;
5352
import modelengine.fit.waterflow.spi.FlowExceptionService;
5453
import modelengine.fitframework.broker.client.BrokerClient;
5554
import modelengine.fitframework.broker.client.filter.route.FitableIdFilter;
@@ -229,9 +228,8 @@ public FitStream.Subscriber<FlowData, FlowData> getSubscriber(String streamId, F
229228
* @param event {@link FlowEvent}节点之间的连线
230229
*/
231230
public void subscribe(String streamId, FlowEnv flowEnv, FlowNode toNode, FlowEvent event) {
232-
FitStream.Subscriber<FlowData, FlowData> toSubscriber = getTo(streamId, flowEnv.getRepo(),
233-
flowEnv.getMessenger(), flowEnv.getLocks(), toNode);
234-
toSubscriber.setFanInMode(To.FanInMode.ALL);
231+
FitStream.Subscriber<FlowData, FlowData> toSubscriber = getTo(streamId, flowEnv.getRepo(), flowEnv.getMessenger(), flowEnv.getLocks(), toNode);
232+
toSubscriber.setAllMode();
235233
this.subscribe(getFrom(streamId, flowEnv.getRepo(), flowEnv.getMessenger(), flowEnv.getLocks()),
236234
toSubscriber, event);
237235
}

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/To.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -579,14 +579,25 @@ public void setValidator(Processors.Validator<I> validator) {
579579
}
580580

581581
public void setFanInMode(To.FanInMode fanInMode) {
582-
this.fanInMode = Optional.ofNullable(fanInMode).orElse(To.FanInMode.ANY);
582+
fanInMode = Optional.ofNullable(fanInMode).orElse(To.FanInMode.ANY);
583+
// 当设置为ALL模式时,如果froms数量为1,则自动改为ANY模式
584+
if (To.FanInMode.ALL.equals(fanInMode) && this.froms.size() == 1) {
585+
this.fanInMode = To.FanInMode.ANY;
586+
} else {
587+
this.fanInMode = fanInMode;
588+
}
583589
}
584590

585591
/**
586592
* 设置为ALL模式,强制等待所有输入数据到齐后再处理
587593
*/
588594
public void setAllMode() {
589-
this.fanInMode = To.FanInMode.ALL;
595+
// 当设置为ALL模式时,如果froms数量为1,则自动改为ANY模式
596+
if (this.froms.size() == 1) {
597+
this.fanInMode = To.FanInMode.ANY;
598+
} else {
599+
this.fanInMode = To.FanInMode.ALL;
600+
}
590601
}
591602

592603
/**

0 commit comments

Comments
 (0)