Skip to content

Commit d322291

Browse files
author
Kripa Dev
committed
client: avoid duplicate abort onerror during close
1 parent e86b183 commit d322291

2 files changed

Lines changed: 70 additions & 10 deletions

File tree

packages/client/src/client/streamableHttp.ts

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,32 @@ export class StreamableHTTPClientTransport implements Transport {
151151
private _lastUpscopingHeader?: string; // Track last upscoping header to prevent infinite upscoping.
152152
private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field
153153
private _reconnectionTimeout?: ReturnType<typeof setTimeout>;
154+
private _isClosing = false;
154155

155156
onclose?: () => void;
156157
onerror?: (error: Error) => void;
157158
onmessage?: (message: JSONRPCMessage) => void;
158159

160+
private _isAbortError(error: unknown): boolean {
161+
if (!(error instanceof Error)) {
162+
return false;
163+
}
164+
165+
if (error.name === 'AbortError') {
166+
return true;
167+
}
168+
169+
return error.message.includes('aborted');
170+
}
171+
172+
private _emitError(error: Error): void {
173+
if (this._isClosing && this._isAbortError(error)) {
174+
return;
175+
}
176+
177+
this.onerror?.(error);
178+
}
179+
159180
constructor(url: URL, opts?: StreamableHTTPClientTransportOptions) {
160181
this._url = url;
161182
this._resourceMetadataUrl = undefined;
@@ -260,7 +281,7 @@ export class StreamableHTTPClientTransport implements Transport {
260281

261282
this._handleSseStream(response.body, options, true);
262283
} catch (error) {
263-
this.onerror?.(error as Error);
284+
this._emitError(error as Error);
264285
throw error;
265286
}
266287
}
@@ -298,7 +319,7 @@ export class StreamableHTTPClientTransport implements Transport {
298319

299320
// Check if we've exceeded maximum retry attempts
300321
if (attemptCount >= maxRetries) {
301-
this.onerror?.(new Error(`Maximum reconnection attempts (${maxRetries}) exceeded.`));
322+
this._emitError(new Error(`Maximum reconnection attempts (${maxRetries}) exceeded.`));
302323
return;
303324
}
304325

@@ -309,7 +330,7 @@ export class StreamableHTTPClientTransport implements Transport {
309330
this._reconnectionTimeout = setTimeout(() => {
310331
// Use the last event ID to resume where we left off
311332
this._startOrAuthSse(options).catch(error => {
312-
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`));
333+
this._emitError(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`));
313334
// Schedule another attempt if this one failed, incrementing the attempt counter
314335
this._scheduleReconnection(options, attemptCount + 1);
315336
});
@@ -377,7 +398,7 @@ export class StreamableHTTPClientTransport implements Transport {
377398
}
378399
this.onmessage?.(message);
379400
} catch (error) {
380-
this.onerror?.(error as Error);
401+
this._emitError(error as Error);
381402
}
382403
}
383404
}
@@ -400,7 +421,7 @@ export class StreamableHTTPClientTransport implements Transport {
400421
}
401422
} catch (error) {
402423
// Handle stream errors - likely a network disconnect
403-
this.onerror?.(new Error(`SSE stream disconnected: ${error}`));
424+
this._emitError(new Error(`SSE stream disconnected: ${error}`));
404425

405426
// Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing
406427
// Reconnect if: already reconnectable (GET stream) OR received a priming event (POST stream with event ID)
@@ -419,7 +440,7 @@ export class StreamableHTTPClientTransport implements Transport {
419440
0
420441
);
421442
} catch (error) {
422-
this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`));
443+
this._emitError(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`));
423444
}
424445
}
425446
}
@@ -434,6 +455,7 @@ export class StreamableHTTPClientTransport implements Transport {
434455
);
435456
}
436457

458+
this._isClosing = false;
437459
this._abortController = new AbortController();
438460
}
439461

@@ -462,6 +484,7 @@ export class StreamableHTTPClientTransport implements Transport {
462484
clearTimeout(this._reconnectionTimeout);
463485
this._reconnectionTimeout = undefined;
464486
}
487+
this._isClosing = true;
465488
this._abortController?.abort();
466489
this.onclose?.();
467490
}
@@ -484,7 +507,7 @@ export class StreamableHTTPClientTransport implements Transport {
484507
if (resumptionToken) {
485508
// If we have a last event ID, we need to reconnect the SSE stream
486509
this._startOrAuthSse({ resumptionToken, replayMessageId: isJSONRPCRequest(message) ? message.id : undefined }).catch(
487-
error => this.onerror?.(error)
510+
error => this._emitError(error)
488511
);
489512
return;
490513
}
@@ -593,7 +616,7 @@ export class StreamableHTTPClientTransport implements Transport {
593616
// if it's supported by the server
594617
if (isInitializedNotification(message)) {
595618
// Start without a lastEventId since this is a fresh connection
596-
this._startOrAuthSse({ resumptionToken: undefined }).catch(error => this.onerror?.(error));
619+
this._startOrAuthSse({ resumptionToken: undefined }).catch(error => this._emitError(error));
597620
}
598621
return;
599622
}
@@ -633,7 +656,7 @@ export class StreamableHTTPClientTransport implements Transport {
633656
await response.text?.().catch(() => {});
634657
}
635658
} catch (error) {
636-
this.onerror?.(error as Error);
659+
this._emitError(error as Error);
637660
throw error;
638661
}
639662
}
@@ -682,7 +705,7 @@ export class StreamableHTTPClientTransport implements Transport {
682705

683706
this._sessionId = undefined;
684707
} catch (error) {
685-
this.onerror?.(error as Error);
708+
this._emitError(error as Error);
686709
throw error;
687710
}
688711
}

packages/client/test/client/streamableHttp.test.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1715,5 +1715,42 @@ describe('StreamableHTTPClientTransport', () => {
17151715
refresh_token: 'refresh-token' // Refresh token is preserved
17161716
});
17171717
});
1718+
1719+
it('should suppress abort-driven onerror callbacks during close()', async () => {
1720+
const abortError = new Error('The operation was aborted.');
1721+
abortError.name = 'AbortError';
1722+
1723+
const fetchMock = globalThis.fetch as Mock;
1724+
fetchMock.mockRejectedValueOnce(abortError);
1725+
1726+
const onerror = vi.fn();
1727+
transport.onerror = onerror;
1728+
1729+
await transport.start();
1730+
const sendPromise = transport.send({ jsonrpc: '2.0', method: 'test', params: {}, id: '1' } as JSONRPCMessage);
1731+
1732+
await transport.close();
1733+
await expect(sendPromise).rejects.toThrow('The operation was aborted.');
1734+
1735+
expect(onerror).not.toHaveBeenCalled();
1736+
});
1737+
1738+
it('should still report abort errors when not closing', async () => {
1739+
const abortError = new Error('The operation was aborted.');
1740+
abortError.name = 'AbortError';
1741+
1742+
const fetchMock = globalThis.fetch as Mock;
1743+
fetchMock.mockRejectedValueOnce(abortError);
1744+
1745+
const onerror = vi.fn();
1746+
transport.onerror = onerror;
1747+
1748+
await transport.start();
1749+
await expect(transport.send({ jsonrpc: '2.0', method: 'test', params: {}, id: '1' } as JSONRPCMessage)).rejects.toThrow(
1750+
'The operation was aborted.'
1751+
);
1752+
1753+
expect(onerror).toHaveBeenCalledTimes(1);
1754+
});
17181755
});
17191756
});

0 commit comments

Comments
 (0)