Skip to content

Commit 47f27a2

Browse files
committed
Revert "fix(client): await SSE stream open in connect() to prevent race condition"
This reverts commit 88db838.
1 parent 9b681d2 commit 47f27a2

2 files changed

Lines changed: 46 additions & 6 deletions

File tree

packages/client/src/client/streamableHttp.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ export class StreamableHTTPClientTransport implements Transport {
184184
private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field
185185
private readonly _reconnectionScheduler?: ReconnectionScheduler;
186186
private _cancelReconnection?: () => void;
187+
private _sseStreamOpened = false; // Track if SSE stream was successfully opened
187188

188189
onclose?: () => void;
189190
onerror?: (error: Error) => void;
@@ -292,6 +293,7 @@ export class StreamableHTTPClientTransport implements Transport {
292293
});
293294
}
294295

296+
this._sseStreamOpened = true;
295297
this._handleSseStream(response.body, options, true);
296298
} catch (error) {
297299
this.onerror?.(error as Error);
@@ -552,10 +554,19 @@ export class StreamableHTTPClientTransport implements Transport {
552554

553555
// Handle session ID received during initialization
554556
const sessionId = response.headers.get('mcp-session-id');
557+
const hadSessionId = this._sessionId !== undefined;
555558
if (sessionId) {
556559
this._sessionId = sessionId;
557560
}
558561

562+
// If we just received a session ID for the first time and SSE stream is not open,
563+
// try to open it now. This handles the case where the initial SSE connection
564+
// during start() was rejected because the server wasn't initialized yet.
565+
// See: https://github.com/modelcontextprotocol/typescript-sdk/issues/1167
566+
if (sessionId && !hadSessionId && !this._sseStreamOpened) {
567+
this._startOrAuthSse({ resumptionToken: undefined }).catch(error => this.onerror?.(error));
568+
}
569+
559570
if (!response.ok) {
560571
if (response.status === 401 && this._authProvider) {
561572
// Store WWW-Authenticate params for interactive finishAuth() path
@@ -639,12 +650,8 @@ export class StreamableHTTPClientTransport implements Transport {
639650
// if the accepted notification is initialized, we start the SSE stream
640651
// if it's supported by the server
641652
if (isInitializedNotification(message)) {
642-
// Await the SSE stream opening so that connect() does not resolve
643-
// until the GET listener is established. This prevents a race where
644-
// the server sends a request (e.g. roots/list) before the stream is ready.
645-
// Errors are swallowed here because _startOrAuthSse already reports
646-
// them via onerror, and the SSE stream is optional (server may 405).
647-
await this._startOrAuthSse({ resumptionToken: undefined }).catch(() => {});
653+
// Start without a lastEventId since this is a fresh connection
654+
this._startOrAuthSse({ resumptionToken: undefined }).catch(error => this.onerror?.(error));
648655
}
649656
return;
650657
}

packages/client/test/client/streamableHttp.test.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,19 @@ describe('StreamableHTTPClientTransport', () => {
104104
headers: new Headers({ 'content-type': 'text/event-stream', 'mcp-session-id': 'test-session-id' })
105105
});
106106

107+
// Mock the SSE stream GET request that happens after receiving session ID
108+
(globalThis.fetch as Mock).mockResolvedValueOnce({
109+
ok: false,
110+
status: 405,
111+
headers: new Headers(),
112+
body: { cancel: vi.fn() }
113+
});
114+
107115
await transport.send(message);
108116

117+
// Allow the async SSE connection attempt to complete
118+
await new Promise(resolve => setTimeout(resolve, 10));
119+
109120
// Send a second message that should include the session ID
110121
(globalThis.fetch as Mock).mockResolvedValueOnce({
111122
ok: true,
@@ -167,8 +178,19 @@ describe('StreamableHTTPClientTransport', () => {
167178
headers: new Headers({ 'content-type': 'text/event-stream', 'mcp-session-id': 'test-session-id' })
168179
});
169180

181+
// Mock the SSE stream GET request that happens after receiving session ID
182+
(globalThis.fetch as Mock).mockResolvedValueOnce({
183+
ok: false,
184+
status: 405,
185+
headers: new Headers(),
186+
body: { cancel: vi.fn() }
187+
});
188+
170189
await transport.send(message);
171190

191+
// Allow the async SSE connection attempt to complete
192+
await new Promise(resolve => setTimeout(resolve, 10));
193+
172194
expect(transport.sessionId).toBe('test-session-id');
173195

174196
// Now terminate the session
@@ -208,8 +230,19 @@ describe('StreamableHTTPClientTransport', () => {
208230
headers: new Headers({ 'content-type': 'text/event-stream', 'mcp-session-id': 'test-session-id' })
209231
});
210232

233+
// Mock the SSE stream GET request that happens after receiving session ID
234+
(globalThis.fetch as Mock).mockResolvedValueOnce({
235+
ok: false,
236+
status: 405,
237+
headers: new Headers(),
238+
body: { cancel: vi.fn() }
239+
});
240+
211241
await transport.send(message);
212242

243+
// Allow the async SSE connection attempt to complete
244+
await new Promise(resolve => setTimeout(resolve, 10));
245+
213246
// Now terminate the session, but server responds with 405
214247
(globalThis.fetch as Mock).mockResolvedValueOnce({
215248
ok: false,

0 commit comments

Comments
 (0)