Skip to content

Commit 01003ff

Browse files
fix: abort in-flight request handlers on connection close
Previously, request handlers would continue running after the transport disconnected, wasting resources and preventing cleanup of long-running operations. Protocol._onclose() now aborts all active request handler AbortControllers with a ConnectionClosed error. Also fixes InMemoryTransport.close() firing onclose twice on the initiating side due to peer recursion. Fixes #611 Co-authored-by: Aljosa Asanovic <aljosa.a@gmail.com>
1 parent 9aed95a commit 01003ff

5 files changed

Lines changed: 82 additions & 0 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@modelcontextprotocol/core': patch
3+
---
4+
5+
Abort in-flight request handlers when the connection closes. Previously, request handlers would continue running after the transport disconnected, wasting resources and preventing proper cleanup. Also fixes `InMemoryTransport.close()` firing `onclose` twice on the initiating side.

packages/core/src/shared/protocol.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,9 @@ export abstract class Protocol<ContextT extends BaseContext> {
730730
this._taskProgressTokens.clear();
731731
this._pendingDebouncedNotifications.clear();
732732

733+
const requestHandlerAbortControllers = this._requestHandlerAbortControllers;
734+
this._requestHandlerAbortControllers = new Map();
735+
733736
const error = new SdkError(SdkErrorCode.ConnectionClosed, 'Connection closed');
734737

735738
this._transport = undefined;
@@ -738,6 +741,10 @@ export abstract class Protocol<ContextT extends BaseContext> {
738741
for (const handler of responseHandlers.values()) {
739742
handler(error);
740743
}
744+
745+
for (const controller of requestHandlerAbortControllers.values()) {
746+
controller.abort(error);
747+
}
741748
}
742749

743750
private _onerror(error: Error): void {

packages/core/src/util/inMemory.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ interface QueuedMessage {
1313
export class InMemoryTransport implements Transport {
1414
private _otherTransport?: InMemoryTransport;
1515
private _messageQueue: QueuedMessage[] = [];
16+
private _closed = false;
1617

1718
onclose?: () => void;
1819
onerror?: (error: Error) => void;
@@ -39,6 +40,9 @@ export class InMemoryTransport implements Transport {
3940
}
4041

4142
async close(): Promise<void> {
43+
if (this._closed) return;
44+
this._closed = true;
45+
4246
const other = this._otherTransport;
4347
this._otherTransport = undefined;
4448
await other?.close();

packages/core/test/inMemory.test.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,42 @@ describe('InMemoryTransport', () => {
9999
await expect(clientTransport.send({ jsonrpc: '2.0', method: 'test', id: 1 })).rejects.toThrow('Not connected');
100100
});
101101

102+
test('should fire onclose exactly once per transport', async () => {
103+
let clientCloseCount = 0;
104+
let serverCloseCount = 0;
105+
106+
clientTransport.onclose = () => clientCloseCount++;
107+
serverTransport.onclose = () => serverCloseCount++;
108+
109+
await clientTransport.close();
110+
111+
expect(clientCloseCount).toBe(1);
112+
expect(serverCloseCount).toBe(1);
113+
});
114+
115+
test('should handle double close idempotently', async () => {
116+
let clientCloseCount = 0;
117+
clientTransport.onclose = () => clientCloseCount++;
118+
119+
await clientTransport.close();
120+
await clientTransport.close();
121+
122+
expect(clientCloseCount).toBe(1);
123+
});
124+
125+
test('should handle concurrent close from both sides', async () => {
126+
let clientCloseCount = 0;
127+
let serverCloseCount = 0;
128+
129+
clientTransport.onclose = () => clientCloseCount++;
130+
serverTransport.onclose = () => serverCloseCount++;
131+
132+
await Promise.all([clientTransport.close(), serverTransport.close()]);
133+
134+
expect(clientCloseCount).toBe(1);
135+
expect(serverCloseCount).toBe(1);
136+
});
137+
102138
test('should queue messages sent before start', async () => {
103139
const message: JSONRPCMessage = {
104140
jsonrpc: '2.0',

packages/core/test/shared/protocol.test.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,36 @@ describe('protocol tests', () => {
209209
expect(oncloseMock).toHaveBeenCalled();
210210
});
211211

212+
test('should abort in-flight request handlers when the connection is closed', async () => {
213+
await protocol.connect(transport);
214+
215+
let abortReason: unknown;
216+
let handlerStarted = false;
217+
const handlerDone = new Promise<void>(resolve => {
218+
protocol.setRequestHandler('ping', async (_request, ctx) => {
219+
handlerStarted = true;
220+
await new Promise<void>(resolveInner => {
221+
ctx.mcpReq.signal.addEventListener('abort', () => {
222+
abortReason = ctx.mcpReq.signal.reason;
223+
resolveInner();
224+
});
225+
});
226+
resolve();
227+
return {};
228+
});
229+
});
230+
231+
transport.onmessage?.({ jsonrpc: '2.0', id: 1, method: 'ping', params: {} });
232+
233+
await vi.waitFor(() => expect(handlerStarted).toBe(true));
234+
235+
await transport.close();
236+
await handlerDone;
237+
238+
expect(abortReason).toBeInstanceOf(SdkError);
239+
expect((abortReason as SdkError).code).toBe(SdkErrorCode.ConnectionClosed);
240+
});
241+
212242
test('should not overwrite existing hooks when connecting transports', async () => {
213243
const oncloseMock = vi.fn();
214244
const onerrorMock = vi.fn();

0 commit comments

Comments
 (0)