Skip to content

Commit 00337d9

Browse files
CodeCasterXclaude
andcommitted
refactor: 清理代码并优化测试结构
1. 清理代码: - Fork.java: 移除注释和日志,保留简洁的 null 检查 - Tip.java: 移除注释,保留防御性 null 检查 2. 优化测试结构: - shouldOkWhenAiFlowWithExampleSelector: 恢复为单次测试 - shouldStableWhenRunnableParallelUnderConcurrency: 新增专门的并发稳定性测试(1000次重复) 详细的问题分析和修复说明已同步到 Issue #247 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 4a8a1b2 commit 00337d9

3 files changed

Lines changed: 19 additions & 16 deletions

File tree

  • framework
    • fel/java
      • fel-core/src/main/java/modelengine/fel/core/util
      • fel-flow/src/test/java/modelengine/fel/engine/operators
    • waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states

framework/fel/java/fel-core/src/main/java/modelengine/fel/core/util/Tip.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ public Tip addAll(Map<String, Content> args) {
118118
* @return 表示当前的 {@link Tip}。
119119
*/
120120
public Tip merge(Tip other) {
121-
// Issue #247: 防御性处理,在并发场景下 Fork.join() 可能传入 null
122121
if (other == null) {
123122
return this;
124123
}

framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/operators/PatternTest.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ void shouldOkWhenAiFlowWithNormalRunnableParallel() {
8181
assertThat(answer1.toString()).isEqualTo("answer question1 from context with my history");
8282
}
8383

84-
@RepeatedTest(1000)
85-
@DisplayName("测试 ExampleSelector - 重复运行以复现 NPE")
84+
@Test
85+
@DisplayName("测试 ExampleSelector")
8686
void shouldOkWhenAiFlowWithExampleSelector() {
8787
Example[] examples = {new DefaultExample("2+2", "4"), new DefaultExample("2+3", "5")};
8888
Conversation<String, Prompt> converse = AiFlows.<String>create()
@@ -98,6 +98,23 @@ void shouldOkWhenAiFlowWithExampleSelector() {
9898
assertThat(converse.offer("1+2").await().text()).isEqualTo("2+2=4\n2+3=5\n1+2=");
9999
}
100100

101+
@RepeatedTest(1000)
102+
@DisplayName("测试 RunnableParallel 并发稳定性")
103+
void shouldStableWhenRunnableParallelUnderConcurrency() {
104+
Example[] examples = {new DefaultExample("2+2", "4"), new DefaultExample("2+3", "5")};
105+
Conversation<String, Prompt> converse = AiFlows.<String>create()
106+
.runnableParallel(question(),
107+
fewShot(ExampleSelector.builder()
108+
.template("{{q}}={{a}}", "q", "a")
109+
.delimiter("\n")
110+
.example(examples)
111+
.build()))
112+
.prompt(Prompts.human("{{examples}}\n{{question}}="))
113+
.close()
114+
.converse();
115+
assertThat(converse.offer("1+2").await().text()).isEqualTo("2+2=4\n2+3=5\n1+2=");
116+
}
117+
101118
@Test
102119
@DisplayName("测试 Retriever")
103120
void shouldOkWhenAiFlowWithRetriever() {

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Fork.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import modelengine.fit.waterflow.domain.stream.reactive.Processor;
1313
import modelengine.fit.waterflow.domain.stream.reactive.Publisher;
1414
import modelengine.fit.waterflow.domain.utils.Tuple;
15-
import modelengine.fitframework.log.Logger;
1615
import modelengine.fitframework.util.ObjectUtils;
1716

1817
import java.util.ArrayList;
@@ -34,8 +33,6 @@
3433
* @since 1.0
3534
*/
3635
public class Fork<O, D, I, F extends Flow<D>> extends Activity<D, F> {
37-
private static final Logger LOG = Logger.get(Fork.class);
38-
3936
private final State<I, D, I, F> node;
4037

4138
private final List<State<O, D, ?, F>> forks = new ArrayList<>();
@@ -97,21 +94,11 @@ public synchronized R process(FlowContext<O> input) {
9794
}
9895
}
9996

100-
// Issue #247: 智能处理并发场景下的 null 数据
101-
// 在某些竞态条件下,FlowContext.data 可能为 null
102-
// 此时不更新分支计数,等待正确的数据到来
10397
O inputData = input.getData();
10498
if (inputData == null) {
105-
LOG.warn("[Fork.join] Received null FlowContext.data, skipping. "
106-
+ "key={}, session={}, thread={}, branch={}/{}, acc={}",
107-
key, input.getSession().getId(), Thread.currentThread().getName(),
108-
acc.second() + 1, forkNumber.get(), acc.first());
109-
// 返回 null 表示聚合未完成,等待有效数据
11099
return null;
111100
}
112-
113101
R processedResult = processor.process(acc.first(), inputData);
114-
115102
acc = Tuple.from(processedResult, acc.second() + 1);
116103
accs.put(key, acc);
117104

0 commit comments

Comments
 (0)