Skip to content

Commit 8e0b918

Browse files
fix(client): reset retry/upscoping state on close() and prevent stale reconnects after restart
- close() now resets _serverRetryMs and _lastUpscopingHeader so a restarted transport does not inherit the previous session's server-provided retry delay or upscoping-loop guard state. - _scheduleReconnection and _handleSseStream capture the abort signal at schedule time instead of re-reading this._abortController. After close() + start() the field points at the new session's controller, so the previous read allowed a stale reconnect to fire into the restarted transport. - Added tests for both cases.
1 parent 043e452 commit 8e0b918

2 files changed

Lines changed: 53 additions & 3 deletions

File tree

packages/client/src/client/streamableHttp.ts

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -341,10 +341,16 @@ export class StreamableHTTPClientTransport implements Transport {
341341
// Calculate next delay based on current attempt count
342342
const delay = this._getNextReconnectionDelay(attemptCount);
343343

344+
// Capture the signal active when this reconnection was scheduled. close() + start()
345+
// replaces this._abortController, so re-reading it later would see the new session's
346+
// controller and allow a stale reconnect to fire into the restarted transport.
347+
const signal = this._abortController?.signal;
348+
344349
const reconnect = (): void => {
345350
this._cancelReconnection = undefined;
346-
if (this._abortController?.signal.aborted) return;
351+
if (signal?.aborted) return;
347352
this._startOrAuthSse(options).catch(error => {
353+
if (signal?.aborted) return;
348354
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`));
349355
try {
350356
this._scheduleReconnection(options, attemptCount + 1);
@@ -369,6 +375,9 @@ export class StreamableHTTPClientTransport implements Transport {
369375
}
370376
const { onresumptiontoken, replayMessageId } = options;
371377

378+
// Capture the signal this stream is bound to so we don't reconnect into a restarted transport.
379+
const signal = this._abortController?.signal;
380+
372381
let lastEventId: string | undefined;
373382
// Track whether we've received a priming event (event with ID)
374383
// Per spec, server SHOULD send a priming event with ID before closing
@@ -436,7 +445,7 @@ export class StreamableHTTPClientTransport implements Transport {
436445
// BUT don't reconnect if we already received a response - the request is complete
437446
const canResume = isReconnectable || hasPrimingEvent;
438447
const needsReconnect = canResume && !receivedResponse;
439-
if (needsReconnect && this._abortController && !this._abortController.signal.aborted) {
448+
if (needsReconnect && signal && !signal.aborted) {
440449
this._scheduleReconnection(
441450
{
442451
resumptionToken: lastEventId,
@@ -455,7 +464,7 @@ export class StreamableHTTPClientTransport implements Transport {
455464
// BUT don't reconnect if we already received a response - the request is complete
456465
const canResume = isReconnectable || hasPrimingEvent;
457466
const needsReconnect = canResume && !receivedResponse;
458-
if (needsReconnect && this._abortController && !this._abortController.signal.aborted) {
467+
if (needsReconnect && signal && !signal.aborted) {
459468
// Use the exponential backoff reconnection strategy
460469
try {
461470
this._scheduleReconnection(
@@ -512,6 +521,8 @@ export class StreamableHTTPClientTransport implements Transport {
512521
this._cancelReconnection = undefined;
513522
this._abortController?.abort();
514523
this._sessionId = undefined;
524+
this._lastUpscopingHeader = undefined;
525+
this._serverRetryMs = undefined;
515526
this.onclose?.();
516527
}
517528
}

packages/client/test/client/streamableHttp.test.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1998,6 +1998,31 @@ describe('StreamableHTTPClientTransport', () => {
19981998
expect(onerror).not.toHaveBeenCalled();
19991999
});
20002000

2001+
it('ignores a late-firing reconnect after close() + start()', async () => {
2002+
let capturedReconnect: (() => void) | undefined;
2003+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
2004+
reconnectionOptions,
2005+
reconnectionScheduler: reconnect => {
2006+
capturedReconnect = reconnect;
2007+
}
2008+
});
2009+
const onerror = vi.fn();
2010+
transport.onerror = onerror;
2011+
const fetchMock = globalThis.fetch as Mock;
2012+
2013+
await transport.start();
2014+
triggerReconnection(transport);
2015+
await transport.close();
2016+
await transport.start();
2017+
2018+
fetchMock.mockClear();
2019+
capturedReconnect?.();
2020+
await vi.runAllTimersAsync();
2021+
2022+
expect(fetchMock).not.toHaveBeenCalled();
2023+
expect(onerror).not.toHaveBeenCalled();
2024+
});
2025+
20012026
it('still aborts and fires onclose if the cancel function throws', async () => {
20022027
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
20032028
reconnectionOptions,
@@ -2059,5 +2084,19 @@ describe('StreamableHTTPClientTransport', () => {
20592084
const postRestartHeaders = fetchMock.mock.calls[1]![1]?.headers as Headers;
20602085
expect(postRestartHeaders.get('mcp-session-id')).toBeNull();
20612086
});
2087+
2088+
it('should reset server-provided retry delay and upscoping header on close()', async () => {
2089+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'));
2090+
await transport.start();
2091+
2092+
const internal = transport as unknown as { _serverRetryMs?: number; _lastUpscopingHeader?: string };
2093+
internal._serverRetryMs = 3000;
2094+
internal._lastUpscopingHeader = 'Bearer realm="x"';
2095+
2096+
await transport.close();
2097+
2098+
expect(internal._serverRetryMs).toBeUndefined();
2099+
expect(internal._lastUpscopingHeader).toBeUndefined();
2100+
});
20622101
});
20632102
});

0 commit comments

Comments
 (0)