|
| 1 | +/** |
| 2 | + * Test-only legacy HTTP+SSE host bridge. |
| 3 | + * |
| 4 | + * v2 removed the server-side SSE transport, but the client-side |
| 5 | + * SSEClientTransport is still shipped for talking to legacy servers. This |
| 6 | + * bridge stands in for the removed server half so the e2e matrix can exercise |
| 7 | + * the real client transport end to end: it speaks the legacy wire protocol |
| 8 | + * (an `endpoint` SSE event carrying the POST URL with the sessionId, |
| 9 | + * `message` SSE events for server→client JSON-RPC, plain POSTs for |
| 10 | + * client→server JSON-RPC) over a real loopback listener and bridges to a real |
| 11 | + * v2 server through the Transport interface. |
| 12 | + */ |
| 13 | + |
| 14 | +import { randomUUID } from 'node:crypto'; |
| 15 | +import type { IncomingMessage, ServerResponse } from 'node:http'; |
| 16 | +import { createServer } from 'node:http'; |
| 17 | + |
| 18 | +import { JSONRPCMessageSchema } from '@modelcontextprotocol/core'; |
| 19 | +import type { JSONRPCMessage, McpServer, Server, Transport } from '@modelcontextprotocol/server'; |
| 20 | + |
| 21 | +const SSE_PATH = '/sse'; |
| 22 | +const POST_PATH = '/messages'; |
| 23 | + |
| 24 | +type AnyServer = McpServer | Server; |
| 25 | + |
| 26 | +function toError(value: unknown): Error { |
| 27 | + return value instanceof Error ? value : new Error(String(value)); |
| 28 | +} |
| 29 | + |
| 30 | +/** Test-only server half of the legacy HTTP+SSE transport (v2 ships only the client half). */ |
| 31 | +export class LegacySseServerTransport implements Transport { |
| 32 | + private _response?: ServerResponse; |
| 33 | + readonly sessionId: string = randomUUID(); |
| 34 | + |
| 35 | + onclose?: () => void; |
| 36 | + onerror?: (error: Error) => void; |
| 37 | + onmessage?: (message: JSONRPCMessage) => void; |
| 38 | + |
| 39 | + constructor(private readonly _res: ServerResponse) {} |
| 40 | + |
| 41 | + async start(): Promise<void> { |
| 42 | + if (this._response) throw new Error('LegacySseServerTransport already started'); |
| 43 | + this._res.writeHead(200, { |
| 44 | + 'content-type': 'text/event-stream', |
| 45 | + 'cache-control': 'no-cache, no-transform', |
| 46 | + connection: 'keep-alive' |
| 47 | + }); |
| 48 | + // The legacy protocol's first event tells the client where to POST its messages. |
| 49 | + this._res.write(`event: endpoint\ndata: ${POST_PATH}?sessionId=${this.sessionId}\n\n`); |
| 50 | + this._response = this._res; |
| 51 | + this._res.on('close', () => { |
| 52 | + this._response = undefined; |
| 53 | + this.onclose?.(); |
| 54 | + }); |
| 55 | + } |
| 56 | + |
| 57 | + async send(message: JSONRPCMessage): Promise<void> { |
| 58 | + if (!this._response) throw new Error('SSE stream not established'); |
| 59 | + this._response.write(`event: message\ndata: ${JSON.stringify(message)}\n\n`); |
| 60 | + } |
| 61 | + |
| 62 | + async close(): Promise<void> { |
| 63 | + this._response?.end(); |
| 64 | + this._response = undefined; |
| 65 | + this.onclose?.(); |
| 66 | + } |
| 67 | + |
| 68 | + /** Delivers a client→server POST to the server side and answers the HTTP request (202 on success). */ |
| 69 | + async handlePostMessage(req: IncomingMessage, res: ServerResponse): Promise<void> { |
| 70 | + if (!this._response) { |
| 71 | + res.writeHead(500).end('SSE connection not established'); |
| 72 | + return; |
| 73 | + } |
| 74 | + const contentTypeHeader = req.headers['content-type'] ?? ''; |
| 75 | + if (!contentTypeHeader.includes('application/json')) { |
| 76 | + res.writeHead(400).end(`Unsupported content-type: ${contentTypeHeader}`); |
| 77 | + return; |
| 78 | + } |
| 79 | + const chunks: Buffer[] = []; |
| 80 | + for await (const chunk of req) { |
| 81 | + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(String(chunk))); |
| 82 | + } |
| 83 | + const raw = Buffer.concat(chunks).toString('utf8'); |
| 84 | + |
| 85 | + let message: JSONRPCMessage; |
| 86 | + try { |
| 87 | + message = JSONRPCMessageSchema.parse(JSON.parse(raw)); |
| 88 | + } catch (error) { |
| 89 | + res.writeHead(400).end(`Invalid message: ${raw}`); |
| 90 | + this.onerror?.(toError(error)); |
| 91 | + return; |
| 92 | + } |
| 93 | + res.writeHead(202).end('Accepted'); |
| 94 | + this.onmessage?.(message); |
| 95 | + } |
| 96 | +} |
| 97 | + |
| 98 | +export interface LegacySseHost { |
| 99 | + /** URL of the SSE endpoint; GET it to open a stream. */ |
| 100 | + readonly url: URL; |
| 101 | + close(): Promise<void>; |
| 102 | +} |
| 103 | + |
| 104 | +/** |
| 105 | + * Runs a loopback legacy-SSE host: every GET on the SSE path gets its own |
| 106 | + * server instance from the factory (mirroring hostPerSession), POSTs are |
| 107 | + * routed to the owning session, unknown sessions get 404 and handler failures |
| 108 | + * become 500. |
| 109 | + */ |
| 110 | +export async function startLegacySseHost(makeServer: () => AnyServer): Promise<LegacySseHost> { |
| 111 | + const sessions = new Map<string, { tx: LegacySseServerTransport; server: AnyServer }>(); |
| 112 | + |
| 113 | + const handle = async (req: IncomingMessage, res: ServerResponse): Promise<void> => { |
| 114 | + const requestUrl = new URL(req.url ?? '/', 'http://127.0.0.1'); |
| 115 | + if (req.method === 'GET' && requestUrl.pathname === SSE_PATH) { |
| 116 | + const tx = new LegacySseServerTransport(res); |
| 117 | + const server = makeServer(); |
| 118 | + sessions.set(tx.sessionId, { tx, server }); |
| 119 | + // connect() starts the transport, which writes the SSE headers and the endpoint event. |
| 120 | + await server.connect(tx); |
| 121 | + return; |
| 122 | + } |
| 123 | + if (req.method === 'POST' && requestUrl.pathname === POST_PATH) { |
| 124 | + const session = sessions.get(requestUrl.searchParams.get('sessionId') ?? ''); |
| 125 | + if (!session) { |
| 126 | + res.writeHead(404, { 'content-type': 'text/plain' }).end('Session not found'); |
| 127 | + return; |
| 128 | + } |
| 129 | + await session.tx.handlePostMessage(req, res); |
| 130 | + return; |
| 131 | + } |
| 132 | + res.writeHead(404).end(); |
| 133 | + }; |
| 134 | + |
| 135 | + const httpServer = createServer((req, res) => { |
| 136 | + handle(req, res).catch((error: unknown) => { |
| 137 | + // Handler failures become a 500 rather than an unhandled rejection. |
| 138 | + if (!res.headersSent) res.writeHead(500).end(toError(error).message); |
| 139 | + }); |
| 140 | + }); |
| 141 | + |
| 142 | + await new Promise<void>(resolve => httpServer.listen(0, '127.0.0.1', resolve)); |
| 143 | + const address = httpServer.address(); |
| 144 | + if (address === null || typeof address === 'string') throw new Error('expected the SSE host to listen on a TCP port'); |
| 145 | + const url = new URL(`http://127.0.0.1:${address.port}${SSE_PATH}`); |
| 146 | + |
| 147 | + return { |
| 148 | + url, |
| 149 | + close: async () => { |
| 150 | + for (const { tx, server } of sessions.values()) { |
| 151 | + await server.close(); |
| 152 | + await tx.close(); |
| 153 | + } |
| 154 | + sessions.clear(); |
| 155 | + httpServer.closeAllConnections(); |
| 156 | + await new Promise<void>(resolve => httpServer.close(() => resolve())); |
| 157 | + } |
| 158 | + }; |
| 159 | +} |
0 commit comments