Skip to content

Commit d410a7b

Browse files
xuiocodex
andcommitted
Harden app-server recovery edge cases
Avoid exec fallback once app-server turn/start may have accepted a prompt. Preserve timeout status for interrupted timeout completions and merge durable session-state writes. Co-Authored-By: OpenAI Codex <noreply@openai.com>
1 parent 7e1bfc5 commit d410a7b

6 files changed

Lines changed: 97 additions & 13 deletions

File tree

dist/index.js

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24103,7 +24103,7 @@ var CodexAppServerSession = class _CodexAppServerSession {
2410324103
}
2410424104
if (method === "turn/completed") {
2410524105
const turn = params?.turn;
24106-
const status = resultStatusFromTurn(turn?.status);
24106+
const status = active.timeoutReason && turn?.status === "interrupted" ? "timeout" : resultStatusFromTurn(turn?.status);
2410724107
if (hasTurnErrorStatus(turn?.status) && turn?.error) {
2410824108
active.summary.errors.push(JSON.stringify(turn.error));
2410924109
}
@@ -24241,13 +24241,18 @@ var SessionStateStore = class {
2424124241
return [];
2424224242
}
2424324243
}
24244-
save(sessions) {
24244+
save(sessions, options = {}) {
2424524245
mkdirSync3(path7.dirname(this.file), { recursive: true });
2424624246
const temp = `${this.file}.${process.pid}.tmp`;
24247+
const replaceIds = new Set(options.replaceIds ?? sessions.map((session) => session.id));
24248+
const merged = [
24249+
...this.load().filter((session) => !replaceIds.has(session.id)),
24250+
...sessions
24251+
];
2424724252
const payload = {
2424824253
version: 1,
2424924254
updatedAt: (/* @__PURE__ */ new Date()).toISOString(),
24250-
sessions
24255+
sessions: merged
2425124256
};
2425224257
writeFileSync2(temp, `${JSON.stringify(payload, null, 2)}
2425324258
`, "utf8");
@@ -24356,6 +24361,7 @@ function readPositiveInt2(value, fallback, max) {
2435624361
var CodexSessionManager = class {
2435724362
sessions = /* @__PURE__ */ new Map();
2435824363
stateStore;
24364+
persistedSessionIds = /* @__PURE__ */ new Set();
2435924365
completedTtlSeconds = readPositiveInt2(
2436024366
process.env.CODEX_SUBAGENTS_SESSION_COMPLETED_TTL_SECONDS,
2436124367
3600,
@@ -24460,6 +24466,7 @@ var CodexSessionManager = class {
2446024466
stateFile: this.stateStore?.file
2446124467
};
2446224468
this.sessions.set(session.id, session);
24469+
if (this.stateStore) this.persistedSessionIds.add(session.id);
2446324470
const turn = this.enqueueTurn(session, {
2446424471
prompt: options.prompt,
2446524472
overrides: {
@@ -24950,15 +24957,13 @@ var CodexSessionManager = class {
2495024957
{ sessionTurnId: session.activeTurn?.id }
2495124958
);
2495224959
} catch (error2) {
24953-
if (session.turns === 0 && shouldFallbackToExec(error2)) {
24960+
if (session.turns === 0 && !session.appServer && shouldFallbackToExec(error2)) {
2495424961
session.appServerFallbackReason = error2 instanceof Error ? error2.message : String(error2);
2495524962
logger.warn("session.app_server_fallback_to_exec", {
2495624963
sessionId: session.id,
2495724964
appServerFallbackReason: session.appServerFallbackReason,
2495824965
error: errorForLog(error2)
2495924966
});
24960-
await session.appServer?.close().catch(() => {
24961-
});
2496224967
session.appServer = void 0;
2496324968
session.protocol = "exec";
2496424969
return this.runExecTurn(session, {
@@ -25080,6 +25085,7 @@ var CodexSessionManager = class {
2508025085
const record2 = this.recordFromState(persisted);
2508125086
if (!record2) continue;
2508225087
this.sessions.set(record2.id, record2);
25088+
this.persistedSessionIds.add(record2.id);
2508325089
}
2508425090
logger.info("session.state.loaded", { stateFile: store.file, sessions: this.sessions.size });
2508525091
}
@@ -25139,7 +25145,7 @@ var CodexSessionManager = class {
2513925145
error: session.error
2514025146
}));
2514125147
try {
25142-
store.save(states);
25148+
store.save(states, { replaceIds: this.persistedSessionIds });
2514325149
} catch (error2) {
2514425150
logger.error("session.state.save_failed", { stateFile: store.file, error: errorForLog(error2) });
2514525151
}

src/app-server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -853,7 +853,9 @@ export class CodexAppServerSession {
853853

854854
if (method === "turn/completed") {
855855
const turn = params?.turn as JsonObject | undefined;
856-
const status = resultStatusFromTurn(turn?.status);
856+
const status = active.timeoutReason && turn?.status === "interrupted"
857+
? "timeout"
858+
: resultStatusFromTurn(turn?.status);
857859
if (hasTurnErrorStatus(turn?.status) && turn?.error) {
858860
active.summary.errors.push(JSON.stringify(turn.error));
859861
}

src/session-state.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,18 @@ export class SessionStateStore {
4747
}
4848
}
4949

50-
save(sessions: DurableSessionState[]): void {
50+
save(sessions: DurableSessionState[], options: { replaceIds?: Iterable<string> } = {}): void {
5151
mkdirSync(path.dirname(this.file), { recursive: true });
5252
const temp = `${this.file}.${process.pid}.tmp`;
53+
const replaceIds = new Set(options.replaceIds ?? sessions.map((session) => session.id));
54+
const merged = [
55+
...this.load().filter((session) => !replaceIds.has(session.id)),
56+
...sessions,
57+
];
5358
const payload: SessionStateFile = {
5459
version: 1,
5560
updatedAt: new Date().toISOString(),
56-
sessions,
61+
sessions: merged,
5762
};
5863
writeFileSync(temp, `${JSON.stringify(payload, null, 2)}\n`, "utf8");
5964
renameSync(temp, this.file);

src/sessions.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ function readPositiveInt(value: string | undefined, fallback: number, max: numbe
180180
export class CodexSessionManager {
181181
private readonly sessions = new Map<string, CodexSessionRecord>();
182182
private readonly stateStore: SessionStateStore | undefined;
183+
private readonly persistedSessionIds = new Set<string>();
183184
private readonly completedTtlSeconds = readPositiveInt(
184185
process.env.CODEX_SUBAGENTS_SESSION_COMPLETED_TTL_SECONDS,
185186
3600,
@@ -304,6 +305,7 @@ export class CodexSessionManager {
304305
stateFile: this.stateStore?.file,
305306
};
306307
this.sessions.set(session.id, session);
308+
if (this.stateStore) this.persistedSessionIds.add(session.id);
307309
const turn = this.enqueueTurn(session, {
308310
prompt: options.prompt,
309311
overrides: {
@@ -872,14 +874,13 @@ export class CodexSessionManager {
872874
{ sessionTurnId: session.activeTurn?.id },
873875
);
874876
} catch (error) {
875-
if (session.turns === 0 && shouldFallbackToExec(error)) {
877+
if (session.turns === 0 && !session.appServer && shouldFallbackToExec(error)) {
876878
session.appServerFallbackReason = error instanceof Error ? error.message : String(error);
877879
logger.warn("session.app_server_fallback_to_exec", {
878880
sessionId: session.id,
879881
appServerFallbackReason: session.appServerFallbackReason,
880882
error: errorForLog(error),
881883
});
882-
await session.appServer?.close().catch(() => {});
883884
session.appServer = undefined;
884885
session.protocol = "exec";
885886
return this.runExecTurn(session, {
@@ -1036,6 +1037,7 @@ export class CodexSessionManager {
10361037
const record = this.recordFromState(persisted);
10371038
if (!record) continue;
10381039
this.sessions.set(record.id, record);
1040+
this.persistedSessionIds.add(record.id);
10391041
}
10401042
logger.info("session.state.loaded", { stateFile: store.file, sessions: this.sessions.size });
10411043
}
@@ -1099,7 +1101,7 @@ export class CodexSessionManager {
10991101
error: session.error,
11001102
}));
11011103
try {
1102-
store.save(states);
1104+
store.save(states, { replaceIds: this.persistedSessionIds });
11031105
} catch (error) {
11041106
logger.error("session.state.save_failed", { stateFile: store.file, error: errorForLog(error) });
11051107
}

test/app-server-hardening.test.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import os from "node:os";
33
import path from "node:path";
44
import { afterEach, describe, expect, it } from "vitest";
55
import { AgentRunQueue, BackpressureError, CodexJobManager } from "../src/jobs.js";
6+
import { SessionStateStore, type DurableSessionState } from "../src/session-state.js";
67
import { CodexSessionManager } from "../src/sessions.js";
78

89
const fakeCodex = path.resolve("test/fixtures/fake-codex.mjs");
@@ -132,6 +133,24 @@ describe("app-server hardening", () => {
132133
manager.cancel(session.id);
133134
});
134135

136+
it("preserves timeout status when a timed-out turn completes as interrupted", async () => {
137+
const manager = new CodexSessionManager();
138+
const projectDir = await tempDir("codex-subagents-app-timeout-interrupted-project-");
139+
140+
const { result, session } = await manager.start({
141+
prompt: "timeout interrupted probe DELAY_MS=500",
142+
projectDir,
143+
codexBin: fakeCodex,
144+
timeoutMs: 30,
145+
terminateGraceMs: 100,
146+
});
147+
148+
expect(result.ok).toBe(false);
149+
expect(result.status).toBe("timeout");
150+
expect(result.timeoutReason).toBe("timeout");
151+
manager.cancel(session.id);
152+
});
153+
135154
it("terminates the app-server child process when a session is cancelled", async () => {
136155
const manager = new CodexSessionManager();
137156
const projectDir = await tempDir("codex-subagents-app-cancel-project-");
@@ -229,6 +248,53 @@ describe("app-server hardening", () => {
229248
manager.cancel(session.id);
230249
});
231250

251+
it("does not fall back to exec after turn/start may have been accepted", async () => {
252+
const manager = new CodexSessionManager();
253+
const projectDir = await tempDir("codex-subagents-app-turn-timeout-project-");
254+
const recordDir = await tempDir("codex-subagents-app-turn-timeout-record-");
255+
256+
const { session, turn } = manager.startAsync({
257+
prompt: "TURN_START_NO_RESPONSE",
258+
projectDir,
259+
codexBin: fakeCodex,
260+
spawnTimeoutMs: 50,
261+
env: {
262+
FAKE_CODEX_RECORD_DIR: recordDir,
263+
},
264+
});
265+
266+
const waited = await manager.wait(session.id, 2_000, turn.id);
267+
expect(waited.completed).toBe(true);
268+
expect(waited.turn?.status).toBe("failed");
269+
expect(waited.session?.protocol).toBe("app-server");
270+
271+
const calls = await recordedCalls(recordDir);
272+
expect(calls.some((call) => call.protocol === "app-server" && call.method === "turn/start")).toBe(true);
273+
expect(calls.some((call) => call.protocol === "exec")).toBe(false);
274+
manager.cancel(session.id);
275+
});
276+
277+
it("merges durable session state instead of overwriting unknown sessions", async () => {
278+
const stateDir = await tempDir("codex-subagents-state-merge-");
279+
const store = new SessionStateStore(path.join(stateDir, "sessions.json"));
280+
const now = new Date().toISOString();
281+
const state = (id: string): DurableSessionState => ({
282+
id,
283+
status: "active",
284+
createdAt: now,
285+
updatedAt: now,
286+
codexThreadId: `thread-${id}`,
287+
protocol: "app-server",
288+
turns: 1,
289+
baseOptions: { projectDir: stateDir },
290+
});
291+
292+
store.save([state("external")], { replaceIds: ["external"] });
293+
store.save([state("local")], { replaceIds: ["local"] });
294+
295+
expect(store.load().map((session) => session.id).sort()).toEqual(["external", "local"]);
296+
});
297+
232298
it("marks live steering unsupported when turn/steer fails", async () => {
233299
const manager = new CodexSessionManager();
234300
const projectDir = await tempDir("codex-subagents-app-steer-fail-project-");

test/fixtures/fake-codex.mjs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,9 @@ if (args[0] === "app-server") {
272272
return;
273273
}
274274
recordCall({ protocol: "app-server", method, prompt: activePrompt, threadId, turnId: activeTurn });
275+
if (hasMode("TURN_START_NO_RESPONSE")) {
276+
return;
277+
}
275278
send({
276279
id,
277280
result: {

0 commit comments

Comments
 (0)