Skip to content

Commit 112e4c1

Browse files
cpackerletta-code
andauthored
fix(codex): make write_stdin polls abortable (#2627)
Co-authored-by: Letta Code <noreply@letta.com>
1 parent 1413aa6 commit 112e4c1

2 files changed

Lines changed: 65 additions & 4 deletions

File tree

src/tools/exec-command.test.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,34 @@ describe.skipIf(isWindows)("Codex unified exec tools", () => {
9797
expect(second.output).toContain("Output:\ndone");
9898
});
9999

100+
test("empty write_stdin polls abort promptly", async () => {
101+
const first = await exec_command({
102+
cmd: "sleep 2",
103+
yield_time_ms: 250,
104+
});
105+
106+
const match = first.output.match(/Process running with session ID (\d+)/);
107+
expect(match?.[1]).toBeDefined();
108+
109+
const controller = new AbortController();
110+
const timer = setTimeout(() => controller.abort(), 50);
111+
const startedAt = Date.now();
112+
try {
113+
await expect(
114+
write_stdin({
115+
session_id: Number(match?.[1]),
116+
chars: "",
117+
yield_time_ms: 30_000,
118+
signal: controller.signal,
119+
}),
120+
).rejects.toThrow("The operation was aborted");
121+
} finally {
122+
clearTimeout(timer);
123+
}
124+
125+
expect(Date.now() - startedAt).toBeLessThan(1000);
126+
});
127+
100128
test("write_stdin sends input to tty-enabled sessions", async () => {
101129
const first = await exec_command({
102130
cmd: "cat",

src/tools/impl/exec-command.ts

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ interface WriteStdinArgs {
5252
chars?: string;
5353
yield_time_ms?: number;
5454
max_output_tokens?: number;
55+
signal?: AbortSignal;
5556
onOutput?: (chunk: string, stream: "stdout" | "stderr") => void;
5657
}
5758

@@ -129,8 +130,35 @@ type ExecOutputChunk = {
129130

130131
const execSessions = new Map<string, ExecSession>();
131132

132-
function sleep(ms: number): Promise<void> {
133-
return new Promise((resolve) => setTimeout(resolve, ms));
133+
function createAbortError(): Error {
134+
const error = new Error("The operation was aborted");
135+
error.name = "AbortError";
136+
return error;
137+
}
138+
139+
function throwIfAborted(signal: AbortSignal | undefined): void {
140+
if (signal?.aborted) {
141+
throw createAbortError();
142+
}
143+
}
144+
145+
function sleep(ms: number, signal?: AbortSignal): Promise<void> {
146+
if (!signal) {
147+
return new Promise((resolve) => setTimeout(resolve, ms));
148+
}
149+
throwIfAborted(signal);
150+
return new Promise((resolve, reject) => {
151+
const timer = setTimeout(() => {
152+
signal.removeEventListener("abort", onAbort);
153+
resolve();
154+
}, ms);
155+
const onAbort = () => {
156+
clearTimeout(timer);
157+
signal.removeEventListener("abort", onAbort);
158+
reject(createAbortError());
159+
};
160+
signal.addEventListener("abort", onAbort, { once: true });
161+
});
134162
}
135163

136164
function clampYieldTime(value: number | undefined, fallback: number): number {
@@ -568,12 +596,14 @@ async function waitForSessionOutput(params: {
568596
session: ExecSession;
569597
startOffset: number;
570598
yieldTimeMs: number;
599+
signal?: AbortSignal;
571600
onOutput?: (chunk: string, stream: "stdout" | "stderr") => void;
572601
}): Promise<{ output: string; wallTimeMs: number }> {
573602
const startTime = Date.now();
574603
const deadline = startTime + params.yieldTimeMs;
575604
let emittedOffset = params.startOffset;
576605

606+
throwIfAborted(params.signal);
577607
while (Date.now() < deadline && params.session.status === "running") {
578608
if (params.onOutput && params.session.output.length > emittedOffset) {
579609
for (const chunk of getSessionOutputChunks(
@@ -585,8 +615,9 @@ async function waitForSessionOutput(params: {
585615
}
586616
emittedOffset = params.session.output.length;
587617
}
588-
await sleep(25);
618+
await sleep(25, params.signal);
589619
}
620+
throwIfAborted(params.signal);
590621

591622
if (params.onOutput && params.session.output.length > emittedOffset) {
592623
for (const chunk of getSessionOutputChunks(
@@ -690,6 +721,7 @@ export async function exec_command(
690721
session,
691722
startOffset: 0,
692723
yieldTimeMs,
724+
signal: args.signal,
693725
onOutput: args.onOutput,
694726
});
695727

@@ -731,7 +763,7 @@ export async function write_stdin(
731763
}
732764
if (chars) {
733765
(backgroundProcess.process as ProcessLauncher).write(chars);
734-
await sleep(100);
766+
await sleep(100, args.signal);
735767
}
736768

737769
const startOffset = session.readOffset;
@@ -740,6 +772,7 @@ export async function write_stdin(
740772
session,
741773
startOffset,
742774
yieldTimeMs,
775+
signal: args.signal,
743776
onOutput: args.onOutput,
744777
});
745778

0 commit comments

Comments
 (0)