Skip to content

fix:support milky sse#657

Closed
super1207 wants to merge 3 commits into
LLOneBot:mainfrom
super1207:main
Closed

fix:support milky sse#657
super1207 wants to merge 3 commits into
LLOneBot:mainfrom
super1207:main

Conversation

@super1207
Copy link
Copy Markdown
Contributor

@super1207 super1207 commented Dec 13, 2025

Summary by Sourcery

在现有 WebSocket 事件流的基础上新增 Server-Sent Events (SSE) 支持,并确保对所有事件客户端进行正确的清理和日志记录。

新功能:

  • 暴露一个 HTTP SSE /event 端点,使用现有的访问令牌机制向已认证客户端推送事件流。

增强点:

  • 追踪 SSE 客户端,用于事件广播和服务器关闭,与现有的 WebSocket 客户端管理方式保持一致。
  • 改进事件端点日志,以区分 WebSocket 和 SSE 连接及其相关错误。
Original summary in English

Summary by Sourcery

Add Server-Sent Events (SSE) support alongside existing WebSocket event streaming and ensure proper cleanup and logging for all event clients.

New Features:

  • Expose an HTTP SSE /event endpoint that streams events to authenticated clients using the existing access token mechanism.

Enhancements:

  • Track SSE clients for event broadcasting and server shutdown, mirroring existing WebSocket client management.
  • Improve event endpoint logs to distinguish between WebSocket and SSE connections and related errors.

@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai Bot commented Dec 13, 2025

Reviewer's Guide

在现有的 MilkyHttpHandler WebSocket 事件流基础上新增 Server-Sent Events (SSE) 支持,包括鉴权、连接生命周期管理、优雅关闭,以及对 WS 和 SSE 客户端的统一广播,并改进日志的清晰度。

SSE /event 连接与生命周期的时序图

sequenceDiagram
  actor Client
  participant ExpressApp
  participant MilkyHttpHandler
  participant sseClients as SSEClientsSet

  Client->>ExpressApp: GET /prefix/event
  ExpressApp->>MilkyHttpHandler: handleSSERequest(req, res)

  alt accessToken configured
    MilkyHttpHandler->>MilkyHttpHandler: extract inputToken from Authorization or query
    alt inputToken invalid
      MilkyHttpHandler-->>Client: 401 Unauthorized (JSON Failed(-401))
      Note over MilkyHttpHandler: return
    end
  end

  MilkyHttpHandler->>Client: set SSE headers and flush
  MilkyHttpHandler->>sseClients: add(res)

  Client-->>MilkyHttpHandler: close event
  MilkyHttpHandler->>sseClients: delete(res)

  Client-->>MilkyHttpHandler: error event
  MilkyHttpHandler->>sseClients: delete(res)
Loading

面向 WS 和 SSE 客户端的统一广播时序图

sequenceDiagram
  participant Producer as EventProducer
  participant MilkyHttpHandler
  participant wsClients as WebSocketClientsSet
  participant sseClients as SSEClientsSet
  participant WS as WebSocket
  participant SSE as SSEResponse

  EventProducer->>MilkyHttpHandler: broadcast(msg)

  loop for each ws in eventPushClients
    MilkyHttpHandler->>WS: check readyState
    alt WebSocket OPEN
      MilkyHttpHandler->>WS: send(msg)
    else not OPEN
      MilkyHttpHandler->>MilkyHttpHandler: skip
    end
  end

  alt sseClients not empty
    MilkyHttpHandler->>MilkyHttpHandler: build sseData = "data: msg\\n\\n"
    loop for each res in sseClients
      MilkyHttpHandler->>SSE: check not closed
      alt connection open
        MilkyHttpHandler->>SSE: write(sseData)
      else connection closed
        MilkyHttpHandler->>MilkyHttpHandler: skip
      end
    end
  end
Loading

带 SSE 支持的 MilkyHttpHandler 更新后类图

