Skip to content

Commit 7f1b460

Browse files
committed
fix(http-stream): prevent session destruction on transient errors
Two bugs in HttpStreamTransport caused 'Session not found' (-32001) errors for clients like Cline after a session was established and working: 1. onerror callback destroyed sessions on transient SDK errors The SDK fires onerror for non-fatal issues (parse errors on malformed requests, failed SSE writes during event replay). The framework's handler unconditionally deleted the session from the transport map, making all subsequent requests from that client fail with -32001. Fix: onerror now logs but preserves the session. Only onclose (explicit DELETE or server shutdown) removes sessions. 2. Re-initialization with a stale session ID was rejected After a session was lost (server restart, error, etc.), if the client sent a new initialize request while still including the old session ID header, the framework rejected it with 'Session not found' instead of creating a new session. Fix: detect initialize requests with stale session IDs and create a new session instead of rejecting. Also fixed: broadcast send() failures no longer remove sessions from the transport map. Includes 6 regression tests covering all three scenarios.
1 parent edcbfe3 commit 7f1b460

2 files changed

Lines changed: 283 additions & 10 deletions

File tree

src/transports/http/server.ts

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,22 @@ export class HttpStreamTransport extends AbstractTransport {
135135
authData = (authResult as AuthResult).data as RequestContextData || {};
136136
}
137137

138+
// Allow re-initialization even when a stale session ID is provided.
139+
// Clients like Cline may keep sending the old session ID header after
140+
// a session is lost (server restart, transport error, etc.).
141+
const isReInitialize = sessionId && !this._transports[sessionId] && body && isInitializeRequest(body);
142+
138143
// Handle different request scenarios
139144
if (sessionId && this._transports[sessionId]) {
140145
// Existing session
141146
transport = this._transports[sessionId];
142147
logger.debug(`Reusing existing session: ${sessionId}`);
143-
} else if (isInitialize) {
144-
// New session initialization
145-
logger.info('Creating new session for initialization request');
148+
} else if (isInitialize || isReInitialize) {
149+
if (isReInitialize) {
150+
logger.info(`Stale session ID ${sessionId} — creating new session for re-initialization`);
151+
} else {
152+
logger.info('Creating new session for initialization request');
153+
}
146154

147155
transport = new StreamableHTTPServerTransport({
148156
sessionIdGenerator: () => randomUUID(),
@@ -161,10 +169,11 @@ export class HttpStreamTransport extends AbstractTransport {
161169
};
162170

163171
transport.onerror = (error) => {
164-
logger.error(`Transport error for session: ${error}`);
165-
if (transport.sessionId) {
166-
delete this._transports[transport.sessionId];
167-
}
172+
// Log the error but do NOT remove the session. The SDK fires onerror
173+
// for transient issues (parse errors, failed SSE writes) that don't
174+
// invalidate the session. Removing the session here causes "Session
175+
// not found" errors on subsequent requests from the same client.
176+
logger.error(`Transport error for session ${transport.sessionId}: ${error}`);
168177
};
169178

170179
transport.onmessage = async (message: JSONRPCMessage) => {
@@ -182,7 +191,7 @@ export class HttpStreamTransport extends AbstractTransport {
182191
this.sendError(res, 400, -32000, 'Bad Request: No valid session ID provided');
183192
return;
184193
} else {
185-
// Session ID provided but not found
194+
// Session ID provided but not found (and not an initialize request)
186195
this.sendError(res, 404, -32001, 'Session not found');
187196
return;
188197
}
@@ -268,8 +277,11 @@ export class HttpStreamTransport extends AbstractTransport {
268277
}
269278

270279
if (failedSessions.length > 0) {
271-
failedSessions.forEach((sessionId) => delete this._transports[sessionId]);
272-
logger.warn(`Failed to send message to ${failedSessions.length} sessions.`);
280+
// Log but don't remove sessions on transient send failures.
281+
// The SDK throws when no SSE stream is currently open for a request ID,
282+
// which is a normal condition (e.g. client momentarily between requests).
283+
// The session itself remains valid for future requests.
284+
logger.warn(`Failed to broadcast to ${failedSessions.length} session(s) — sessions preserved for future requests.`);
273285
}
274286
}
275287

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
import { describe, it, expect, beforeEach, afterEach } from '@jest/globals';
2+
import { HttpStreamTransport } from '../../../src/transports/http/server.js';
3+
import http from 'node:http';
4+
5+
/**
6+
* Regression tests for session resilience in HttpStreamTransport.
7+
*
8+
* These cover two bugs that caused "Session not found" (-32001) errors
9+
* for clients like Cline after a session was established:
10+
*
11+
* 1. onerror callback destroyed sessions on transient SDK errors (parse errors,
12+
* failed SSE writes) — the session should survive these.
13+
* 2. Re-initialization with a stale session ID was rejected with 404 instead
14+
* of creating a new session.
15+
*/
16+
describe('HttpStreamTransport — Session Resilience', () => {
17+
let transport: HttpStreamTransport;
18+
let testPort: number;
19+
// Track open requests so we can clean them up before closing the transport
20+
let openRequests: http.ClientRequest[];
21+
22+
const initializeBody = {
23+
jsonrpc: '2.0',
24+
method: 'initialize',
25+
params: {
26+
protocolVersion: '2024-11-05',
27+
capabilities: {},
28+
clientInfo: { name: 'test-client', version: '1.0.0' },
29+
},
30+
id: 1,
31+
};
32+
33+
beforeEach(() => {
34+
testPort = 4000 + Math.floor(Math.random() * 1000);
35+
openRequests = [];
36+
transport = new HttpStreamTransport({
37+
port: testPort,
38+
endpoint: '/mcp',
39+
responseMode: 'stream',
40+
});
41+
});
42+
43+
afterEach(async () => {
44+
// Destroy any open HTTP connections so the server can shut down cleanly
45+
for (const req of openRequests) {
46+
req.destroy();
47+
}
48+
openRequests = [];
49+
50+
if (transport.isRunning()) {
51+
await transport.close();
52+
}
53+
});
54+
55+
function getTransports(): Record<string, any> {
56+
return (transport as any)._transports;
57+
}
58+
59+
function wait(ms: number): Promise<void> {
60+
return new Promise((resolve) => setTimeout(resolve, ms));
61+
}
62+
63+
/**
64+
* Polls until at least one session exists in the transport map.
65+
* Throws after the timeout if no session appears.
66+
*/
67+
async function waitForSession(timeoutMs = 2000): Promise<string> {
68+
const start = Date.now();
69+
while (Date.now() - start < timeoutMs) {
70+
const ids = Object.keys(getTransports());
71+
if (ids.length > 0) return ids[0];
72+
await wait(20);
73+
}
74+
throw new Error('Timed out waiting for session to be created');
75+
}
76+
77+
/**
78+
* Fire-and-forget POST. The SSE response may never complete (no MCP server),
79+
* so we don't await the response — we track the request for cleanup.
80+
*/
81+
function firePost(body: any, sessionId?: string): void {
82+
const headers: http.OutgoingHttpHeaders = {
83+
'Content-Type': 'application/json',
84+
'Accept': 'application/json, text/event-stream',
85+
};
86+
if (sessionId) {
87+
headers['Mcp-Session-Id'] = sessionId;
88+
}
89+
const bodyStr = JSON.stringify(body);
90+
91+
const req = http.request({
92+
hostname: 'localhost',
93+
port: testPort,
94+
path: '/mcp',
95+
method: 'POST',
96+
headers: { ...headers, 'Content-Length': Buffer.byteLength(bodyStr) },
97+
});
98+
req.on('error', () => {});
99+
req.write(bodyStr);
100+
req.end();
101+
102+
openRequests.push(req);
103+
}
104+
105+
/**
106+
* Full request/response for non-streaming responses (errors, 404s, etc.)
107+
*/
108+
function makeRequest(
109+
body: any,
110+
sessionId?: string,
111+
): Promise<{ statusCode: number; headers: http.IncomingHttpHeaders; body: string }> {
112+
return new Promise((resolve, reject) => {
113+
const headers: http.OutgoingHttpHeaders = {
114+
'Content-Type': 'application/json',
115+
'Accept': 'application/json, text/event-stream',
116+
};
117+
if (sessionId) {
118+
headers['Mcp-Session-Id'] = sessionId;
119+
}
120+
const bodyStr = JSON.stringify(body);
121+
122+
const req = http.request(
123+
{
124+
hostname: 'localhost',
125+
port: testPort,
126+
path: '/mcp',
127+
method: 'POST',
128+
headers: { ...headers, 'Content-Length': Buffer.byteLength(bodyStr) },
129+
},
130+
(res) => {
131+
let responseBody = '';
132+
res.on('data', (chunk: Buffer) => {
133+
responseBody += chunk.toString();
134+
});
135+
res.on('end', () => {
136+
resolve({
137+
statusCode: res.statusCode!,
138+
headers: res.headers,
139+
body: responseBody,
140+
});
141+
});
142+
},
143+
);
144+
req.on('error', reject);
145+
req.write(bodyStr);
146+
req.end();
147+
148+
openRequests.push(req);
149+
});
150+
}
151+
152+
// ---------------------------------------------------------------------------
153+
// Bug 1: onerror must NOT destroy sessions
154+
// ---------------------------------------------------------------------------
155+
describe('onerror should not destroy sessions', () => {
156+
it('should keep session alive after onerror fires on the internal transport', async () => {
157+
await transport.start();
158+
transport.onmessage = async () => {};
159+
160+
firePost(initializeBody);
161+
const sessionId = await waitForSession();
162+
163+
// Simulate the SDK firing onerror (e.g. parse error on a bad request)
164+
const internalTransport = getTransports()[sessionId];
165+
internalTransport.onerror?.(new Error('Simulated transient error'));
166+
167+
// Session must still be in the map
168+
expect(getTransports()[sessionId]).toBeDefined();
169+
});
170+
171+
it('should keep session alive after multiple onerror events', async () => {
172+
await transport.start();
173+
transport.onmessage = async () => {};
174+
175+
firePost(initializeBody);
176+
const sessionId = await waitForSession();
177+
const internalTransport = getTransports()[sessionId];
178+
179+
internalTransport.onerror?.(new Error('Error 1'));
180+
internalTransport.onerror?.(new Error('Error 2'));
181+
internalTransport.onerror?.(new Error('Error 3'));
182+
183+
expect(getTransports()[sessionId]).toBeDefined();
184+
});
185+
});
186+
187+
// ---------------------------------------------------------------------------
188+
// Bug 2: Re-initialization with stale session ID
189+
// ---------------------------------------------------------------------------
190+
describe('re-initialization with stale session ID', () => {
191+
it('should create a new session instead of returning 404', async () => {
192+
await transport.start();
193+
transport.onmessage = async () => {};
194+
195+
// Send an initialize request with a session ID that doesn't exist
196+
firePost(initializeBody, 'stale-session-id-that-does-not-exist');
197+
const sessionId = await waitForSession();
198+
199+
// A new session was created (not rejected with 404)
200+
expect(sessionId).not.toBe('stale-session-id-that-does-not-exist');
201+
expect(getTransports()[sessionId]).toBeDefined();
202+
});
203+
204+
it('should still reject non-initialize requests with unknown session IDs', async () => {
205+
await transport.start();
206+
transport.onmessage = async () => {};
207+
208+
const response = await makeRequest(
209+
{ jsonrpc: '2.0', method: 'tools/list', id: 1 },
210+
'nonexistent-session-id',
211+
);
212+
213+
expect(response.statusCode).toBe(404);
214+
expect(response.body).toContain('Session not found');
215+
});
216+
});
217+
218+
// ---------------------------------------------------------------------------
219+
// onclose SHOULD still clean up sessions (correct behavior preserved)
220+
// ---------------------------------------------------------------------------
221+
describe('onclose should still remove sessions', () => {
222+
it('should remove session when transport is closed', async () => {
223+
await transport.start();
224+
transport.onmessage = async () => {};
225+
226+
firePost(initializeBody);
227+
const sessionId = await waitForSession();
228+
expect(getTransports()[sessionId]).toBeDefined();
229+
230+
// Simulate the SDK calling close (as it does for DELETE requests)
231+
const internalTransport = getTransports()[sessionId];
232+
await internalTransport.close();
233+
234+
expect(getTransports()[sessionId]).toBeUndefined();
235+
});
236+
});
237+
238+
// ---------------------------------------------------------------------------
239+
// Broadcast send failures should not destroy sessions
240+
// ---------------------------------------------------------------------------
241+
describe('broadcast failures should not destroy sessions', () => {
242+
it('should preserve sessions after a failed broadcast send', async () => {
243+
await transport.start();
244+
transport.onmessage = async () => {};
245+
246+
firePost(initializeBody);
247+
const sessionId = await waitForSession();
248+
249+
// Monkey-patch the internal transport's send to throw
250+
const internalTransport = getTransports()[sessionId];
251+
internalTransport.send = async () => {
252+
throw new Error('Simulated send failure');
253+
};
254+
255+
// Broadcast — should NOT remove the session
256+
await transport.send({ jsonrpc: '2.0', method: 'notification/test' });
257+
258+
expect(getTransports()[sessionId]).toBeDefined();
259+
});
260+
});
261+
});

0 commit comments

Comments
 (0)