From 6adba9eed180c2551c346722afbf24b8f1cb0067 Mon Sep 17 00:00:00 2001 From: super1207 Date: Sat, 13 Dec 2025 14:43:22 +0800 Subject: [PATCH] fix:support milky sse --- src/milky/network/http.ts | 71 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 66 insertions(+), 5 deletions(-) diff --git a/src/milky/network/http.ts b/src/milky/network/http.ts index 2d62d3851..28187348e 100644 --- a/src/milky/network/http.ts +++ b/src/milky/network/http.ts @@ -1,7 +1,7 @@ import { MilkyHttpConfig } from '@/common/types' import type { MilkyAdapter } from '@/milky/adapter' import { Failed } from '@/milky/common/api' -import express, { Express } from 'express' +import express, { Express, Response } from 'express' import { WebSocketServer, WebSocket } from 'ws' import http from 'http' import cors from 'cors' @@ -10,6 +10,8 @@ import { Context } from 'cordis' class MilkyHttpHandler { readonly app: Express readonly eventPushClients = new Set() + readonly sseClients = new Set() + private httpServer: http.Server | undefined private wsServer: WebSocketServer | undefined @@ -46,6 +48,48 @@ class MilkyHttpHandler { }) } + // SSE + this.app.get(`${config.prefix}/event`, (req, res) => { + // 1. 鉴权逻辑 (与 WS/API 保持一致) + if (this.config.accessToken) { + let inputToken = '' + const authHeader = req.headers['authorization'] + if (authHeader && authHeader.startsWith('Bearer ')) { + inputToken = authHeader.split('Bearer ').pop()! + } else if (req.query.access_token) { + // 兼容 query 参数鉴权 + inputToken = String(req.query.access_token) + } + + if (inputToken !== this.config.accessToken) { + this.ctx.logger.warn('MilkyHttp', `${req.ip} -> /event [SSE] (Credentials invalid)`) + return res.status(401).json(Failed(-401, 'Unauthorized')) + } + } + + // 2. 设置 SSE 响应头 + res.setHeader('Content-Type', 'text/event-stream; charset=utf-8') + res.setHeader('Cache-Control', 'no-cache') + res.setHeader('Connection', 'keep-alive') + res.setHeader('X-Accel-Buffering', 'no') // 防止 Nginx 缓存 + res.flushHeaders() + + // 3. 加入客户端列表 + this.sseClients.add(res) + this.ctx.logger.info('MilkyHttp', `${req.ip} -> /event [SSE] (Connected)`) + + // 4. 处理断开连接 + req.on('close', () => { + this.sseClients.delete(res) + this.ctx.logger.info('MilkyHttp', `${req.ip} -> /event [SSE] (Disconnected)`) + }) + + req.on('error', (err) => { + this.ctx.logger.warn('MilkyHttp', `SSE error: ${err}`) + this.sseClients.delete(res) + }) + }) + // API endpoint this.app.post(`${config.prefix}/api/:endpoint`, async (req, res) => { const endpoint = req.params.endpoint @@ -100,18 +144,18 @@ class MilkyHttpHandler { } if (!inputToken || inputToken !== this.config.accessToken) { - this.ctx.logger.warn('MilkyHttp', `${req.socket.remoteAddress} -> /event (Credentials invalid)`) + this.ctx.logger.warn('MilkyHttp', `${req.socket.remoteAddress} -> /event [WS] (Credentials invalid)`) ws.close(1008, 'Unauthorized') return } } this.eventPushClients.add(ws) - this.ctx.logger.info('MilkyHttp', `${req.socket.remoteAddress} -> /event (Connected)`) + this.ctx.logger.info('MilkyHttp', `${req.socket.remoteAddress} -> /event [WS] (Connected)`) ws.on('close', () => { this.eventPushClients.delete(ws) - this.ctx.logger.info('MilkyHttp', `${req.socket.remoteAddress} -> /event (Disconnected)`) + this.ctx.logger.info('MilkyHttp', `${req.socket.remoteAddress} -> /event [WS] (Disconnected)`) }) ws.on('error', (error) => { @@ -122,18 +166,35 @@ class MilkyHttpHandler { } stop() { + this.sseClients.forEach(res => res.end()) + this.sseClients.clear() this.wsServer?.close() this.httpServer?.close() } broadcast(msg: string) { + // 1. 推送给 WebSocket 客户端 for (const ws of this.eventPushClients) { try { if (ws.readyState === WebSocket.OPEN) { ws.send(msg) } } catch (e) { - this.ctx.logger.warn('MilkyHttp', `Failed to send message: ${e}`) + this.ctx.logger.warn('MilkyHttp', `Failed to send WS message: ${e}`) + } + } + + // 2. 推送给 SSE 客户端 + if (this.sseClients.size > 0) { + const sseData = `data: ${msg}\n\n` + for (const res of this.sseClients) { + try { + if (!res.closed) { // 检查连接是否关闭 + res.write(sseData) + } + } catch (e) { + this.ctx.logger.warn('MilkyHttp', `Failed to send SSE message: ${e}`) + } } } }