Skip to content
5 changes: 5 additions & 0 deletions .changeset/reconnection-scheduler.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@modelcontextprotocol/client': minor
---

Add `reconnectionScheduler` option to `StreamableHTTPClientTransport`. Lets non-persistent environments (serverless, mobile, desktop sleep/wake) override the default `setTimeout`-based SSE reconnection scheduling. The scheduler may return a cancel function that is invoked on `transport.close()`.
71 changes: 59 additions & 12 deletions packages/client/src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,31 @@
maxRetries: number;
}

/**
* Custom scheduler for SSE stream reconnection attempts.
*
* Called instead of `setTimeout` when the transport needs to schedule a reconnection.
* Useful in environments where `setTimeout` is unsuitable (serverless functions that
* terminate before the timer fires, mobile apps that need platform background scheduling,
* desktop apps handling sleep/wake).
*
* @param reconnect - Call this to perform the reconnection attempt.
* @param delay - Suggested delay in milliseconds (from backoff calculation).
* @param attemptCount - Zero-indexed retry attempt number.
* @returns An optional cancel function. If returned, it will be called on
* {@linkcode StreamableHTTPClientTransport.close | transport.close()} to abort the
* pending reconnection.
*
* @example
* ```ts
* const scheduler: ReconnectionScheduler = (reconnect, delay) => {
* const id = platformBackgroundTask.schedule(reconnect, delay);
* return () => platformBackgroundTask.cancel(id);
* };
* ```

Check warning on line 102 in packages/client/src/client/streamableHttp.ts

View check run for this annotation

Claude / Claude Code Review

Inline @example violates .examples.ts convention

Nit: The `@example` block on `ReconnectionScheduler` (lines 96–102) uses inline code, but [CLAUDE.md L46](../CLAUDE.md) requires JSDoc examples to reference companion `.examples.ts` files with `source="./streamableHttp.examples.ts#ReconnectionScheduler_basicUsage"`. All other files in this directory (`auth.ts`, `client.ts`, `middleware.ts`, `authExtensions.ts`) follow this convention.
Comment thread
claude[bot] marked this conversation as resolved.
*/
export type ReconnectionScheduler = (reconnect: () => void, delay: number, attemptCount: number) => (() => void) | void;

/**
* Configuration options for the {@linkcode StreamableHTTPClientTransport}.
*/
Expand Down Expand Up @@ -116,6 +141,12 @@
*/
reconnectionOptions?: StreamableHTTPReconnectionOptions;

/**
* Custom scheduler for reconnection attempts. If not provided, `setTimeout` is used.
* See {@linkcode ReconnectionScheduler}.
*/
reconnectionScheduler?: ReconnectionScheduler;

