Skip to content

Commit 8719716

Browse files
committed
Revert "fix(client): await SSE stream open in connect() to prevent race condition"
This reverts commit d0eeff6. #1521 (review)
1 parent 6fa4688 commit 8719716

2 files changed

Lines changed: 46 additions & 6 deletions

File tree

src/client/streamableHttp.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ export class StreamableHTTPClientTransport implements Transport {
137137
private _lastUpscopingHeader?: string; // Track last upscoping header to prevent infinite upscoping.
138138
private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field
139139
private _reconnectionTimeout?: ReturnType<typeof setTimeout>;
140+
private _sseStreamOpened = false; // Track if SSE stream was successfully opened
140141

141142
onclose?: () => void;
142143
onerror?: (error: Error) => void;
@@ -240,6 +241,7 @@ export class StreamableHTTPClientTransport implements Transport {
240241
throw new StreamableHTTPError(response.status, `Failed to open SSE stream: ${response.statusText}`);
241242
}
242243

244+
this._sseStreamOpened = true;
243245
this._handleSseStream(response.body, options, true);
244246
} catch (error) {
245247
this.onerror?.(error as Error);
@@ -479,10 +481,19 @@ export class StreamableHTTPClientTransport implements Transport {
479481

480482
// Handle session ID received during initialization
481483
const sessionId = response.headers.get('mcp-session-id');
484+
const hadSessionId = this._sessionId !== undefined;
482485
if (sessionId) {
483486
this._sessionId = sessionId;
484487
}
485488

489+
// If we just received a session ID for the first time and SSE stream is not open,
490+
// try to open it now. This handles the case where the initial SSE connection
491+
// during start() was rejected because the server wasn't initialized yet.
492+
// See: https://github.com/modelcontextprotocol/typescript-sdk/issues/1167
493+
if (sessionId && !hadSessionId && !this._sseStreamOpened) {
494+
this._startOrAuthSse({ resumptionToken: undefined }).catch(err => this.onerror?.(err));
495+
}
496+
486497
if (!response.ok) {
487498
const text = await response.text().catch(() => null);
488499

@@ -561,12 +572,8 @@ export class StreamableHTTPClientTransport implements Transport {
561572
// if the accepted notification is initialized, we start the SSE stream
562573
// if it's supported by the server
563574
if (isInitializedNotification(message)) {
564-
// Await the SSE stream opening so that connect() does not resolve
565-
// until the GET listener is established. This prevents a race where
566-
// the server sends a request (e.g. roots/list) before the stream is ready.
567-
// Errors are swallowed here because _startOrAuthSse already reports
568-
// them via onerror, and the SSE stream is optional (server may 405).
569-
await this._startOrAuthSse({ resumptionToken: undefined }).catch(() => {});
575+
// Start without a lastEventId since this is a fresh connection
576+
this._startOrAuthSse({ resumptionToken: undefined }).catch(err => this.onerror?.(err));
570577
}
571578
return;
572579
}

test/client/streamableHttp.test.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,19 @@ describe('StreamableHTTPClientTransport', () => {
101101
headers: new Headers({ 'content-type': 'text/event-stream', 'mcp-session-id': 'test-session-id' })
102102
});
103103

104+
// Mock the SSE stream GET request that happens after receiving session ID
105+
(global.fetch as Mock).mockResolvedValueOnce({
106+
ok: false,
107+
status: 405,
108+
headers: new Headers(),
109+
body: { cancel: vi.fn() }
110+
});
111+
104112
await transport.send(message);
105113

114+
// Allow the async SSE connection attempt to complete
115+
await new Promise(resolve => setTimeout(resolve, 10));
116+
106117
// Send a second message that should include the session ID
107118
(global.fetch as Mock).mockResolvedValueOnce({
108119
ok: true,
@@ -137,8 +148,19 @@ describe('StreamableHTTPClientTransport', () => {
137148
headers: new Headers({ 'content-type': 'text/event-stream', 'mcp-session-id': 'test-session-id' })
138149
});
139150

151+
// Mock the SSE stream GET request that happens after receiving session ID
152+
(global.fetch as Mock).mockResolvedValueOnce({
153+
ok: false,
154+
status: 405,
155+
headers: new Headers(),
156+
body: { cancel: vi.fn() }
157+
});
158+
140159
await transport.send(message);
141160

161+
// Allow the async SSE connection attempt to complete
162+
await new Promise(resolve => setTimeout(resolve, 10));
163+
142164
expect(transport.sessionId).toBe('test-session-id');
143165

144166
// Now terminate the session
@@ -178,8 +200,19 @@ describe('StreamableHTTPClientTransport', () => {
178200
headers: new Headers({ 'content-type': 'text/event-stream', 'mcp-session-id': 'test-session-id' })
179201
});
180202

203+
// Mock the SSE stream GET request that happens after receiving session ID
204+
(global.fetch as Mock).mockResolvedValueOnce({
205+
ok: false,
206+
status: 405,
207+
headers: new Headers(),
208+
body: { cancel: vi.fn() }
209+
});
210+
181211
await transport.send(message);
182212

213+
// Allow the async SSE connection attempt to complete
214+
await new Promise(resolve => setTimeout(resolve, 10));
215+
183216
// Now terminate the session, but server responds with 405
184217
(global.fetch as Mock).mockResolvedValueOnce({
185218
ok: false,

0 commit comments

Comments
 (0)