Skip to content

Commit 6d604c6

Browse files
committed
Make MCP HTTP stateless for hosted routing
1 parent 1101e40 commit 6d604c6

4 files changed

Lines changed: 155 additions & 125 deletions

File tree

.changeset/stateless-mcp-http.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@transloadit/mcp-server': patch
3+
---
4+
5+
Serve Streamable HTTP MCP requests statelessly so hosted deployments keep working behind non-sticky load balancing while preserving isolated transport instances per request.

packages/mcp-server/src/express.ts

Lines changed: 15 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
import { randomUUID } from 'node:crypto'
21
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'
3-
import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js'
42
import express from 'express'
53
import type { TransloaditMcpHttpOptions } from './http.ts'
64
import { isBasicAuthorized } from './http-helpers.ts'
@@ -13,11 +11,6 @@ export type TransloaditMcpExpressOptions = TransloaditMcpHttpOptions & {
1311
}
1412

1513
export function createTransloaditMcpExpressRouter(options: TransloaditMcpExpressOptions = {}) {
16-
const sessionIdGenerator = options.sessionIdGenerator ?? (() => randomUUID())
17-
18-
// Per-session transport map: each MCP client gets its own transport + server pair.
19-
const transports = new Map<string, StreamableHTTPServerTransport>()
20-
2114
const router = express.Router()
2215
const routePath = options.path ?? '/mcp'
2316
const metricsPath =
@@ -64,58 +57,27 @@ export function createTransloaditMcpExpressRouter(options: TransloaditMcpExpress
6457
})
6558

