Skip to content

Commit 10c6e48

Browse files
kvzclaude
andcommitted
fix(mcp-server): support concurrent sessions via per-session transports
The HTTP and Express handlers previously created a single StreamableHTTPServerTransport shared by all clients. After the first client initialized, subsequent clients received "Server already initialized" because the transport tracks initialization state per instance. This switches to the SDK-recommended pattern of creating a new transport + server pair for each session, keyed by session ID. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 92f1e4f commit 10c6e48

3 files changed

Lines changed: 243 additions & 39 deletions

File tree

packages/mcp-server/src/express.ts

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { randomUUID } from 'node:crypto'
22
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'
3+
import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js'
34
import express from 'express'
45
import type { TransloaditMcpHttpOptions } from './http.ts'
56
import { isBasicAuthorized } from './http-helpers.ts'
6-
import { createMcpRequestHandler } from './http-request-handler.ts'
77
import { getMetrics, getMetricsContentType } from './metrics.ts'
88
import { createTransloaditMcpServer } from './server.ts'
99
import { buildServerCard, serverCardPath } from './server-card.ts'
@@ -15,28 +15,16 @@ export type TransloaditMcpExpressOptions = TransloaditMcpHttpOptions & {
1515
export const createTransloaditMcpExpressRouter = async (
1616
options: TransloaditMcpExpressOptions = {},
1717
) => {
18-
const server = createTransloaditMcpServer(options)
19-
const transport = new StreamableHTTPServerTransport({
20-
sessionIdGenerator: options.sessionIdGenerator ?? (() => randomUUID()),
21-
allowedOrigins: options.allowedOrigins,
22-
allowedHosts: options.allowedHosts,
23-
enableDnsRebindingProtection: options.enableDnsRebindingProtection,
24-
})
18+
const sessionIdGenerator = options.sessionIdGenerator ?? (() => randomUUID())
2519

26-
await server.connect(transport)
20+
// Per-session transport map: each MCP client gets its own transport + server pair.
21+
const transports = new Map<string, StreamableHTTPServerTransport>()
2722

2823
const router = express.Router()
2924
const routePath = options.path ?? '/mcp'
3025
const metricsPath =
3126
options.metricsPath === false ? undefined : (options.metricsPath ?? '/metrics')
3227
const metricsAuth = options.metricsAuth
33-
const handler = createMcpRequestHandler(transport, {
34-
allowedOrigins: options.allowedOrigins,
35-
mcpToken: options.mcpToken,
36-
path: { expectedPath: routePath, allowRoot: true },
37-
logger: options.logger,
38-
redactSecrets: [options.mcpToken, options.authKey, options.authSecret],
39-
})
4028

4129
const serverCardJson = JSON.stringify(
4230
buildServerCard(routePath, { authKey: options.authKey, authSecret: options.authSecret }),
@@ -77,8 +65,60 @@ export const createTransloaditMcpExpressRouter = async (
7765
sendServerCard(res, false)
7866
})
7967

80-
router.all(routePath, (req, res) => {
81-
void handler(req, res)
68+
router.all(routePath, async (req: express.Request, res: express.Response) => {
69+
const sessionId = req.headers['mcp-session-id'] as string | undefined
70+
let transport: StreamableHTTPServerTransport | undefined
71+
72+
if (sessionId) {
73+
transport = transports.get(sessionId)
74+
if (!transport) {
75+
res.status(404).json({
76+
jsonrpc: '2.0',
77+
error: { code: -32000, message: 'Session not found' },
78+
id: null,
79+
})
80+
return
81+
}
82+
} else if (req.method === 'POST' && isInitializeRequest(req.body)) {
83+
// New initialization request — create a new transport + server pair.
84+
transport = new StreamableHTTPServerTransport({
85+
sessionIdGenerator,
86+
allowedOrigins: options.allowedOrigins,
87+
allowedHosts: options.allowedHosts,
88+
enableDnsRebindingProtection: options.enableDnsRebindingProtection,
89+
onsessioninitialized: (sid) => {
90+
transports.set(sid, transport!)
91+
},
92+
})
93+
94+
transport.onclose = () => {
95+
const sid = transport!.sessionId
96+
if (sid) {
97+
transports.delete(sid)
98+
}
99+
}
100+
101+
const server = createTransloaditMcpServer(options)
102+
await server.connect(transport)
103+
} else if (req.method === 'POST') {
104+
res.status(400).json({
105+
jsonrpc: '2.0',
106+
error: { code: -32600, message: 'Bad Request: No valid session ID provided' },
107+
id: null,
108+
})
109+
return
110+
}
111+
112+
if (!transport) {
113+
res.status(400).json({
114+
jsonrpc: '2.0',
115+
error: { code: -32600, message: 'Bad Request: No valid session ID provided' },
116+
id: null,
117+
})
118+
return
119+
}
120+
121+
await transport.handleRequest(req, res, req.body)
82122
})
83123

84124
if (metricsPath) {

packages/mcp-server/src/http.ts

Lines changed: 148 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import { randomUUID } from 'node:crypto'
22
import type { IncomingMessage, ServerResponse } from 'node:http'
33
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'
4+
import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js'
45
import type { SevLogger } from '@transloadit/sev-logger'
56
import {
67
applyCorsHeaders,
8+
isAuthorized,
79
isBasicAuthorized,
810
normalizePath,
911
parsePathname,
1012
} from './http-helpers.ts'
11-
import { createMcpRequestHandler } from './http-request-handler.ts'
1213
import { getMetrics, getMetricsContentType } from './metrics.ts'
1314
import type { TransloaditMcpServerOptions } from './server.ts'
1415
import { createTransloaditMcpServer } from './server.ts'
@@ -35,31 +36,38 @@ export type TransloaditMcpHttpHandler = ((
3536

3637
const defaultPath = '/mcp'
3738

39+
/** Read the full request body and JSON-parse it so `isInitializeRequest` can inspect the payload. */
40+
async function readJsonBody(req: IncomingMessage): Promise<unknown> {
41+
return new Promise((resolve, reject) => {
42+
const chunks: Buffer[] = []
43+
req.on('data', (chunk: Buffer) => chunks.push(chunk))
44+
req.on('end', () => {
45+
const raw = Buffer.concat(chunks).toString('utf8')
46+
if (!raw) {
47+
resolve(undefined)
48+
return
49+
}
50+
try {
51+
resolve(JSON.parse(raw))
52+
} catch {
53+
resolve(undefined)
54+
}
55+
})
56+
req.on('error', reject)
57+
})
58+
}
59+
3860
export const createTransloaditMcpHttpHandler = async (
3961
options: TransloaditMcpHttpOptions = {},
4062
): Promise<TransloaditMcpHttpHandler> => {
41-
const server = createTransloaditMcpServer(options)
42-
const transport = new StreamableHTTPServerTransport({
43-
sessionIdGenerator: options.sessionIdGenerator ?? (() => randomUUID()),
44-
allowedOrigins: options.allowedOrigins,
45-
allowedHosts: options.allowedHosts,
46-
enableDnsRebindingProtection: options.enableDnsRebindingProtection,
47-
})
48-
49-
await server.connect(transport)
50-
5163
const expectedPath = options.path ?? defaultPath
5264
const metricsPath =
5365
options.metricsPath === false ? undefined : normalizePath(options.metricsPath ?? '/metrics')
5466
const metricsAuth = options.metricsAuth
67+
const sessionIdGenerator = options.sessionIdGenerator ?? (() => randomUUID())
5568

56-
const mcpHandler = createMcpRequestHandler(transport, {
57-
allowedOrigins: options.allowedOrigins,
58-
mcpToken: options.mcpToken,
59-
path: { expectedPath },
60-
logger: options.logger,
61-
redactSecrets: [options.mcpToken, options.authKey, options.authSecret],
62-
})
69+
// Per-session transport map: each MCP client gets its own transport + server pair.
70+
const transports = new Map<string, StreamableHTTPServerTransport>()
6371

6472
const serverCardJson = JSON.stringify(
6573
buildServerCard(expectedPath, { authKey: options.authKey, authSecret: options.authSecret }),
@@ -69,7 +77,6 @@ export const createTransloaditMcpHttpHandler = async (
6977
const pathname = normalizePath(parsePathname(req.url, expectedPath))
7078

7179
if (pathname === serverCardPath) {
72-
// Public discovery endpoint for registries; always allow CORS (optionally restricted by allowedOrigins).
7380
if (!applyCorsHeaders(req, res, options.allowedOrigins)) {
7481
return
7582
}
@@ -115,11 +122,131 @@ export const createTransloaditMcpHttpHandler = async (
115122
return
116123
}
117124

118-
await mcpHandler(req, res)
125+
if (pathname !== normalizePath(expectedPath)) {
126+
res.statusCode = 404
127+
res.end('Not Found')
128+
return
129+
}
130+
131+
if (!applyCorsHeaders(req, res, options.allowedOrigins)) {
132+
return
133+
}
134+
135+
if (req.method === 'OPTIONS') {
136+
res.statusCode = 204
137+
res.end()
138+
return
139+
}
140+
141+
if (options.mcpToken && !isAuthorized(req, options.mcpToken)) {
142+
res.statusCode = 401
143+
res.setHeader('WWW-Authenticate', 'Bearer')
144+
res.end('Unauthorized')
145+
return
146+
}
147+
148+
// Bare GETs without the SSE Accept header are not valid MCP requests (the
149+
// Streamable HTTP spec requires Accept: text/event-stream for GET). Return
150+
// a friendly JSON status so directory health-probes see a 200 instead of 406.
151+
const accept = req.headers.accept ?? ''
152+
if (req.method === 'GET' && !accept.includes('text/event-stream')) {
153+
res.statusCode = 200
154+
res.setHeader('Content-Type', 'application/json')
155+
res.end(
156+
JSON.stringify({
157+
name: 'Transloadit MCP Server',
158+
status: 'ok',
159+
docs: 'https://transloadit.com/docs/sdks/mcp-server/',
160+
}),
161+
)
162+
return
163+
}
164+
165+
// Route request to the correct per-session transport.
166+
const sessionId = req.headers['mcp-session-id'] as string | undefined
167+
let transport: StreamableHTTPServerTransport | undefined
168+
169+
if (sessionId) {
170+
transport = transports.get(sessionId)
171+
if (!transport) {
172+
res.statusCode = 404
173+
res.setHeader('Content-Type', 'application/json')
174+
res.end(
175+
JSON.stringify({
176+
jsonrpc: '2.0',
177+
error: { code: -32000, message: 'Session not found' },
178+
id: null,
179+
}),
180+
)
181+
return
182+
}
183+
}
184+
185+
// For POST requests without a session, read the body to check for initialization.
186+
let parsedBody: unknown
187+
if (req.method === 'POST' && !transport) {
188+
parsedBody = await readJsonBody(req)
189+
if (isInitializeRequest(parsedBody)) {
190+
transport = new StreamableHTTPServerTransport({
191+
sessionIdGenerator,
192+
allowedOrigins: options.allowedOrigins,
193+
allowedHosts: options.allowedHosts,
194+
enableDnsRebindingProtection: options.enableDnsRebindingProtection,
195+
onsessioninitialized: (sid) => {
196+
transports.set(sid, transport!)
197+
},
198+
})
199+
200+
transport.onclose = () => {
201+
const sid = transport!.sessionId
202+
if (sid) {
203+
transports.delete(sid)
204+
}
205+
}
206+
207+
const server = createTransloaditMcpServer(options)
208+
await server.connect(transport)
209+
} else {
210+
res.statusCode = 400
211+
res.setHeader('Content-Type', 'application/json')
212+
res.end(
213+
JSON.stringify({
214+
jsonrpc: '2.0',
215+
error: { code: -32600, message: 'Bad Request: No valid session ID provided' },
216+
id: null,
217+
}),
218+
)
219+
return
220+
}
221+
}
222+
223+
if (!transport) {
224+
res.statusCode = 400
225+
res.setHeader('Content-Type', 'application/json')
226+
res.end(
227+
JSON.stringify({
228+
jsonrpc: '2.0',
229+
error: { code: -32600, message: 'Bad Request: No valid session ID provided' },
230+
id: null,
231+
}),
232+
)
233+
return
234+
}
235+
236+
try {
237+
await transport.handleRequest(req, res, parsedBody)
238+
} catch (error) {
239+
if (!res.headersSent) {
240+
res.statusCode = 500
241+
res.end('Internal Server Error')
242+
}
243+
}
119244
}) as TransloaditMcpHttpHandler
120245

121246
handler.close = async () => {
122-
await transport.close()
247+
const closePromises = [...transports.values()].map((t) => t.close())
248+
await Promise.all(closePromises)
249+
transports.clear()
123250
}
124251

125252
return handler
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import { expect, test } from 'vitest'
2+
import { createHttpClient, startHttpServer } from './http-server.ts'
3+
import { parseToolPayload } from './mcp-client.ts'
4+
5+
test('streamable http: supports concurrent sessions', async () => {
6+
const server = await startHttpServer()
7+
8+
try {
9+
// Create two independent MCP clients (each sends its own initialize request).
10+
const session1 = await createHttpClient(server.url)
11+
const session2 = await createHttpClient(server.url)
12+
13+
try {
14+
// Both sessions should be able to call tools independently.
15+
const [robots1, robots2] = await Promise.all([
16+
session1.client.callTool({
17+
name: 'transloadit_list_robots',
18+
arguments: { limit: 1 },
19+
}),
20+
session2.client.callTool({
21+
name: 'transloadit_list_robots',
22+
arguments: { limit: 1 },
23+
}),
24+
])
25+
26+
expect(parseToolPayload(robots1).status).toBe('ok')
27+
expect(parseToolPayload(robots2).status).toBe('ok')
28+
} finally {
29+
await session1.transport.close()
30+
await session1.client.close()
31+
await session2.transport.close()
32+
await session2.client.close()
33+
}
34+
} finally {
35+
await server.close()
36+
}
37+
})

0 commit comments

Comments
 (0)