Skip to content

Commit 89fb094

Browse files
fix(core): consolidate per-request cleanup in _requestWithSchema (#1790)
1 parent 8822c96 commit 89fb094

File tree

3 files changed

+84
-9
lines changed

3 files changed

+84
-9
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@modelcontextprotocol/core': patch
3+
---
4+
5+
Consolidate per-request cleanup in `_requestWithSchema` into a single `.finally()` block. This fixes an abort signal listener leak (listeners accumulated when a caller reused one `AbortSignal` across requests) and two cases where `_responseHandlers` entries leaked on send-failure paths.

packages/core/src/shared/protocol.ts

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -800,6 +800,9 @@ export abstract class Protocol<ContextT extends BaseContext> {
800800
): Promise<SchemaOutput<T>> {
801801
const { relatedRequestId, resumptionToken, onresumptiontoken } = options ?? {};
802802

803+
let onAbort: (() => void) | undefined;
804+
let cleanupMessageId: number | undefined;
805+
803806
// Send the request
804807
return new Promise<SchemaOutput<T>>((resolve, reject) => {
805808
const earlyReject = (error: unknown) => {
@@ -823,6 +826,7 @@ export abstract class Protocol<ContextT extends BaseContext> {
823826
options?.signal?.throwIfAborted();
824827

825828
const messageId = this._requestMessageId++;
829+
cleanupMessageId = messageId;
826830
const jsonrpcRequest: JSONRPCRequest = {
827831
...request,
828832
jsonrpc: '2.0',
@@ -841,9 +845,7 @@ export abstract class Protocol<ContextT extends BaseContext> {
841845
}
842846

843847
const cancel = (reason: unknown) => {
844-
this._responseHandlers.delete(messageId);
845848
this._progressHandlers.delete(messageId);
846-
this._cleanupTimeout(messageId);
847849

848850
this._transport
849851
?.send(
@@ -885,9 +887,8 @@ export abstract class Protocol<ContextT extends BaseContext> {
885887
}
886888
});
887889

888-
options?.signal?.addEventListener('abort', () => {
889-
cancel(options?.signal?.reason);
890-
});
890+
onAbort = () => cancel(options?.signal?.reason);
891+
options?.signal?.addEventListener('abort', onAbort, { once: true });
891892

892893
const timeout = options?.timeout ?? DEFAULT_REQUEST_TIMEOUT_MSEC;
893894
const timeoutHandler = () => cancel(new SdkError(SdkErrorCode.RequestTimeout, 'Request timed out', { timeout }));
@@ -907,27 +908,38 @@ export abstract class Protocol<ContextT extends BaseContext> {
907908
let outboundQueued = false;
908909
try {
909910
const taskResult = this._taskManager.processOutboundRequest(jsonrpcRequest, options, messageId, responseHandler, error => {
910-
this._cleanupTimeout(messageId);
911+
this._progressHandlers.delete(messageId);
911912
reject(error);
912913
});
913914
if (taskResult.queued) {
914915
outboundQueued = true;
915916
}
916917
} catch (error) {
917-
this._responseHandlers.delete(messageId);
918918
this._progressHandlers.delete(messageId);
919-
this._cleanupTimeout(messageId);
920919
reject(error);
921920
return;
922921
}
923922

924923
if (!outboundQueued) {
925924
// No related task or no module - send through transport normally
926925
this._transport.send(jsonrpcRequest, { relatedRequestId, resumptionToken, onresumptiontoken }).catch(error => {
927-
this._cleanupTimeout(messageId);
926+
this._progressHandlers.delete(messageId);
928927
reject(error);
929928
});
930929
}
930+
}).finally(() => {
931+
// Per-request cleanup that must run on every exit path. Consolidated
932+
// here so new exit paths added to the promise body can't forget it.
933+
// _progressHandlers is NOT cleaned up here: _onresponse deletes it
934+
// conditionally (preserveProgress for task flows), and error paths
935+
// above delete it inline since no task exists in those cases.
936+
if (onAbort) {
937+
options?.signal?.removeEventListener('abort', onAbort);
938+
}
939+
if (cleanupMessageId !== undefined) {
940+
this._responseHandlers.delete(cleanupMessageId);
941+
this._cleanupTimeout(cleanupMessageId);
942+
}
931943
});
932944
}
933945

packages/core/test/shared/protocol.test.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,64 @@ describe('protocol tests', () => {
247247
expect((abortReason as SdkError).code).toBe(SdkErrorCode.ConnectionClosed);
248248
});
249249

250+
test('should remove abort listener from caller signal when request settles', async () => {
251+
await protocol.connect(transport);
252+
253+
const controller = new AbortController();
254+
const addSpy = vi.spyOn(controller.signal, 'addEventListener');
255+
const removeSpy = vi.spyOn(controller.signal, 'removeEventListener');
256+
257+
const mockSchema = z.object({ result: z.string() });
258+
const reqPromise = testRequest(protocol, { method: 'example', params: {} }, mockSchema, {
259+
signal: controller.signal
260+
});
261+
262+
expect(addSpy).toHaveBeenCalledTimes(1);
263+
const listener = addSpy.mock.calls[0]![1];
264+
265+
transport.onmessage?.({ jsonrpc: '2.0', id: 0, result: { result: 'ok' } });
266+
await reqPromise;
267+
268+
expect(removeSpy).toHaveBeenCalledWith('abort', listener);
269+
});
270+
271+
test('should not accumulate abort listeners when reusing a signal across requests', async () => {
272+
await protocol.connect(transport);
273+
274+
const controller = new AbortController();
275+
const addSpy = vi.spyOn(controller.signal, 'addEventListener');
276+
const removeSpy = vi.spyOn(controller.signal, 'removeEventListener');
277+
278+
const mockSchema = z.object({ result: z.string() });
279+
for (let i = 0; i < 5; i++) {
280+
const reqPromise = testRequest(protocol, { method: 'example', params: {} }, mockSchema, {
281+
signal: controller.signal
282+
});
283+
transport.onmessage?.({ jsonrpc: '2.0', id: i, result: { result: 'ok' } });
284+
await reqPromise;
285+
}
286+
287+
expect(addSpy).toHaveBeenCalledTimes(5);
288+
expect(removeSpy).toHaveBeenCalledTimes(5);
289+
});
290+
291+
test('should remove abort listener when request rejects', async () => {
292+
await protocol.connect(transport);
293+
294+
const controller = new AbortController();
295+
const removeSpy = vi.spyOn(controller.signal, 'removeEventListener');
296+
297+
const mockSchema = z.object({ result: z.string() });
298+
await expect(
299+
testRequest(protocol, { method: 'example', params: {} }, mockSchema, {
300+
signal: controller.signal,
301+
timeout: 0
302+
})
303+
).rejects.toThrow();
304+
305+
expect(removeSpy).toHaveBeenCalledWith('abort', expect.any(Function));
306+
});
307+
250308
test('should not overwrite existing hooks when connecting transports', async () => {
251309
const oncloseMock = vi.fn();
252310
const onerrorMock = vi.fn();

0 commit comments

Comments
 (0)