Skip to content
11 changes: 11 additions & 0 deletions src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ export class StreamableHTTPClientTransport implements Transport {
private _lastUpscopingHeader?: string; // Track last upscoping header to prevent infinite upscoping.
private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field
private _reconnectionTimeout?: ReturnType<typeof setTimeout>;
private _sseStreamOpened = false; // Track if SSE stream was successfully opened

onclose?: () => void;
onerror?: (error: Error) => void;
Expand Down Expand Up @@ -240,6 +241,7 @@ export class StreamableHTTPClientTransport implements Transport {
throw new StreamableHTTPError(response.status, `Failed to open SSE stream: ${response.statusText}`);
}

this._sseStreamOpened = true;
this._handleSseStream(response.body, options, true);
} catch (error) {
this.onerror?.(error as Error);
Expand Down Expand Up @@ -479,10 +481,19 @@ export class StreamableHTTPClientTransport implements Transport {

// Handle session ID received during initialization
const sessionId = response.headers.get('mcp-session-id');
const hadSessionId = this._sessionId !== undefined;
if (sessionId) {
this._sessionId = sessionId;
}

// If we just received a session ID for the first time and SSE stream is not open,
// try to open it now. This handles the case where the initial SSE connection
// during start() was rejected because the server wasn't initialized yet.
// See: https://github.com/modelcontextprotocol/typescript-sdk/issues/1167
if (sessionId && !hadSessionId && !this._sseStreamOpened) {
this._startOrAuthSse({ resumptionToken: undefined }).catch(err => this.onerror?.(err));
}
Comment on lines +490 to +495
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug: During normal MCP initialization, two concurrent GET SSE streams are opened. The new session-ID path (line 493) fires _startOrAuthSse when the initialize response returns a session ID, and then the existing isInitializedNotification path (line 574) fires _startOrAuthSse again unconditionally when the initialized notification gets a 202 response -- it does not check _sseStreamOpened. The server rejects the duplicate with 409 Conflict, causing a spurious error surfaced via onerror on every stateful initialization. Fix: add && !this._sseStreamOpened to the guard at line 574.

Extended reasoning...

What the bug is

This PR adds a new code path at line 493 that opens an SSE stream via _startOrAuthSse() when a session ID is received for the first time during send(). However, there is already an existing code path at line 574 that opens an SSE stream when isInitializedNotification(message) is true and the server responds with 202. During normal MCP initialization, both paths fire, creating two concurrent GET SSE connections.

The specific triggering flow

Step-by-step proof with the standard MCP handshake:

  1. Client sends initialize POST -- Server responds with 200 + mcp-session-id header.
  2. At line 484, hadSessionId is false (no prior session). At line 488, this._sessionId is set.
  3. At line 493, the condition sessionId && !hadSessionId && !this._sseStreamOpened is true, so _startOrAuthSse() fires (fire-and-forget). This opens SSE stream Better validation of data coming over the wire #1.
  4. The server is already initialized (it responded to initialize), so the GET SSE succeeds, and _sseStreamOpened is set to true at line 244.
  5. Client sends initialized notification POST -- Server responds with 202.
  6. At line 574, isInitializedNotification(message) returns true. This path calls _startOrAuthSse() without checking _sseStreamOpened. This opens SSE stream Race condition between transport start and server connection #2.

Why existing code does not prevent it

The _sseStreamOpened flag is only checked at line 493 (the new path) and set at line 244. The isInitializedNotification path at line 574 never reads this flag, so it unconditionally opens a second stream even after the first one succeeded. The _startOrAuthSse method itself has no internal guard against being called multiple times.

Impact

The server explicitly rejects duplicate GET SSE streams with 409 Conflict (webStandardStreamableHttp.ts lines 410-414: "Only one SSE stream is allowed per session"). This means:

  • Every stateful initialization produces an unnecessary network request
  • A StreamableHTTPError(409) is thrown and surfaced via onerror, creating a spurious error on every normal init
  • With third-party server implementations that do not enforce the 409 guard, actual duplicate message delivery could occur

How to fix

Add && !this._sseStreamOpened to the guard at line 574:

if (isInitializedNotification(message) && !this._sseStreamOpened) {
    this._startOrAuthSse({ resumptionToken: undefined }).catch(err => this.onerror?.(err));
}

Alternatively, skip the new session-ID path (line 493) when the message being sent is the initialize request, since the initialized notification path will handle opening the stream.


if (!response.ok) {
const text = await response.text().catch(() => null);

Expand Down
34 changes: 34 additions & 0 deletions test/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,19 @@ describe('StreamableHTTPClientTransport', () => {
headers: new Headers({ 'content-type': 'text/event-stream', 'mcp-session-id': 'test-session-id' })
});

// Mock the SSE stream GET request that happens after receiving session ID
(global.fetch as Mock).mockResolvedValueOnce({
ok: false,
status: 405,
headers: new Headers(),
body: { cancel: vi.fn() }
});

await transport.send(message);

// Allow the async SSE connection attempt to complete
await new Promise(resolve => setTimeout(resolve, 10));

// Send a second message that should include the session ID
(global.fetch as Mock).mockResolvedValueOnce({
ok: true,
Expand Down Expand Up @@ -137,7 +148,19 @@ describe('StreamableHTTPClientTransport', () => {
headers: new Headers({ 'content-type': 'text/event-stream', 'mcp-session-id': 'test-session-id' })
});

// Mock the SSE stream GET request that happens after receiving session ID
(global.fetch as Mock).mockResolvedValueOnce({
ok: false,
status: 405,
headers: new Headers(),
body: { cancel: vi.fn() }
});

await transport.send(message);

// Allow the async SSE connection attempt to complete
await new Promise(resolve => setTimeout(resolve, 10));

expect(transport.sessionId).toBe('test-session-id');

// Now terminate the session
Expand Down Expand Up @@ -177,8 +200,19 @@ describe('StreamableHTTPClientTransport', () => {
headers: new Headers({ 'content-type': 'text/event-stream', 'mcp-session-id': 'test-session-id' })
});

// Mock the SSE stream GET request that happens after receiving session ID
(global.fetch as Mock).mockResolvedValueOnce({
ok: false,
status: 405,
headers: new Headers(),
body: { cancel: vi.fn() }
});

await transport.send(message);

// Allow the async SSE connection attempt to complete
await new Promise(resolve => setTimeout(resolve, 10));

// Now terminate the session, but server responds with 405
(global.fetch as Mock).mockResolvedValueOnce({
ok: false,
Expand Down
47 changes: 47 additions & 0 deletions test/integration-tests/stateManagementStreamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
ListToolsResultSchema,
ListResourcesResultSchema,
ListPromptsResultSchema,
ListRootsRequestSchema,
LATEST_PROTOCOL_VERSION
} from '../../src/types.js';
import { zodTestMatrix, type ZodMatrixEntry } from '../../src/__fixtures__/zodTestMatrix.js';
Expand Down Expand Up @@ -376,6 +377,52 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
// Clean up
await transport.close();
});

it('should support server-initiated roots/list request', async () => {
// This test reproduces GitHub issue #1167
// https://github.com/modelcontextprotocol/typescript-sdk/issues/1167
//
// The bug: server.listRoots() hangs when using HTTP transport because:
// 1. Client tries to open GET SSE stream before initialization
// 2. Server rejects with 400 "Server not initialized"
// 3. Client never retries opening SSE stream after initialization
// 4. Server's send() silently returns when no SSE stream exists
// 5. listRoots() promise never resolves

// Create client with roots capability
const client = new Client({ name: 'test-client', version: '1.0.0' }, { capabilities: { roots: { listChanged: true } } });

// Register handler for roots/list requests from server
client.setRequestHandler(ListRootsRequestSchema, async () => {
return {
roots: [{ uri: 'file:///home/user/project', name: 'Test Project' }]
};
});

const transport = new StreamableHTTPClientTransport(baseUrl);
await client.connect(transport);

// Verify client has session ID (stateful mode)
expect(transport.sessionId).toBeDefined();

// Now try to call listRoots from the server
const rootsPromise = mcpServer!.server.listRoots();

// Use a short timeout to detect the hang
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error('listRoots() timed out - SSE stream not working')), 2000);
});

const result = await Promise.race([rootsPromise, timeoutPromise]);

expect(result.roots).toHaveLength(1);
expect(result.roots[0]).toEqual({
uri: 'file:///home/user/project',
name: 'Test Project'
});

await transport.close();
});
});
});
});
Loading
Loading