Skip to content

Commit 64bcb0d

Browse files
feat(server): add multi-node session hydration (C# SDK parity)
Adds two pieces for reconstructing a session-aware server on a node that did not handle the original initialize handshake: - sessionId option on WebStandardStreamableHTTPServerTransport: when set, the transport validates incoming mcp-session-id headers and rejects re-initialization without requiring a fresh handshake. Mirrors C# SDK's SessionId { get; init; } and python-sdk's mcp_session_id constructor parameter. - Server.restoreInitializeState(params): restores negotiated client capabilities and version from persisted InitializeRequest params, so capability-gated server-initiated features (sampling, elicitation, roots) work on hydrated instances. Mirrors C# SDK's HandleInitializeRequestAsync. Internal refactor: removes the private _initialized flag. Its checks are replaced by equivalent sessionId === undefined checks; observable behavior (error codes/messages) is unchanged. Closes #1658. Supersedes #1668.
1 parent 5516c1b commit 64bcb0d

File tree

4 files changed

+268
-11
lines changed

4 files changed

+268
-11
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
'@modelcontextprotocol/server': minor
3+
---
4+
5+
Add multi-node session hydration support, aligned with the C# SDK pattern:
6+
7+
- `WebStandardStreamableHTTPServerTransport` now accepts a `sessionId` constructor option. When set, the transport validates incoming `mcp-session-id` headers against that value and rejects re-initialization, without requiring a fresh initialize handshake on this node.
8+
- `Server.restoreInitializeState(params)` restores negotiated client capabilities and version from persisted `InitializeRequest` params, so capability-gated server-initiated features (sampling, elicitation, roots) work on hydrated instances.
9+
10+
Internal refactor: the private `_initialized` flag is removed. Its checks are replaced by equivalent `sessionId === undefined` checks, so observable behavior (error codes and messages) is unchanged.

packages/server/src/server/server.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,21 @@ export class Server extends Protocol<ServerContext> {
449449
return this._clientCapabilities;
450450
}
451451

452+
/**
453+
* Restores the initialize-handshake state (client capabilities and version) from
454+
* persisted `InitializeRequest` params. Use this in multi-node deployments when
455+
* reconstructing a server for a session that was initialized on a different node,
456+
* so that capability-gated server-initiated features (sampling, elicitation, roots)
457+
* work correctly on the hydrated instance.
458+
*
459+
* Pair with the `sessionId` option on {@link WebStandardStreamableHTTPServerTransport}
460+
* to restore transport-layer session validation alongside protocol-layer state.
461+
*/
462+
restoreInitializeState(params: InitializeRequest['params']): void {
463+
this._clientCapabilities = params.capabilities;
464+
this._clientVersion = params.clientInfo;
465+
}
466+
452467
/**
453468
* After initialization has completed, this will be populated with information about the client's name and version.
454469
*/

packages/server/src/server/streamableHttp.ts

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,24 @@ export interface WebStandardStreamableHTTPServerTransportOptions {
7575
* Function that generates a session ID for the transport.
7676
* The session ID SHOULD be globally unique and cryptographically secure (e.g., a securely generated UUID, a JWT, or a cryptographic hash)
7777
*
78-
* If not provided, session management is disabled (stateless mode).
78+
* If neither this nor {@link sessionId} is provided, session management is disabled (stateless mode).
7979
*/
8080
sessionIdGenerator?: () => string;
8181

82+
/**
83+
* Pre-sets the session ID at construction time. Use this when reconstructing a
84+
* transport for an existing session, e.g. in multi-node deployments where a request
85+
* arrives at a node that did not handle the original initialize handshake.
86+
*
87+
* The transport will validate incoming `mcp-session-id` headers against this value
88+
* and reject re-initialization attempts.
89+
*
90+
* **Note:** This restores transport-layer session validation only. To restore
91+
* protocol-layer state (negotiated client capabilities), call
92+
* {@link Server.restoreInitializeState} with persisted `InitializeRequest` params.
93+
*/
94+
sessionId?: string;
95+
8296
/**
8397
* A callback for session initialization events
8498
* This is called when the server initializes a new session.
@@ -228,7 +242,6 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
228242
private _streamMapping: Map<string, StreamMapping> = new Map();
229243
private _requestToStreamMapping: Map<RequestId, string> = new Map();
230244
private _requestResponseMap: Map<RequestId, JSONRPCMessage> = new Map();
231-
private _initialized: boolean = false;
232245
private _enableJsonResponse: boolean = false;
233246
private _standaloneSseStreamId: string = '_GET_stream';
234247
private _eventStore?: EventStore;
@@ -247,6 +260,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
247260

248261
constructor(options: WebStandardStreamableHTTPServerTransportOptions = {}) {
249262
this.sessionIdGenerator = options.sessionIdGenerator;
263+
this.sessionId = options.sessionId;
250264
this._enableJsonResponse = options.enableJsonResponse ?? false;
251265
this._eventStore = options.eventStore;
252266
this._onsessioninitialized = options.onsessioninitialized;
@@ -667,9 +681,9 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
667681
// https://spec.modelcontextprotocol.io/specification/2025-03-26/basic/lifecycle/
668682
const isInitializationRequest = messages.some(element => isInitializeRequest(element));
669683
if (isInitializationRequest) {
670-
// If it's a server with session management and the session ID is already set we should reject the request
671-
// to avoid re-initialization.
672-
if (this._initialized && this.sessionId !== undefined) {
684+
// If the session ID is already set (via prior initialize or constructor)
685+
// reject the request to avoid re-initialization.
686+
if (this.sessionId !== undefined) {
673687
this.onerror?.(new Error('Invalid Request: Server already initialized'));
674688
return this.createJsonErrorResponse(400, -32_600, 'Invalid Request: Server already initialized');
675689
}
@@ -678,7 +692,6 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
678692
return this.createJsonErrorResponse(400, -32_600, 'Invalid Request: Only one initialization request is allowed');
679693
}
680694
this.sessionId = this.sessionIdGenerator?.();
681-
this._initialized = true;
682695

683696
// If we have a session ID and an onsessioninitialized handler, call it immediately
684697
// This is needed in cases where the server needs to keep track of multiple sessions
@@ -847,13 +860,14 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
847860
* Returns `Response` error if invalid, `undefined` otherwise
848861
*/
849862
private validateSession(req: Request): Response | undefined {
850-
if (this.sessionIdGenerator === undefined) {
851-
// If the sessionIdGenerator ID is not set, the session management is disabled
852-
// and we don't need to validate the session ID
863+
if (this.sessionIdGenerator === undefined && this.sessionId === undefined) {
864+
// No generator and no pre-set session ID means session management is
865+
// disabled (stateless mode); skip validation.
853866
return undefined;
854867
}
855-
if (!this._initialized) {
856-
// If the server has not been initialized yet, reject all requests
868+
if (this.sessionId === undefined) {
869+
// Stateful mode but no session ID yet: initialize has not run on this
870+
// transport instance.
857871
this.onerror?.(new Error('Bad Request: Server not initialized'));
858872
return this.createJsonErrorResponse(400, -32_000, 'Bad Request: Server not initialized');
859873
}
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
import type { CallToolResult, JSONRPCErrorResponse, JSONRPCMessage } from '@modelcontextprotocol/core';
2+
import { McpServer, Server, WebStandardStreamableHTTPServerTransport } from '@modelcontextprotocol/server';
3+
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
4+
import * as z from 'zod/v4';
5+
6+
const PROTOCOL_VERSION = '2025-11-25';
7+
8+
const TEST_MESSAGES = {
9+
initialize: {
10+
jsonrpc: '2.0',
11+
method: 'initialize',
12+
params: {
13+
clientInfo: { name: 'test-client', version: '1.0' },
14+
protocolVersion: PROTOCOL_VERSION,
15+
capabilities: {}
16+
},
17+
id: 'init-1'
18+
} as JSONRPCMessage,
19+
toolsList: {
20+
jsonrpc: '2.0',
21+
method: 'tools/list',
22+
params: {},
23+
id: 'tools-1'
24+
} as JSONRPCMessage
25+
};
26+
27+
function createRequest(
28+
method: string,
29+
body?: JSONRPCMessage | JSONRPCMessage[],
30+
options?: { sessionId?: string; extraHeaders?: Record<string, string> }
31+
): Request {
32+
const headers: Record<string, string> = {};
33+
34+
if (method === 'POST') {
35+
headers.Accept = 'application/json, text/event-stream';
36+
} else if (method === 'GET') {
37+
headers.Accept = 'text/event-stream';
38+
}
39+
40+
if (body) {
41+
headers['Content-Type'] = 'application/json';
42+
}
43+
44+
if (options?.sessionId) {
45+
headers['mcp-session-id'] = options.sessionId;
46+
headers['mcp-protocol-version'] = PROTOCOL_VERSION;
47+
}
48+
49+
if (options?.extraHeaders) {
50+
Object.assign(headers, options.extraHeaders);
51+
}
52+
53+
return new Request('http://localhost/mcp', {
54+
method,
55+
headers,
56+
body: body ? JSON.stringify(body) : undefined
57+
});
58+
}
59+
60+
async function readSSEEvent(response: Response): Promise<string> {
61+
const reader = response.body?.getReader();
62+
const { value } = await reader!.read();
63+
return new TextDecoder().decode(value);
64+
}
65+
66+
function parseSSEData(text: string): unknown {
67+
const dataLine = text.split('\n').find(line => line.startsWith('data:'));
68+
if (!dataLine) throw new Error('No data line found in SSE event');
69+
return JSON.parse(dataLine.slice(5).trim());
70+
}
71+
72+
function expectErrorResponse(data: unknown, expectedCode: number, expectedMessagePattern: RegExp): void {
73+
expect(data).toMatchObject({
74+
jsonrpc: '2.0',
75+
error: expect.objectContaining({
76+
code: expectedCode,
77+
message: expect.stringMatching(expectedMessagePattern)
78+
})
79+
});
80+
}
81+
82+
describe('WebStandardStreamableHTTPServerTransport session hydration', () => {
83+
let transport: WebStandardStreamableHTTPServerTransport;
84+
let mcpServer: McpServer;
85+
86+
beforeEach(() => {
87+
mcpServer = new McpServer({ name: 'test-server', version: '1.0.0' }, { capabilities: { logging: {} } });
88+
89+
mcpServer.registerTool(
90+
'greet',
91+
{
92+
description: 'A simple greeting tool',
93+
inputSchema: z.object({ name: z.string().describe('Name to greet') })
94+
},
95+
async ({ name }): Promise<CallToolResult> => ({
96+
content: [{ type: 'text', text: `Hello, ${name}!` }]
97+
})
98+
);
99+
});
100+
101+
afterEach(async () => {
102+
await transport?.close();
103+
});
104+
105+
async function connectTransport(options?: ConstructorParameters<typeof WebStandardStreamableHTTPServerTransport>[0]) {
106+
transport = new WebStandardStreamableHTTPServerTransport(options);
107+
await mcpServer.connect(transport);
108+
}
109+
110+
describe('transport-layer hydration (sessionId option)', () => {
111+
it('processes requests without initialize when constructed with sessionId', async () => {
112+
const sessionId = 'persisted-session-id';
113+
await connectTransport({ sessionId });
114+
115+
const response = await transport.handleRequest(createRequest('POST', TEST_MESSAGES.toolsList, { sessionId }));
116+
117+
expect(response.status).toBe(200);
118+
expect(response.headers.get('mcp-session-id')).toBe(sessionId);
119+
120+
const eventData = parseSSEData(await readSSEEvent(response));
121+
expect(eventData).toMatchObject({
122+
jsonrpc: '2.0',
123+
result: expect.objectContaining({
124+
tools: expect.arrayContaining([expect.objectContaining({ name: 'greet' })])
125+
}),
126+
id: 'tools-1'
127+
});
128+
});
129+
130+
it('rejects requests with a mismatched session ID', async () => {
131+
await connectTransport({ sessionId: 'persisted-session-id' });
132+
133+
const response = await transport.handleRequest(
134+
createRequest('POST', TEST_MESSAGES.toolsList, { sessionId: 'wrong-session-id' })
135+
);
136+
137+
expect(response.status).toBe(404);
138+
expectErrorResponse(await response.json(), -32_001, /Session not found/);
139+
});
140+
141+
it('rejects requests without a session ID header', async () => {
142+
await connectTransport({ sessionId: 'persisted-session-id' });
143+
144+
const response = await transport.handleRequest(createRequest('POST', TEST_MESSAGES.toolsList));
145+
146+
expect(response.status).toBe(400);
147+
const errorData = (await response.json()) as JSONRPCErrorResponse;
148+
expectErrorResponse(errorData, -32_000, /Mcp-Session-Id header is required/);
149+
expect(errorData.id).toBeNull();
150+
});
151+
152+
it('rejects re-initialize on a hydrated transport', async () => {
153+
await connectTransport({ sessionId: 'persisted-session-id' });
154+
155+
const response = await transport.handleRequest(createRequest('POST', TEST_MESSAGES.initialize));
156+
157+
expect(response.status).toBe(400);
158+
expectErrorResponse(await response.json(), -32_600, /Server already initialized/);
159+
});
160+
161+
it('leaves default initialize flow unchanged when sessionId is not provided', async () => {
162+
await connectTransport({ sessionIdGenerator: () => 'generated-session-id' });
163+
164+
const initResponse = await transport.handleRequest(createRequest('POST', TEST_MESSAGES.initialize));
165+
166+
expect(initResponse.status).toBe(200);
167+
expect(initResponse.headers.get('mcp-session-id')).toBe('generated-session-id');
168+
169+
const toolsResponse = await transport.handleRequest(
170+
createRequest('POST', TEST_MESSAGES.toolsList, { sessionId: 'generated-session-id' })
171+
);
172+
173+
expect(toolsResponse.status).toBe(200);
174+
const eventData = parseSSEData(await readSSEEvent(toolsResponse));
175+
expect(eventData).toMatchObject({
176+
jsonrpc: '2.0',
177+
result: expect.objectContaining({
178+
tools: expect.arrayContaining([expect.objectContaining({ name: 'greet' })])
179+
}),
180+
id: 'tools-1'
181+
});
182+
});
183+
});
184+
185+
describe('Server.restoreInitializeState', () => {
186+
it('restores client capabilities without an initialize round-trip', () => {
187+
const server = new Server({ name: 'test', version: '1.0.0' }, { capabilities: {} });
188+
189+
expect(server.getClientCapabilities()).toBeUndefined();
190+
expect(server.getClientVersion()).toBeUndefined();
191+
192+
server.restoreInitializeState({
193+
protocolVersion: PROTOCOL_VERSION,
194+
capabilities: { sampling: {}, elicitation: { form: {} } },
195+
clientInfo: { name: 'persisted-client', version: '2.0.0' }
196+
});
197+
198+
expect(server.getClientCapabilities()).toEqual({ sampling: {}, elicitation: { form: {} } });
199+
expect(server.getClientVersion()).toEqual({ name: 'persisted-client', version: '2.0.0' });
200+
});
201+
202+
it('enables capability-gated methods after restoration', async () => {
203+
const server = new Server({ name: 'test', version: '1.0.0' }, { capabilities: {} });
204+
205+
// Before restoration, server thinks client has no sampling capability
206+
expect(server.getClientCapabilities()?.sampling).toBeUndefined();
207+
208+
server.restoreInitializeState({
209+
protocolVersion: PROTOCOL_VERSION,
210+
capabilities: { sampling: {} },
211+
clientInfo: { name: 'c', version: '1' }
212+
});
213+
214+
// After restoration, sampling capability is visible
215+
expect(server.getClientCapabilities()?.sampling).toEqual({});
216+
});
217+
});
218+
});

0 commit comments

Comments
 (0)