/**
* Session ID for the connection. This is used to identify the session on the server.
* When not provided and connecting to a server that supports session IDs, the server will generate a new session ID.
Expand Down Expand Up @@ -150,7 +181,8 @@
private _protocolVersion?: string;
private _lastUpscopingHeader?: string; // Track last upscoping header to prevent infinite upscoping.
private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field
private _reconnectionTimeout?: ReturnType<typeof setTimeout>;
private readonly _reconnectionScheduler?: ReconnectionScheduler;
private _cancelReconnection?: () => void;

onclose?: () => void;
onerror?: (error: Error) => void;
Expand All @@ -172,6 +204,7 @@
this._sessionId = opts?.sessionId;
this._protocolVersion = opts?.protocolVersion;
this._reconnectionOptions = opts?.reconnectionOptions ?? DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS;
this._reconnectionScheduler = opts?.reconnectionScheduler;
}

private async _commonHeaders(): Promise<Headers> {
Expand Down Expand Up @@ -305,15 +338,28 @@
// Calculate next delay based on current attempt count
const delay = this._getNextReconnectionDelay(attemptCount);

// Schedule the reconnection
this._reconnectionTimeout = setTimeout(() => {
// Use the last event ID to resume where we left off
const reconnect = (): void => {
this._cancelReconnection = undefined;
Comment thread
felixweinberger marked this conversation as resolved.
if (this._abortController?.signal.aborted) return;
this._startOrAuthSse(options).catch(error => {
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`));
// Schedule another attempt if this one failed, incrementing the attempt counter
this._scheduleReconnection(options, attemptCount + 1);
try {
this._scheduleReconnection(options, attemptCount + 1);
} catch (scheduleError) {
this.onerror?.(scheduleError instanceof Error ? scheduleError : new Error(String(scheduleError)));
}
});
}, delay);
};

if (this._reconnectionScheduler) {
const cancel = this._reconnectionScheduler(reconnect, delay, attemptCount);
if (typeof cancel === 'function') {
this._cancelReconnection = cancel;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Nit: When the custom reconnectionScheduler returns void (no cancel function), _cancelReconnection is not reset to undefined, so it can retain a stale cancel function from a prior _scheduleReconnection call. The setTimeout path (line 360) always sets _cancelReconnection, but the scheduler path only sets it conditionally. Fix: this._cancelReconnection = typeof cancel === "function" ? cancel : undefined;

Extended reasoning...

When _scheduleReconnection is called and a custom reconnectionScheduler is provided, the return value is checked with typeof cancel === "function". If it is a function, _cancelReconnection is set. However, if the scheduler returns void, _cancelReconnection is left untouched — it retains whatever value it held from a previous call.

Compare this with the default setTimeout path, which unconditionally sets _cancelReconnection = () => clearTimeout(handle). The two branches have inconsistent behavior: one always updates the field, the other only conditionally updates it.

Step-by-step proof: Consider two SSE streams (GET + POST) that disconnect concurrently:

  1. Stream A disconnects. _scheduleReconnection is called. The custom scheduler returns cancelA. Now _cancelReconnection = cancelA.
  2. Before cancelA fires, Stream B disconnects. _scheduleReconnection is called again. This time the scheduler returns void (e.g., it delegates to a fire-and-forget platform API). Since typeof undefined \!== "function", the if branch is skipped. _cancelReconnection still holds cancelA — a stale reference.
  3. On close(), this._cancelReconnection?.() calls the stale cancelA instead of having no cancel for Stream B's reconnection. Stream B's pending reconnection cannot be cancelled via this mechanism.

The practical impact is very low. The reconnect closure already guards against a closed transport with if (this._abortController?.signal.aborted) return (line 343), so a late-firing reconnect after close() is safely no-oped. Calling a stale cancel function is typically harmless (a no-op on an already-fired callback). The scenario also requires specific concurrent stream timing that is uncommon in practice.

The fix is a one-line change to unconditionally assign _cancelReconnection:

this._cancelReconnection = typeof cancel === "function" ? cancel : undefined;

This maintains the invariant that _cancelReconnection always reflects the currently scheduled reconnection, consistent with the setTimeout path.

} else {
const handle = setTimeout(reconnect, delay);
this._cancelReconnection = () => clearTimeout(handle);
}
}

private _handleSseStream(stream: ReadableStream<Uint8Array> | null, options: StartSSEOptions, isReconnectable: boolean): void {
Expand Down Expand Up @@ -458,12 +504,13 @@
}

async close(): Promise<void> {
if (this._reconnectionTimeout) {
clearTimeout(this._reconnectionTimeout);
this._reconnectionTimeout = undefined;
try {
this._cancelReconnection?.();
} finally {
this._cancelReconnection = undefined;
this._abortController?.abort();
this.onclose?.();
}
this._abortController?.abort();
this.onclose?.();
}

async send(
Expand Down
7 changes: 6 additions & 1 deletion packages/client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@ export type { SSEClientTransportOptions } from './client/sse.js';
export { SSEClientTransport, SseError } from './client/sse.js';
export type { StdioServerParameters } from './client/stdio.js';
export { DEFAULT_INHERITED_ENV_VARS, getDefaultEnvironment, StdioClientTransport } from './client/stdio.js';
export type { StartSSEOptions, StreamableHTTPClientTransportOptions, StreamableHTTPReconnectionOptions } from './client/streamableHttp.js';
export type {
ReconnectionScheduler,
StartSSEOptions,
StreamableHTTPClientTransportOptions,
StreamableHTTPReconnectionOptions
} from './client/streamableHttp.js';
export { StreamableHTTPClientTransport } from './client/streamableHttp.js';
export { WebSocketClientTransport } from './client/websocket.js';

Expand Down
148 changes: 142 additions & 6 deletions packages/client/test/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { Mock, Mocked } from 'vitest';

import type { OAuthClientProvider } from '../../src/client/auth.js';
import { UnauthorizedError } from '../../src/client/auth.js';
import type { StartSSEOptions, StreamableHTTPReconnectionOptions } from '../../src/client/streamableHttp.js';
import type { ReconnectionScheduler, StartSSEOptions, StreamableHTTPReconnectionOptions } from '../../src/client/streamableHttp.js';
import { StreamableHTTPClientTransport } from '../../src/client/streamableHttp.js';

describe('StreamableHTTPClientTransport', () => {
Expand Down Expand Up @@ -1617,8 +1617,8 @@ describe('StreamableHTTPClientTransport', () => {
})
);

// Verify no timeout was scheduled (no reconnection attempt)
expect(transport['_reconnectionTimeout']).toBeUndefined();
// Verify no reconnection was scheduled
expect(transport['_cancelReconnection']).toBeUndefined();
});

it('should schedule reconnection when maxRetries is greater than 0', async () => {
Expand All @@ -1640,10 +1640,10 @@ describe('StreamableHTTPClientTransport', () => {

// ASSERT - should schedule a reconnection, not report error yet
expect(errorSpy).not.toHaveBeenCalled();
expect(transport['_reconnectionTimeout']).toBeDefined();
expect(transport['_cancelReconnection']).toBeDefined();

// Clean up the timeout to avoid test pollution
clearTimeout(transport['_reconnectionTimeout']);
// Clean up the pending reconnection to avoid test pollution
transport['_cancelReconnection']?.();
});
});

Expand Down Expand Up @@ -1716,4 +1716,140 @@ describe('StreamableHTTPClientTransport', () => {
});
});
});

describe('reconnectionScheduler', () => {
const reconnectionOptions: StreamableHTTPReconnectionOptions = {
initialReconnectionDelay: 1000,
maxReconnectionDelay: 5000,
reconnectionDelayGrowFactor: 2,
maxRetries: 3
};

function triggerReconnection(t: StreamableHTTPClientTransport): void {
(t as unknown as { _scheduleReconnection(opts: StartSSEOptions, attempt?: number): void })._scheduleReconnection({}, 0);
}

beforeEach(() => {
vi.useFakeTimers();
});

afterEach(() => {
vi.useRealTimers();
});

it('invokes the custom scheduler with reconnect, delay, and attemptCount', () => {
const scheduler = vi.fn<ReconnectionScheduler>();
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
reconnectionOptions,
reconnectionScheduler: scheduler
});

triggerReconnection(transport);

expect(scheduler).toHaveBeenCalledTimes(1);
expect(scheduler).toHaveBeenCalledWith(expect.any(Function), 1000, 0);
});

it('falls back to setTimeout when no scheduler is provided', () => {
const setTimeoutSpy = vi.spyOn(global, 'setTimeout');
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
reconnectionOptions
});

triggerReconnection(transport);

expect(setTimeoutSpy).toHaveBeenCalledWith(expect.any(Function), 1000);
});

it('does not use setTimeout when a custom scheduler is provided', () => {
const setTimeoutSpy = vi.spyOn(global, 'setTimeout');
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
reconnectionOptions,
reconnectionScheduler: vi.fn()
});

triggerReconnection(transport);

expect(setTimeoutSpy).not.toHaveBeenCalled();
});

it('calls the returned cancel function on close()', async () => {
const cancel = vi.fn();
const scheduler: ReconnectionScheduler = vi.fn(() => cancel);
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
reconnectionOptions,
reconnectionScheduler: scheduler
});

triggerReconnection(transport);
expect(cancel).not.toHaveBeenCalled();

await transport.close();
expect(cancel).toHaveBeenCalledTimes(1);
});

it('tolerates schedulers that return void (no cancel function)', async () => {
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
reconnectionOptions,
reconnectionScheduler: () => {
/* no return */
}
});

