Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 66 additions & 5 deletions src/milky/network/http.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { MilkyHttpConfig } from '@/common/types'
import type { MilkyAdapter } from '@/milky/adapter'
import { Failed } from '@/milky/common/api'
import express, { Express } from 'express'
import express, { Express, Response } from 'express'
import { WebSocketServer, WebSocket } from 'ws'
import http from 'http'
import cors from 'cors'
Expand All @@ -10,6 +10,8 @@ import { Context } from 'cordis'
class MilkyHttpHandler {
readonly app: Express
readonly eventPushClients = new Set<WebSocket>()
readonly sseClients = new Set<Response>()

private httpServer: http.Server | undefined
private wsServer: WebSocketServer | undefined

Expand Down Expand Up @@ -46,6 +48,48 @@ class MilkyHttpHandler {
})
}

// SSE
this.app.get(`${config.prefix}/event`, (req, res) => {
// 1. 鉴权逻辑 (与 WS/API 保持一致)
if (this.config.accessToken) {
let inputToken = ''
const authHeader = req.headers['authorization']
if (authHeader && authHeader.startsWith('Bearer ')) {
inputToken = authHeader.split('Bearer ').pop()!
} else if (req.query.access_token) {
// 兼容 query 参数鉴权
inputToken = String(req.query.access_token)
}

if (inputToken !== this.config.accessToken) {
this.ctx.logger.warn('MilkyHttp', `${req.ip} -> /event [SSE] (Credentials invalid)`)
return res.status(401).json(Failed(-401, 'Unauthorized'))
}
}

// 2. 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
res.setHeader('X-Accel-Buffering', 'no') // 防止 Nginx 缓存
res.flushHeaders()

// 3. 加入客户端列表
this.sseClients.add(res)
this.ctx.logger.info('MilkyHttp', `${req.ip} -> /event [SSE] (Connected)`)

// 4. 处理断开连接
req.on('close', () => {
this.sseClients.delete(res)
this.ctx.logger.info('MilkyHttp', `${req.ip} -> /event [SSE] (Disconnected)`)
})

req.on('error', (err) => {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): 建议在响应/连接上监听错误,而不是在请求上监听。

对于 SSE,长连接资源是响应/底层 socket,而不是请求体。由于典型的 SSE 请求通常没有请求体,req.on('error') 可能永远不会触发,因此错误也就不会触发从 sseClients 中清理客户端。为了让这个逻辑更健壮,请将错误处理程序挂在 resres.socket 上(例如 res.on('error', ...)),以便在连接失败时能够可靠地移除客户端。

建议实现:

      res.on('error', (err) => {
        this.ctx.logger.warn('MilkyHttp', `SSE connection error: ${err}`)
        this.sseClients.delete(res)
      })

为了进一步增强健壮性,你还可以:

  1. res.socket 也挂载一个错误处理程序(如 res.socket?.on('error', ...)),以捕获底层 socket 错误。
  2. 确保 res 的类型定义(大概率是 ServerResponse)在 TypeScript 中与 .on('error', ...) 兼容(Node 的 ServerResponse 继承自 EventEmitter,通常已经可以这样使用)。
Original comment in English

suggestion (bug_risk): Consider listening for errors on the response/connection instead of the request.

With SSE the long‑lived resource is the response/underlying socket, not the request body. Since typical SSE requests have no body, req.on('error') may never fire, so errors wouldn’t trigger cleanup from sseClients. To make this robust, attach the error handler to res or res.socket (for example, res.on('error', ...)) so you reliably remove clients when the connection fails.

Suggested implementation:

      res.on('error', (err) => {
        this.ctx.logger.warn('MilkyHttp', `SSE connection error: ${err}`)
        this.sseClients.delete(res)
      })

To make this even more robust, you may also want to:

  1. Attach an error handler to res.socket (e.g. res.socket?.on('error', ...)) to catch low-level socket errors.
  2. Ensure that wherever res is typed (likely as ServerResponse), its type is compatible with .on('error', ...) in your TypeScript definitions (Node's ServerResponse extends EventEmitter, so this should generally already work).

this.ctx.logger.warn('MilkyHttp', `SSE error: ${err}`)
this.sseClients.delete(res)
})
})

