Skip to content

Commit a2e5037

Browse files
fix: abort in-flight request handlers on connection close (#1735)
Co-authored-by: Aljosa Asanovic <aljosa.a@gmail.com>
1 parent 5516c1b commit a2e5037

File tree

6 files changed

+118
-12
lines changed

6 files changed

+118
-12
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@modelcontextprotocol/core': patch
3+
---
4+
5+
Abort in-flight request handlers when the connection closes. Previously, request handlers would continue running after the transport disconnected, wasting resources and preventing proper cleanup. Also fixes `InMemoryTransport.close()` firing `onclose` twice on the initiating side.

packages/core/src/shared/protocol.ts

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -457,8 +457,11 @@ export abstract class Protocol<ContextT extends BaseContext> {
457457
this._transport = transport;
458458
const _onclose = this.transport?.onclose;
459459
this._transport.onclose = () => {
460-
_onclose?.();
461-
this._onclose();
460+
try {
461+
_onclose?.();
462+
} finally {
463+
this._onclose();
464+
}
462465
};
463466

464467
const _onerror = this.transport?.onerror;
@@ -494,13 +497,28 @@ export abstract class Protocol<ContextT extends BaseContext> {
494497
this._taskManager.onClose();
495498
this._pendingDebouncedNotifications.clear();
496499

500+
for (const info of this._timeoutInfo.values()) {
501+
clearTimeout(info.timeoutId);
502+
}
503+
this._timeoutInfo.clear();
504+
505+
const requestHandlerAbortControllers = this._requestHandlerAbortControllers;
506+
this._requestHandlerAbortControllers = new Map();
507+
497508
const error = new SdkError(SdkErrorCode.ConnectionClosed, 'Connection closed');
498509

499510
this._transport = undefined;
500-
this.onclose?.();
501511

502-
for (const handler of responseHandlers.values()) {
503-
handler(error);
512+
try {
513+
this.onclose?.();
514+
} finally {
515+
for (const handler of responseHandlers.values()) {
516+
handler(error);
517+
}
518+
519+
for (const controller of requestHandlerAbortControllers.values()) {
520+
controller.abort(error);
521+
}
504522
}
505523
}
506524

@@ -642,7 +660,9 @@ export abstract class Protocol<ContextT extends BaseContext> {
642660
)
643661
.catch(error => this._onerror(new Error(`Failed to send response: ${error}`)))
644662
.finally(() => {
645-
this._requestHandlerAbortControllers.delete(request.id);
663+
if (this._requestHandlerAbortControllers.get(request.id) === abortController) {
664+
this._requestHandlerAbortControllers.delete(request.id);
665+
}
646666
});
647667
}
648668

packages/core/src/shared/taskManager.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -801,6 +801,7 @@ export class TaskManager {
801801

802802
onClose(): void {
803803
this._taskProgressTokens.clear();
804+
this._requestResolvers.clear();
804805
}
805806

806807
// -- Private helpers --
@@ -893,8 +894,4 @@ export class NullTaskManager extends TaskManager {
893894
): Promise<{ queued: boolean; jsonrpcNotification?: JSONRPCNotification }> {
894895
return { queued: false, jsonrpcNotification: { ...notification, jsonrpc: '2.0' } };
895896
}
896-
897-
override onClose(): void {
898-
// No-op
899-
}
900897
}

packages/core/src/util/inMemory.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ interface QueuedMessage {
1313
export class InMemoryTransport implements Transport {
1414
private _otherTransport?: InMemoryTransport;
1515
private _messageQueue: QueuedMessage[] = [];
16+
private _closed = false;
1617

1718
onclose?: () => void;
1819
onerror?: (error: Error) => void;
@@ -39,10 +40,16 @@ export class InMemoryTransport implements Transport {
3940
}
4041

4142
async close(): Promise<void> {
43+
if (this._closed) return;
44+
this._closed = true;
45+
4246
const other = this._otherTransport;
4347
this._otherTransport = undefined;
44-
await other?.close();
45-
this.onclose?.();
48+
try {
49+
await other?.close();
50+
} finally {
51+
this.onclose?.();
52+
}
4653
}
4754

4855
/**

packages/core/test/inMemory.test.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,53 @@ describe('InMemoryTransport', () => {
9999
await expect(clientTransport.send({ jsonrpc: '2.0', method: 'test', id: 1 })).rejects.toThrow('Not connected');
100100
});
101101

102+
test('should fire onclose exactly once per transport', async () => {
103+
let clientCloseCount = 0;
104+
let serverCloseCount = 0;
105+
106+
clientTransport.onclose = () => clientCloseCount++;
107+
serverTransport.onclose = () => serverCloseCount++;
108+
109+
await clientTransport.close();
110+
111+
expect(clientCloseCount).toBe(1);
112+
expect(serverCloseCount).toBe(1);
113+
});
114+
115+
test('should handle double close idempotently', async () => {
116+
let clientCloseCount = 0;
117+
clientTransport.onclose = () => clientCloseCount++;
118+
119+
await clientTransport.close();
120+
await clientTransport.close();
121+
122+
expect(clientCloseCount).toBe(1);
123+
});
124+
125+
test('should handle concurrent close from both sides', async () => {
126+
let clientCloseCount = 0;
127+
let serverCloseCount = 0;
128+
129+
clientTransport.onclose = () => clientCloseCount++;
130+
serverTransport.onclose = () => serverCloseCount++;
131+
132+
await Promise.all([clientTransport.close(), serverTransport.close()]);
133+
134+
expect(clientCloseCount).toBe(1);
135+
expect(serverCloseCount).toBe(1);
136+
});
137+
138+
test('should fire onclose even if peer onclose throws', async () => {
139+
let clientCloseCount = 0;
140+
clientTransport.onclose = () => clientCloseCount++;
141+
serverTransport.onclose = () => {
142+
throw new Error('boom');
143+
};
144+
145+
await expect(clientTransport.close()).rejects.toThrow('boom');
146+
expect(clientCloseCount).toBe(1);
147+
});
148+
102149
test('should queue messages sent before start', async () => {
103150
const message: JSONRPCMessage = {
104151
jsonrpc: '2.0',

packages/core/test/shared/protocol.test.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,36 @@ describe('protocol tests', () => {
217217
expect(oncloseMock).toHaveBeenCalled();
218218
});
219219

220+
test('should abort in-flight request handlers when the connection is closed', async () => {
221+
await protocol.connect(transport);
222+
223+
let abortReason: unknown;
224+
let handlerStarted = false;
225+
const handlerDone = new Promise<void>(resolve => {
226+
protocol.setRequestHandler('ping', async (_request, ctx) => {
227+
handlerStarted = true;
228+
await new Promise<void>(resolveInner => {
229+
ctx.mcpReq.signal.addEventListener('abort', () => {
230+
abortReason = ctx.mcpReq.signal.reason;
231+
resolveInner();
232+
});
233+
});
234+
resolve();
235+
return {};
236+
});
237+
});
238+
239+
transport.onmessage?.({ jsonrpc: '2.0', id: 1, method: 'ping', params: {} });
240+
241+
await vi.waitFor(() => expect(handlerStarted).toBe(true));
242+
243+
await transport.close();
244+
await handlerDone;
245+
246+
expect(abortReason).toBeInstanceOf(SdkError);
247+
expect((abortReason as SdkError).code).toBe(SdkErrorCode.ConnectionClosed);
248+
});
249+
220250
test('should not overwrite existing hooks when connecting transports', async () => {
221251
const oncloseMock = vi.fn();
222252
const onerrorMock = vi.fn();

0 commit comments

Comments
 (0)