@@ -78,6 +78,31 @@ export interface StreamableHTTPReconnectionOptions {
7878 maxRetries : number ;
7979}
8080
81+ /**
82+ * Custom scheduler for SSE stream reconnection attempts.
83+ *
84+ * Called instead of `setTimeout` when the transport needs to schedule a reconnection.
85+ * Useful in environments where `setTimeout` is unsuitable (serverless functions that
86+ * terminate before the timer fires, mobile apps that need platform background scheduling,
87+ * desktop apps handling sleep/wake).
88+ *
89+ * @param reconnect - Call this to perform the reconnection attempt.
90+ * @param delay - Suggested delay in milliseconds (from backoff calculation).
91+ * @param attemptCount - Zero-indexed retry attempt number.
92+ * @returns An optional cancel function. If returned, it will be called on
93+ * {@linkcode StreamableHTTPClientTransport.close | transport.close()} to abort the
94+ * pending reconnection.
95+ *
96+ * @example
97+ * ```ts source="./streamableHttp.examples.ts#ReconnectionScheduler_basicUsage"
98+ * const scheduler: ReconnectionScheduler = (reconnect, delay) => {
99+ * const id = platformBackgroundTask.schedule(reconnect, delay);
100+ * return () => platformBackgroundTask.cancel(id);
101+ * };
102+ * ```
103+ */
104+ export type ReconnectionScheduler = ( reconnect : ( ) => void , delay : number , attemptCount : number ) => ( ( ) => void ) | void ;
105+
81106/**
82107 * Configuration options for the {@linkcode StreamableHTTPClientTransport}.
83108 */
@@ -116,6 +141,12 @@ export type StreamableHTTPClientTransportOptions = {
116141 */
117142 reconnectionOptions ?: StreamableHTTPReconnectionOptions ;
118143
144+ /**
145+ * Custom scheduler for reconnection attempts. If not provided, `setTimeout` is used.
146+ * See {@linkcode ReconnectionScheduler}.
147+ */
148+ reconnectionScheduler ?: ReconnectionScheduler ;
149+
119150 /**
120151 * Session ID for the connection. This is used to identify the session on the server.
121152 * When not provided and connecting to a server that supports session IDs, the server will generate a new session ID.
@@ -150,7 +181,8 @@ export class StreamableHTTPClientTransport implements Transport {
150181 private _protocolVersion ?: string ;
151182 private _lastUpscopingHeader ?: string ; // Track last upscoping header to prevent infinite upscoping.
152183 private _serverRetryMs ?: number ; // Server-provided retry delay from SSE retry field
153- private _reconnectionTimeout ?: ReturnType < typeof setTimeout > ;
184+ private readonly _reconnectionScheduler ?: ReconnectionScheduler ;
185+ private _cancelReconnection ?: ( ) => void ;
154186
155187 onclose ?: ( ) => void ;
156188 onerror ?: ( error : Error ) => void ;
@@ -172,6 +204,7 @@ export class StreamableHTTPClientTransport implements Transport {
172204 this . _sessionId = opts ?. sessionId ;
173205 this . _protocolVersion = opts ?. protocolVersion ;
174206 this . _reconnectionOptions = opts ?. reconnectionOptions ?? DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS ;
207+ this . _reconnectionScheduler = opts ?. reconnectionScheduler ;
175208 }
176209
177210 private async _commonHeaders ( ) : Promise < Headers > {
@@ -305,15 +338,26 @@ export class StreamableHTTPClientTransport implements Transport {
305338 // Calculate next delay based on current attempt count
306339 const delay = this . _getNextReconnectionDelay ( attemptCount ) ;
307340
308- // Schedule the reconnection
309- this . _reconnectionTimeout = setTimeout ( ( ) => {
310- // Use the last event ID to resume where we left off
341+ const reconnect = ( ) : void => {
342+ this . _cancelReconnection = undefined ;
343+ if ( this . _abortController ?. signal . aborted ) return ;
311344 this . _startOrAuthSse ( options ) . catch ( error => {
312345 this . onerror ?.( new Error ( `Failed to reconnect SSE stream: ${ error instanceof Error ? error . message : String ( error ) } ` ) ) ;
313- // Schedule another attempt if this one failed, incrementing the attempt counter
314- this . _scheduleReconnection ( options , attemptCount + 1 ) ;
346+ try {
347+ this . _scheduleReconnection ( options , attemptCount + 1 ) ;
348+ } catch ( scheduleError ) {
349+ this . onerror ?.( scheduleError instanceof Error ? scheduleError : new Error ( String ( scheduleError ) ) ) ;
350+ }
315351 } ) ;
316- } , delay ) ;
352+ } ;
353+
354+ if ( this . _reconnectionScheduler ) {
355+ const cancel = this . _reconnectionScheduler ( reconnect , delay , attemptCount ) ;
356+ this . _cancelReconnection = typeof cancel === 'function' ? cancel : undefined ;
357+ } else {
358+ const handle = setTimeout ( reconnect , delay ) ;
359+ this . _cancelReconnection = ( ) => clearTimeout ( handle ) ;
360+ }
317361 }
318362
319363 private _handleSseStream ( stream : ReadableStream < Uint8Array > | null , options : StartSSEOptions , isReconnectable : boolean ) : void {
@@ -458,12 +502,13 @@ export class StreamableHTTPClientTransport implements Transport {
458502 }
459503
460504 async close ( ) : Promise < void > {
461- if ( this . _reconnectionTimeout ) {
462- clearTimeout ( this . _reconnectionTimeout ) ;
463- this . _reconnectionTimeout = undefined ;
505+ try {
506+ this . _cancelReconnection ?.( ) ;
507+ } finally {
508+ this . _cancelReconnection = undefined ;
509+ this . _abortController ?. abort ( ) ;
510+ this . onclose ?.( ) ;
464511 }
465- this . _abortController ?. abort ( ) ;
466- this . onclose ?.( ) ;
467512 }
468513
469514 async send (
0 commit comments