// API endpoint
this.app.post(`${config.prefix}/api/:endpoint`, async (req, res) => {
const endpoint = req.params.endpoint
Expand Down Expand Up @@ -100,18 +144,18 @@ class MilkyHttpHandler {
}

if (!inputToken || inputToken !== this.config.accessToken) {
this.ctx.logger.warn('MilkyHttp', `${req.socket.remoteAddress} -> /event (Credentials invalid)`)
this.ctx.logger.warn('MilkyHttp', `${req.socket.remoteAddress} -> /event [WS] (Credentials invalid)`)
ws.close(1008, 'Unauthorized')
return
}
}

this.eventPushClients.add(ws)
this.ctx.logger.info('MilkyHttp', `${req.socket.remoteAddress} -> /event (Connected)`)
this.ctx.logger.info('MilkyHttp', `${req.socket.remoteAddress} -> /event [WS] (Connected)`)

ws.on('close', () => {
this.eventPushClients.delete(ws)
this.ctx.logger.info('MilkyHttp', `${req.socket.remoteAddress} -> /event (Disconnected)`)
this.ctx.logger.info('MilkyHttp', `${req.socket.remoteAddress} -> /event [WS] (Disconnected)`)
})

ws.on('error', (error) => {
Expand All @@ -122,18 +166,35 @@ class MilkyHttpHandler {
}

stop() {
this.sseClients.forEach(res => res.end())
this.sseClients.clear()
this.wsServer?.close()
this.httpServer?.close()
}

broadcast(msg: string) {
// 1. 推送给 WebSocket 客户端
for (const ws of this.eventPushClients) {
try {
if (ws.readyState === WebSocket.OPEN) {
ws.send(msg)
}
} catch (e) {
this.ctx.logger.warn('MilkyHttp', `Failed to send message: ${e}`)
this.ctx.logger.warn('MilkyHttp', `Failed to send WS message: ${e}`)
}
}

// 2. 推送给 SSE 客户端
if (this.sseClients.size > 0) {
const sseData = `data: ${msg}\n\n`
for (const res of this.sseClients) {
try {
if (!res.closed) { // 检查连接是否关闭

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): 使用标准的 ServerResponse 状态标志,而不是 res.closed

Express/Node 的 ServerResponse 并不保证会暴露 closed 属性;常用的标志是 writableEnded/destroyed(或者检查 res.socket?.writable)。使用 res.closed 很可能不可靠,或者总是 undefined,这样仍有可能会向已经关闭的连接写入。请改为在写入前使用类似 if (!res.writableEnded && !res.destroyed) 的检查(或其他等价方式)。

Original comment in English

issue (bug_risk): Use standard ServerResponse state flags instead of res.closed.

Express/Node ServerResponse doesn’t reliably expose a closed property; common flags are writableEnded/destroyed (or checking res.socket?.writable). Using res.closed is likely unreliable or always undefined, so you may still write to closed connections. Please switch to a check like if (!res.writableEnded && !res.destroyed) (or similar) before writing.

res.write(sseData)
}
} catch (e) {
this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`)
}
}
Comment on lines +187 to 198

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: 当写入抛出异常时,从集合中移除失败的 SSE 客户端。

如果 res.write 抛出异常,这个连接实际上已经失效。继续将它保留在 sseClients 中,会导致每次广播都触发相同的错误,并制造多余的日志噪音。在 catch 代码块中,请将该 ressseClients 中移除(就像你在 close 事件中做的那样),这样集合中只会保留仍然活跃的客户端。

Suggested change
// 2. 推送给 SSE 客户端
if (this.sseClients.size > 0) {
const sseData = `data: ${msg}\n\n`
for (const res of this.sseClients) {
try {
if (!res.closed) { // 检查连接是否关闭
res.write(sseData)
}
} catch (e) {
this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`)
}
}
// 2. 推送给 SSE 客户端
if (this.sseClients.size > 0) {
const sseData = `data: ${msg}\n\n`
for (const res of this.sseClients) {
try {
if (!res.closed) { // 检查连接是否关闭
res.write(sseData)
}
} catch (e) {
this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`)
// 写入失败的 SSE 客户端视为失效,移出集合,避免之后重复报错
this.sseClients.delete(res)
}
}
Original comment in English

suggestion: Remove failed SSE clients from the set when a write throws.

If res.write throws, that connection is effectively dead. Keeping it in sseClients means every future broadcast will hit the same error and add log noise. In the catch block, remove that res from sseClients (as you do on close) so the set only contains active clients.

Suggested change
// 2. 推送给 SSE 客户端
if (this.sseClients.size > 0) {
const sseData = `data: ${msg}\n\n`
for (const res of this.sseClients) {
try {
if (!res.closed) { // 检查连接是否关闭
res.write(sseData)
}
} catch (e) {
this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`)
}
}
// 2. 推送给 SSE 客户端
if (this.sseClients.size > 0) {
const sseData = `data: ${msg}\n\n`
for (const res of this.sseClients) {
try {
if (!res.closed) { // 检查连接是否关闭
res.write(sseData)
}
} catch (e) {
this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`)
// 写入失败的 SSE 客户端视为失效,移出集合,避免之后重复报错
this.sseClients.delete(res)
}
}

}
}
Expand Down
Loading