triggerReconnection(transport);
await expect(transport.close()).resolves.toBeUndefined();
});

it('clears the default setTimeout on close() when no scheduler is provided', async () => {
const clearTimeoutSpy = vi.spyOn(global, 'clearTimeout');
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
reconnectionOptions
});

triggerReconnection(transport);
await transport.close();

expect(clearTimeoutSpy).toHaveBeenCalledTimes(1);
});

it('ignores a late-firing reconnect after close()', async () => {
let capturedReconnect: (() => void) | undefined;
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
reconnectionOptions,
reconnectionScheduler: reconnect => {
capturedReconnect = reconnect;
}
});
const onerror = vi.fn();
transport.onerror = onerror;

await transport.start();
triggerReconnection(transport);
await transport.close();

capturedReconnect?.();
await vi.runAllTimersAsync();

expect(onerror).not.toHaveBeenCalled();
});

it('still aborts and fires onclose if the cancel function throws', async () => {
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
reconnectionOptions,
reconnectionScheduler: () => () => {
throw new Error('cancel failed');
}
});
const onclose = vi.fn();
transport.onclose = onclose;

await transport.start();
triggerReconnection(transport);
const abortController = transport['_abortController'];

await expect(transport.close()).rejects.toThrow('cancel failed');
expect(abortController?.signal.aborted).toBe(true);
expect(onclose).toHaveBeenCalledTimes(1);
});
});
});
Loading