classDiagram
  class MilkyHttpHandler {
    +Express app
    +Set~WebSocket~ eventPushClients
    +Set~Response~ sseClients
    -http.Server httpServer
    -WebSocketServer wsServer
    +start(config)
    +stop()
    +broadcast(msg string)
  }

  class WebSocket {
    +number readyState
    +send(data string)
    +close(code number, reason string)
  }

  class Response {
    +setHeader(name string, value string)
    +flushHeaders()
    +write(chunk string)
    +end()
    +boolean closed
  }

  class WebSocketServer {
    +close()
  }

  class HttpServer {
    +close()
  }

  MilkyHttpHandler "*" o-- WebSocket : eventPushClients
  MilkyHttpHandler "*" o-- Response : sseClients
  MilkyHttpHandler o-- WebSocketServer : wsServer
  MilkyHttpHandler o-- HttpServer : httpServer
Loading

文件级变更

Change Details Files
并行于现有 WebSocket 事件流,引入 SSE 端点和客户端追踪。
  • 在 MilkyHttpHandler 上新增 sseClients 集合,用于追踪活跃的 SSE Response 对象。
  • 注册带令牌鉴权的 GET /event SSE 路由,与 WS/API 保持一致,支持 Authorization 头和 access_token 查询参数。
  • 配置合适的 SSE 头(event-stream、no-cache、keep-alive、X-Accel-Buffering),并在开始流式传输前刷新它们。
  • 在客户端断开或请求出错时,从 sseClients 中移除对应的 Response,并记录连接/断开/错误日志。
src/milky/network/http.ts
确保在服务器停止时关闭 SSE 连接,并将其纳入广播流程。
  • 在 stop() 中遍历 sseClients,对每个响应调用 end 并在关闭 WS 和 HTTP 服务器之前清空集合。
  • 扩展 broadcast(msg),向所有已连接的 SSE 客户端发送消息,格式为 "data: <msg>\n\n"。
  • 在写入 SSE 前检查连接状态并捕获写入错误,使用 SSE 特定的日志消息记录写入失败。
src/milky/network/http.ts
明确 WebSocket 事件日志以便与 SSE 区分。
  • 更新 WebSocket /event 的鉴权与生命周期日志,在消息中加入 [WS] 标签。
  • 调整当 WS 发送失败时的警告消息,明确说明为 "Failed to send WS message"。
src/milky/network/http.ts

Tips and commands

与 Sourcery 交互

  • 触发新评审: 在 Pull Request 中评论 @sourcery-ai review
  • 继续讨论: 直接回复 Sourcery 的评审评论。
  • 从评审评论生成 GitHub issue: 在某条评审评论下回复,请 Sourcery 从该评论创建 issue。你也可以在评论中回复 @sourcery-ai issue 来从该评论创建 issue。
  • 生成 Pull Request 标题: 在 Pull Request 标题中的任意位置写上 @sourcery-ai,即可随时生成标题。你也可以在 Pull Request 中评论 @sourcery-ai title 来(重新)生成标题。
  • 生成 Pull Request 摘要: 在 Pull Request 正文的任意位置写上 @sourcery-ai summary,即可在指定位置生成 PR 摘要。你也可以在 Pull Request 中评论 @sourcery-ai summary 来(重新)生成摘要。
  • 生成 Reviewer's Guide: 在 Pull Request 中评论 @sourcery-ai guide,即可随时(重新)生成 reviewer's guide。
  • 一次性解决所有 Sourcery 评论: 在 Pull Request 中评论 @sourcery-ai resolve,即可批量标记所有 Sourcery 评论为已解决。如果你已经处理完所有评论且不想再看到它们,这会很有用。
  • 一次性关闭所有 Sourcery 评审: 在 Pull Request 中评论 @sourcery-ai dismiss,即可关闭所有现有的 Sourcery 评审。特别适用于你希望从一个新的评审开始的情况——别忘了随后评论 @sourcery-ai review 来触发新的评审!

自定义你的使用体验

打开你的 dashboard 以:

  • 启用或禁用评审功能,例如 Sourcery 自动生成的 PR 摘要、reviewer's guide 等。
  • 更改评审语言。
  • 添加、删除或编辑自定义评审指令。
  • 调整其他评审设置。

获取帮助

Original review guide in English

Reviewer's Guide

Adds Server-Sent Events (SSE) support alongside existing WebSocket event streaming in MilkyHttpHandler, including authentication, connection lifecycle management, graceful shutdown, and unified broadcast to both WS and SSE clients with improved logging clarity.

Sequence diagram for SSE /event connection and lifecycle

sequenceDiagram
  actor Client
  participant ExpressApp
  participant MilkyHttpHandler
  participant sseClients as SSEClientsSet

  Client->>ExpressApp: GET /prefix/event
  ExpressApp->>MilkyHttpHandler: handleSSERequest(req, res)

  alt accessToken configured
    MilkyHttpHandler->>MilkyHttpHandler: extract inputToken from Authorization or query
    alt inputToken invalid
      MilkyHttpHandler-->>Client: 401 Unauthorized (JSON Failed(-401))
      Note over MilkyHttpHandler: return
    end
  end

  MilkyHttpHandler->>Client: set SSE headers and flush
  MilkyHttpHandler->>sseClients: add(res)

  Client-->>MilkyHttpHandler: close event
  MilkyHttpHandler->>sseClients: delete(res)

  Client-->>MilkyHttpHandler: error event
  MilkyHttpHandler->>sseClients: delete(res)
Loading

Sequence diagram for unified broadcast to WS and SSE clients

sequenceDiagram
  participant Producer as EventProducer
  participant MilkyHttpHandler
  participant wsClients as WebSocketClientsSet
  participant sseClients as SSEClientsSet
  participant WS as WebSocket
  participant SSE as SSEResponse

  EventProducer->>MilkyHttpHandler: broadcast(msg)

  loop for each ws in eventPushClients
    MilkyHttpHandler->>WS: check readyState
    alt WebSocket OPEN
      MilkyHttpHandler->>WS: send(msg)
    else not OPEN
      MilkyHttpHandler->>MilkyHttpHandler: skip
    end
  end

  alt sseClients not empty
    MilkyHttpHandler->>MilkyHttpHandler: build sseData = "data: msg\\n\\n"
    loop for each res in sseClients
      MilkyHttpHandler->>SSE: check not closed
      alt connection open
        MilkyHttpHandler->>SSE: write(sseData)
      else connection closed
        MilkyHttpHandler->>MilkyHttpHandler: skip
      end
    end
  end
Loading

Updated class diagram for MilkyHttpHandler with SSE support

classDiagram
  class MilkyHttpHandler {
    +Express app
    +Set~WebSocket~ eventPushClients
    +Set~Response~ sseClients
    -http.Server httpServer
    -WebSocketServer wsServer
    +start(config)
    +stop()
    +broadcast(msg string)
  }

  class WebSocket {
    +number readyState
    +send(data string)
    +close(code number, reason string)
  }

  class Response {
    +setHeader(name string, value string)
    +flushHeaders()
    +write(chunk string)
    +end()
    +boolean closed
  }

  class WebSocketServer {
    +close()
  }

  class HttpServer {
    +close()
  }

  MilkyHttpHandler "*" o-- WebSocket : eventPushClients
  MilkyHttpHandler "*" o-- Response : sseClients
  MilkyHttpHandler o-- WebSocketServer : wsServer
  MilkyHttpHandler o-- HttpServer : httpServer
Loading

File-Level Changes

Change Details Files
Introduce SSE endpoint and client tracking parallel to existing WebSocket event stream.
  • Add sseClients set on MilkyHttpHandler to track active SSE Response objects.
  • Register GET /event SSE route with token-based auth consistent with WS/API, including Authorization header and access_token query support.
  • Configure appropriate SSE headers (event-stream, no-cache, keep-alive, X-Accel-Buffering) and flush them before streaming.
  • On client disconnect or request error, remove Response from sseClients and log connect/disconnect/error events.
src/milky/network/http.ts
Ensure SSE connections are closed on server stop and included in broadcast flow.
  • On stop(), iterate sseClients to end each response and clear the set before shutting down WS and HTTP servers.
  • Extend broadcast(msg) to send messages to all connected SSE clients formatted as "data: \n\n".
  • Guard SSE writes by checking connection state and catching write errors, logging failures with an SSE-specific message.
src/milky/network/http.ts
Clarify WebSocket event logging to distinguish from SSE.
  • Update WebSocket /event authentication and lifecycle logs to include a [WS] tag in messages.
  • Adjust warning message when WS send fails to explicitly say "Failed to send WS message".
src/milky/network/http.ts

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

你好,我已经查看了你的改动,这里有一些反馈:

  • SSE 的认证/Token 提取逻辑和现有的 WS/API 认证逻辑非常相似;可以考虑抽取一个小的共享辅助函数,以避免未来改动时出现差异。
  • broadcast 中,使用 res.closed 来检测失效的 SSE 连接对于 Node/Express 的响应来说并不标准;建议改为检查 res.writableEnded,或者在写入错误时将客户端从 sseClients 中移除,以避免反复向已断开的连接写入。
面向 AI Agent 的提示
Please address the comments from this code review:

## Overall Comments
- SSE 的认证/Token 提取逻辑和现有的 WS/API 认证逻辑非常相似;可以考虑抽取一个小的共享辅助函数,以避免未来改动时出现差异。
-`broadcast` 中,使用 `res.closed` 来检测失效的 SSE 连接对于 Node/Express 的响应来说并不标准;建议改为检查 `res.writableEnded`,或者在写入错误时将客户端从 `sseClients` 中移除,以避免反复向已断开的连接写入。

## Individual Comments

