Skip to content

Commit 18848bf

Browse files
authored
refactor: optimize streaming response terminal event handling (#161)
- 引入 CHAT_TERMINAL_EVENTS 和 WORKFLOW_TERMINAL_EVENTS 常量集合 - 修改 processStreamLine 方法参数以支持动态终止事件判断 - 为聊天、补全、工作流等不同场景配置相应的终止事件类型 - 重构事件处理逻辑,统一通过枚举集合判断流式读取终止条件 - 移除硬编码的特定事件判断,提高代码可维护性 - 优化不同 Dify 应用类型的流式响应处理策略
1 parent d6d7adf commit 18848bf

1 file changed

Lines changed: 12 additions & 8 deletions

File tree

src/main/java/io/github/imfangs/dify/client/impl/DefaultDifyClient.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
import java.io.*;
2020
import java.nio.charset.StandardCharsets;
21+
import java.util.EnumSet;
2122
import java.util.HashMap;
2223
import java.util.Map;
2324
import java.util.Optional;
25+
import java.util.Set;
2426
import java.util.function.Consumer;
2527

2628
/**
@@ -33,6 +35,8 @@ public class DefaultDifyClient extends DifyBaseClientImpl implements DifyClient
3335
// 流式响应相关常量
3436
private static final String DATA_PREFIX = "data:";
3537
private static final String PING_EVENT = "event: ping";
38+
private static final Set<EventType> CHAT_TERMINAL_EVENTS = EnumSet.of(EventType.MESSAGE_END, EventType.ERROR);
39+
private static final Set<EventType> WORKFLOW_TERMINAL_EVENTS = EnumSet.of(EventType.WORKFLOW_FINISHED, EventType.ERROR);
3640

3741
// API 路径常量
3842
// 对话型应用相关路径
@@ -108,7 +112,7 @@ public void sendChatMessageStream(ChatMessage message, ChatStreamCallback callba
108112
message.setResponseMode(ResponseMode.STREAMING);
109113

110114
// 执行流式请求
111-
executeStreamRequest(CHAT_MESSAGES_PATH, message, (line) -> processStreamLine(line, callback, (data, eventType) -> {
115+
executeStreamRequest(CHAT_MESSAGES_PATH, message, (line) -> processStreamLine(line, callback, CHAT_TERMINAL_EVENTS, (data, eventType) -> {
112116
StreamEventDispatcher.dispatchChatEvent(callback, data, eventType);
113117
}), callback::onException);
114118
}
@@ -120,7 +124,7 @@ public void sendChatMessageStream(ChatMessage message, ChatflowStreamCallback ca
120124
message.setResponseMode(ResponseMode.STREAMING);
121125

122126
// 执行流式请求
123-
executeStreamRequest(CHAT_MESSAGES_PATH, message, (line) -> processStreamLine(line, callback, (data, eventType) -> {
127+
executeStreamRequest(CHAT_MESSAGES_PATH, message, (line) -> processStreamLine(line, callback, WORKFLOW_TERMINAL_EVENTS, (data, eventType) -> {
124128
StreamEventDispatcher.dispatchChatFlowEvent(callback, data, eventType);
125129
}), callback::onException);
126130
}
@@ -287,7 +291,7 @@ public void sendCompletionMessageStream(CompletionRequest request, CompletionStr
287291
request.setResponseMode(ResponseMode.STREAMING);
288292

289293
// 执行流式请求
290-
executeStreamRequest(COMPLETION_MESSAGES_PATH, request, (line) -> processStreamLine(line, callback, (data, eventType) -> {
294+
executeStreamRequest(COMPLETION_MESSAGES_PATH, request, (line) -> processStreamLine(line, callback, CHAT_TERMINAL_EVENTS, (data, eventType) -> {
291295
// 分发事件
292296
StreamEventDispatcher.dispatchCompletionEvent(callback, data);
293297
}), callback::onException);
@@ -323,7 +327,7 @@ public void runWorkflowStream(WorkflowRunRequest request, WorkflowStreamCallback
323327
request.setResponseMode(ResponseMode.STREAMING);
324328

325329
// 执行流式请求
326-
executeStreamRequest(WORKFLOWS_RUN_PATH, request, (line) -> processStreamLine(line, callback, (data, eventType) -> {
330+
executeStreamRequest(WORKFLOWS_RUN_PATH, request, (line) -> processStreamLine(line, callback, WORKFLOW_TERMINAL_EVENTS, (data, eventType) -> {
327331
// 分发事件
328332
StreamEventDispatcher.dispatchWorkflowEvent(callback, data);
329333
}), callback::onException);
@@ -467,10 +471,11 @@ private interface LineProcessor {
467471
*
468472
* @param line 数据行
469473
* @param callback 回调接口
474+
* @param terminalEvents 流式读取终止事件
470475
* @param eventProcessor 事件处理器
471476
* @return 是否继续处理
472477
*/
473-
private boolean processStreamLine(String line, BaseStreamCallback callback, EventProcessor eventProcessor) {
478+
private boolean processStreamLine(String line, BaseStreamCallback callback, Set<EventType> terminalEvents, EventProcessor eventProcessor) {
474479
if(line == null || line.trim().isEmpty()){
475480
return true;
476481
}
@@ -487,11 +492,10 @@ private boolean processStreamLine(String line, BaseStreamCallback callback, Even
487492

488493
// 处理事件
489494
eventProcessor.process(data, baseEvent.getEvent());
490-
// 如果是结束类事件,则停止继续读取,主动关闭连接
495+
// 不同 Dify 应用类型的最终事件不同,例如 Chatflow 的 message_end 后仍会继续发送 workflow_finished。
491496
String eventTypeStr = baseEvent.getEvent();
492497
EventType eventType = eventTypeStr != null ? EventType.fromValue(eventTypeStr) : null;
493-
if (eventType == EventType.MESSAGE_END
494-
|| eventType == EventType.ERROR) {
498+
if (eventType != null && terminalEvents.contains(eventType)) {
495499
return false;
496500
}
497501
} catch (Exception e) {

0 commit comments

Comments
 (0)