Skip to content

Commit 4a8a1b2

Browse files
CodeCasterXclaude
andcommitted
fix: 改进 Fork.join() null 数据处理逻辑
修复问题:之前的修复在检测到 null 数据且是最后一个分支时会返回已有数据, 导致聚合提前完成,丢失分支数据。 改进策略:当 inputData 为 null 时,不更新分支计数,直接返回 null, 等待正确的数据到来后正常完成聚合。 这样可以避免因竞态条件导致的数据丢失问题。 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 150abf1 commit 4a8a1b2

File tree

1 file changed

+3
-8
lines changed
  • framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states

1 file changed

+3
-8
lines changed

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Fork.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -99,19 +99,14 @@ public synchronized R process(FlowContext<O> input) {
9999

100100
// Issue #247: 智能处理并发场景下的 null 数据
101101
// 在某些竞态条件下,FlowContext.data 可能为 null
102+
// 此时不更新分支计数,等待正确的数据到来
102103
O inputData = input.getData();
103104
if (inputData == null) {
104-
LOG.warn("[Fork.join] Received null FlowContext.data. "
105+
LOG.warn("[Fork.join] Received null FlowContext.data, skipping. "
105106
+ "key={}, session={}, thread={}, branch={}/{}, acc={}",
106107
key, input.getSession().getId(), Thread.currentThread().getName(),
107108
acc.second() + 1, forkNumber.get(), acc.first());
108-
109-
// 跳过此分支,不更新累加器
110-
// 如果是最后一个分支,返回已有数据(避免整个流程失败)
111-
if (acc.second() + 1 == forkNumber.get()) {
112-
accs.remove(key);
113-
return acc.first();
114-
}
109+
// 返回 null 表示聚合未完成,等待有效数据
115110
return null;
116111
}
117112

0 commit comments

Comments
 (0)