Skip to content
Merged
5 changes: 5 additions & 0 deletions .changeset/fix-abort-listener-leak.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@modelcontextprotocol/core': patch
---

Fix abort signal listener leak in outbound requests. When a caller reuses a single `AbortSignal` across multiple requests (common for session-scoped cancellation), the SDK previously attached a new listener per request without ever removing it. The listener is now detached when the request settles.
13 changes: 10 additions & 3 deletions packages/core/src/shared/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,8 @@ export abstract class Protocol<ContextT extends BaseContext> {
): Promise<SchemaOutput<T>> {
const { relatedRequestId, resumptionToken, onresumptiontoken } = options ?? {};

let onAbort: (() => void) | undefined;

// Send the request
return new Promise<SchemaOutput<T>>((resolve, reject) => {
const earlyReject = (error: unknown) => {
Expand Down Expand Up @@ -885,9 +887,8 @@ export abstract class Protocol<ContextT extends BaseContext> {
}
});

options?.signal?.addEventListener('abort', () => {
cancel(options?.signal?.reason);
});
onAbort = () => cancel(options?.signal?.reason);
options?.signal?.addEventListener('abort', onAbort, { once: true });

const timeout = options?.timeout ?? DEFAULT_REQUEST_TIMEOUT_MSEC;
const timeoutHandler = () => cancel(new SdkError(SdkErrorCode.RequestTimeout, 'Request timed out', { timeout }));
Expand Down Expand Up @@ -928,6 +929,12 @@ export abstract class Protocol<ContextT extends BaseContext> {
reject(error);
});
}
}).finally(() => {
// Detach the abort listener once the request settles so it doesn't
// accumulate on a caller-supplied signal reused across requests.
if (onAbort) {
options?.signal?.removeEventListener('abort', onAbort);
}
});
}

Expand Down
41 changes: 41 additions & 0 deletions packages/core/test/shared/protocol.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,47 @@ describe('protocol tests', () => {
expect((abortReason as SdkError).code).toBe(SdkErrorCode.ConnectionClosed);
});

test('should remove abort listener from caller signal when request settles', async () => {
await protocol.connect(transport);

const controller = new AbortController();
const addSpy = vi.spyOn(controller.signal, 'addEventListener');
const removeSpy = vi.spyOn(controller.signal, 'removeEventListener');

const mockSchema = z.object({ result: z.string() });
const reqPromise = testRequest(protocol, { method: 'example', params: {} }, mockSchema, {
signal: controller.signal
});

expect(addSpy).toHaveBeenCalledTimes(1);
const listener = addSpy.mock.calls[0]![1];

transport.onmessage?.({ jsonrpc: '2.0', id: 0, result: { result: 'ok' } });
await reqPromise;

expect(removeSpy).toHaveBeenCalledWith('abort', listener);
});

test('should not accumulate abort listeners when reusing a signal across requests', async () => {
await protocol.connect(transport);

const controller = new AbortController();
const addSpy = vi.spyOn(controller.signal, 'addEventListener');
const removeSpy = vi.spyOn(controller.signal, 'removeEventListener');

const mockSchema = z.object({ result: z.string() });
for (let i = 0; i < 5; i++) {
const reqPromise = testRequest(protocol, { method: 'example', params: {} }, mockSchema, {
signal: controller.signal
});
transport.onmessage?.({ jsonrpc: '2.0', id: i, result: { result: 'ok' } });
await reqPromise;
}

expect(addSpy).toHaveBeenCalledTimes(5);
expect(removeSpy).toHaveBeenCalledTimes(5);
});

test('should not overwrite existing hooks when connecting transports', async () => {
const oncloseMock = vi.fn();
const onerrorMock = vi.fn();
Expand Down
Loading