-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat(client): add reconnectionScheduler to StreamableHTTPClientTransport #1763
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
2d1622d
3dcd989
d18c9b3
b12a469
3ac7bb4
6e24e54
85d4cc0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| '@modelcontextprotocol/client': minor | ||
| --- | ||
|
|
||
| Add `reconnectionScheduler` option to `StreamableHTTPClientTransport`. Lets non-persistent environments (serverless, mobile, desktop sleep/wake) override the default `setTimeout`-based SSE reconnection scheduling. The scheduler may return a cancel function that is invoked on `transport.close()`. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -78,6 +78,31 @@ | |
| maxRetries: number; | ||
| } | ||
|
|
||
| /** | ||
| * Custom scheduler for SSE stream reconnection attempts. | ||
| * | ||
| * Called instead of `setTimeout` when the transport needs to schedule a reconnection. | ||
| * Useful in environments where `setTimeout` is unsuitable (serverless functions that | ||
| * terminate before the timer fires, mobile apps that need platform background scheduling, | ||
| * desktop apps handling sleep/wake). | ||
| * | ||
| * @param reconnect - Call this to perform the reconnection attempt. | ||
| * @param delay - Suggested delay in milliseconds (from backoff calculation). | ||
| * @param attemptCount - Zero-indexed retry attempt number. | ||
| * @returns An optional cancel function. If returned, it will be called on | ||
| * {@linkcode StreamableHTTPClientTransport.close | transport.close()} to abort the | ||
| * pending reconnection. | ||
| * | ||
| * @example | ||
| * ```ts | ||
| * const scheduler: ReconnectionScheduler = (reconnect, delay) => { | ||
| * const id = platformBackgroundTask.schedule(reconnect, delay); | ||
| * return () => platformBackgroundTask.cancel(id); | ||
| * }; | ||
| * ``` | ||
|
Check warning on line 102 in packages/client/src/client/streamableHttp.ts
|
||
| */ | ||
| export type ReconnectionScheduler = (reconnect: () => void, delay: number, attemptCount: number) => (() => void) | void; | ||
|
|
||
| /** | ||
| * Configuration options for the {@linkcode StreamableHTTPClientTransport}. | ||
| */ | ||
|
|
@@ -116,6 +141,12 @@ | |
| */ | ||
| reconnectionOptions?: StreamableHTTPReconnectionOptions; | ||
|
|
||
| /** | ||
| * Custom scheduler for reconnection attempts. If not provided, `setTimeout` is used. | ||
| * See {@linkcode ReconnectionScheduler}. | ||
| */ | ||
| reconnectionScheduler?: ReconnectionScheduler; | ||
|
|
||
| /** | ||
| * Session ID for the connection. This is used to identify the session on the server. | ||
| * When not provided and connecting to a server that supports session IDs, the server will generate a new session ID. | ||
|
|
@@ -150,7 +181,8 @@ | |
| private _protocolVersion?: string; | ||
| private _lastUpscopingHeader?: string; // Track last upscoping header to prevent infinite upscoping. | ||
| private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field | ||
| private _reconnectionTimeout?: ReturnType<typeof setTimeout>; | ||
| private readonly _reconnectionScheduler?: ReconnectionScheduler; | ||
| private _cancelReconnection?: () => void; | ||
|
|
||
| onclose?: () => void; | ||
| onerror?: (error: Error) => void; | ||
|
|
@@ -172,6 +204,7 @@ | |
| this._sessionId = opts?.sessionId; | ||
| this._protocolVersion = opts?.protocolVersion; | ||
| this._reconnectionOptions = opts?.reconnectionOptions ?? DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS; | ||
| this._reconnectionScheduler = opts?.reconnectionScheduler; | ||
| } | ||
|
|
||
| private async _commonHeaders(): Promise<Headers> { | ||
|
|
@@ -305,15 +338,28 @@ | |
| // Calculate next delay based on current attempt count | ||
| const delay = this._getNextReconnectionDelay(attemptCount); | ||
|
|
||
| // Schedule the reconnection | ||
| this._reconnectionTimeout = setTimeout(() => { | ||
| // Use the last event ID to resume where we left off | ||
| const reconnect = (): void => { | ||
| this._cancelReconnection = undefined; | ||
|
felixweinberger marked this conversation as resolved.
|
||
| if (this._abortController?.signal.aborted) return; | ||
| this._startOrAuthSse(options).catch(error => { | ||
| this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`)); | ||
| // Schedule another attempt if this one failed, incrementing the attempt counter | ||
| this._scheduleReconnection(options, attemptCount + 1); | ||
| try { | ||
| this._scheduleReconnection(options, attemptCount + 1); | ||
| } catch (scheduleError) { | ||
| this.onerror?.(scheduleError instanceof Error ? scheduleError : new Error(String(scheduleError))); | ||
| } | ||
| }); | ||
| }, delay); | ||
| }; | ||
|
|
||
| if (this._reconnectionScheduler) { | ||
| const cancel = this._reconnectionScheduler(reconnect, delay, attemptCount); | ||
| if (typeof cancel === 'function') { | ||
| this._cancelReconnection = cancel; | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Nit: When the custom Extended reasoning...When Compare this with the default Step-by-step proof: Consider two SSE streams (GET + POST) that disconnect concurrently:
The practical impact is very low. The The fix is a one-line change to unconditionally assign this._cancelReconnection = typeof cancel === "function" ? cancel : undefined;This maintains the invariant that |
||
| } else { | ||
| const handle = setTimeout(reconnect, delay); | ||
| this._cancelReconnection = () => clearTimeout(handle); | ||
| } | ||
| } | ||
|
|
||
| private _handleSseStream(stream: ReadableStream<Uint8Array> | null, options: StartSSEOptions, isReconnectable: boolean): void { | ||
|
|
@@ -458,12 +504,13 @@ | |
| } | ||
|
|
||
| async close(): Promise<void> { | ||
| if (this._reconnectionTimeout) { | ||
| clearTimeout(this._reconnectionTimeout); | ||
| this._reconnectionTimeout = undefined; | ||
| try { | ||
| this._cancelReconnection?.(); | ||
| } finally { | ||
| this._cancelReconnection = undefined; | ||
| this._abortController?.abort(); | ||
| this.onclose?.(); | ||
| } | ||
| this._abortController?.abort(); | ||
| this.onclose?.(); | ||
| } | ||
|
|
||
| async send( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.