Skip to content

Commit 0a50001

Browse files
committed
[waterflow] support configuring node concurrency
1 parent 5527c80 commit 0a50001

3 files changed

Lines changed: 37 additions & 1 deletion

File tree

  • framework
    • fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities
    • waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain

framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiState.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,18 @@ public AiState<O, D, I, RF, F> id(String id) {
8282
return this;
8383
}
8484

85+
/**
86+
* Sets the maximum concurrency level for this state's processing pipeline.
87+
*
88+
* @param concurrency The maximum number of concurrent operations allowed (must be positive).
89+
* @return The current state instance for method chaining.
90+
* @throws IllegalArgumentException If the concurrency value is zero or negative.
91+
*/
92+
public AiState<O, D, I, RF, F> concurrency(int concurrency) {
93+
this.state.concurrency(concurrency);
94+
return this;
95+
}
96+
8597
/**
8698
* 获取当前节点的数据订阅者。
8799
*

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/State.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,18 @@ public State<O, D, I, F> id(String id) {
104104
return ObjectUtils.cast(super.setId(id));
105105
}
106106

107+
/**
108+
* Sets the maximum concurrency level for this state's processing pipeline.
109+
*
110+
* @param concurrency The maximum number of concurrent operations allowed (must be positive).
111+
* @return The current state instance for method chaining.
112+
* @throws IllegalArgumentException If the concurrency value is zero or negative.
113+
*/
114+
public State<O, D, I, F> concurrency(int concurrency) {
115+
ObjectUtils.<Node<I, O>>cast(this.processor).setMaxConcurrency(concurrency);
116+
return this;
117+
}
118+
107119
/**
108120
* 跳转到指定节点,使用节点的唯一标识来标识一个节点。
109121
* <p>

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ public class To<I, O> extends IdGenerator implements Subscriber<I, O> {
100100
@Getter
101101
private final FlowLocks locks;
102102

103+
private int maxConcurrency = MAX_CONCURRENCY;
104+
103105
// 默认自动流转过滤器是按batchID批次过滤contexts
104106
private final Operators.Filter<I> defaultAutoFilter = (contexts) -> {
105107
if (CollectionUtils.isEmpty(contexts)) {
@@ -606,7 +608,7 @@ private synchronized void updateConcurrency(int newConcurrency) {
606608
* @return true-已经满负载, false-未满负载
607609
*/
608610
public boolean isOverLimit() {
609-
return this.curConcurrency >= MAX_CONCURRENCY;
611+
return this.curConcurrency >= this.maxConcurrency;
610612
}
611613

612614
/**
@@ -736,6 +738,16 @@ public void emit(O data, FlowSession session) {
736738
this.listeners.values().forEach(listener -> listener.handle(data, session));
737739
}
738740

741+
/**
742+
* Sets the maximum concurrency level for this state's processing pipeline.
743+
*
744+
* @param concurrency The maximum number of concurrent operations allowed (must be positive).
745+
* @throws IllegalArgumentException If the concurrency value is zero or negative.
746+
*/
747+
public void setMaxConcurrency(int concurrency) {
748+
this.maxConcurrency = Validation.greaterThan(concurrency, 0, "The concurrency should greater than 0.");
749+
}
750+
739751
private FlowSession getNextSession(FlowSession session) {
740752
return FlowSessionRepo.getNextToSession(this.streamId, session);
741753
}

0 commit comments

Comments
 (0)