fix:support milky sse#657
Conversation
Reviewer's Guide在现有的 MilkyHttpHandler WebSocket 事件流基础上新增 Server-Sent Events (SSE) 支持,包括鉴权、连接生命周期管理、优雅关闭,以及对 WS 和 SSE 客户端的统一广播,并改进日志的清晰度。 SSE /event 连接与生命周期的时序图sequenceDiagram
actor Client
participant ExpressApp
participant MilkyHttpHandler
participant sseClients as SSEClientsSet
Client->>ExpressApp: GET /prefix/event
ExpressApp->>MilkyHttpHandler: handleSSERequest(req, res)
alt accessToken configured
MilkyHttpHandler->>MilkyHttpHandler: extract inputToken from Authorization or query
alt inputToken invalid
MilkyHttpHandler-->>Client: 401 Unauthorized (JSON Failed(-401))
Note over MilkyHttpHandler: return
end
end
MilkyHttpHandler->>Client: set SSE headers and flush
MilkyHttpHandler->>sseClients: add(res)
Client-->>MilkyHttpHandler: close event
MilkyHttpHandler->>sseClients: delete(res)
Client-->>MilkyHttpHandler: error event
MilkyHttpHandler->>sseClients: delete(res)
面向 WS 和 SSE 客户端的统一广播时序图sequenceDiagram
participant Producer as EventProducer
participant MilkyHttpHandler
participant wsClients as WebSocketClientsSet
participant sseClients as SSEClientsSet
participant WS as WebSocket
participant SSE as SSEResponse
EventProducer->>MilkyHttpHandler: broadcast(msg)
loop for each ws in eventPushClients
MilkyHttpHandler->>WS: check readyState
alt WebSocket OPEN
MilkyHttpHandler->>WS: send(msg)
else not OPEN
MilkyHttpHandler->>MilkyHttpHandler: skip
end
end
alt sseClients not empty
MilkyHttpHandler->>MilkyHttpHandler: build sseData = "data: msg\\n\\n"
loop for each res in sseClients
MilkyHttpHandler->>SSE: check not closed
alt connection open
MilkyHttpHandler->>SSE: write(sseData)
else connection closed
MilkyHttpHandler->>MilkyHttpHandler: skip
end
end
end
带 SSE 支持的 MilkyHttpHandler 更新后类图classDiagram
class MilkyHttpHandler {
+Express app
+Set~WebSocket~ eventPushClients
+Set~Response~ sseClients
-http.Server httpServer
-WebSocketServer wsServer
+start(config)
+stop()
+broadcast(msg string)
}
class WebSocket {
+number readyState
+send(data string)
+close(code number, reason string)
}
class Response {
+setHeader(name string, value string)
+flushHeaders()
+write(chunk string)
+end()
+boolean closed
}
class WebSocketServer {
+close()
}
class HttpServer {
+close()
}
MilkyHttpHandler "*" o-- WebSocket : eventPushClients
MilkyHttpHandler "*" o-- Response : sseClients
MilkyHttpHandler o-- WebSocketServer : wsServer
MilkyHttpHandler o-- HttpServer : httpServer
文件级变更
Tips and commands与 Sourcery 交互
自定义你的使用体验打开你的 dashboard 以:
获取帮助Original review guide in EnglishReviewer's GuideAdds Server-Sent Events (SSE) support alongside existing WebSocket event streaming in MilkyHttpHandler, including authentication, connection lifecycle management, graceful shutdown, and unified broadcast to both WS and SSE clients with improved logging clarity. Sequence diagram for SSE /event connection and lifecyclesequenceDiagram
actor Client
participant ExpressApp
participant MilkyHttpHandler
participant sseClients as SSEClientsSet
Client->>ExpressApp: GET /prefix/event
ExpressApp->>MilkyHttpHandler: handleSSERequest(req, res)
alt accessToken configured
MilkyHttpHandler->>MilkyHttpHandler: extract inputToken from Authorization or query
alt inputToken invalid
MilkyHttpHandler-->>Client: 401 Unauthorized (JSON Failed(-401))
Note over MilkyHttpHandler: return
end
end
MilkyHttpHandler->>Client: set SSE headers and flush
MilkyHttpHandler->>sseClients: add(res)
Client-->>MilkyHttpHandler: close event
MilkyHttpHandler->>sseClients: delete(res)
Client-->>MilkyHttpHandler: error event
MilkyHttpHandler->>sseClients: delete(res)
Sequence diagram for unified broadcast to WS and SSE clientssequenceDiagram
participant Producer as EventProducer
participant MilkyHttpHandler
participant wsClients as WebSocketClientsSet
participant sseClients as SSEClientsSet
participant WS as WebSocket
participant SSE as SSEResponse
EventProducer->>MilkyHttpHandler: broadcast(msg)
loop for each ws in eventPushClients
MilkyHttpHandler->>WS: check readyState
alt WebSocket OPEN
MilkyHttpHandler->>WS: send(msg)
else not OPEN
MilkyHttpHandler->>MilkyHttpHandler: skip
end
end
alt sseClients not empty
MilkyHttpHandler->>MilkyHttpHandler: build sseData = "data: msg\\n\\n"
loop for each res in sseClients
MilkyHttpHandler->>SSE: check not closed
alt connection open
MilkyHttpHandler->>SSE: write(sseData)
else connection closed
MilkyHttpHandler->>MilkyHttpHandler: skip
end
end
end
Updated class diagram for MilkyHttpHandler with SSE supportclassDiagram
class MilkyHttpHandler {
+Express app
+Set~WebSocket~ eventPushClients
+Set~Response~ sseClients
-http.Server httpServer
-WebSocketServer wsServer
+start(config)
+stop()
+broadcast(msg string)
}
class WebSocket {
+number readyState
+send(data string)
+close(code number, reason string)
}
class Response {
+setHeader(name string, value string)
+flushHeaders()
+write(chunk string)
+end()
+boolean closed
}
class WebSocketServer {
+close()
}
class HttpServer {
+close()
}
MilkyHttpHandler "*" o-- WebSocket : eventPushClients
MilkyHttpHandler "*" o-- Response : sseClients
MilkyHttpHandler o-- WebSocketServer : wsServer
MilkyHttpHandler o-- HttpServer : httpServer
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
你好,我已经查看了你的改动,这里有一些反馈:
- SSE 的认证/Token 提取逻辑和现有的 WS/API 认证逻辑非常相似;可以考虑抽取一个小的共享辅助函数,以避免未来改动时出现差异。
- 在
broadcast中,使用res.closed来检测失效的 SSE 连接对于 Node/Express 的响应来说并不标准;建议改为检查res.writableEnded,或者在写入错误时将客户端从sseClients中移除,以避免反复向已断开的连接写入。
面向 AI Agent 的提示
Please address the comments from this code review:
## Overall Comments
- SSE 的认证/Token 提取逻辑和现有的 WS/API 认证逻辑非常相似;可以考虑抽取一个小的共享辅助函数,以避免未来改动时出现差异。
- 在 `broadcast` 中,使用 `res.closed` 来检测失效的 SSE 连接对于 Node/Express 的响应来说并不标准;建议改为检查 `res.writableEnded`,或者在写入错误时将客户端从 `sseClients` 中移除,以避免反复向已断开的连接写入。
## Individual Comments
### Comment 1
<location> `src/milky/network/http.ts:192` </location>
<code_context>
+ const sseData = `data: ${msg}\n\n`
+ for (const res of this.sseClients) {
+ try {
+ if (!res.closed) { // 检查连接是否关闭
+ res.write(sseData)
+ }
</code_context>
<issue_to_address>
**issue (bug_risk):** 使用标准的 `ServerResponse` 状态标志,而不是 `res.closed`。
`Express`/Node 的 `ServerResponse` 并不保证会暴露 `closed` 属性;常用的标志是 `writableEnded`/`destroyed`(或者检查 `res.socket?.writable`)。使用 `res.closed` 很可能不可靠,或者总是 `undefined`,这样仍有可能会向已经关闭的连接写入。请改为在写入前使用类似 `if (!res.writableEnded && !res.destroyed)` 的检查(或其他等价方式)。
</issue_to_address>
### Comment 2
<location> `src/milky/network/http.ts:87` </location>
<code_context>
+ this.ctx.logger.info('MilkyHttp', `${req.ip} -> /event [SSE] (Disconnected)`)
+ })
+
+ req.on('error', (err) => {
+ this.ctx.logger.warn('MilkyHttp', `SSE error: ${err}`)
+ this.sseClients.delete(res)
</code_context>
<issue_to_address>
**suggestion (bug_risk):** 建议在响应/连接上监听错误,而不是在请求上监听。
对于 SSE,长连接资源是响应/底层 socket,而不是请求体。由于典型的 SSE 请求通常没有请求体,`req.on('error')` 可能永远不会触发,因此错误也就不会触发从 `sseClients` 中清理客户端。为了让这个逻辑更健壮,请将错误处理程序挂在 `res` 或 `res.socket` 上(例如 `res.on('error', ...)`),以便在连接失败时能够可靠地移除客户端。
建议实现:
```typescript
res.on('error', (err) => {
this.ctx.logger.warn('MilkyHttp', `SSE connection error: ${err}`)
this.sseClients.delete(res)
})
```
为了进一步增强健壮性,你还可以:
1. 给 `res.socket` 也挂载一个错误处理程序(如 `res.socket?.on('error', ...)`),以捕获底层 socket 错误。
2. 确保 `res` 的类型定义(大概率是 `ServerResponse`)在 TypeScript 中与 `.on('error', ...)` 兼容(Node 的 `ServerResponse` 继承自 `EventEmitter`,通常已经可以这样使用)。
</issue_to_address>
### Comment 3
<location> `src/milky/network/http.ts:187-198` </location>
<code_context>
if (ws.readyState === WebSocket.OPEN) {
ws.send(msg)
}
} catch (e) {
- this.ctx.logger.warn('MilkyHttp', `Failed to send message: ${e}`)
+ this.ctx.logger.warn('MilkyHttp', `Failed to send WS message: ${e}`)
+ }
+ }
+
+ // 2. 推送给 SSE 客户端
+ if (this.sseClients.size > 0) {
+ const sseData = `data: ${msg}\n\n`
+ for (const res of this.sseClients) {
+ try {
+ if (!res.closed) { // 检查连接是否关闭
+ res.write(sseData)
+ }
+ } catch (e) {
+ this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`)
+ }
}
</code_context>
<issue_to_address>
**suggestion:** 当写入抛出异常时,从集合中移除失败的 SSE 客户端。
如果 `res.write` 抛出异常,这个连接实际上已经失效。继续将它保留在 `sseClients` 中,会导致每次广播都触发相同的错误,并制造多余的日志噪音。在 `catch` 代码块中,请将该 `res` 从 `sseClients` 中移除(就像你在 `close` 事件中做的那样),这样集合中只会保留仍然活跃的客户端。
```suggestion
// 2. 推送给 SSE 客户端
if (this.sseClients.size > 0) {
const sseData = `data: ${msg}\n\n`
for (const res of this.sseClients) {
try {
if (!res.closed) { // 检查连接是否关闭
res.write(sseData)
}
} catch (e) {
this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`)
// 写入失败的 SSE 客户端视为失效,移出集合,避免之后重复报错
this.sseClients.delete(res)
}
}
```
</issue_to_address>帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进之后的评审。
Original comment in English
Hey there - I've reviewed your changes - here's some feedback:
- The SSE auth/token extraction logic is very similar to the existing WS/API auth; consider extracting a small shared helper to avoid divergence in future changes.
- In
broadcast, usingres.closedto detect dead SSE connections is non-standard for Node/Express responses; prefer checkingres.writableEndedor removing the client fromsseClientson write errors to avoid repeatedly writing to broken connections.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The SSE auth/token extraction logic is very similar to the existing WS/API auth; consider extracting a small shared helper to avoid divergence in future changes.
- In `broadcast`, using `res.closed` to detect dead SSE connections is non-standard for Node/Express responses; prefer checking `res.writableEnded` or removing the client from `sseClients` on write errors to avoid repeatedly writing to broken connections.
## Individual Comments
### Comment 1
<location> `src/milky/network/http.ts:192` </location>
<code_context>
+ const sseData = `data: ${msg}\n\n`
+ for (const res of this.sseClients) {
+ try {
+ if (!res.closed) { // 检查连接是否关闭
+ res.write(sseData)
+ }
</code_context>
<issue_to_address>
**issue (bug_risk):** Use standard `ServerResponse` state flags instead of `res.closed`.
`Express`/Node `ServerResponse` doesn’t reliably expose a `closed` property; common flags are `writableEnded`/`destroyed` (or checking `res.socket?.writable`). Using `res.closed` is likely unreliable or always `undefined`, so you may still write to closed connections. Please switch to a check like `if (!res.writableEnded && !res.destroyed)` (or similar) before writing.
</issue_to_address>
### Comment 2
<location> `src/milky/network/http.ts:87` </location>
<code_context>
+ this.ctx.logger.info('MilkyHttp', `${req.ip} -> /event [SSE] (Disconnected)`)
+ })
+
+ req.on('error', (err) => {
+ this.ctx.logger.warn('MilkyHttp', `SSE error: ${err}`)
+ this.sseClients.delete(res)
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consider listening for errors on the response/connection instead of the request.
With SSE the long‑lived resource is the response/underlying socket, not the request body. Since typical SSE requests have no body, `req.on('error')` may never fire, so errors wouldn’t trigger cleanup from `sseClients`. To make this robust, attach the error handler to `res` or `res.socket` (for example, `res.on('error', ...)`) so you reliably remove clients when the connection fails.
Suggested implementation:
```typescript
res.on('error', (err) => {
this.ctx.logger.warn('MilkyHttp', `SSE connection error: ${err}`)
this.sseClients.delete(res)
})
```
To make this even more robust, you may also want to:
1. Attach an error handler to `res.socket` (e.g. `res.socket?.on('error', ...)`) to catch low-level socket errors.
2. Ensure that wherever `res` is typed (likely as `ServerResponse`), its type is compatible with `.on('error', ...)` in your TypeScript definitions (Node's `ServerResponse` extends `EventEmitter`, so this should generally already work).
</issue_to_address>
### Comment 3
<location> `src/milky/network/http.ts:187-198` </location>
<code_context>
if (ws.readyState === WebSocket.OPEN) {
ws.send(msg)
}
} catch (e) {
- this.ctx.logger.warn('MilkyHttp', `Failed to send message: ${e}`)
+ this.ctx.logger.warn('MilkyHttp', `Failed to send WS message: ${e}`)
+ }
+ }
+
+ // 2. 推送给 SSE 客户端
+ if (this.sseClients.size > 0) {
+ const sseData = `data: ${msg}\n\n`
+ for (const res of this.sseClients) {
+ try {
+ if (!res.closed) { // 检查连接是否关闭
+ res.write(sseData)
+ }
+ } catch (e) {
+ this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`)
+ }
}
</code_context>
<issue_to_address>
**suggestion:** Remove failed SSE clients from the set when a write throws.
If `res.write` throws, that connection is effectively dead. Keeping it in `sseClients` means every future broadcast will hit the same error and add log noise. In the catch block, remove that `res` from `sseClients` (as you do on `close`) so the set only contains active clients.
```suggestion
// 2. 推送给 SSE 客户端
if (this.sseClients.size > 0) {
const sseData = `data: ${msg}\n\n`
for (const res of this.sseClients) {
try {
if (!res.closed) { // 检查连接是否关闭
res.write(sseData)
}
} catch (e) {
this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`)
// 写入失败的 SSE 客户端视为失效,移出集合,避免之后重复报错
this.sseClients.delete(res)
}
}
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| const sseData = `data: ${msg}\n\n` | ||
| for (const res of this.sseClients) { | ||
| try { | ||
| if (!res.closed) { // 检查连接是否关闭 |
There was a problem hiding this comment.
issue (bug_risk): 使用标准的 ServerResponse 状态标志,而不是 res.closed。
Express/Node 的 ServerResponse 并不保证会暴露 closed 属性;常用的标志是 writableEnded/destroyed(或者检查 res.socket?.writable)。使用 res.closed 很可能不可靠,或者总是 undefined,这样仍有可能会向已经关闭的连接写入。请改为在写入前使用类似 if (!res.writableEnded && !res.destroyed) 的检查(或其他等价方式)。
Original comment in English
issue (bug_risk): Use standard ServerResponse state flags instead of res.closed.
Express/Node ServerResponse doesn’t reliably expose a closed property; common flags are writableEnded/destroyed (or checking res.socket?.writable). Using res.closed is likely unreliable or always undefined, so you may still write to closed connections. Please switch to a check like if (!res.writableEnded && !res.destroyed) (or similar) before writing.
| this.ctx.logger.info('MilkyHttp', `${req.ip} -> /event [SSE] (Disconnected)`) | ||
| }) | ||
|
|
||
| req.on('error', (err) => { |
There was a problem hiding this comment.
suggestion (bug_risk): 建议在响应/连接上监听错误,而不是在请求上监听。
对于 SSE,长连接资源是响应/底层 socket,而不是请求体。由于典型的 SSE 请求通常没有请求体,req.on('error') 可能永远不会触发,因此错误也就不会触发从 sseClients 中清理客户端。为了让这个逻辑更健壮,请将错误处理程序挂在 res 或 res.socket 上(例如 res.on('error', ...)),以便在连接失败时能够可靠地移除客户端。
建议实现:
res.on('error', (err) => {
this.ctx.logger.warn('MilkyHttp', `SSE connection error: ${err}`)
this.sseClients.delete(res)
})为了进一步增强健壮性,你还可以:
- 给
res.socket也挂载一个错误处理程序(如res.socket?.on('error', ...)),以捕获底层 socket 错误。 - 确保
res的类型定义(大概率是ServerResponse)在 TypeScript 中与.on('error', ...)兼容(Node 的ServerResponse继承自EventEmitter,通常已经可以这样使用)。
Original comment in English
suggestion (bug_risk): Consider listening for errors on the response/connection instead of the request.
With SSE the long‑lived resource is the response/underlying socket, not the request body. Since typical SSE requests have no body, req.on('error') may never fire, so errors wouldn’t trigger cleanup from sseClients. To make this robust, attach the error handler to res or res.socket (for example, res.on('error', ...)) so you reliably remove clients when the connection fails.
Suggested implementation:
res.on('error', (err) => {
this.ctx.logger.warn('MilkyHttp', `SSE connection error: ${err}`)
this.sseClients.delete(res)
})To make this even more robust, you may also want to:
- Attach an error handler to
res.socket(e.g.res.socket?.on('error', ...)) to catch low-level socket errors. - Ensure that wherever
resis typed (likely asServerResponse), its type is compatible with.on('error', ...)in your TypeScript definitions (Node'sServerResponseextendsEventEmitter, so this should generally already work).
| // 2. 推送给 SSE 客户端 | ||
| if (this.sseClients.size > 0) { | ||
| const sseData = `data: ${msg}\n\n` | ||
| for (const res of this.sseClients) { | ||
| try { | ||
| if (!res.closed) { // 检查连接是否关闭 | ||
| res.write(sseData) | ||
| } | ||
| } catch (e) { | ||
| this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`) | ||
| } | ||
| } |
There was a problem hiding this comment.
suggestion: 当写入抛出异常时,从集合中移除失败的 SSE 客户端。
如果 res.write 抛出异常,这个连接实际上已经失效。继续将它保留在 sseClients 中,会导致每次广播都触发相同的错误,并制造多余的日志噪音。在 catch 代码块中,请将该 res 从 sseClients 中移除(就像你在 close 事件中做的那样),这样集合中只会保留仍然活跃的客户端。
| // 2. 推送给 SSE 客户端 | |
| if (this.sseClients.size > 0) { | |
| const sseData = `data: ${msg}\n\n` | |
| for (const res of this.sseClients) { | |
| try { | |
| if (!res.closed) { // 检查连接是否关闭 | |
| res.write(sseData) | |
| } | |
| } catch (e) { | |
| this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`) | |
| } | |
| } | |
| // 2. 推送给 SSE 客户端 | |
| if (this.sseClients.size > 0) { | |
| const sseData = `data: ${msg}\n\n` | |
| for (const res of this.sseClients) { | |
| try { | |
| if (!res.closed) { // 检查连接是否关闭 | |
| res.write(sseData) | |
| } | |
| } catch (e) { | |
| this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`) | |
| // 写入失败的 SSE 客户端视为失效,移出集合,避免之后重复报错 | |
| this.sseClients.delete(res) | |
| } | |
| } |
Original comment in English
suggestion: Remove failed SSE clients from the set when a write throws.
If res.write throws, that connection is effectively dead. Keeping it in sseClients means every future broadcast will hit the same error and add log noise. In the catch block, remove that res from sseClients (as you do on close) so the set only contains active clients.
| // 2. 推送给 SSE 客户端 | |
| if (this.sseClients.size > 0) { | |
| const sseData = `data: ${msg}\n\n` | |
| for (const res of this.sseClients) { | |
| try { | |
| if (!res.closed) { // 检查连接是否关闭 | |
| res.write(sseData) | |
| } | |
| } catch (e) { | |
| this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`) | |
| } | |
| } | |
| // 2. 推送给 SSE 客户端 | |
| if (this.sseClients.size > 0) { | |
| const sseData = `data: ${msg}\n\n` | |
| for (const res of this.sseClients) { | |
| try { | |
| if (!res.closed) { // 检查连接是否关闭 | |
| res.write(sseData) | |
| } | |
| } catch (e) { | |
| this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`) | |
| // 写入失败的 SSE 客户端视为失效,移出集合,避免之后重复报错 | |
| this.sseClients.delete(res) | |
| } | |
| } |
|
感谢 pr,但是在 pr 之前 dev 分支就已经实现了 |
Summary by Sourcery
在现有 WebSocket 事件流的基础上新增 Server-Sent Events (SSE) 支持,并确保对所有事件客户端进行正确的清理和日志记录。
新功能:
/event端点,使用现有的访问令牌机制向已认证客户端推送事件流。增强点:
Original summary in English
Summary by Sourcery
Add Server-Sent Events (SSE) support alongside existing WebSocket event streaming and ensure proper cleanup and logging for all event clients.
New Features:
Enhancements: