diff --git a/packages/client/src/client/sse.ts b/packages/client/src/client/sse.ts index 133aa0004..d34450d24 100644 --- a/packages/client/src/client/sse.ts +++ b/packages/client/src/client/sse.ts @@ -75,6 +75,7 @@ export class SSEClientTransport implements Transport { private _fetch?: FetchLike; private _fetchWithInit: FetchLike; private _protocolVersion?: string; + private _pendingRequests = new Set>(); onclose?: () => void; onerror?: (error: Error) => void; @@ -238,6 +239,15 @@ export class SSEClientTransport implements Transport { } async close(): Promise { + // Wait for any in-flight requests to complete before aborting, so that + // successful responses are not marked as aborted by Undici/OpenTelemetry. + if (this._pendingRequests.size > 0) { + const GRACEFUL_CLOSE_TIMEOUT = 2000; + await Promise.race([ + Promise.allSettled(this._pendingRequests), + new Promise(resolve => setTimeout(resolve, GRACEFUL_CLOSE_TIMEOUT)) + ]); + } this._abortController?.abort(); this._eventSource?.close(); this.onclose?.(); @@ -248,6 +258,18 @@ export class SSEClientTransport implements Transport { throw new SdkError(SdkErrorCode.NotConnected, 'Not connected'); } + const requestPromise = this._doSend(message); + // Track the promise for graceful close, but suppress unhandled rejections + // on the tracked copy since the caller handles the actual rejection. + const tracked = requestPromise.catch(() => {}); + this._pendingRequests.add(tracked); + tracked.finally(() => { + this._pendingRequests.delete(tracked); + }); + return requestPromise; + } + + private async _doSend(message: JSONRPCMessage): Promise { try { const headers = await this._commonHeaders(); headers.set('content-type', 'application/json'); @@ -259,7 +281,7 @@ export class SSEClientTransport implements Transport { signal: this._abortController?.signal }; - const response = await (this._fetch ?? fetch)(this._endpoint, init); + const response = await (this._fetch ?? fetch)(this._endpoint!, init); if (!response.ok) { const text = await response.text?.().catch(() => null); diff --git a/packages/client/src/client/streamableHttp.ts b/packages/client/src/client/streamableHttp.ts index dab9b37ab..d9753e198 100644 --- a/packages/client/src/client/streamableHttp.ts +++ b/packages/client/src/client/streamableHttp.ts @@ -141,6 +141,7 @@ export class StreamableHTTPClientTransport implements Transport { private _lastUpscopingHeader?: string; // Track last upscoping header to prevent infinite upscoping. private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field private _reconnectionTimeout?: ReturnType; + private _pendingRequests = new Set>(); onclose?: () => void; onerror?: (error: Error) => void; @@ -452,6 +453,15 @@ export class StreamableHTTPClientTransport implements Transport { clearTimeout(this._reconnectionTimeout); this._reconnectionTimeout = undefined; } + // Wait for any in-flight requests to complete before aborting, so that + // successful responses are not marked as aborted by Undici/OpenTelemetry. + if (this._pendingRequests.size > 0) { + const GRACEFUL_CLOSE_TIMEOUT = 2000; + await Promise.race([ + Promise.allSettled(this._pendingRequests), + new Promise(resolve => setTimeout(resolve, GRACEFUL_CLOSE_TIMEOUT)) + ]); + } this._abortController?.abort(); this.onclose?.(); } @@ -459,6 +469,21 @@ export class StreamableHTTPClientTransport implements Transport { async send( message: JSONRPCMessage | JSONRPCMessage[], options?: { resumptionToken?: string; onresumptiontoken?: (token: string) => void } + ): Promise { + const requestPromise = this._doSend(message, options); + // Track the promise for graceful close, but suppress unhandled rejections + // on the tracked copy since the caller handles the actual rejection. + const tracked = requestPromise.catch(() => {}); + this._pendingRequests.add(tracked); + tracked.finally(() => { + this._pendingRequests.delete(tracked); + }); + return requestPromise; + } + + private async _doSend( + message: JSONRPCMessage | JSONRPCMessage[], + options?: { resumptionToken?: string; onresumptiontoken?: (token: string) => void } ): Promise { try { const { resumptionToken, onresumptiontoken } = options || {};