Skip to content

Commit 9db218d

Browse files
committed
feat(server): add replayInitialization for stateless serverless session adoption
In stateless serverless deployments (Vercel, Lambda, Workers), each HTTP request creates a fresh Server + Transport. The initialize handshake sets state (_clientCapabilities, _clientVersion, sessionId, _initialized) that is lost between requests, forcing developers to use Reflect.set hacks. Add a `replayInitialization` callback to transport options that restores session state on non-initialize requests. The transport reads the session ID from the request header, calls the callback to fetch cached state, and flows it to the server via a new `oninitialized` callback on the Transport interface. - Transport: `replayInitialization` option, `_tryReplayInitialization` helper called once in `handleRequest` before method dispatch - Transport: fix `validateSession` to recognize replayed sessions - Core: `oninitialized` callback on Transport interface (same pattern as onmessage/onclose/onerror) - Server: override `connect()` to hook oninitialized, seeding _clientCapabilities and _clientVersion via ??= - Node middleware: delegate oninitialized getter/setter to inner transport - Callback returns undefined → 404 per spec (client re-initializes) - Callback throws → 500 internal error - Race condition guard via _replayInProgress flag Closes #1658, #1882
1 parent 9ed62fe commit 9db218d

7 files changed

Lines changed: 535 additions & 9 deletions

File tree

packages/core/src/shared/transport.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { JSONRPCMessage, MessageExtraInfo, RequestId } from '../types/index.js';
1+
import type { ClientCapabilities, Implementation, JSONRPCMessage, MessageExtraInfo, RequestId } from '../types/index.js';
22

33
export type FetchLike = (url: string | URL, init?: RequestInit) => Promise<Response>;
44

@@ -116,6 +116,20 @@ export interface Transport {
116116
*/
117117
onmessage?: (<T extends JSONRPCMessage>(message: T, extra?: MessageExtraInfo) => void) | undefined;
118118

119+
/**
120+
* Callback invoked when session initialization state is restored by the transport.
121+
*
122+
* For transports that support stateless session replay (e.g.,
123+
* {@linkcode @modelcontextprotocol/server!server/streamableHttp.WebStandardStreamableHTTPServerTransport | WebStandardStreamableHTTPServerTransport}),
124+
* this is called when a non-initialize request triggers session restoration
125+
* via the transport's `replayInitialization` callback.
126+
*
127+
* The {@linkcode @modelcontextprotocol/server!server/server.Server | Server} hooks this during
128+
* {@linkcode @modelcontextprotocol/server!server/server.Server.connect | connect()} to
129+
* seed client capabilities and version info.
130+
*/
131+
oninitialized?: ((data: { clientCapabilities: ClientCapabilities; clientVersion: Implementation }) => void) | undefined;
132+
119133
/**
120134
* The session ID generated for this connection.
121135
*/

packages/core/src/util/inMemory.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { SdkError, SdkErrorCode } from '../errors/sdkErrors.js';
22
import type { Transport } from '../shared/transport.js';
3-
import type { AuthInfo, JSONRPCMessage, RequestId } from '../types/index.js';
3+
import type { AuthInfo, ClientCapabilities, Implementation, JSONRPCMessage, RequestId } from '../types/index.js';
44

55
interface QueuedMessage {
66
message: JSONRPCMessage;
@@ -18,6 +18,7 @@ export class InMemoryTransport implements Transport {
1818
onclose?: () => void;
1919
onerror?: (error: Error) => void;
2020
onmessage?: (message: JSONRPCMessage, extra?: { authInfo?: AuthInfo }) => void;
21+
oninitialized?: (data: { clientCapabilities: ClientCapabilities; clientVersion: Implementation }) => void;
2122
sessionId?: string;
2223

2324
/**

packages/middleware/node/src/streamableHttp.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,15 @@
1010
import type { IncomingMessage, ServerResponse } from 'node:http';
1111

1212
import { getRequestListener } from '@hono/node-server';
13-
import type { AuthInfo, JSONRPCMessage, MessageExtraInfo, RequestId, Transport } from '@modelcontextprotocol/core';
13+
import type {
14+
AuthInfo,
15+
ClientCapabilities,
16+
Implementation,
17+
JSONRPCMessage,
18+
MessageExtraInfo,
19+
RequestId,
20+
Transport
21+
} from '@modelcontextprotocol/core';
1422
import type { WebStandardStreamableHTTPServerTransportOptions } from '@modelcontextprotocol/server';
1523
import { WebStandardStreamableHTTPServerTransport } from '@modelcontextprotocol/server';
1624

@@ -130,6 +138,17 @@ export class NodeStreamableHTTPServerTransport implements Transport {
130138
return this._webStandardTransport.onmessage;
131139
}
132140

141+
/**
142+
* Sets callback for session initialization replay.
143+
*/
144+
set oninitialized(handler: ((data: { clientCapabilities: ClientCapabilities; clientVersion: Implementation }) => void) | undefined) {
145+
this._webStandardTransport.oninitialized = handler;
146+
}
147+
148+
get oninitialized(): ((data: { clientCapabilities: ClientCapabilities; clientVersion: Implementation }) => void) | undefined {
149+
return this._webStandardTransport.oninitialized;
150+
}
151+
133152
/**
134153
* Starts the transport. This is required by the {@linkcode Transport} interface but is a no-op
135154
* for the Streamable HTTP transport as connections are managed per-request.

packages/server/src/server/server.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ import type {
3131
ServerResult,
3232
TaskManagerOptions,
3333
ToolResultContent,
34-
ToolUseContent
34+
ToolUseContent,
35+
Transport
3536
} from '@modelcontextprotocol/core';
3637
import {
3738
assertClientRequestTaskCapability,
@@ -140,6 +141,21 @@ export class Server extends Protocol<ServerContext> {
140141
}
141142
}
142143

144+
/**
145+
* Attaches to the given transport, hooking the `oninitialized` callback
146+
* to seed client capabilities and version info from replayed sessions.
147+
*/
148+
override async connect(transport: Transport): Promise<void> {
149+
const _oninitialized = transport.oninitialized;
150+
transport.oninitialized = data => {
151+
_oninitialized?.(data);
152+
this._clientCapabilities ??= data.clientCapabilities;
153+
this._clientVersion ??= data.clientVersion;
154+
};
155+
156+
await super.connect(transport);
157+
}
158+
143159
private _registerLoggingHandler(): void {
144160
this.setRequestHandler('logging/setLevel', async (request, ctx) => {
145161
const transportSessionId: string | undefined =

packages/server/src/server/streamableHttp.ts

Lines changed: 82 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,15 @@
77
* For Node.js Express/HTTP compatibility, use {@linkcode @modelcontextprotocol/node!NodeStreamableHTTPServerTransport | NodeStreamableHTTPServerTransport} which wraps this transport.
88
*/
99

10-
import type { AuthInfo, JSONRPCMessage, MessageExtraInfo, RequestId, Transport } from '@modelcontextprotocol/core';
10+
import type {
11+
AuthInfo,
12+
ClientCapabilities,
13+
Implementation,
14+
JSONRPCMessage,
15+
MessageExtraInfo,
16+
RequestId,
17+
Transport
18+
} from '@modelcontextprotocol/core';
1119
import {
1220
DEFAULT_NEGOTIATED_PROTOCOL_VERSION,
1321
isInitializeRequest,
@@ -152,6 +160,30 @@ export interface WebStandardStreamableHTTPServerTransportOptions {
152160
* @default {@linkcode SUPPORTED_PROTOCOL_VERSIONS}
153161
*/
154162
supportedProtocolVersions?: string[];
163+
164+
/**
165+
* Callback to restore session state for stateless deployments.
166+
*
167+
* Called when a non-initialize request arrives and the transport is not yet
168+
* initialized. The transport reads the session ID from the `mcp-session-id`
169+
* request header and passes it to this callback.
170+
*
171+
* Return cached client capabilities and version info to adopt the session,
172+
* or `undefined` to reject the request.
173+
*
174+
* This callback is never called for `initialize` requests — those follow
175+
* the normal handshake path. Note that {@linkcode WebStandardStreamableHTTPServerTransportOptions.onsessioninitialized | onsessioninitialized}
176+
* is NOT called during replay — it only fires for new session creation.
177+
*
178+
* @param sessionId - The session ID from the `mcp-session-id` request header.
179+
* @returns Cached client state to restore, or `undefined` if the session is unknown.
180+
*/
181+
replayInitialization?: (
182+
sessionId: string
183+
) =>
184+
| { clientCapabilities: ClientCapabilities; clientVersion: Implementation }
185+
| undefined
186+
| Promise<{ clientCapabilities: ClientCapabilities; clientVersion: Implementation } | undefined>;
155187
}
156188

157189
/**
@@ -240,11 +272,14 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
240272
private _enableDnsRebindingProtection: boolean;
241273
private _retryInterval?: number;
242274
private _supportedProtocolVersions: string[];
275+
private _replayInitialization?: WebStandardStreamableHTTPServerTransportOptions['replayInitialization'];
276+
private _replayInProgress = false;
243277

244278
sessionId?: string;
245279
onclose?: () => void;
246280
onerror?: (error: Error) => void;
247281
onmessage?: (message: JSONRPCMessage, extra?: MessageExtraInfo) => void;
282+
oninitialized?: (data: { clientCapabilities: ClientCapabilities; clientVersion: Implementation }) => void;
248283

249284
constructor(options: WebStandardStreamableHTTPServerTransportOptions = {}) {
250285
this.sessionIdGenerator = options.sessionIdGenerator;
@@ -257,6 +292,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
257292
this._enableDnsRebindingProtection = options.enableDnsRebindingProtection ?? false;
258293
this._retryInterval = options.retryInterval;
259294
this._supportedProtocolVersions = options.supportedProtocolVersions ?? SUPPORTED_PROTOCOL_VERSIONS;
295+
this._replayInitialization = options.replayInitialization;
260296
}
261297

262298
/**
@@ -351,6 +387,17 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
351387
return validationError;
352388
}
353389

390+
// Attempt stateless session replay before dispatching to method handlers.
391+
let replayError: Response | undefined;
392+
try {
393+
replayError = await this._tryReplayInitialization(req);
394+
} catch {
395+
return this.createJsonErrorResponse(500, -32_603, 'Internal error: session replay failed');
396+
}
397+
if (replayError) {
398+
return replayError;
399+
}
400+
354401
switch (req.method) {
355402
case 'POST': {
356403
return this.handlePostRequest(req, options);
@@ -840,14 +887,45 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
840887
return new Response(null, { status: 200 });
841888
}
842889

890+
/**
891+
* Attempts to restore session state via the `replayInitialization` callback.
892+
* Called once in `handleRequest()` before method dispatch.
893+
*
894+
* No-op when already initialized, no callback provided, or no session ID header.
895+
* On success, sets `sessionId` and `_initialized`, then invokes `oninitialized`
896+
* so the server can seed client capabilities and version info.
897+
*/
898+
private async _tryReplayInitialization(req: Request): Promise<Response | undefined> {
899+
if (this._initialized || this._replayInProgress || !this._replayInitialization) return undefined;
900+
901+
const sessionId = req.headers.get('mcp-session-id');
902+
if (!sessionId) return undefined;
903+
904+
this._replayInProgress = true;
905+
try {
906+
const result = await this._replayInitialization(sessionId);
907+
if (!result) {
908+
// Session unknown/expired — 404 tells the client to re-initialize per spec
909+
this.onerror?.(new Error('Session not found'));
910+
return this.createJsonErrorResponse(404, -32_001, 'Session not found');
911+
}
912+
913+
this.sessionId = sessionId;
914+
this._initialized = true;
915+
this.oninitialized?.(result);
916+
return undefined;
917+
} finally {
918+
this._replayInProgress = false;
919+
}
920+
}
921+
843922
/**
844923
* Validates session ID for non-initialization requests.
845924
* Returns `Response` error if invalid, `undefined` otherwise
846925
*/
847926
private validateSession(req: Request): Response | undefined {
848-
if (this.sessionIdGenerator === undefined) {
849-
// If the sessionIdGenerator ID is not set, the session management is disabled
850-
// and we don't need to validate the session ID
927+
if (this.sessionIdGenerator === undefined && this.sessionId === undefined && !this._replayInitialization) {
928+
// Session management is fully disabled (no generator, no adopted/replayed session, no replay callback)
851929
return undefined;
852930
}
853931
if (!this._initialized) {

packages/server/test/server/server.test.ts

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { JSONRPCMessage } from '@modelcontextprotocol/core';
1+
import type { ClientCapabilities, Implementation, JSONRPCMessage } from '@modelcontextprotocol/core';
22
import { InMemoryTransport, LATEST_PROTOCOL_VERSION } from '@modelcontextprotocol/core';
33
import { Server } from '../../src/server/server.js';
44

@@ -39,4 +39,101 @@ describe('Server', () => {
3939
await server.close();
4040
});
4141
});
42+
43+
describe('connect — oninitialized hook', () => {
44+
const testCapabilities: ClientCapabilities = { sampling: {} };
45+
const testVersion: Implementation = { name: 'test-client', version: '2.0.0' };
46+
47+
it('should seed getClientCapabilities() when transport.oninitialized is called', async () => {
48+
const server = new Server({ name: 'test', version: '1.0.0' }, { capabilities: {} });
49+
50+
const [, serverTransport] = InMemoryTransport.createLinkedPair();
51+
await server.connect(serverTransport);
52+
53+
// Simulate transport calling oninitialized (as _tryReplayInitialization would)
54+
serverTransport.oninitialized?.({
55+
clientCapabilities: testCapabilities,
56+
clientVersion: testVersion
57+
});
58+
59+
expect(server.getClientCapabilities()).toEqual(testCapabilities);
60+
expect(server.getClientVersion()).toEqual(testVersion);
61+
62+
await server.close();
63+
});
64+
65+
it('should return undefined for getClientCapabilities() when oninitialized is not called', async () => {
66+
const server = new Server({ name: 'test', version: '1.0.0' }, { capabilities: {} });
67+
68+
const [, serverTransport] = InMemoryTransport.createLinkedPair();
69+
await server.connect(serverTransport);
70+
71+
expect(server.getClientCapabilities()).toBeUndefined();
72+
expect(server.getClientVersion()).toBeUndefined();
73+
74+
await server.close();
75+
});
76+
77+
it('should be overwritten by a real initialize handshake', async () => {
78+
const server = new Server({ name: 'test', version: '1.0.0' }, { capabilities: {} });
79+
80+
const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair();
81+
await server.connect(serverTransport);
82+
83+
// First: seed via oninitialized
84+
serverTransport.oninitialized?.({
85+
clientCapabilities: testCapabilities,
86+
clientVersion: testVersion
87+
});
88+
89+
expect(server.getClientCapabilities()).toEqual(testCapabilities);
90+
91+
// Then: real initialize overwrites
92+
const responsePromise = new Promise<JSONRPCMessage>(resolve => {
93+
clientTransport.onmessage = msg => resolve(msg);
94+
});
95+
await clientTransport.start();
96+
97+
const realCapabilities: ClientCapabilities = { elicitation: { form: {} } };
98+
const realVersion: Implementation = { name: 'real-client', version: '3.0.0' };
99+
100+
await clientTransport.send({
101+
jsonrpc: '2.0',
102+
id: 1,
103+
method: 'initialize',
104+
params: {
105+
protocolVersion: LATEST_PROTOCOL_VERSION,
106+
capabilities: realCapabilities,
107+
clientInfo: realVersion
108+
}
109+
} as JSONRPCMessage);
110+
111+
await responsePromise;
112+
113+
expect(server.getClientCapabilities()).toEqual(realCapabilities);
114+
expect(server.getClientVersion()).toEqual(realVersion);
115+
116+
await server.close();
117+
});
118+
119+
it('should chain with an existing transport.oninitialized callback', async () => {
120+
const server = new Server({ name: 'test', version: '1.0.0' }, { capabilities: {} });
121+
122+
const [, serverTransport] = InMemoryTransport.createLinkedPair();
123+
124+
const existingCallback = vi.fn();
125+
serverTransport.oninitialized = existingCallback;
126+
127+
await server.connect(serverTransport);
128+
129+
const data = { clientCapabilities: testCapabilities, clientVersion: testVersion };
130+
serverTransport.oninitialized?.(data);
131+
132+
// Both the existing callback and the server's hook should have fired
133+
expect(existingCallback).toHaveBeenCalledWith(data);
134+
expect(server.getClientCapabilities()).toEqual(testCapabilities);
135+
136+
await server.close();
137+
});
138+
});
42139
});

0 commit comments

Comments
 (0)