Skip to content

Commit 2d1622d

Browse files
feat(client): add reconnectionScheduler to StreamableHTTPClientTransport
Adds a ReconnectionScheduler callback option so non-persistent environments can override the default setTimeout-based SSE reconnection scheduling. The scheduler receives (reconnect, delay, attemptCount) and may return a cancel function that is invoked on transport.close(). This ensures pending custom-scheduled reconnections can be aborted the same way the built-in setTimeout is cleared. Replaces the _reconnectionTimeout field with a unified _cancelReconnection callback that works for both the default and custom scheduler paths. Fixes #1162 Closes #1177 Co-authored-by: CHOIJEWON <alsrn6040@kakao.com>
1 parent 40174d2 commit 2d1622d

File tree

3 files changed

+157
-15
lines changed

3 files changed

+157
-15
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@modelcontextprotocol/client': minor
3+
---
4+
5+
Add `reconnectionScheduler` option to `StreamableHTTPClientTransport`. Lets non-persistent environments (serverless, mobile, desktop sleep/wake) override the default `setTimeout`-based SSE reconnection scheduling. The scheduler may return a cancel function that is invoked on `transport.close()`.

packages/client/src/client/streamableHttp.ts

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,31 @@ export interface StreamableHTTPReconnectionOptions {
7878
maxRetries: number;
7979
}
8080

