Skip to content

Commit 3d2f957

Browse files
xuiocodex
andcommitted
Prevent orphaned MCP stdio spin
Co-Authored-By: OpenAI Codex <noreply@openai.com>
1 parent fab9f5b commit 3d2f957

8 files changed

Lines changed: 227 additions & 19 deletions

File tree

dist/index.js

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21548,8 +21548,16 @@ function loggingDiagnostics(env = process.env) {
2154821548
};
2154921549
}
2155021550
function writeDefaultLog(line) {
21551-
process.stderr.write(`${line}
21552-
`);
21551+
try {
21552+
if (!process.stderr.destroyed && process.stderr.writable) {
21553+
process.stderr.write(`${line}
21554+
`, (error2) => {
21555+
if (error2) lastLogFileError = error2.message;
21556+
});
21557+
}
21558+
} catch (error2) {
21559+
lastLogFileError = error2 instanceof Error ? error2.message : String(error2);
21560+
}
2155321561
const logFile = process.env.CODEX_SUBAGENTS_LOG_FILE?.trim();
2155421562
if (!logFile) return;
2155521563
try {
@@ -23503,6 +23511,21 @@ function createProgressReporter(extra, options = {}) {
2350323511
return { send, flush };
2350423512
}
2350523513

23514+
// src/stdio.ts
23515+
var brokenStdioCodes = /* @__PURE__ */ new Set([
23516+
"EPIPE",
23517+
"ECONNRESET",
23518+
"ERR_STREAM_DESTROYED",
23519+
"ERR_STREAM_WRITE_AFTER_END"
23520+
]);
23521+
function isBrokenStdioError(error2) {
23522+
if (!error2 || typeof error2 !== "object") return false;
23523+
const value = error2;
23524+
if (typeof value.code === "string" && brokenStdioCodes.has(value.code)) return true;
23525+
const message = typeof value.message === "string" ? value.message.toLowerCase() : "";
23526+
return message.includes("broken pipe") || message.includes("stream has been destroyed") || message.includes("write after end");
23527+
}
23528+
2350623529
// src/response.ts
2350723530
var singleAgentLimits = {
2350823531
finalMessageChars: 12e3,
@@ -26222,7 +26245,7 @@ async function loggedToolCall(tool, args, extra, run) {
2622226245
return errorResult(error2, tool);
2622326246
}
2622426247
}
26225-
function installTransportLogging(transport) {
26248+
function installTransportLogging(transport, shutdown) {
2622626249
const previousOnMessage = transport.onmessage;
2622726250
transport.onmessage = (message) => {
2622826251
logger.rawDebug("mcp.transport.inbound", {
@@ -26232,6 +26255,10 @@ function installTransportLogging(transport) {
2623226255
};
2623326256
const previousOnError = transport.onerror;
2623426257
transport.onerror = (error2) => {
26258+
if (isBrokenStdioError(error2)) {
26259+
shutdown("mcp_transport_broken_stdio", 0, 500);
26260+
return;
26261+
}
2623526262
logger.error("mcp.transport.error", { error: errorForLog(error2) });
2623626263
previousOnError?.(error2);
2623726264
};
@@ -26243,6 +26270,10 @@ function installTransportLogging(transport) {
2624326270
try {
2624426271
await send(message);
2624526272
} catch (error2) {
26273+
if (isBrokenStdioError(error2)) {
26274+
shutdown("mcp_transport_send_broken_stdio", 0, 500);
26275+
return;
26276+
}
2624626277
logger.error("mcp.transport.send_failed", { error: errorForLog(error2) });
2624726278
throw error2;
2624826279
}
@@ -28456,34 +28487,59 @@ registerCleanupHandler(async (reason) => {
2845628487
});
2845728488
function installProcessCleanup() {
2845828489
let shutdownStarted = false;
28459-
const shutdown = (reason, exitCode) => {
28490+
const shutdown = (reason, exitCode, graceMs = 2500) => {
2846028491
if (shutdownStarted) return;
2846128492
shutdownStarted = true;
28462-
void cleanupRuntime(reason).finally(() => {
28493+
const forceExit = exitCode === void 0 ? void 0 : setTimeout(() => process.exit(exitCode), Math.max(1e3, graceMs + 1e3));
28494+
forceExit?.unref();
28495+
void cleanupRuntime(reason, graceMs).finally(() => {
28496+
if (forceExit) clearTimeout(forceExit);
2846328497
if (exitCode !== void 0) process.exit(exitCode);
2846428498
});
2846528499
};
28500+
const shutdownOnBrokenStdio = (reason) => (error2) => {
28501+
if (isBrokenStdioError(error2)) {
28502+
shutdown(reason, 0, 500);
28503+
return;
28504+
}
28505+
logger.error(`${reason}.error`, { error: errorForLog(error2) });
28506+
};
2846628507
process.once("SIGINT", () => shutdown("SIGINT", 130));
2846728508
process.once("SIGTERM", () => shutdown("SIGTERM", 143));
28468-
process.stdin.once("close", () => shutdown("stdin_close"));
28509+
process.stdin.once("close", () => shutdown("stdin_close", 0, 500));
28510+
process.stdin.once("end", () => shutdown("stdin_end", 0, 500));
28511+
process.stdin.once("error", shutdownOnBrokenStdio("stdin"));
28512+
process.stdout.once("error", shutdownOnBrokenStdio("stdout"));
28513+
process.stderr.once("error", shutdownOnBrokenStdio("stderr"));
28514+
return shutdown;
2846928515
}
2847028516
async function main() {
28471-
installProcessCleanup();
28517+
const shutdown = installProcessCleanup();
2847228518
process.on("unhandledRejection", (error2) => {
28519+
if (isBrokenStdioError(error2)) {
28520+
shutdown("unhandled_broken_stdio", 0, 500);
28521+
return;
28522+
}
2847328523
logger.error("process.unhandled_rejection", { error: errorForLog(error2) });
2847428524
});
2847528525
process.on("uncaughtException", (error2) => {
28526+
if (isBrokenStdioError(error2)) {
28527+
shutdown("uncaught_broken_stdio", 0, 500);
28528+
return;
28529+
}
2847628530
logger.error("process.uncaught_exception", { error: errorForLog(error2) });
28531+
shutdown("uncaught_exception", 1, 500);
2847728532
});
2847828533
logger.info("server.starting", {
2847928534
logging: loggingDiagnostics()
2848028535
});
2848128536
const transport = new StdioServerTransport();
28482-
installTransportLogging(transport);
28537+
installTransportLogging(transport, shutdown);
2848328538
await server.connect(transport);
2848428539
logger.info("server.connected", { transport: "stdio" });
2848528540
}
2848628541
main().catch((error2) => {
28542+
if (isBrokenStdioError(error2)) process.exit(0);
2848728543
logger.error("server.start_failed", { error: errorForLog(error2) });
2848828544
process.exit(1);
2848928545
});

package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
"test:stress": "node test/stress-mcp.mjs",
3434
"test:progress": "node test/progress-mcp.mjs",
3535
"test:advanced": "node test/advanced-mcp.mjs",
36+
"test:stdio-shutdown": "node test/stdio-shutdown.mjs",
3637
"test:dev-link": "node test/dev-link.mjs",
3738
"test:plugin-manifest": "node test/plugin-manifest.mjs",
3839
"test:codex-runtime": "node test/codex-runtime-probe.mjs",
@@ -46,8 +47,8 @@
4647
"test:claude-real-codex": "node test/claude-real-codex.mjs",
4748
"test:claude-real-session": "node test/claude-real-session.mjs",
4849
"test:real-soak": "node test/real-soak.mjs",
49-
"test:ci": "npm run build && npm run test:plugin-manifest && npm test && npm run smoke:mcp && npm run test:reliability && npm run test:stress && npm run test:progress && npm run test:advanced && npm run test:dev-link",
50-
"test:comprehensive": "npm run build && npm test && npm run smoke:mcp && npm run test:reliability && npm run test:stress && npm run test:progress && npm run test:advanced && npm run test:codex-runtime && npm run test:app-server-contract && npm run test:real-matrix && npm run validate:plugin && npm run test:claude-desktop",
50+
"test:ci": "npm run build && npm run test:plugin-manifest && npm test && npm run test:stdio-shutdown && npm run smoke:mcp && npm run test:reliability && npm run test:stress && npm run test:progress && npm run test:advanced && npm run test:dev-link",
51+
"test:comprehensive": "npm run build && npm test && npm run test:stdio-shutdown && npm run smoke:mcp && npm run test:reliability && npm run test:stress && npm run test:progress && npm run test:advanced && npm run test:codex-runtime && npm run test:app-server-contract && npm run test:real-matrix && npm run validate:plugin && npm run test:claude-desktop",
5152
"test:claude-desktop": "node test/claude-desktop-cli.mjs"
5253
},
5354
"keywords": [

src/index.ts

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import { errorForLog, logger, loggingDiagnostics, makeLogId, summarizeRawTraffic
3131
import { recoveryForAgentResult, recoveryForError, recoveryForWait } from "./recovery.js";
3232
import { redactJsonValue, redactSensitiveText } from "./redaction.js";
3333
import { createProgressReporter, type ProgressOptions, type ProgressReporter, type ToolExtra } from "./progress.js";
34+
import { isBrokenStdioError } from "./stdio.js";
3435
import {
3536
compactAgentResultForMcp,
3637
compactAgentResultsForMcp,
@@ -1002,7 +1003,7 @@ async function loggedToolCall(
10021003
}
10031004
}
10041005

1005-
function installTransportLogging(transport: StdioServerTransport): void {
1006+
function installTransportLogging(transport: StdioServerTransport, shutdown: (reason: string, exitCode?: number, graceMs?: number) => void): void {
10061007
const previousOnMessage = transport.onmessage;
10071008
transport.onmessage = (message) => {
10081009
logger.rawDebug("mcp.transport.inbound", {
@@ -1013,6 +1014,10 @@ function installTransportLogging(transport: StdioServerTransport): void {
10131014

10141015
const previousOnError = transport.onerror;
10151016
transport.onerror = (error) => {
1017+
if (isBrokenStdioError(error)) {
1018+
shutdown("mcp_transport_broken_stdio", 0, 500);
1019+
return;
1020+
}
10161021
logger.error("mcp.transport.error", { error: errorForLog(error) });
10171022
previousOnError?.(error);
10181023
};
@@ -1025,6 +1030,10 @@ function installTransportLogging(transport: StdioServerTransport): void {
10251030
try {
10261031
await send(message);
10271032
} catch (error) {
1033+
if (isBrokenStdioError(error)) {
1034+
shutdown("mcp_transport_send_broken_stdio", 0, 500);
1035+
return;
1036+
}
10281037
logger.error("mcp.transport.send_failed", { error: errorForLog(error) });
10291038
throw error;
10301039
}
@@ -3788,40 +3797,67 @@ registerCleanupHandler(async (reason) => {
37883797
await sessionManager.shutdown(reason);
37893798
});
37903799

3791-
function installProcessCleanup(): void {
3800+
function installProcessCleanup(): (reason: string, exitCode?: number, graceMs?: number) => void {
37923801
let shutdownStarted = false;
3793-
const shutdown = (reason: string, exitCode?: number) => {
3802+
const shutdown = (reason: string, exitCode?: number, graceMs = 2_500) => {
37943803
if (shutdownStarted) return;
37953804
shutdownStarted = true;
3796-
void cleanupRuntime(reason).finally(() => {
3805+
const forceExit = exitCode === undefined
3806+
? undefined
3807+
: setTimeout(() => process.exit(exitCode), Math.max(1_000, graceMs + 1_000));
3808+
forceExit?.unref();
3809+
void cleanupRuntime(reason, graceMs).finally(() => {
3810+
if (forceExit) clearTimeout(forceExit);
37973811
if (exitCode !== undefined) process.exit(exitCode);
37983812
});
37993813
};
3814+
const shutdownOnBrokenStdio = (reason: string) => (error: unknown) => {
3815+
if (isBrokenStdioError(error)) {
3816+
shutdown(reason, 0, 500);
3817+
return;
3818+
}
3819+
logger.error(`${reason}.error`, { error: errorForLog(error) });
3820+
};
38003821

38013822
process.once("SIGINT", () => shutdown("SIGINT", 130));
38023823
process.once("SIGTERM", () => shutdown("SIGTERM", 143));
3803-
process.stdin.once("close", () => shutdown("stdin_close"));
3824+
process.stdin.once("close", () => shutdown("stdin_close", 0, 500));
3825+
process.stdin.once("end", () => shutdown("stdin_end", 0, 500));
3826+
process.stdin.once("error", shutdownOnBrokenStdio("stdin"));
3827+
process.stdout.once("error", shutdownOnBrokenStdio("stdout"));
3828+
process.stderr.once("error", shutdownOnBrokenStdio("stderr"));
3829+
return shutdown;
38043830
}
38053831

38063832
async function main(): Promise<void> {
3807-
installProcessCleanup();
3833+
const shutdown = installProcessCleanup();
38083834
process.on("unhandledRejection", (error) => {
3835+
if (isBrokenStdioError(error)) {
3836+
shutdown("unhandled_broken_stdio", 0, 500);
3837+
return;
3838+
}
38093839
logger.error("process.unhandled_rejection", { error: errorForLog(error) });
38103840
});
38113841
process.on("uncaughtException", (error) => {
3842+
if (isBrokenStdioError(error)) {
3843+
shutdown("uncaught_broken_stdio", 0, 500);
3844+
return;
3845+
}
38123846
logger.error("process.uncaught_exception", { error: errorForLog(error) });
3847+
shutdown("uncaught_exception", 1, 500);
38133848
});
38143849

38153850
logger.info("server.starting", {
38163851
logging: loggingDiagnostics(),
38173852
});
38183853
const transport = new StdioServerTransport();
3819-
installTransportLogging(transport);
3854+
installTransportLogging(transport, shutdown);
38203855
await server.connect(transport);
38213856
logger.info("server.connected", { transport: "stdio" });
38223857
}
38233858

38243859
main().catch((error) => {
3860+
if (isBrokenStdioError(error)) process.exit(0);
38253861
logger.error("server.start_failed", { error: errorForLog(error) });
38263862
process.exit(1);
38273863
});

src/logging.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,16 @@ export function loggingDiagnostics(env: NodeJS.ProcessEnv = process.env): Record
203203
}
204204

205205
function writeDefaultLog(line: string): void {
206-
process.stderr.write(`${line}\n`);
206+
try {
207+
if (!process.stderr.destroyed && process.stderr.writable) {
208+
process.stderr.write(`${line}\n`, (error) => {
209+
if (error) lastLogFileError = error.message;
210+
});
211+
}
212+
} catch (error) {
213+
lastLogFileError = error instanceof Error ? error.message : String(error);
214+
}
215+
207216
const logFile = process.env.CODEX_SUBAGENTS_LOG_FILE?.trim();
208217
if (!logFile) return;
209218
try {

src/stdio.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
const brokenStdioCodes = new Set([
2+
"EPIPE",
3+
"ECONNRESET",
4+
"ERR_STREAM_DESTROYED",
5+
"ERR_STREAM_WRITE_AFTER_END",
6+
]);
7+
8+
export function isBrokenStdioError(error: unknown): boolean {
9+
if (!error || typeof error !== "object") return false;
10+
const value = error as { code?: unknown; message?: unknown };
11+
if (typeof value.code === "string" && brokenStdioCodes.has(value.code)) return true;
12+
const message = typeof value.message === "string" ? value.message.toLowerCase() : "";
13+
return (
14+
message.includes("broken pipe") ||
15+
message.includes("stream has been destroyed") ||
16+
message.includes("write after end")
17+
);
18+
}

test/app-server-hardening.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ describe("app-server hardening", () => {
263263
return calls.some((call) => call.method === "process/sigterm");
264264
}, 10_000);
265265
expect(sawSigterm).toBe(true);
266-
});
266+
}, 15_000);
267267

268268
it("keeps concurrent app-server sessions isolated", async () => {
269269
const manager = new CodexSessionManager();

test/stdio-shutdown.mjs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import { spawn } from "node:child_process";
2+
import { once } from "node:events";
3+
import { mkdtemp, rm } from "node:fs/promises";
4+
import os from "node:os";
5+
import path from "node:path";
6+
import { setTimeout as delay } from "node:timers/promises";
7+
8+
const root = process.cwd();
9+
const stateDir = await mkdtemp(path.join(os.tmpdir(), "codex-subagents-stdio-"));
10+
11+
function assert(condition, message, details) {
12+
if (!condition) {
13+
throw new Error(`${message}${details ? `\n${JSON.stringify(details, null, 2)}` : ""}`);
14+
}
15+
}
16+
17+
async function waitForExit(child, timeoutMs) {
18+
const exited = once(child, "exit").then(([code, signal]) => ({ code, signal }));
19+
return Promise.race([exited, delay(timeoutMs).then(() => undefined)]);
20+
}
21+
22+
try {
23+
const child = spawn(path.join(root, "dist/index.js"), [], {
24+
cwd: root,
25+
stdio: ["pipe", "pipe", "pipe"],
26+
env: {
27+
...process.env,
28+
CODEX_SUBAGENTS_SESSION_STATE_FILE: path.join(stateDir, "sessions.json"),
29+
CODEX_SUBAGENTS_LOG_LEVEL: "debug",
30+
},
31+
});
32+
33+
let stderr = "";
34+
child.stderr.setEncoding("utf8");
35+
child.stderr.on("data", (chunk) => {
36+
stderr += chunk;
37+
});
38+
39+
await delay(250);
40+
child.stdout.destroy();
41+
child.stdin.write(`${JSON.stringify({
42+
jsonrpc: "2.0",
43+
id: 1,
44+
method: "initialize",
45+
params: {
46+
protocolVersion: "2025-06-18",
47+
capabilities: {},
48+
clientInfo: { name: "stdio-shutdown-test", version: "0.1.0" },
49+
},
50+
})}\n`);
51+
52+
const result = await waitForExit(child, 5_000);
53+
if (!result) {
54+
child.kill("SIGKILL");
55+
throw new Error(`MCP server did not exit after stdout disconnect.\n${stderr}`);
56+
}
57+
assert(result.code === 0, "MCP server should exit cleanly after stdout disconnect", {
58+
result,
59+
stderr,
60+
});
61+
assert(
62+
!stderr.includes("uncaught_exception") && !stderr.includes("unhandled_rejection"),
63+
"broken stdio should not loop through uncaught exception logging",
64+
stderr,
65+
);
66+
} finally {
67+
await rm(stateDir, { recursive: true, force: true });
68+
}
69+
70+
console.log("Stdio shutdown test passed");

0 commit comments

Comments
 (0)