Skip to content

Commit 819da20

Browse files
author
User
committed
fix(server): handle EPIPE in stdio transport send path
1 parent af8ccdc commit 819da20

2 files changed

Lines changed: 106 additions & 17 deletions

File tree

packages/server/src/server/stdio.ts

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { process } from '@modelcontextprotocol/server/_shims';
1919
export class StdioServerTransport implements Transport {
2020
private _readBuffer: ReadBuffer = new ReadBuffer();
2121
private _started = false;
22+
private _closed = false;
2223

2324
constructor(
2425
private _stdin: Readable = process.stdin,
@@ -38,11 +39,14 @@ export class StdioServerTransport implements Transport {
3839
this.onerror?.(error);
3940
};
4041
_onstdouterror = (error: Error) => {
41-
// Handle stdout errors (e.g., EPIPE when client disconnects)
42-
// Trigger close to clean up gracefully
43-
this.close().catch(() => {
44-
// Ignore errors during close
45-
});
42+
// Handle stdout broken pipe when client disconnects.
43+
if ((error as NodeJS.ErrnoException).code === 'EPIPE') {
44+
this.close().catch(() => {
45+
// Ignore errors during close
46+
});
47+
return;
48+
}
49+
4650
this.onerror?.(error);
4751
};
4852

@@ -78,6 +82,11 @@ export class StdioServerTransport implements Transport {
7882
}
7983

8084
async close(): Promise<void> {
85+
if (this._closed) {
86+
return;
87+
}
88+
this._closed = true;
89+
8190
// Remove our event listeners first
8291
this._stdin.off('data', this._ondata);
8392
this._stdin.off('error', this._onerror);
@@ -99,23 +108,52 @@ export class StdioServerTransport implements Transport {
99108
send(message: JSONRPCMessage): Promise<void> {
100109
return new Promise((resolve, reject) => {
101110
const json = serializeMessage(message);
102-
103-
// Handle write errors (e.g., EPIPE when client disconnects)
104-
const onError = (error: Error) => {
111+
let settled = false;
112+
113+
const cleanup = () => {
105114
this._stdout.off('error', onError);
115+
this._stdout.off('drain', onDrain);
116+
};
117+
118+
const onDrain = () => {
119+
if (settled) {
120+
return;
121+
}
122+
settled = true;
123+
cleanup();
124+
resolve();
125+
};
126+
127+
const onError = (error: Error) => {
128+
if (settled) {
129+
return;
130+
}
131+
settled = true;
132+
cleanup();
133+
134+
if ((error as NodeJS.ErrnoException).code === 'EPIPE') {
135+
this.close().catch(() => {
136+
// Ignore errors during close
137+
});
138+
resolve();
139+
return;
140+
}
141+
106142
reject(error);
107143
};
108-
144+
109145
this._stdout.once('error', onError);
110-
111-
if (this._stdout.write(json)) {
112-
this._stdout.off('error', onError);
113-
resolve();
114-
} else {
115-
this._stdout.once('drain', () => {
116-
this._stdout.off('error', onError);
146+
147+
try {
148+
if (this._stdout.write(json)) {
149+
settled = true;
150+
cleanup();
117151
resolve();
118-
});
152+
} else {
153+
this._stdout.once('drain', onDrain);
154+
}
155+
} catch (error) {
156+
onError(error as Error);
119157
}
120158
});
121159
}

packages/server/test/server/stdio.test.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,54 @@ test('should read multiple messages', async () => {
102102
await finished;
103103
expect(readMessages).toEqual(messages);
104104
});
105+
106+
test('should handle EPIPE from stdout error event and close gracefully', async () => {
107+
const epipeOutput = new Writable({
108+
write(_chunk, _encoding, callback) {
109+
callback();
110+
}
111+
});
112+
113+
const server = new StdioServerTransport(input, epipeOutput);
114+
115+
let didClose = false;
116+
server.onclose = () => {
117+
didClose = true;
118+
};
119+
120+
await server.start();
121+
122+
epipeOutput.emit('error', Object.assign(new Error('broken pipe'), { code: 'EPIPE' }));
123+
124+
await new Promise(resolve => setImmediate(resolve));
125+
126+
expect(didClose).toBeTruthy();
127+
});
128+
129+
test('should resolve send and close on EPIPE write failure', async () => {
130+
const epipeWriteOutput = new Writable({
131+
write(_chunk, _encoding, callback) {
132+
callback(Object.assign(new Error('broken pipe'), { code: 'EPIPE' }));
133+
}
134+
});
135+
136+
const server = new StdioServerTransport(input, epipeWriteOutput);
137+
138+
let didClose = false;
139+
server.onclose = () => {
140+
didClose = true;
141+
};
142+
143+
await server.start();
144+
145+
await expect(
146+
server.send({
147+
jsonrpc: '2.0',
148+
method: 'notifications/initialized'
149+
})
150+
).resolves.toBeUndefined();
151+
152+
await new Promise(resolve => setImmediate(resolve));
153+
154+
expect(didClose).toBeTruthy();
155+
});

0 commit comments

Comments
 (0)