Skip to content

Commit e280345

Browse files
cursoragentarul28
andcommitted
fix(acp): bounded acquire retries; clear waitForTerminalExit kill timer on close
- Replace recursive acquireAcpCliConnection with retry loop (max attempts + backoff) - Clear SIGKILL timeout when process closes before WAIT_FOR_TERMINAL_EXIT_MAX_MS Co-authored-by: Arul Sharma <arul28@users.noreply.github.com>
1 parent d5f1144 commit e280345

2 files changed

Lines changed: 162 additions & 146 deletions

File tree

apps/desktop/src/main/services/chat/acpCliPool.ts

Lines changed: 141 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ const pools = new Map<string, { ref: number; pooled: AcpCliPooled }>();
3939
const pendingInit = new Map<string, Promise<void>>();
4040

4141
const STDERR_LOG_MAX = 8_192;
42+
const ACP_CLI_ACQUIRE_MAX_ATTEMPTS = 12;
43+
const ACP_CLI_ACQUIRE_RETRY_BACKOFF_MS = 25;
4244

4345
function killProcQuiet(proc: ChildProcessWithoutNullStreams | null): void {
4446
if (!proc) return;
@@ -73,147 +75,158 @@ function evictPoolEntry(poolKey: string, reason: string, err?: unknown): void {
7375

7476
export async function acquireAcpCliConnection(options: AcpCliPoolOptions): Promise<AcpCliPooled> {
7577
const key = options.poolKey;
76-
const existing = pools.get(key);
77-
if (existing) {
78-
existing.ref += 1;
79-
return existing.pooled;
80-
}
8178

82-
let initOwner = false;
83-
let init = pendingInit.get(key);
84-
if (!init) {
85-
initOwner = true;
86-
init = (async () => {
87-
let proc: ChildProcessWithoutNullStreams | null = null;
88-
const stderrChunks: Buffer[] = [];
89-
const appendStderr = (d: Buffer | string): void => {
90-
const buf = Buffer.isBuffer(d) ? d : Buffer.from(String(d), "utf8");
91-
stderrChunks.push(buf);
92-
let total = 0;
93-
for (const c of stderrChunks) total += c.length;
94-
while (total > STDERR_LOG_MAX && stderrChunks.length > 1) {
95-
total -= stderrChunks.shift()!.length;
96-
}
97-
};
98-
99-
try {
100-
proc = spawn(options.spawn.command, options.spawn.args, {
101-
stdio: ["pipe", "pipe", "pipe"],
102-
env: options.spawn.env ?? { ...process.env },
103-
cwd: options.spawn.cwd,
104-
detached: process.platform !== "win32",
105-
});
106-
107-
let failureHandled = false;
108-
const onProcFailure = (label: string, err?: unknown) => {
109-
if (failureHandled) return;
110-
failureHandled = true;
111-
const tail = Buffer.concat(stderrChunks).toString("utf8").trim();
112-
if (tail) {
113-
console.error(`${options.logPrefix} ${label} stderr (tail) poolKey=${key}:`, tail);
79+
for (let attempt = 0; attempt < ACP_CLI_ACQUIRE_MAX_ATTEMPTS; attempt += 1) {
80+
if (attempt > 0) {
81+
await new Promise((r) => setTimeout(r, ACP_CLI_ACQUIRE_RETRY_BACKOFF_MS));
82+
}
83+
84+
const existing = pools.get(key);
85+
if (existing) {
86+
existing.ref += 1;
87+
return existing.pooled;
88+
}
89+
90+
let initOwner = false;
91+
let init = pendingInit.get(key);
92+
if (!init) {
93+
initOwner = true;
94+
init = (async () => {
95+
let proc: ChildProcessWithoutNullStreams | null = null;
96+
const stderrChunks: Buffer[] = [];
97+
const appendStderr = (d: Buffer | string): void => {
98+
const buf = Buffer.isBuffer(d) ? d : Buffer.from(String(d), "utf8");
99+
stderrChunks.push(buf);
100+
let total = 0;
101+
for (const c of stderrChunks) total += c.length;
102+
while (total > STDERR_LOG_MAX && stderrChunks.length > 1) {
103+
total -= stderrChunks.shift()!.length;
114104
}
115-
killProcQuiet(proc);
116-
evictPoolEntry(key, `${options.logPrefix} ${label}`, err);
117-
};
118-
119-
proc.once("error", (err) => {
120-
onProcFailure("process error", err);
121-
});
122-
proc.once("close", (code, signal) => {
123-
if (!pools.has(key)) return;
124-
onProcFailure(`process closed code=${code} signal=${signal}`);
125-
});
126-
127-
proc.stderr?.on("data", appendStderr);
128-
129-
const terminals = new Map<string, AcpHostTermState>();
130-
const bridge: AcpHostBridge = {
131-
onPermission: null,
132-
onSessionUpdate: null,
133-
getRootPath: () => "",
134-
getDirtyFileText: null,
135-
onTerminalOutputDelta: null,
136-
flushTerminalOutput: null,
137-
onTerminalDisposed: null,
138105
};
139106

140-
const client = createAcpHostClient(bridge, terminals, { logPrefix: options.logPrefix });
141-
const toAgentStdin = Writable.toWeb(proc.stdin as Writable);
142-
const fromAgentStdout = Readable.toWeb(proc.stdout as Readable);
143-
const stream = ndJsonStream(
144-
toAgentStdin as unknown as WritableStream<Uint8Array>,
145-
fromAgentStdout as unknown as ReadableStream<Uint8Array>,
146-
);
147-
const connection = new ClientSideConnection(() => client, stream);
148-
149-
const initResult = await connection.initialize({
150-
protocolVersion: PROTOCOL_VERSION,
151-
clientInfo: { name: "ade", title: "ADE", version: options.appVersion },
152-
clientCapabilities: {
153-
fs: { readTextFile: true, writeTextFile: true },
154-
terminal: true,
155-
},
156-
});
157-
158-
if (options.afterInitialize) {
159-
await options.afterInitialize({ connection, initResult });
160-
}
161-
162-
const pooled: AcpCliPooled = {
163-
connection,
164-
bridge,
165-
terminals,
166-
dispose: () => {
167-
for (const termId of terminals.keys()) {
168-
bridge.onTerminalDisposed?.(termId);
107+
try {
108+
proc = spawn(options.spawn.command, options.spawn.args, {
109+
stdio: ["pipe", "pipe", "pipe"],
110+
env: options.spawn.env ?? { ...process.env },
111+
cwd: options.spawn.cwd,
112+
detached: process.platform !== "win32",
113+
});
114+
115+
let failureHandled = false;
116+
const onProcFailure = (label: string, err?: unknown) => {
117+
if (failureHandled) return;
118+
failureHandled = true;
119+
const tail = Buffer.concat(stderrChunks).toString("utf8").trim();
120+
if (tail) {
121+
console.error(`${options.logPrefix} ${label} stderr (tail) poolKey=${key}:`, tail);
169122
}
170-
for (const t of terminals.values()) {
123+
killProcQuiet(proc);
124+
evictPoolEntry(key, `${options.logPrefix} ${label}`, err);
125+
};
126+
127+
proc.once("error", (err) => {
128+
onProcFailure("process error", err);
129+
});
130+
proc.once("close", (code, signal) => {
131+
if (!pools.has(key)) return;
132+
onProcFailure(`process closed code=${code} signal=${signal}`);
133+
});
134+
135+
proc.stderr?.on("data", appendStderr);
136+
137+
const terminals = new Map<string, AcpHostTermState>();
138+
const bridge: AcpHostBridge = {
139+
onPermission: null,
140+
onSessionUpdate: null,
141+
getRootPath: () => "",
142+
getDirtyFileText: null,
143+
onTerminalOutputDelta: null,
144+
flushTerminalOutput: null,
145+
onTerminalDisposed: null,
146+
};
147+
148+
const client = createAcpHostClient(bridge, terminals, { logPrefix: options.logPrefix });
149+
const toAgentStdin = Writable.toWeb(proc.stdin as Writable);
150+
const fromAgentStdout = Readable.toWeb(proc.stdout as Readable);
151+
const stream = ndJsonStream(
152+
toAgentStdin as unknown as WritableStream<Uint8Array>,
153+
fromAgentStdout as unknown as ReadableStream<Uint8Array>,
154+
);
155+
const connection = new ClientSideConnection(() => client, stream);
156+
157+
const initResult = await connection.initialize({
158+
protocolVersion: PROTOCOL_VERSION,
159+
clientInfo: { name: "ade", title: "ADE", version: options.appVersion },
160+
clientCapabilities: {
161+
fs: { readTextFile: true, writeTextFile: true },
162+
terminal: true,
163+
},
164+
});
165+
166+
if (options.afterInitialize) {
167+
await options.afterInitialize({ connection, initResult });
168+
}
169+
170+
const pooled: AcpCliPooled = {
171+
connection,
172+
bridge,
173+
terminals,
174+
dispose: () => {
175+
for (const termId of terminals.keys()) {
176+
bridge.onTerminalDisposed?.(termId);
177+
}
178+
for (const t of terminals.values()) {
179+
try {
180+
if (!t.exited) t.proc.kill("SIGKILL");
181+
} catch {
182+
// ignore
183+
}
184+
}
185+
terminals.clear();
171186
try {
172-
if (!t.exited) t.proc.kill("SIGKILL");
187+
proc?.kill("SIGTERM");
173188
} catch {
174189
// ignore
175190
}
176-
}
177-
terminals.clear();
178-
try {
179-
proc?.kill("SIGTERM");
180-
} catch {
181-
// ignore
182-
}
183-
},
184-
};
191+
},
192+
};
185193

186-
pools.set(key, { ref: 1, pooled });
187-
} catch (err) {
188-
const tail = Buffer.concat(stderrChunks).toString("utf8").trim();
189-
if (tail) {
190-
console.error(`${options.logPrefix} init failed stderr (tail) poolKey=${key}:`, tail);
194+
pools.set(key, { ref: 1, pooled });
195+
} catch (err) {
196+
const tail = Buffer.concat(stderrChunks).toString("utf8").trim();
197+
if (tail) {
198+
console.error(`${options.logPrefix} init failed stderr (tail) poolKey=${key}:`, tail);
199+
}
200+
killProcQuiet(proc);
201+
evictPoolEntry(key, `${options.logPrefix} initialization failed`, err);
202+
throw err;
191203
}
192-
killProcQuiet(proc);
193-
evictPoolEntry(key, `${options.logPrefix} initialization failed`, err);
194-
throw err;
195-
}
196-
})().finally(() => {
197-
pendingInit.delete(key);
198-
});
199-
pendingInit.set(key, init);
204+
})().finally(() => {
205+
pendingInit.delete(key);
206+
});
207+
pendingInit.set(key, init);
208+
}
209+
210+
try {
211+
await init;
212+
} catch (err) {
213+
if (initOwner) throw err;
214+
continue;
215+
}
216+
217+
const entry = pools.get(key);
218+
if (!entry) {
219+
continue;
220+
}
221+
if (!initOwner) {
222+
entry.ref += 1;
223+
}
224+
return entry.pooled;
200225
}
201226

202-
try {
203-
await init;
204-
} catch (err) {
205-
if (initOwner) throw err;
206-
return acquireAcpCliConnection(options);
207-
}
208-
209-
const entry = pools.get(key);
210-
if (!entry) {
211-
return acquireAcpCliConnection(options);
212-
}
213-
if (!initOwner) {
214-
entry.ref += 1;
215-
}
216-
return entry.pooled;
227+
throw new Error(
228+
`acpCliPool: exceeded ${ACP_CLI_ACQUIRE_MAX_ATTEMPTS} acquire attempts for poolKey=${key} (init or pool entry never became ready).`,
229+
);
217230
}
218231

219232
export function releaseAcpCliConnection(poolKey: string): void {

apps/desktop/src/main/services/chat/acpHostClient.ts

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -228,24 +228,27 @@ export function createAcpHostClient(
228228
return { exitCode: -1, signal: null };
229229
}
230230
if (!t.exited) {
231-
await Promise.race([
232-
new Promise<void>((resolve) => {
233-
t.proc.once("close", resolve);
234-
}),
235-
new Promise<void>((resolve) => {
236-
setTimeout(() => {
237-
try {
238-
if (!t.exited) t.proc.kill("SIGKILL");
239-
} catch {
240-
// ignore
241-
}
242-
console.warn(
243-
`${logPrefix} waitForTerminalExit exceeded ${WAIT_FOR_TERMINAL_EXIT_MAX_MS}ms; sent SIGKILL`,
244-
);
245-
resolve();
246-
}, WAIT_FOR_TERMINAL_EXIT_MAX_MS);
247-
}),
248-
]);
231+
let killTimer: ReturnType<typeof setTimeout> | undefined;
232+
const closed = new Promise<void>((resolve) => {
233+
t.proc.once("close", () => {
234+
if (killTimer !== undefined) clearTimeout(killTimer);
235+
resolve();
236+
});
237+
});
238+
const timedOut = new Promise<void>((resolve) => {
239+
killTimer = setTimeout(() => {
240+
try {
241+
if (!t.exited) t.proc.kill("SIGKILL");
242+
} catch {
243+
// ignore
244+
}
245+
console.warn(
246+
`${logPrefix} waitForTerminalExit exceeded ${WAIT_FOR_TERMINAL_EXIT_MAX_MS}ms; sent SIGKILL`,
247+
);
248+
resolve();
249+
}, WAIT_FOR_TERMINAL_EXIT_MAX_MS);
250+
});
251+
await Promise.race([closed, timedOut]);
249252
if (!t.exited) {
250253
await new Promise<void>((resolve) => {
251254
const tmo = setTimeout(resolve, 15_000);

0 commit comments

Comments
 (0)