Skip to content

Commit 5d6cdd3

Browse files
author
qq_62395577
committed
修改完毕,解决From=1但是pre>1的问题
1 parent 1e174f5 commit 5d6cdd3

1 file changed

Lines changed: 4 additions & 15 deletions

File tree

  • app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/streams/To.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ public Processors.Filter<I> postFilter() {
551551
}
552552

553553
private Processors.Filter<I> requestFilter(Processors.Filter<I> fallbackFilter) {
554-
if (!To.FanInMode.ALL.equals(this.fanInMode)) {
554+
if (!To.FanInMode.ALL.equals(this.fanInMode)||this.froms.size()==1) {
555555
return fallbackFilter;
556556
}
557557
return this::selectReadyMergeGroup;
@@ -579,25 +579,14 @@ public void setValidator(Processors.Validator<I> validator) {
579579
}
580580

581581
public void setFanInMode(To.FanInMode fanInMode) {
582-
fanInMode = Optional.ofNullable(fanInMode).orElse(To.FanInMode.ANY);
583-
// 当设置为ALL模式时,如果froms数量为1,则自动改为ANY模式
584-
if (To.FanInMode.ALL.equals(fanInMode) && this.froms.size() == 1) {
585-
this.fanInMode = To.FanInMode.ANY;
586-
} else {
587-
this.fanInMode = fanInMode;
588-
}
582+
this.fanInMode = Optional.ofNullable(fanInMode).orElse(To.FanInMode.ANY);
589583
}
590584

591585
/**
592586
* 设置为ALL模式,强制等待所有输入数据到齐后再处理
593587
*/
594588
public void setAllMode() {
595-
// 当设置为ALL模式时,如果froms数量为1,则自动改为ANY模式
596-
if (this.froms.size() == 1) {
597-
this.fanInMode = To.FanInMode.ANY;
598-
} else {
599-
this.fanInMode = To.FanInMode.ALL;
600-
}
589+
this.fanInMode = To.FanInMode.ALL;
601590
}
602591

603592
/**
@@ -756,7 +745,7 @@ public void setFailed(List<FlowContext<I>> pre, Exception ex) {
756745
}
757746

758747
private List<FlowContext<I>> mergeProcessInputs(List<FlowContext<I>> pre) {
759-
if (!To.FanInMode.ALL.equals(this.fanInMode) || pre.size() <= 1) {
748+
if (!To.FanInMode.ALL.equals(this.fanInMode)||froms.size()==1 || pre.size() <= 1) {
760749
return pre;
761750
}
762751
if (!(ProcessMode.MAPPING.equals(this.processMode)

0 commit comments

Comments
 (0)