Skip to content

Commit 9c5e51a

Browse files
authored
[FEL] 05-retrieval示例修复 (#391)
* 05示例修复 * 复原文件读取逻辑 * 添加cache释放逻辑 * 把MatchWindow放在FlowSessionCache统一管理 * 添加FlowSessionCache文档
1 parent 574bb64 commit 9c5e51a

File tree

6 files changed

+99
-13
lines changed

6 files changed

+99
-13
lines changed

examples/fel-example/05-retrieval/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,8 @@ node0-->node1{{=}}
226226

227227
## 验证
228228

229+
- 在IDEA中运行`DemoApplication`
230+
229231
- 在浏览器栏输入:`http://localhost:8080/ai/example/chat?query=请介绍一下黑神话悟空`
230232

231233
```json

examples/fel-example/05-retrieval/src/main/java/modelengine/example/ai/chat/retrieval/RetrievalExampleController.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import modelengine.fitframework.annotation.Component;
4242
import modelengine.fitframework.annotation.Fit;
4343
import modelengine.fitframework.annotation.Value;
44+
import modelengine.fitframework.log.Logger;
4445
import modelengine.fitframework.serialization.ObjectSerializer;
4546
import modelengine.fitframework.util.FileUtils;
4647

@@ -57,6 +58,7 @@
5758
@Component
5859
@RequestMapping("/ai/example")
5960
public class RetrievalExampleController {
61+
private static final Logger log = Logger.get(RetrievalExampleController.class);
6062
private static final String REWRITE_PROMPT =
6163
"作为一个向量检索助手,你的任务是结合历史记录,为”原问题“生成”检索词“," + "生成的问题要求指向对象清晰明确,并与“原问题语言相同。\n\n"
6264
+ "历史记录:\n---\n" + DEFAULT_HISTORY_KEY + "---\n原问题:{{query}}\n检索词:";
@@ -85,22 +87,27 @@ public RetrievalExampleController(ChatModel chatModel, EmbedModel embedModel,
8587
.others(node -> node.map(tip -> tip.freeze().get("query").text()))
8688
.retrieve(new DefaultVectorRetriever(vectorStore, SearchOption.custom().topK(1).build()))
8789
.synthesize(docs -> Content.from(docs.stream().map(Document::text).collect(Collectors.joining("\n\n"))))
88-
.close();
90+
.close(__ -> log.info("Retrieve flow completed."));
8991

9092
AiProcessFlow<File, List<Document>> indexFlow = AiFlows.<File>create()
9193
.load(new JsonFileSource(serializer, StringTemplate.create("{{question}}: {{answer}}")))
9294
.index(vectorStore)
9395
.close();
9496
File file = FileUtils.file(this.getClass().getClassLoader().getResource("data.json"));
9597
notNull(file, "The data cannot be null.");
96-
indexFlow.converse().offer(file);
98+
indexFlow.converse()
99+
.doOnError(e -> log.info("Index build error. [error={}]", e.getMessage(), e))
100+
.doOnFinally(() -> log.info("Index build successfully."))
101+
.offer(file);
97102

98103
this.ragFlow = AiFlows.<String>create()
104+
.just(query -> log.info("RAG flow start. [query={}]", query))
99105
.map(query -> Tip.from("query", query))
100106
.runnableParallel(value("context", retrieveFlow), passThrough())
101107
.prompt(Prompts.history(), Prompts.human(CHAT_PROMPT))
108+
.just(__ -> log.info("LLM start generation."))
102109
.generate(chatFlowModel)
103-
.close();
110+
.close(__ -> log.info("RAG flow completed."));
104111
}
105112

106113
/**

framework/waterflow/java/waterflow-core/README.md

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,57 @@ Flows.<Integer>create()
271271

272272
向流中投递数据。这里需要注意,流的运行是异步的,offer返回的是这次运行的实例ID。
273273

274-
## 使用限制
274+
## 核心机制
275+
276+
### FlowSessionCache 会话缓存管理
277+
278+
FlowSessionCache 是 waterflow 的核心资源管理机制,负责统一管理流程执行过程中产生的所有会话相关资源,确保同一批数据的正确汇聚和资源的自动释放。
279+
280+
#### 缓存结构
281+
282+
FlowSessionCache 按照 `flowId -> sessionId -> FlowSessionCache` 的层级结构组织,每个流程会话维护独立的缓存实例,内部管理以下资源:
283+
284+
1. **Session 流转缓存(nextToSessions)**
285+
- 记录每个节点向下游节点流转数据时使用的 session
286+
- 以当前窗口的唯一标识(UUID)为索引
287+
- 确保同一批数据在节点间流转时使用相同的 session 进行汇聚
288+
289+
2. **Emitter 处理缓存(nextEmitterHandleSessions)**
290+
- 专门用于处理 emitter 操作的 session 缓存
291+
- 为发射器操作提供独立的会话上下文
292+
293+
3. **FlatMap 窗口缓存(flatMapSourceWindows)**
294+
- 记录 flatMap 节点产生的源窗口信息
295+
- 以窗口唯一标识为索引存储 `FlatMapSourceWindow` 实例
296+
- 用于将 flatMap 操作产生的多个输出数据与原始输入关联
297+
298+
4. **Match 窗口缓存(matchWindows)**
299+
- 记录条件匹配节点(`conditions`)产生的窗口信息
300+
- 以 MatchWindow 的唯一标识为索引
301+
- 用于将条件分支产生的数据进行汇聚
302+
303+
5. **累加器顺序缓存(accOrders)**
304+
- 记录每个节点的累加操作顺序编号
305+
- 以节点 ID 为索引,存储递增的序号
306+
307+
#### 资源管理机制
308+
309+
**自动创建与复用**
310+
- 首次访问某个流程会话时,FlowSessionCache 自动创建并初始化
311+
- 相同窗口/会话标识的资源会被复用,避免重复创建
312+
- 通过 `FlowSessionRepo` 提供的静态方法访问各类缓存资源
313+
314+
**会话隔离**
315+
- 不同流程(flowId)的缓存完全隔离
316+
- 同一流程的不同会话(sessionId)也拥有独立的缓存空间
317+
- 避免跨会话或跨流程的数据污染
318+
319+
**生命周期管理**
320+
- 会话结束时调用 `FlowSessionRepo.release(flowId, session)` 自动释放所有关联资源
321+
- 当某个流程的所有会话都释放后,自动清理该流程的缓存映射
322+
- 无需在各个窗口或会话类中实现清理逻辑,避免内存泄漏
323+
324+
## 使用限制
275325

276326
1. 在编排流程时需要保证节点流转上没有死循环,否则处于死循环的数据将一致在这些节点上循环流转。
277327
2. 数据流转的线程池最大是100个,每个节点最大同时处理16个批次的数据,每个批次的数据在每个节点上串行执行。超过限制的数据将排队等待执行。

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@
66

77
package modelengine.fit.waterflow.domain.context;
88

9+
import modelengine.fit.waterflow.domain.context.repo.flowsession.FlowSessionRepo;
10+
911
import java.util.HashSet;
1012
import java.util.List;
1113
import java.util.Map;
1214
import java.util.Set;
1315
import java.util.UUID;
14-
import java.util.concurrent.ConcurrentHashMap;
1516
import java.util.stream.Collectors;
1617

1718
/**
@@ -21,8 +22,6 @@
2122
* @since 1.0
2223
*/
2324
public class MatchWindow extends Window {
24-
private static final Map<String, MatchWindow> all = new ConcurrentHashMap<>();
25-
2625
private final Set<MatchWindow> arms = new HashSet<>();
2726

2827
/**
@@ -41,22 +40,26 @@ public MatchWindow(Window source, UUID id, Object data) {
4140
/**
4241
* 创建一个MatchWindow
4342
*
43+
* @param flowId 流程ID
4444
* @param source 源窗口
4545
* @param id 窗口ID
4646
* @param data 窗口数据
4747
* @return 返回创建的MatchWindow对象
4848
*/
49-
public static synchronized MatchWindow from(Window source, UUID id, Object data) {
50-
MatchWindow window = all.get(id.toString());
49+
public static synchronized MatchWindow from(String flowId, Window source, UUID id, Object data) {
50+
// 从 FlowSessionRepo 获取缓存
51+
Map<UUID, MatchWindow> cache = FlowSessionRepo.getMatchWindowCache(flowId, source.getSession());
52+
53+
MatchWindow window = cache.get(id);
5154
if (window == null) {
5255
window = new MatchWindow(source, id, data);
5356
FlowSession session = new FlowSession(source.getSession());
5457
session.setWindow(window);
55-
all.put(id.toString(), window);
58+
cache.put(id, window);
5659
}
5760
WindowToken token = window.createToken();
5861
token.beginConsume();
59-
List<MatchWindow> arms = all.values().stream().filter(t -> t.from == source).collect(Collectors.toList());
62+
List<MatchWindow> arms = cache.values().stream().filter(t -> t.from == source).collect(Collectors.toList());
6063
for (MatchWindow a : arms) {
6164
a.setArms(arms);
6265
}

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowsession/FlowSessionRepo.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import modelengine.fit.waterflow.domain.context.FlatMapSourceWindow;
1010
import modelengine.fit.waterflow.domain.context.FlowSession;
11+
import modelengine.fit.waterflow.domain.context.MatchWindow;
1112
import modelengine.fit.waterflow.domain.context.Window;
1213
import modelengine.fit.waterflow.domain.context.repo.flowcontext.FlowContextRepo;
1314
import modelengine.fitframework.inspection.Validation;
@@ -90,6 +91,19 @@ public static FlatMapSourceWindow getFlatMapSource(String flowId, Window window,
9091
.getFlatMapSourceWindow(window, repo);
9192
}
9293

94+
/**
95+
* 获取 MatchWindow 缓存 Map,用于存储和检索 MatchWindow 实例
96+
*
97+
* @param flowId The unique identifier of the flow.
98+
* @param session The current session context.
99+
* @return MatchWindow 缓存 Map
100+
*/
101+
public static Map<UUID, MatchWindow> getMatchWindowCache(String flowId, FlowSession session) {
102+
Validation.notNull(flowId, "Flow id cannot be null.");
103+
Validation.notNull(session, "Session cannot be null.");
104+
return getFlowSessionCache(flowId, session).getMatchWindowCache();
105+
}
106+
93107
/**
94108
* Releases all resources associated with a specific flow session.
95109
*
@@ -137,6 +151,12 @@ private static class FlowSessionCache {
137151
*/
138152
private final Map<UUID, FlatMapSourceWindow> flatMapSourceWindows = new ConcurrentHashMap<>();
139153

154+
/**
155+
* 记录流程中条件匹配节点产生的窗口信息,用于将同一批数据汇聚。
156+
* 其中索引为 match window 的唯一标识。
157+
*/
158+
private final Map<UUID, MatchWindow> matchWindows = new ConcurrentHashMap<>();
159+
140160
private final Map<String, Integer> accOrders = new ConcurrentHashMap<>();
141161

142162
private FlowSession getNextToSession(FlowSession session) {
@@ -165,6 +185,10 @@ private FlatMapSourceWindow getFlatMapSourceWindow(Window window, FlowContextRep
165185
});
166186
}
167187

188+
private Map<UUID, MatchWindow> getMatchWindowCache() {
189+
return this.matchWindows;
190+
}
191+
168192
private int getNextAccOrder(String nodeId) {
169193
return this.accOrders.compute(nodeId, (key, value) -> {
170194
if (value == null) {

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/MatchHappen.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ public MatchHappen<O, D, I, F> match(Operators.Whether<I> whether,
5050
Operators.BranchProcessor<O, D, I, F> processor) {
5151
UUID id = UUID.randomUUID();
5252
State<I, D, I, F> branchStart = new State<>(this.node.publisher()
53-
.just(input -> input.setSession(
54-
MatchWindow.from(input.getWindow(), id, input.getData()).getSession()), whether)
53+
.just(input -> input.setSession(MatchWindow.from(this.node.processor.getStreamId(),
54+
input.getWindow(), id, input.getData()).getSession()), whether)
5555
.displayAs(SpecialDisplayNode.BRANCH.name()), this.node.getFlow());
5656
State<O, D, ?, F> branch = processor.process(branchStart);
5757
this.branches.add(branch);

0 commit comments

Comments
 (0)