Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions packages/client/src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ export class StreamableHTTPClientTransport implements Transport {
private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field
private readonly _reconnectionScheduler?: ReconnectionScheduler;
private _cancelReconnection?: () => void;
private _sseStreamOpened = false; // Track if SSE stream was successfully opened

onclose?: () => void;
onerror?: (error: Error) => void;
Expand Down Expand Up @@ -294,6 +295,7 @@ export class StreamableHTTPClientTransport implements Transport {
});
}

this._sseStreamOpened = true;
this._handleSseStream(response.body, options, true);
} catch (error) {
this.onerror?.(error as Error);
Expand Down Expand Up @@ -556,10 +558,19 @@ export class StreamableHTTPClientTransport implements Transport {

// Handle session ID received during initialization
const sessionId = response.headers.get('mcp-session-id');
const hadSessionId = this._sessionId !== undefined;
if (sessionId) {
this._sessionId = sessionId;
}

// If we just received a session ID for the first time and SSE stream is not open,
// try to open it now. This handles the case where the initial SSE connection
// during start() was rejected because the server wasn't initialized yet.
// See: https://github.com/modelcontextprotocol/typescript-sdk/issues/1167
if (sessionId && !hadSessionId && !this._sseStreamOpened) {
this._startOrAuthSse({ resumptionToken: undefined }).catch(error => this.onerror?.(error));
}

if (!response.ok) {
if (response.status === 401 && this._authProvider) {
// Store WWW-Authenticate params for interactive finishAuth() path
Expand Down
34 changes: 34 additions & 0 deletions packages/client/test/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,19 @@ describe('StreamableHTTPClientTransport', () => {
headers: new Headers({ 'content-type': 'text/event-stream', 'mcp-session-id': 'test-session-id' })
});

// Mock the SSE stream GET request that happens after receiving session ID
(globalThis.fetch as Mock).mockResolvedValueOnce({
ok: false,
status: 405,
headers: new Headers(),
body: { cancel: vi.fn() }
});

await transport.send(message);

// Allow the async SSE connection attempt to complete
await new Promise(resolve => setTimeout(resolve, 10));

// Send a second message that should include the session ID
(globalThis.fetch as Mock).mockResolvedValueOnce({
ok: true,
Expand Down Expand Up @@ -167,7 +178,19 @@ describe('StreamableHTTPClientTransport', () => {
headers: new Headers({ 'content-type': 'text/event-stream', 'mcp-session-id': 'test-session-id' })
});

// Mock the SSE stream GET request that happens after receiving session ID
(globalThis.fetch as Mock).mockResolvedValueOnce({
ok: false,
status: 405,
headers: new Headers(),
body: { cancel: vi.fn() }
});

await transport.send(message);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spurious newline

// Allow the async SSE connection attempt to complete
await new Promise(resolve => setTimeout(resolve, 10));

expect(transport.sessionId).toBe('test-session-id');

// Now terminate the session
Expand Down Expand Up @@ -207,8 +230,19 @@ describe('StreamableHTTPClientTransport', () => {
headers: new Headers({ 'content-type': 'text/event-stream', 'mcp-session-id': 'test-session-id' })
});

// Mock the SSE stream GET request that happens after receiving session ID
(globalThis.fetch as Mock).mockResolvedValueOnce({
ok: false,
status: 405,
headers: new Headers(),
body: { cancel: vi.fn() }
});

await transport.send(message);

// Allow the async SSE connection attempt to complete
await new Promise(resolve => setTimeout(resolve, 10));

// Now terminate the session, but server responds with 405
(globalThis.fetch as Mock).mockResolvedValueOnce({
ok: false,
Expand Down
Loading
Loading