81+
/**
82+
* Custom scheduler for SSE stream reconnection attempts.
83+
*
84+
* Called instead of `setTimeout` when the transport needs to schedule a reconnection.
85+
* Useful in environments where `setTimeout` is unsuitable (serverless functions that
86+
* terminate before the timer fires, mobile apps that need platform background scheduling,
87+
* desktop apps handling sleep/wake).
88+
*
89+
* @param reconnect - Call this to perform the reconnection attempt.
90+
* @param delay - Suggested delay in milliseconds (from backoff calculation).
91+
* @param attemptCount - Zero-indexed retry attempt number.
92+
* @returns An optional cancel function. If returned, it will be called on
93+
* {@linkcode StreamableHTTPClientTransport.close | transport.close()} to abort the
94+
* pending reconnection.
95+
*
96+
* @example
97+
* ```ts
98+
* const scheduler: ReconnectionScheduler = (reconnect, delay) => {
99+
* const id = platformBackgroundTask.schedule(reconnect, delay);
100+
* return () => platformBackgroundTask.cancel(id);
101+
* };
102+
* ```
103+
*/
104+
export type ReconnectionScheduler = (reconnect: () => void, delay: number, attemptCount: number) => (() => void) | void;
105+
81106
/**
82107
* Configuration options for the {@linkcode StreamableHTTPClientTransport}.
83108
*/
@@ -116,6 +141,12 @@ export type StreamableHTTPClientTransportOptions = {
116141
*/
117142
reconnectionOptions?: StreamableHTTPReconnectionOptions;
118143

144+
/**
145+
* Custom scheduler for reconnection attempts. If not provided, `setTimeout` is used.
146+
* See {@linkcode ReconnectionScheduler}.
147+
*/
148+
reconnectionScheduler?: ReconnectionScheduler;
149+
119150
/**
120151
* Session ID for the connection. This is used to identify the session on the server.
121152
* When not provided and connecting to a server that supports session IDs, the server will generate a new session ID.
@@ -150,7 +181,8 @@ export class StreamableHTTPClientTransport implements Transport {
150181
private _protocolVersion?: string;
151182
private _lastUpscopingHeader?: string; // Track last upscoping header to prevent infinite upscoping.
152183
private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field
153-
private _reconnectionTimeout?: ReturnType<typeof setTimeout>;
184+
private readonly _reconnectionScheduler?: ReconnectionScheduler;
185+
private _cancelReconnection?: () => void;
154186

155187
onclose?: () => void;
156188
onerror?: (error: Error) => void;
@@ -172,6 +204,7 @@ export class StreamableHTTPClientTransport implements Transport {
172204
this._sessionId = opts?.sessionId;
173205
this._protocolVersion = opts?.protocolVersion;
174206
this._reconnectionOptions = opts?.reconnectionOptions ?? DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS;
207+
this._reconnectionScheduler = opts?.reconnectionScheduler;
175208
}
176209

177210
private async _commonHeaders(): Promise<Headers> {
@@ -305,15 +338,23 @@ export class StreamableHTTPClientTransport implements Transport {
305338
// Calculate next delay based on current attempt count
306339
const delay = this._getNextReconnectionDelay(attemptCount);
307340

308-
// Schedule the reconnection
309-
this._reconnectionTimeout = setTimeout(() => {
310-
// Use the last event ID to resume where we left off
341+
const reconnect = (): void => {
342+
this._cancelReconnection = undefined;
311343
this._startOrAuthSse(options).catch(error => {
312344
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`));
313-
// Schedule another attempt if this one failed, incrementing the attempt counter
314345
this._scheduleReconnection(options, attemptCount + 1);
315346
});
316-
}, delay);
347+
};
348+
349+
if (this._reconnectionScheduler) {
350+
const cancel = this._reconnectionScheduler(reconnect, delay, attemptCount);
351+
if (typeof cancel === 'function') {
352+
this._cancelReconnection = cancel;
353+
}
354+
} else {
355+
const handle = setTimeout(reconnect, delay);
356+
this._cancelReconnection = () => clearTimeout(handle);
357+
}
317358
}
318359

319360
private _handleSseStream(stream: ReadableStream<Uint8Array> | null, options: StartSSEOptions, isReconnectable: boolean): void {
@@ -458,9 +499,9 @@ export class StreamableHTTPClientTransport implements Transport {
458499
}
459500

460501
async close(): Promise<void> {
461-
if (this._reconnectionTimeout) {
462-
clearTimeout(this._reconnectionTimeout);
463-
this._reconnectionTimeout = undefined;
502+
if (this._cancelReconnection) {
503+
this._cancelReconnection();
504+
this._cancelReconnection = undefined;
464505
}
465506
this._abortController?.abort();
466507
this.onclose?.();

packages/client/test/client/streamableHttp.test.ts

Lines changed: 102 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import type { Mock, Mocked } from 'vitest';
44

55
import type { OAuthClientProvider } from '../../src/client/auth.js';
66
import { UnauthorizedError } from '../../src/client/auth.js';
7-
import type { StartSSEOptions, StreamableHTTPReconnectionOptions } from '../../src/client/streamableHttp.js';
7+
import type { ReconnectionScheduler, StartSSEOptions, StreamableHTTPReconnectionOptions } from '../../src/client/streamableHttp.js';
88
import { StreamableHTTPClientTransport } from '../../src/client/streamableHttp.js';
99

1010
describe('StreamableHTTPClientTransport', () => {
@@ -1617,8 +1617,8 @@ describe('StreamableHTTPClientTransport', () => {
16171617
})
16181618
);
16191619

1620-
// Verify no timeout was scheduled (no reconnection attempt)
1621-
expect(transport['_reconnectionTimeout']).toBeUndefined();
1620+
// Verify no reconnection was scheduled
1621+
expect(transport['_cancelReconnection']).toBeUndefined();
16221622
});
16231623

16241624
it('should schedule reconnection when maxRetries is greater than 0', async () => {
@@ -1640,10 +1640,10 @@ describe('StreamableHTTPClientTransport', () => {
16401640

16411641
// ASSERT - should schedule a reconnection, not report error yet
16421642
expect(errorSpy).not.toHaveBeenCalled();
1643-
expect(transport['_reconnectionTimeout']).toBeDefined();
1643+
expect(transport['_cancelReconnection']).toBeDefined();
16441644

1645-
// Clean up the timeout to avoid test pollution
1646-
clearTimeout(transport['_reconnectionTimeout']);
1645+
// Clean up the pending reconnection to avoid test pollution
1646+
transport['_cancelReconnection']?.();
16471647
});
16481648
});
16491649

@@ -1716,4 +1716,100 @@ describe('StreamableHTTPClientTransport', () => {
17161716
});
17171717
});
17181718
});
1719+
1720+
describe('reconnectionScheduler', () => {
1721+
const reconnectionOptions: StreamableHTTPReconnectionOptions = {
1722+
initialReconnectionDelay: 1000,
1723+
maxReconnectionDelay: 5000,
1724+
reconnectionDelayGrowFactor: 2,
1725+
maxRetries: 3
1726+
};
1727+
1728+
function triggerReconnection(t: StreamableHTTPClientTransport): void {
1729+
(t as unknown as { _scheduleReconnection(opts: StartSSEOptions, attempt?: number): void })._scheduleReconnection({}, 0);
1730+
}
1731+
1732+
beforeEach(() => {
1733+
vi.useFakeTimers();
1734+
});
1735+
1736+
afterEach(() => {
1737+
vi.useRealTimers();
1738+
});
1739+
1740+
it('invokes the custom scheduler with reconnect, delay, and attemptCount', () => {
1741+
const scheduler = vi.fn<ReconnectionScheduler>();
1742+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
1743+
reconnectionOptions,
1744+
reconnectionScheduler: scheduler
1745+
});
1746+
1747+
triggerReconnection(transport);
1748+
1749+
expect(scheduler).toHaveBeenCalledTimes(1);
1750+
expect(scheduler).toHaveBeenCalledWith(expect.any(Function), 1000, 0);
1751+
});
1752+
1753+
it('falls back to setTimeout when no scheduler is provided', () => {
1754+
const setTimeoutSpy = vi.spyOn(global, 'setTimeout');
1755+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
1756+
reconnectionOptions
1757+
});
1758+
1759+
triggerReconnection(transport);
1760+
1761+
expect(setTimeoutSpy).toHaveBeenCalledWith(expect.any(Function), 1000);
1762+
});
1763+
1764+
it('does not use setTimeout when a custom scheduler is provided', () => {
1765+
const setTimeoutSpy = vi.spyOn(global, 'setTimeout');
1766+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
1767+
reconnectionOptions,
1768+
reconnectionScheduler: vi.fn()
1769+
});
1770+
1771+
triggerReconnection(transport);
1772+
1773+
expect(setTimeoutSpy).not.toHaveBeenCalled();
1774+
});
1775+
1776+
it('calls the returned cancel function on close()', async () => {
1777+
const cancel = vi.fn();
1778+
const scheduler: ReconnectionScheduler = vi.fn(() => cancel);
1779+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
1780+
reconnectionOptions,
1781+
reconnectionScheduler: scheduler
1782+
});
1783+
1784+
triggerReconnection(transport);
1785+
expect(cancel).not.toHaveBeenCalled();
1786+
1787+
await transport.close();
1788+
expect(cancel).toHaveBeenCalledTimes(1);
1789+
});
1790+
1791+
it('tolerates schedulers that return void (no cancel function)', async () => {
1792+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
1793+
reconnectionOptions,
1794+
reconnectionScheduler: () => {
1795+
/* no return */
1796+
}
1797+
});
1798+
1799+
triggerReconnection(transport);
1800+
await expect(transport.close()).resolves.toBeUndefined();
1801+
});
1802+
1803+
it('clears the default setTimeout on close() when no scheduler is provided', async () => {
1804+
const clearTimeoutSpy = vi.spyOn(global, 'clearTimeout');
1805+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
1806+
reconnectionOptions
1807+
});
1808+
1809+
triggerReconnection(transport);
1810+
await transport.close();
1811+
1812+
expect(clearTimeoutSpy).toHaveBeenCalledTimes(1);
1813+
});
1814+
});
17191815
});

0 commit comments

Comments
 (0)