Skip to content

Commit c9ddaaa

Browse files
xuiocodex
andcommitted
Harden MCP result resilience
Bound progress notification delivery and clamp totals. Preserve per-agent failures in parallel results and enforce requested structured output parsing. Co-Authored-By: OpenAI Codex <noreply@openai.com>
1 parent 62ac3e9 commit c9ddaaa

13 files changed

Lines changed: 619 additions & 171 deletions

dist/index.js

Lines changed: 197 additions & 82 deletions
Large diffs are not rendered by default.

src/app-server.ts

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process";
22
import { stat } from "node:fs/promises";
33
import { resolveCodexBinary, type ResolvedCodexBinary } from "./binary.js";
4+
import { parseStructuredOutput, schemaForOutputContract } from "./contracts.js";
45
import { killChildProcess, trackChildProcess } from "./lifecycle.js";
56
import {
67
defaultReasoningEffort,
@@ -452,14 +453,23 @@ export class CodexAppServerSession {
452453

453454
const finish = (status: AgentRunResult["status"], error?: string): AgentRunResult => {
454455
const final = truncate(redactSensitiveText(summary.lastAgentMessage ?? ""), maxOutputChars);
456+
const wantsStructuredOutput = Boolean(
457+
schemaForOutputContract(options.outputContract, options.outputSchema) ||
458+
(options.outputContract && options.outputContract !== "freeform"),
459+
);
460+
const structured =
461+
wantsStructuredOutput
462+
? parseStructuredOutput(summary.lastAgentMessage ?? "")
463+
: { value: undefined, error: undefined };
464+
const resultStatus = status === "completed" && structured.error ? "failed" : status;
455465
const outputArtifacts = this.activeTurn?.artifactWriter.finish({
456466
finalMessage: summary.lastAgentMessage ?? "",
457467
keep: final.truncatedChars > 0 || stdout.truncated() > 0 || stderr.truncated() > 0,
458468
});
459469
const result: AgentRunResult = {
460470
name: options.name,
461-
ok: status === "completed",
462-
status,
471+
ok: resultStatus === "completed",
472+
status: resultStatus,
463473
durationMs: Date.now() - started,
464474
codexBinary: this.codexBinary,
465475
cwd: this.cwd,
@@ -481,6 +491,8 @@ export class CodexAppServerSession {
481491
},
482492
outputArtifacts,
483493
eventSummary: cloneSummary(summary),
494+
structuredOutput: structured.value === undefined ? undefined : redactJsonValue(structured.value),
495+
structuredOutputError: structured.error,
484496
commandPreview: [this.codexBinary.path, "app-server", "--listen", "stdio://", "turn/start"],
485497
timeoutReason,
486498
codexSubagents: {
@@ -887,10 +899,19 @@ export class CodexAppServerSession {
887899
finalMessage: active.summary.lastAgentMessage ?? "",
888900
keep: final.truncatedChars > 0 || active.stdout.truncated() > 0 || active.stderr.truncated() > 0,
889901
});
902+
const wantsStructuredOutput = Boolean(
903+
schemaForOutputContract(active.options.outputContract, active.options.outputSchema) ||
904+
(active.options.outputContract && active.options.outputContract !== "freeform"),
905+
);
906+
const structured =
907+
wantsStructuredOutput
908+
? parseStructuredOutput(active.summary.lastAgentMessage ?? "")
909+
: { value: undefined, error: undefined };
910+
const resultStatus = status === "completed" && structured.error ? "failed" : status;
890911
const result: AgentRunResult = {
891912
name: active.options.name,
892-
ok: status === "completed",
893-
status,
913+
ok: resultStatus === "completed",
914+
status: resultStatus,
894915
durationMs: Date.now() - active.started,
895916
codexBinary: this.codexBinary,
896917
cwd: this.cwd,
@@ -912,6 +933,8 @@ export class CodexAppServerSession {
912933
},
913934
outputArtifacts,
914935
eventSummary: cloneSummary(active.summary),
936+
structuredOutput: structured.value === undefined ? undefined : redactJsonValue(structured.value),
937+
structuredOutputError: structured.error,
915938
commandPreview: [this.codexBinary.path, "app-server", "--listen", "stdio://", "turn/start"],
916939
timeoutReason: active.timeoutReason,
917940
codexSubagents: {

src/index.ts

Lines changed: 39 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
22
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
3-
import type { RequestHandlerExtra } from "@modelcontextprotocol/sdk/shared/protocol.js";
4-
import type { CallToolResult, JSONRPCMessage, ServerNotification, ServerRequest } from "@modelcontextprotocol/sdk/types.js";
3+
import type { CallToolResult, JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js";
54
import { z } from "zod";
65
import {
76
defaultModel,
@@ -28,6 +27,7 @@ import { jobManager, runQueuedAgent, runQueuedAgents } from "./jobs.js";
2827
import { cleanupRuntime, lifecycleStats, registerCleanupHandler } from "./lifecycle.js";
2928
import { errorForLog, logger, loggingDiagnostics, makeLogId, summarizeRawTrafficForLog } from "./logging.js";
3029
import { recoveryForAgentResult, recoveryForError, recoveryForWait } from "./recovery.js";
30+
import { createProgressReporter, type ProgressOptions, type ProgressReporter, type ToolExtra } from "./progress.js";
3131
import {
3232
compactAgentResultForMcp,
3333
compactAgentResultsForMcp,
@@ -37,9 +37,6 @@ import {
3737
import { sessionManager } from "./sessions.js";
3838
import { modelPresets } from "./subagents.js";
3939

40-
type ToolExtra = RequestHandlerExtra<ServerRequest, ServerNotification>;
41-
type ProgressReporter = ReturnType<typeof createProgressReporter>;
42-
4340
const usageGuide = [
4441
"Claude Code integration guide for codex-subagents:",
4542
"",
@@ -390,13 +387,28 @@ function toCodexSubagents(
390387
}
391388

392389
function jsonResult(value: Record<string, unknown>, isError = false): CallToolResult {
390+
const fullText = JSON.stringify(value, null, 2);
391+
const text =
392+
fullText.length <= 4_000
393+
? fullText
394+
: JSON.stringify(
395+
{
396+
ok: Boolean(value.ok ?? !isError),
397+
isError,
398+
note:
399+
"MCP text content was shortened to keep Claude responsive; use structuredContent for the compacted result.",
400+
keys: Object.keys(value),
401+
},
402+
null,
403+
2,
404+
);
393405
return {
394406
structuredContent: value,
395407
isError,
396408
content: [
397409
{
398410
type: "text",
399-
text: JSON.stringify(value, null, 2),
411+
text,
400412
},
401413
],
402414
};
@@ -492,60 +504,6 @@ async function loggedToolCall(
492504
}
493505
}
494506

495-
function createProgressReporter(extra: ToolExtra | undefined) {
496-
const progressToken = extra?._meta?.progressToken;
497-
let progress = 0;
498-
let pending = Promise.resolve();
499-
500-
async function send(message: string, options: { progress?: number; total?: number } = {}) {
501-
logger.rawDebug("mcp.progress", {
502-
hasProgressToken: progressToken !== undefined,
503-
message,
504-
options,
505-
});
506-
if (progressToken === undefined || !extra) return;
507-
508-
pending = pending
509-
.catch(() => {})
510-
.then(async () => {
511-
const requested = options.progress ?? progress + 1;
512-
progress = Math.max(progress + 1, requested);
513-
await extra.sendNotification({
514-
method: "notifications/progress",
515-
params: {
516-
progressToken,
517-
progress,
518-
...(options.total === undefined ? {} : { total: options.total }),
519-
message,
520-
},
521-
});
522-
logger.rawDebug("mcp.notification.sent", {
523-
method: "notifications/progress",
524-
params: {
525-
progressToken,
526-
progress,
527-
...(options.total === undefined ? {} : { total: options.total }),
528-
message,
529-
},
530-
});
531-
})
532-
.catch((error) => {
533-
logger.error("mcp.notification.failed", { error: errorForLog(error) });
534-
// Progress is best-effort; a failed notification must not fail the tool call.
535-
});
536-
537-
await pending;
538-
}
539-
540-
async function flush() {
541-
if (progressToken === undefined || !extra) return;
542-
await pending;
543-
await new Promise((resolve) => setTimeout(resolve, 0));
544-
}
545-
546-
return { send, flush };
547-
}
548-
549507
function installTransportLogging(transport: StdioServerTransport): void {
550508
const previousOnMessage = transport.onmessage;
551509
transport.onmessage = (message) => {
@@ -590,9 +548,10 @@ async function withProgressHeartbeat<T>(
590548
progress: ProgressReporter,
591549
message: string,
592550
operation: () => Promise<T>,
551+
progressOptions?: ProgressOptions,
593552
): Promise<T> {
594553
const interval = setInterval(() => {
595-
void progress.send(message);
554+
void progress.send(message, progressOptions);
596555
}, progressHeartbeatMs());
597556
interval.unref();
598557
try {
@@ -662,7 +621,7 @@ function toRunOptions(args: {
662621
skipGitRepoCheck: args.skip_git_repo_check,
663622
ignoreRules: args.ignore_rules,
664623
isolatedCodexHome: args.isolated_codex_home,
665-
mcpConfigPolicy: args.mcp_config_policy,
624+
mcpConfigPolicy: args.mcp_config_policy ?? (args.codex_mcp_servers ? "explicit" : undefined),
666625
codexMcpServers: args.codex_mcp_servers,
667626
forwardSensitiveEnv: args.forward_sensitive_env,
668627
idleTimeoutMs: args.idle_timeout_ms,
@@ -771,7 +730,10 @@ function toParallelRunOptions(args: ParallelToolInput) {
771730
skipGitRepoCheck: agent.skip_git_repo_check ?? args.skip_git_repo_check,
772731
ignoreRules: agent.ignore_rules ?? args.ignore_rules,
773732
isolatedCodexHome: agent.isolated_codex_home ?? args.isolated_codex_home,
774-
mcpConfigPolicy: agent.mcp_config_policy ?? args.mcp_config_policy,
733+
mcpConfigPolicy:
734+
agent.mcp_config_policy ??
735+
args.mcp_config_policy ??
736+
(agent.codex_mcp_servers ?? args.codex_mcp_servers ? "explicit" : undefined),
775737
codexMcpServers: agent.codex_mcp_servers ?? args.codex_mcp_servers,
776738
forwardSensitiveEnv: agent.forward_sensitive_env ?? args.forward_sensitive_env,
777739
idleTimeoutMs: agent.idle_timeout_ms ?? args.idle_timeout_ms,
@@ -1198,9 +1160,10 @@ server.registerTool(
11981160
? `Parallel Codex run completed (${completed}/${args.tasks.length})`
11991161
: `Parallel Codex run finished with errors (${completed}/${args.tasks.length})`
12001162
: `${result.ok ? "Completed" : "Finished"} ${result.name ?? "Codex agent"} (${completed}/${args.tasks.length})`;
1201-
await progress.send(message, last ? { progress: total, total } : { total });
1163+
await progress.send(message, last ? { progress: total, total } : { total, reserveFinal: true });
12021164
},
12031165
}),
1166+
{ total, reserveFinal: true },
12041167
);
12051168
const ok = results.every((result) => result.ok);
12061169
await progress.flush();
@@ -1271,9 +1234,10 @@ server.registerTool(
12711234
? `Parallel Codex run completed (${completed}/${args.agents.length})`
12721235
: `Parallel Codex run finished with errors (${completed}/${args.agents.length})`
12731236
: `${result.ok ? "Completed" : "Finished"} ${result.name ?? "Codex agent"} (${completed}/${args.agents.length})`;
1274-
await progress.send(message, last ? { progress: total, total } : { total });
1237+
await progress.send(message, last ? { progress: total, total } : { total, reserveFinal: true });
12751238
},
12761239
}),
1240+
{ total, reserveFinal: true },
12771241
);
12781242
const ok = results.every((result) => result.ok);
12791243
await progress.flush();
@@ -1333,10 +1297,11 @@ server.registerTool(
13331297
last
13341298
? `Aggregating ${completed}/${args.agents.length} Codex results`
13351299
: `Completed ${completed}/${args.agents.length} Codex agents`,
1336-
last ? { progress: total, total } : { total },
1300+
last ? { progress: total, total } : { total, reserveFinal: true },
13371301
);
13381302
},
13391303
}),
1304+
{ total, reserveFinal: true },
13401305
);
13411306
const aggregation = aggregateAgentResults(results);
13421307
await progress.flush();
@@ -1451,11 +1416,16 @@ server.registerTool(
14511416
if (job.completedAt) await progress.send(`Codex job ${job.status}`);
14521417
await progress.flush();
14531418
const waitReason = job.completedAt ? undefined : waitCancelled ? "wait_cancelled" : "wait_timeout";
1419+
const completed = Boolean(job.completedAt);
14541420
return jsonResult(
14551421
{
1422+
completed,
14561423
job: compactJobSnapshotForMcp(job),
14571424
timeoutReason: waitReason,
14581425
recovery: recoveryForWait("agent_job", waitReason),
1426+
note: completed
1427+
? undefined
1428+
: "The Codex job is still managed by this MCP server. Use get_agent_run or wait_agent_run again.",
14591429
},
14601430
waitCancelled || job.status === "failed" || job.status === "cancelled",
14611431
);
@@ -1986,7 +1956,9 @@ server.registerTool(
19861956
await progress.send(
19871957
waited.completed
19881958
? `Codex session ${args.session_id} is ready`
1989-
: `Timed out waiting for Codex session ${args.session_id}`,
1959+
: waited.timeoutReason === "wait_cancelled"
1960+
? `Cancelled wait for Codex session ${args.session_id}`
1961+
: `Timed out waiting for Codex session ${args.session_id}`,
19901962
);
19911963
await progress.flush();
19921964
if (waited.error || !waited.session) {

src/jobs.ts

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
type AgentRunPartial,
44
type AgentRunResult,
55
type ParallelRunOptions,
6+
agentFailureResultForError,
67
runAgent,
78
} from "./runner.js";
89
import { errorForLog, logger, summarizeRawTrafficForLog } from "./logging.js";
@@ -307,24 +308,35 @@ export async function runQueuedAgents(
307308
next += 1;
308309
const agent = options.agents[index];
309310
if (!agent) continue;
310-
const result = await runQueuedAgent(
311-
{
311+
const runOptions = {
312312
...options,
313313
...agent,
314314
model: agent.model ?? options.defaultModel,
315315
modelPreset: agent.modelPreset ?? options.modelPreset,
316316
reasoningEffort: agent.reasoningEffort ?? options.defaultReasoningEffort,
317317
prompt: agent.prompt,
318318
name: agent.name ?? `agent-${index + 1}`,
319-
},
320-
{
321-
...queueOptions,
322-
onStart: (queuedMs) => queueOptions.onStart?.(queuedMs, agent.name ?? `agent-${index + 1}`),
323-
onComplete: undefined,
324-
onSnapshot: (snapshot) =>
325-
queueOptions.onSnapshot?.(snapshot, index, options.agents.length),
326-
},
327-
);
319+
};
320+
let result: AgentRunResult;
321+
try {
322+
result = await runQueuedAgent(
323+
runOptions,
324+
{
325+
...queueOptions,
326+
onStart: (queuedMs) => queueOptions.onStart?.(queuedMs, agent.name ?? `agent-${index + 1}`),
327+
onComplete: undefined,
328+
onSnapshot: (snapshot) =>
329+
queueOptions.onSnapshot?.(snapshot, index, options.agents.length),
330+
},
331+
);
332+
} catch (error) {
333+
logger.error("queue.parallel_agent_failed", {
334+
index,
335+
name: runOptions.name,
336+
error: errorForLog(error),
337+
});
338+
result = agentFailureResultForError(runOptions, error);
339+
}
328340
results[index] = result;
329341
await callQueueCallback(() => queueOptions.onComplete?.(result, index, options.agents.length));
330342
}

0 commit comments

Comments
 (0)