6659
router.all(routePath, async (req: express.Request, res: express.Response) => {
67-
const sessionId = req.headers['mcp-session-id'] as string | undefined
68-
let transport: StreamableHTTPServerTransport | undefined
69-
70-
if (sessionId) {
71-
transport = transports.get(sessionId)
72-
if (!transport) {
73-
res.status(404).json({
74-
jsonrpc: '2.0',
75-
error: { code: -32000, message: 'Session not found' },
76-
id: null,
77-
})
78-
return
79-
}
80-
} else if (req.method === 'POST' && isInitializeRequest(req.body)) {
81-
// New initialization request — create a new transport + server pair.
82-
const newTransport = new StreamableHTTPServerTransport({
83-
sessionIdGenerator,
84-
allowedOrigins: options.allowedOrigins,
85-
allowedHosts: options.allowedHosts,
86-
enableDnsRebindingProtection: options.enableDnsRebindingProtection,
87-
onsessioninitialized: (sid) => {
88-
transports.set(sid, newTransport)
89-
},
90-
})
91-
92-
newTransport.onclose = () => {
93-
const sid = newTransport.sessionId
94-
if (sid) {
95-
transports.delete(sid)
96-
}
97-
}
98-
99-
const server = createTransloaditMcpServer(options)
100-
await server.connect(newTransport)
101-
transport = newTransport
102-
} else if (req.method === 'POST') {
103-
res.status(400).json({
60+
if (req.method !== 'POST') {
61+
res.status(405).json({
10462
jsonrpc: '2.0',
105-
error: { code: -32600, message: 'Bad Request: No valid session ID provided' },
63+
error: { code: -32000, message: 'Method not allowed.' },
10664
id: null,
10765
})
10866
return
10967
}
11068

111-
if (!transport) {
112-
res.status(400).json({
113-
jsonrpc: '2.0',
114-
error: { code: -32600, message: 'Bad Request: No valid session ID provided' },
115-
id: null,
116-
})
117-
return
118-
}
69+
const transport = new StreamableHTTPServerTransport({
70+
sessionIdGenerator: undefined,
71+
allowedOrigins: options.allowedOrigins,
72+
allowedHosts: options.allowedHosts,
73+
enableDnsRebindingProtection: options.enableDnsRebindingProtection,
74+
})
75+
const server = createTransloaditMcpServer(options)
76+
res.on('close', () => {
77+
void transport.close()
78+
void server.close()
79+
})
80+
await server.connect(transport)
11981

12082
await transport.handleRequest(req, res, req.body)
12183
})

packages/mcp-server/src/http.ts

Lines changed: 20 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
import { randomUUID } from 'node:crypto'
21
import type { IncomingMessage, ServerResponse } from 'node:http'
32
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'
4-
import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js'
53
import type { SevLogger } from '@transloadit/sev-logger'
64
import {
75
applyCorsHeaders,
@@ -23,6 +21,7 @@ export type TransloaditMcpHttpOptions = TransloaditMcpServerOptions & {
2321
path?: string
2422
metricsPath?: string | false
2523
metricsAuth?: { username: string; password: string }
24+
// Ignored on purpose: the hosted HTTP server is stateless and does not mint session IDs.
2625
sessionIdGenerator?: (() => string) | undefined
2726
logger?: SevLogger
2827
}
@@ -36,7 +35,7 @@ export type TransloaditMcpHttpHandler = ((
3635

3736
const defaultPath = '/mcp'
3837

39-
/** Read the full request body and JSON-parse it so `isInitializeRequest` can inspect the payload. */
38+
/** Read the full request body and JSON-parse it before handing it to the MCP transport. */
4039
function readJsonBody(req: IncomingMessage): Promise<unknown> {
4140
return new Promise((resolve, reject) => {
4241
const chunks: Buffer[] = []
@@ -64,10 +63,6 @@ export function createTransloaditMcpHttpHandler(
6463
const metricsPath =
6564
options.metricsPath === false ? undefined : normalizePath(options.metricsPath ?? '/metrics')
6665
const metricsAuth = options.metricsAuth
67-
const sessionIdGenerator = options.sessionIdGenerator ?? (() => randomUUID())
68-
69-
// Per-session transport map: each MCP client gets its own transport + server pair.
70-
const transports = new Map<string, StreamableHTTPServerTransport>()
7166

7267
const serverCardJson = JSON.stringify(
7368
buildServerCard(expectedPath, { authKey: options.authKey, authSecret: options.authSecret }),
@@ -162,78 +157,33 @@ export function createTransloaditMcpHttpHandler(
162157
return
163158
}
164159

165-
// Route request to the correct per-session transport.
166-
const sessionId = req.headers['mcp-session-id'] as string | undefined
167-
let transport: StreamableHTTPServerTransport | undefined
168-
169-
if (sessionId) {
170-
transport = transports.get(sessionId)
171-
if (!transport) {
172-
res.statusCode = 404
173-
res.setHeader('Content-Type', 'application/json')
174-
res.end(
175-
JSON.stringify({
176-
jsonrpc: '2.0',
177-
error: { code: -32000, message: 'Session not found' },
178-
id: null,
179-
}),
180-
)
181-
return
182-
}
183-
}
184-
185-
// For POST requests without a session, read the body to check for initialization.
186-
let parsedBody: unknown
187-
if (req.method === 'POST' && !transport) {
188-
parsedBody = await readJsonBody(req)
189-
if (isInitializeRequest(parsedBody)) {
190-
const newTransport = new StreamableHTTPServerTransport({
191-
sessionIdGenerator,
192-
allowedOrigins: options.allowedOrigins,
193-
allowedHosts: options.allowedHosts,
194-
enableDnsRebindingProtection: options.enableDnsRebindingProtection,
195-
onsessioninitialized: (sid) => {
196-
transports.set(sid, newTransport)
197-
},
198-
})
199-
200-
newTransport.onclose = () => {
201-
const sid = newTransport.sessionId
202-
if (sid) {
203-
transports.delete(sid)
204-
}
205-
}
206-
207-
const server = createTransloaditMcpServer(options)
208-
await server.connect(newTransport)
209-
transport = newTransport
210-
} else {
211-
res.statusCode = 400
212-
res.setHeader('Content-Type', 'application/json')
213-
res.end(
214-
JSON.stringify({
215-
jsonrpc: '2.0',
216-
error: { code: -32600, message: 'Bad Request: No valid session ID provided' },
217-
id: null,
218-
}),
219-
)
220-
return
221-
}
222-
}
223-
224-
if (!transport) {
225-
res.statusCode = 400
160+
if (req.method !== 'POST') {
161+
res.statusCode = 405
226162
res.setHeader('Content-Type', 'application/json')
227163
res.end(
228164
JSON.stringify({
229165
jsonrpc: '2.0',
230-
error: { code: -32600, message: 'Bad Request: No valid session ID provided' },
166+
error: { code: -32000, message: 'Method not allowed.' },
231167
id: null,
232168
}),
233169
)
234170
return
235171
}
236172

173+
const parsedBody = await readJsonBody(req)
174+
const transport = new StreamableHTTPServerTransport({
175+
sessionIdGenerator: undefined,
176+
allowedOrigins: options.allowedOrigins,
177+
allowedHosts: options.allowedHosts,
178+
enableDnsRebindingProtection: options.enableDnsRebindingProtection,
179+
})
180+
const server = createTransloaditMcpServer(options)
181+
res.on('close', () => {
182+
void transport.close()
183+
void server.close()
184+
})
185+
await server.connect(transport)
186+
237187
try {
238188
await transport.handleRequest(req, res, parsedBody)
239189
} catch {
@@ -245,9 +195,7 @@ export function createTransloaditMcpHttpHandler(
245195
}) as TransloaditMcpHttpHandler
246196

247197
handler.close = async () => {
248-
const closePromises = [...transports.values()].map((t) => t.close())
249-
await Promise.all(closePromises)
250-
transports.clear()
198+
return Promise.resolve()
251199
}
252200

253201
return handler
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import { createServer, request as httpRequest } from 'node:http'
2+
import type { AddressInfo, Socket } from 'node:net'
3+
import { expect, test } from 'vitest'
4+
import { createHttpClient, startHttpServer } from './http-server.ts'
5+
import { parseToolPayload } from './mcp-client.ts'
6+
7+
type RoutedRequest = {
8+
method: string
9+
path: string
10+
headers: Record<string, string | string[] | undefined>
11+
body: Buffer
12+
}
13+
14+
function readRequest(req: import('node:http').IncomingMessage): Promise<RoutedRequest> {
15+
return new Promise((resolve, reject) => {
16+
const chunks: Buffer[] = []
17+
req.on('data', (chunk: Buffer) => chunks.push(chunk))
18+
req.on('end', () => {
19+
resolve({
20+
method: req.method ?? 'GET',
21+
path: req.url ?? '/',
22+
headers: req.headers,
23+
body: Buffer.concat(chunks),
24+
})
25+
})
26+
req.on('error', reject)
27+
})
28+
}
29+
30+
function proxyToBackend(
31+
targetPort: number,
32+
routedRequest: RoutedRequest,
33+
res: import('node:http').ServerResponse,
34+
): Promise<void> {
35+
return new Promise((resolve, reject) => {
36+
const upstream = httpRequest(
37+
{
38+
host: '127.0.0.1',
39+
port: targetPort,
40+
method: routedRequest.method,
41+
path: routedRequest.path,
42+
headers: routedRequest.headers,
43+
},
44+
(upstreamRes) => {
45+
res.writeHead(upstreamRes.statusCode ?? 500, upstreamRes.headers)
46+
upstreamRes.pipe(res)
47+
upstreamRes.on('end', resolve)
48+
upstreamRes.on('error', reject)
49+
},
50+
)
51+
52+
upstream.on('error', reject)
53+
upstream.end(routedRequest.body)
54+
})
55+
}
56+
57+
async function startAlternatingProxy(
58+
ports: [number, number],
59+
): Promise<{ url: URL; close: () => Promise<void> }> {
60+
let requestCount = 0
61+
const server = createServer(async (req, res) => {
62+
try {
63+
const routedRequest = await readRequest(req)
64+
const targetPort = ports[requestCount % ports.length]!
65+
requestCount += 1
66+
await proxyToBackend(targetPort, routedRequest, res)
67+
} catch (error) {
68+
res.statusCode = 500
69+
res.end(error instanceof Error ? error.message : String(error))
70+
}
71+
})
72+
73+
await new Promise<void>((resolve) => {
74+
server.listen(0, '127.0.0.1', resolve)
75+
})
76+
77+
const { port } = server.address() as AddressInfo
78+
return {
79+
url: new URL(`http://127.0.0.1:${port}/mcp`),
80+
close: async () => {
81+
await new Promise<void>((resolve, reject) => {
82+
server.close((error) => (error ? reject(error) : resolve()))
83+
})
84+
},
85+
}
86+
}
87+
88+
test('streamable http: survives non-sticky routing across hosted MCP backends', async () => {
89+
const backendA = await startHttpServer()
90+
const backendB = await startHttpServer()
91+
const proxy = await startAlternatingProxy([
92+
backendA.url.port ? Number(backendA.url.port) : 0,
93+
backendB.url.port ? Number(backendB.url.port) : 0,
94+
])
95+
96+
try {
97+
const { client, transport } = await createHttpClient(proxy.url)
98+
99+
try {
100+
const robots = await client.callTool({
101+
name: 'transloadit_list_robots',
102+
arguments: { limit: 1 },
103+
})
104+
105+
expect(parseToolPayload(robots).status).toBe('ok')
106+
} finally {
107+
await transport.close()
108+
await client.close()
109+
}
110+
} finally {
111+
await proxy.close()
112+
await backendA.close()
113+
await backendB.close()
114+
}
115+
})

0 commit comments

Comments
 (0)