### Comment 1
<location> `src/milky/network/http.ts:192` </location>
<code_context>
+      const sseData = `data: ${msg}\n\n`
+      for (const res of this.sseClients) {
+        try {
+          if (!res.closed) { // 检查连接是否关闭
+            res.write(sseData)
+          }
</code_context>

<issue_to_address>
**issue (bug_risk):** 使用标准的 `ServerResponse` 状态标志,而不是 `res.closed``Express`/Node 的 `ServerResponse` 并不保证会暴露 `closed` 属性;常用的标志是 `writableEnded`/`destroyed`(或者检查 `res.socket?.writable`)。使用 `res.closed` 很可能不可靠,或者总是 `undefined`,这样仍有可能会向已经关闭的连接写入。请改为在写入前使用类似 `if (!res.writableEnded && !res.destroyed)` 的检查(或其他等价方式)。
</issue_to_address>

### Comment 2
<location> `src/milky/network/http.ts:87` </location>
<code_context>
+        this.ctx.logger.info('MilkyHttp', `${req.ip} -> /event [SSE] (Disconnected)`)
+      })
+
+      req.on('error', (err) => {
+        this.ctx.logger.warn('MilkyHttp', `SSE error: ${err}`)
+        this.sseClients.delete(res)
</code_context>

<issue_to_address>
**suggestion (bug_risk):** 建议在响应/连接上监听错误,而不是在请求上监听。

对于 SSE,长连接资源是响应/底层 socket,而不是请求体。由于典型的 SSE 请求通常没有请求体,`req.on('error')` 可能永远不会触发,因此错误也就不会触发从 `sseClients` 中清理客户端。为了让这个逻辑更健壮,请将错误处理程序挂在 `res``res.socket` 上(例如 `res.on('error', ...)`),以便在连接失败时能够可靠地移除客户端。

建议实现:

```typescript
      res.on('error', (err) => {
        this.ctx.logger.warn('MilkyHttp', `SSE connection error: ${err}`)
        this.sseClients.delete(res)
      })

```

为了进一步增强健壮性,你还可以:
1.`res.socket` 也挂载一个错误处理程序(如 `res.socket?.on('error', ...)`),以捕获底层 socket 错误。
2. 确保 `res` 的类型定义(大概率是 `ServerResponse`)在 TypeScript 中与 `.on('error', ...)` 兼容(Node 的 `ServerResponse` 继承自 `EventEmitter`,通常已经可以这样使用)。
</issue_to_address>

### Comment 3
<location> `src/milky/network/http.ts:187-198` </location>
<code_context>
         if (ws.readyState === WebSocket.OPEN) {
           ws.send(msg)
         }
       } catch (e) {
-        this.ctx.logger.warn('MilkyHttp', `Failed to send message: ${e}`)
+        this.ctx.logger.warn('MilkyHttp', `Failed to send WS message: ${e}`)
+      }
+    }
+
+    // 2. 推送给 SSE 客户端
+    if (this.sseClients.size > 0) {
+      const sseData = `data: ${msg}\n\n`
+      for (const res of this.sseClients) {
+        try {
+          if (!res.closed) { // 检查连接是否关闭
+            res.write(sseData)
+          }
+        } catch (e) {
+          this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`)
+        }
       }
</code_context>

<issue_to_address>
**suggestion:** 当写入抛出异常时,从集合中移除失败的 SSE 客户端。

如果 `res.write` 抛出异常,这个连接实际上已经失效。继续将它保留在 `sseClients` 中,会导致每次广播都触发相同的错误,并制造多余的日志噪音。在 `catch` 代码块中,请将该 `res``sseClients` 中移除(就像你在 `close` 事件中做的那样),这样集合中只会保留仍然活跃的客户端。

```suggestion
    // 2. 推送给 SSE 客户端
    if (this.sseClients.size > 0) {
      const sseData = `data: ${msg}\n\n`
      for (const res of this.sseClients) {
        try {
          if (!res.closed) { // 检查连接是否关闭
            res.write(sseData)
          }
        } catch (e) {
          this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`)
          // 写入失败的 SSE 客户端视为失效,移出集合,避免之后重复报错
          this.sseClients.delete(res)
        }
      }
```
</issue_to_address>

Sourcery 对开源项目免费使用——如果你觉得这些评审有帮助,欢迎分享 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进之后的评审。
Original comment in English

Hey there - I've reviewed your changes - here's some feedback:

  • The SSE auth/token extraction logic is very similar to the existing WS/API auth; consider extracting a small shared helper to avoid divergence in future changes.
  • In broadcast, using res.closed to detect dead SSE connections is non-standard for Node/Express responses; prefer checking res.writableEnded or removing the client from sseClients on write errors to avoid repeatedly writing to broken connections.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The SSE auth/token extraction logic is very similar to the existing WS/API auth; consider extracting a small shared helper to avoid divergence in future changes.
- In `broadcast`, using `res.closed` to detect dead SSE connections is non-standard for Node/Express responses; prefer checking `res.writableEnded` or removing the client from `sseClients` on write errors to avoid repeatedly writing to broken connections.

## Individual Comments

### Comment 1
<location> `src/milky/network/http.ts:192` </location>
<code_context>
+      const sseData = `data: ${msg}\n\n`
+      for (const res of this.sseClients) {
+        try {
+          if (!res.closed) { // 检查连接是否关闭
+            res.write(sseData)
+          }
</code_context>

<issue_to_address>
**issue (bug_risk):** Use standard `ServerResponse` state flags instead of `res.closed`.

`Express`/Node `ServerResponse` doesn’t reliably expose a `closed` property; common flags are `writableEnded`/`destroyed` (or checking `res.socket?.writable`). Using `res.closed` is likely unreliable or always `undefined`, so you may still write to closed connections. Please switch to a check like `if (!res.writableEnded && !res.destroyed)` (or similar) before writing.
</issue_to_address>

### Comment 2
<location> `src/milky/network/http.ts:87` </location>
<code_context>
+        this.ctx.logger.info('MilkyHttp', `${req.ip} -> /event [SSE] (Disconnected)`)
+      })
+
+      req.on('error', (err) => {
+        this.ctx.logger.warn('MilkyHttp', `SSE error: ${err}`)
+        this.sseClients.delete(res)
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Consider listening for errors on the response/connection instead of the request.

With SSE the long‑lived resource is the response/underlying socket, not the request body. Since typical SSE requests have no body, `req.on('error')` may never fire, so errors wouldn’t trigger cleanup from `sseClients`. To make this robust, attach the error handler to `res` or `res.socket` (for example, `res.on('error', ...)`) so you reliably remove clients when the connection fails.

Suggested implementation:

```typescript
      res.on('error', (err) => {
        this.ctx.logger.warn('MilkyHttp', `SSE connection error: ${err}`)
        this.sseClients.delete(res)
      })

```

To make this even more robust, you may also want to:
1. Attach an error handler to `res.socket` (e.g. `res.socket?.on('error', ...)`) to catch low-level socket errors.
2. Ensure that wherever `res` is typed (likely as `ServerResponse`), its type is compatible with `.on('error', ...)` in your TypeScript definitions (Node's `ServerResponse` extends `EventEmitter`, so this should generally already work).
</issue_to_address>

### Comment 3
<location> `src/milky/network/http.ts:187-198` </location>
<code_context>
         if (ws.readyState === WebSocket.OPEN) {
           ws.send(msg)
         }
       } catch (e) {
-        this.ctx.logger.warn('MilkyHttp', `Failed to send message: ${e}`)
+        this.ctx.logger.warn('MilkyHttp', `Failed to send WS message: ${e}`)
+      }
+    }
+
+    // 2. 推送给 SSE 客户端
+    if (this.sseClients.size > 0) {
+      const sseData = `data: ${msg}\n\n`
+      for (const res of this.sseClients) {
+        try {
+          if (!res.closed) { // 检查连接是否关闭
+            res.write(sseData)
+          }
+        } catch (e) {
+          this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`)
+        }
       }
</code_context>

<issue_to_address>
**suggestion:** Remove failed SSE clients from the set when a write throws.

If `res.write` throws, that connection is effectively dead. Keeping it in `sseClients` means every future broadcast will hit the same error and add log noise. In the catch block, remove that `res` from `sseClients` (as you do on `close`) so the set only contains active clients.

```suggestion
    // 2. 推送给 SSE 客户端
    if (this.sseClients.size > 0) {
      const sseData = `data: ${msg}\n\n`
      for (const res of this.sseClients) {
        try {
          if (!res.closed) { // 检查连接是否关闭
            res.write(sseData)
          }
        } catch (e) {
          this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`)
          // 写入失败的 SSE 客户端视为失效,移出集合,避免之后重复报错
          this.sseClients.delete(res)
        }
      }
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread src/milky/network/http.ts
const sseData = `data: ${msg}\n\n`
for (const res of this.sseClients) {
try {
if (!res.closed) { // 检查连接是否关闭
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): 使用标准的 ServerResponse 状态标志,而不是 res.closed

Express/Node 的 ServerResponse 并不保证会暴露 closed 属性;常用的标志是 writableEnded/destroyed(或者检查 res.socket?.writable)。使用 res.closed 很可能不可靠,或者总是 undefined,这样仍有可能会向已经关闭的连接写入。请改为在写入前使用类似 if (!res.writableEnded && !res.destroyed) 的检查(或其他等价方式)。

Original comment in English

issue (bug_risk): Use standard ServerResponse state flags instead of res.closed.

Express/Node ServerResponse doesn’t reliably expose a closed property; common flags are writableEnded/destroyed (or checking res.socket?.writable). Using res.closed is likely unreliable or always undefined, so you may still write to closed connections. Please switch to a check like if (!res.writableEnded && !res.destroyed) (or similar) before writing.

Comment thread src/milky/network/http.ts
this.ctx.logger.info('MilkyHttp', `${req.ip} -> /event [SSE] (Disconnected)`)
})

req.on('error', (err) => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): 建议在响应/连接上监听错误,而不是在请求上监听。

对于 SSE,长连接资源是响应/底层 socket,而不是请求体。由于典型的 SSE 请求通常没有请求体,req.on('error') 可能永远不会触发,因此错误也就不会触发从 sseClients 中清理客户端。为了让这个逻辑更健壮,请将错误处理程序挂在 resres.socket 上(例如 res.on('error', ...)),以便在连接失败时能够可靠地移除客户端。

建议实现:

      res.on('error', (err) => {
        this.ctx.logger.warn('MilkyHttp', `SSE connection error: ${err}`)
        this.sseClients.delete(res)
      })

为了进一步增强健壮性,你还可以:

  1. res.socket 也挂载一个错误处理程序(如 res.socket?.on('error', ...)),以捕获底层 socket 错误。
  2. 确保 res 的类型定义(大概率是 ServerResponse)在 TypeScript 中与 .on('error', ...) 兼容(Node 的 ServerResponse 继承自 EventEmitter,通常已经可以这样使用)。
Original comment in English

suggestion (bug_risk): Consider listening for errors on the response/connection instead of the request.

With SSE the long‑lived resource is the response/underlying socket, not the request body. Since typical SSE requests have no body, req.on('error') may never fire, so errors wouldn’t trigger cleanup from sseClients. To make this robust, attach the error handler to res or res.socket (for example, res.on('error', ...)) so you reliably remove clients when the connection fails.

Suggested implementation:

      res.on('error', (err) => {
        this.ctx.logger.warn('MilkyHttp', `SSE connection error: ${err}`)
        this.sseClients.delete(res)
      })

To make this even more robust, you may also want to:

  1. Attach an error handler to res.socket (e.g. res.socket?.on('error', ...)) to catch low-level socket errors.
  2. Ensure that wherever res is typed (likely as ServerResponse), its type is compatible with .on('error', ...) in your TypeScript definitions (Node's ServerResponse extends EventEmitter, so this should generally already work).

Comment thread src/milky/network/http.ts
Comment on lines +187 to 198
// 2. 推送给 SSE 客户端
if (this.sseClients.size > 0) {
const sseData = `data: ${msg}\n\n`
for (const res of this.sseClients) {
try {
if (!res.closed) { // 检查连接是否关闭
res.write(sseData)
}
} catch (e) {
this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`)
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: 当写入抛出异常时,从集合中移除失败的 SSE 客户端。

如果 res.write 抛出异常,这个连接实际上已经失效。继续将它保留在 sseClients 中,会导致每次广播都触发相同的错误,并制造多余的日志噪音。在 catch 代码块中,请将该 ressseClients 中移除(就像你在 close 事件中做的那样),这样集合中只会保留仍然活跃的客户端。

Suggested change
// 2. 推送给 SSE 客户端
if (this.sseClients.size > 0) {
const sseData = `data: ${msg}\n\n`
for (const res of this.sseClients) {
try {
if (!res.closed) { // 检查连接是否关闭
res.write(sseData)
}
} catch (e) {
this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`)
}
}
// 2. 推送给 SSE 客户端
if (this.sseClients.size > 0) {
const sseData = `data: ${msg}\n\n`
for (const res of this.sseClients) {
try {
if (!res.closed) { // 检查连接是否关闭
res.write(sseData)
}
} catch (e) {
this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`)
// 写入失败的 SSE 客户端视为失效,移出集合,避免之后重复报错
this.sseClients.delete(res)
}
}
Original comment in English

suggestion: Remove failed SSE clients from the set when a write throws.

If res.write throws, that connection is effectively dead. Keeping it in sseClients means every future broadcast will hit the same error and add log noise. In the catch block, remove that res from sseClients (as you do on close) so the set only contains active clients.

Suggested change
// 2. 推送给 SSE 客户端
if (this.sseClients.size > 0) {
const sseData = `data: ${msg}\n\n`
for (const res of this.sseClients) {
try {
if (!res.closed) { // 检查连接是否关闭
res.write(sseData)
}
} catch (e) {
this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`)
}
}
// 2. 推送给 SSE 客户端
if (this.sseClients.size > 0) {
const sseData = `data: ${msg}\n\n`
for (const res of this.sseClients) {
try {
if (!res.closed) { // 检查连接是否关闭
res.write(sseData)
}
} catch (e) {
this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`)
// 写入失败的 SSE 客户端视为失效,移出集合,避免之后重复报错
this.sseClients.delete(res)
}
}

@linyuchen
Copy link
Copy Markdown
Collaborator

感谢 pr,但是在 pr 之前 dev 分支就已经实现了

@linyuchen linyuchen closed this Dec 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants