Skip to content

Commit 7158bd5

Browse files
committed
fix(client): avoid stale-session 404 race and stop retry loop on session-bound 404
1 parent 8f88769 commit 7158bd5

2 files changed

Lines changed: 152 additions & 7 deletions

File tree

packages/client/src/client/streamableHttp.ts

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,23 @@ const DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS: StreamableHTTPReconnectionOp
2525
maxRetries: 2
2626
};
2727

28+
const SESSION_BOUND_404_ERROR = Symbol('sessionBound404Error');
29+
30+
type SessionBound404Error = Error & { [SESSION_BOUND_404_ERROR]?: true };
31+
32+
function markSessionBound404Error(error: Error): Error {
33+
(error as SessionBound404Error)[SESSION_BOUND_404_ERROR] = true;
34+
return error;
35+
}
36+
37+
function isSessionBound404Error(error: unknown): boolean {
38+
return Boolean(
39+
error &&
40+
typeof error === 'object' &&
41+
(error as SessionBound404Error)[SESSION_BOUND_404_ERROR] === true
42+
);
43+
}
44+
2845
/**
2946
* Options for starting or authenticating an SSE connection
3047
*/
@@ -237,7 +254,7 @@ export class StreamableHTTPClientTransport implements Transport {
237254
// Try to open an initial SSE stream with GET to listen for server messages
238255
// This is optional according to the spec - server may not support it
239256
const headers = await this._commonHeaders();
240-
const sentWithSession = headers.has('mcp-session-id');
257+
const sentSessionId = headers.get('mcp-session-id');
241258
const userAccept = headers.get('accept');
242259
const types = [...(userAccept?.split(',').map(s => s.trim().toLowerCase()) ?? []), 'text/event-stream'];
243260
headers.set('accept', [...new Set(types)].join(', '));
@@ -255,7 +272,9 @@ export class StreamableHTTPClientTransport implements Transport {
255272
});
256273

257274
if (!response.ok) {
258-
if (response.status === 404 && sentWithSession) {
275+
const shouldClearSessionFor404 =
276+
response.status === 404 && sentSessionId !== null && this._sessionId === sentSessionId;
277+
if (shouldClearSessionFor404) {
259278
this._sessionId = undefined;
260279
}
261280

@@ -293,10 +312,15 @@ export class StreamableHTTPClientTransport implements Transport {
293312
return;
294313
}
295314

296-
throw new SdkError(SdkErrorCode.ClientHttpFailedToOpenStream, `Failed to open SSE stream: ${response.statusText}`, {
315+
const error = new SdkError(
316+
SdkErrorCode.ClientHttpFailedToOpenStream,
317+
`Failed to open SSE stream: ${response.statusText}`,
318+
{
297319
status: response.status,
298320
statusText: response.statusText
299-
});
321+
}
322+
);
323+
throw shouldClearSessionFor404 ? markSessionBound404Error(error) : error;
300324
}
301325

302326
this._handleSseStream(response.body, options, true);
@@ -350,7 +374,11 @@ export class StreamableHTTPClientTransport implements Transport {
350374
this._cancelReconnection = undefined;
351375
if (this._abortController?.signal.aborted) return;
352376
this._startOrAuthSse(options).catch(error => {
353-
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`));
377+
const reconnectError = error instanceof Error ? error : new Error(String(error));
378+
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${reconnectError.message}`));
379+
if (isSessionBound404Error(reconnectError)) {
380+
return;
381+
}
354382
try {
355383
this._scheduleReconnection(options, attemptCount + 1);
356384
} catch (scheduleError) {
@@ -544,7 +572,7 @@ export class StreamableHTTPClientTransport implements Transport {
544572
}
545573

546574
const headers = await this._commonHeaders();
547-
const sentWithSession = headers.has('mcp-session-id');
575+
const sentSessionId = headers.get('mcp-session-id');
548576
headers.set('content-type', 'application/json');
549577
const userAccept = headers.get('accept');
550578
const types = [...(userAccept?.split(',').map(s => s.trim().toLowerCase()) ?? []), 'application/json', 'text/event-stream'];
@@ -567,7 +595,7 @@ export class StreamableHTTPClientTransport implements Transport {
567595
}
568596

569597
if (!response.ok) {
570-
if (response.status === 404 && sentWithSession) {
598+
if (response.status === 404 && sentSessionId !== null && this._sessionId === sentSessionId) {
571599
this._sessionId = undefined;
572600
}
573601

packages/client/test/client/streamableHttp.test.ts

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,49 @@ describe('StreamableHTTPClientTransport', () => {
303303
expect(lastCall[1].headers.get('mcp-session-id')).toBeNull();
304304
});
305305

306+
it('should not clear a newer session ID when a stale session-bound POST request returns 404', async () => {
307+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
308+
sessionId: 'stale-session-A'
309+
});
310+
311+
const message: JSONRPCMessage = {
312+
jsonrpc: '2.0',
313+
method: 'tools/list',
314+
params: {},
315+
id: 'test-id'
316+
};
317+
318+
let resolveFetch!: (value: unknown) => void;
319+
const deferredFetch = new Promise(resolve => {
320+
resolveFetch = resolve;
321+
});
322+
323+
(globalThis.fetch as Mock).mockImplementationOnce(() => {
324+
// Simulate another in-flight request establishing a fresh session while this request is pending.
325+
(transport as unknown as { _sessionId?: string })._sessionId = 'fresh-session-B';
326+
return deferredFetch;
327+
});
328+
329+
const sendPromise = transport.send(message);
330+
331+
resolveFetch({
332+
ok: false,
333+
status: 404,
334+
statusText: 'Not Found',
335+
text: () => Promise.resolve('Session not found'),
336+
headers: new Headers()
337+
});
338+
339+
await expect(sendPromise).rejects.toMatchObject({
340+
code: SdkErrorCode.ClientHttpNotImplemented,
341+
data: expect.objectContaining({
342+
status: 404
343+
})
344+
});
345+
346+
expect(transport.sessionId).toBe('fresh-session-B');
347+
});
348+
306349
it('should handle non-streaming JSON response', async () => {
307350
const message: JSONRPCMessage = {
308351
jsonrpc: '2.0',
@@ -395,6 +438,46 @@ describe('StreamableHTTPClientTransport', () => {
395438
expect(getCall[1].headers.get('mcp-session-id')).toBe('stale-session-id');
396439
});
397440

441+
it('should not clear a newer session ID when a stale session-bound GET request returns 404', async () => {
442+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
443+
sessionId: 'stale-session-A'
444+
});
445+
await transport.start();
446+
447+
let resolveFetch!: (value: unknown) => void;
448+
const deferredFetch = new Promise(resolve => {
449+
resolveFetch = resolve;
450+
});
451+
452+
(globalThis.fetch as Mock).mockImplementationOnce(() => {
453+
// Simulate another in-flight request establishing a fresh session while this request is pending.
454+
(transport as unknown as { _sessionId?: string })._sessionId = 'fresh-session-B';
455+
return deferredFetch;
456+
});
457+
458+
const startPromise = (
459+
transport as unknown as { _startOrAuthSse: (opts: StartSSEOptions) => Promise<void> }
460+
)._startOrAuthSse({});
461+
462+
resolveFetch({
463+
ok: false,
464+
status: 404,
465+
statusText: 'Not Found',
466+
text: () => Promise.resolve('Session not found'),
467+
headers: new Headers()
468+
});
469+
470+
await expect(startPromise).rejects.toMatchObject({
471+
code: SdkErrorCode.ClientHttpFailedToOpenStream,
472+
data: expect.objectContaining({
473+
status: 404,
474+
statusText: 'Not Found'
475+
})
476+
});
477+
478+
expect(transport.sessionId).toBe('fresh-session-B');
479+
});
480+
398481
it('should handle successful initial GET connection for SSE', async () => {
399482
// Set up readable stream for SSE events
400483
const encoder = new TextEncoder();
@@ -1022,6 +1105,40 @@ describe('StreamableHTTPClientTransport', () => {
10221105
expect(fetchMock.mock.calls[1]![1]?.method).toBe('GET');
10231106
});
10241107

1108+
it('should stop retrying GET reconnection after a session-bound 404 clears the stale session', async () => {
1109+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
1110+
sessionId: 'stale-session-id',
1111+
reconnectionOptions: {
1112+
initialReconnectionDelay: 10,
1113+
maxRetries: 3,
1114+
maxReconnectionDelay: 1000,
1115+
reconnectionDelayGrowFactor: 1
1116+
}
1117+
});
1118+
await transport.start();
1119+
1120+
const fetchMock = globalThis.fetch as Mock;
1121+
fetchMock.mockResolvedValue({
1122+
ok: false,
1123+
status: 404,
1124+
statusText: 'Not Found',
1125+
headers: new Headers(),
1126+
text: () => Promise.resolve('Session not found')
1127+
});
1128+
1129+
(
1130+
transport as unknown as {
1131+
_scheduleReconnection: (opts: StartSSEOptions, attemptCount?: number) => void;
1132+
}
1133+
)._scheduleReconnection({}, 0);
1134+
1135+
await vi.advanceTimersByTimeAsync(20);
1136+
await vi.advanceTimersByTimeAsync(100);
1137+
1138+
expect(fetchMock).toHaveBeenCalledTimes(1);
1139+
expect(transport.sessionId).toBeUndefined();
1140+
});
1141+
10251142
it('should NOT reconnect a POST-initiated stream that fails', async () => {
10261143
// ARRANGE
10271144
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {

0 commit comments

Comments
 (0)