Skip to content

Commit 9165f30

Browse files
committed
fix: guard scope cleanup and abort reasons
1 parent b7c0143 commit 9165f30

8 files changed

Lines changed: 263 additions & 58 deletions

File tree

mise.lock

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# @generated - this file is auto-generated by `mise lock` https://mise.jdx.dev/dev-tools/mise-lock.html
1+
# @generated - this file is auto-generated by `mise lock` https://mise.en.dev/dev-tools/mise-lock.html
22

33
[[tools.actionlint]]
44
version = "1.7.12"
@@ -97,7 +97,6 @@ url_api = "https://api.github.com/repos/jdx/communique/releases/assets/413753918
9797
checksum = "sha256:61b46cf231882e0a10e0489df310ec9b875e83da0f38b60d6ac7bd67e09a15dc"
9898
url = "https://github.com/jdx/communique/releases/download/v1.1.3/communique-x86_64-unknown-linux-gnu.tar.gz"
9999
url_api = "https://api.github.com/repos/jdx/communique/releases/assets/413753688"
100-
provenance = "github-attestations"
101100

102101
[tools.communique."platforms.linux-x64-musl"]
103102
checksum = "sha256:9c8d784d8bb91e83cd467ed0d26be6ed3290a61a482413d42b12321aee20d5c0"

src/host/hostMain.ts

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ import {
5151
} from '../storage/sessionPaths.js';
5252
import {
5353
addAbortListener,
54-
makeAbortError,
54+
makeAbortReason,
5555
throwIfAborted,
5656
waitForScopedOperation,
5757
} from '../util/abort.js';
@@ -177,7 +177,6 @@ export async function runHost(sessionId: string): Promise<void> {
177177
let lastOutputAt = Date.now();
178178
let lastActivityAt = lastOutputAt;
179179
const hostAbortController = new AbortController();
180-
let idleTimeoutHandle: ReturnType<typeof setInterval> | null = null;
181180
let idleTimeoutScope: ResourceScope | null = null;
182181
let rpcListenPromise: Promise<void> | null = null;
183182
let shutdownPromise: Promise<void> | null = null;
@@ -193,6 +192,10 @@ export async function runHost(sessionId: string): Promise<void> {
193192
});
194193
let ptyIngestionQueue: Promise<void> = Promise.resolve();
195194

195+
// Per-client wait-exit callbacks, cleaned up individually via ResourceScope.
196+
// Using ptyExitPromise.then() would permanently attach to the shared promise.
197+
const ptyExitWaiters = new Set<() => void>();
198+
196199
const ptyExitPromise = new Promise<void>((resolve) => {
197200
markPtyExited = (): void => {
198201
if (ptyHasExited) {
@@ -201,6 +204,11 @@ export async function runHost(sessionId: string): Promise<void> {
201204

202205
ptyHasExited = true;
203206
resolve();
207+
const waiters = [...ptyExitWaiters];
208+
ptyExitWaiters.clear();
209+
for (const waiter of waiters) {
210+
waiter();
211+
}
204212
};
205213
});
206214

@@ -282,6 +290,7 @@ export async function runHost(sessionId: string): Promise<void> {
282290
const scope = new ResourceScope();
283291
idleTimeoutScope = scope;
284292
const checkIntervalMs = Math.min(idleTimeoutMs, IDLE_CHECK_CAP_MS);
293+
let idleTimeoutHandle: ReturnType<typeof setInterval> | null = null;
285294
idleTimeoutHandle = setInterval(() => {
286295
if (hostAbortController.signal.aborted) {
287296
clearIdleTimeout();
@@ -322,7 +331,7 @@ export async function runHost(sessionId: string): Promise<void> {
322331

323332
shutdownPromise = (async () => {
324333
try {
325-
hostAbortController.abort(makeAbortError());
334+
hostAbortController.abort(makeAbortReason('Host is shutting down.'));
326335
clearIdleTimeout();
327336
if (isSessionCommandable(state)) {
328337
pty.kill();
@@ -391,6 +400,15 @@ export async function runHost(sessionId: string): Promise<void> {
391400
})().catch(rethrowAsync);
392401
};
393402

403+
const makeWaitExitOutcome = (): WaitOutcome => {
404+
const snapshot = state.snapshot();
405+
const result: WaitOutcome = { timedOut: false };
406+
if (snapshot.exitCode !== null) {
407+
result.exitCode = snapshot.exitCode;
408+
}
409+
return result;
410+
};
411+
394412
const handlers: Record<string, MethodHandler> = {
395413
inspect: () => Promise.resolve({ session: state.snapshot() }),
396414
snapshot: async (params: unknown) => {
@@ -723,21 +741,17 @@ export async function runHost(sessionId: string): Promise<void> {
723741

724742
if (hasExit) {
725743
if (ptyHasExited) {
726-
const snapshot = state.snapshot();
727-
const result: WaitOutcome = { timedOut: false };
728-
if (snapshot.exitCode !== null) {
729-
result.exitCode = snapshot.exitCode;
730-
}
731-
return result;
744+
return makeWaitExitOutcome();
732745
}
733746

734-
waitCondition = ptyExitPromise.then(() => {
735-
const snapshot = state.snapshot();
736-
const result: WaitOutcome = { timedOut: false };
737-
if (snapshot.exitCode !== null) {
738-
result.exitCode = snapshot.exitCode;
739-
}
740-
return result;
747+
waitCondition = new Promise<WaitOutcome>((resolve) => {
748+
const waiter = (): void => {
749+
resolve(makeWaitExitOutcome());
750+
};
751+
ptyExitWaiters.add(waiter);
752+
waitScope.add('wait exit waiter', () => {
753+
ptyExitWaiters.delete(waiter);
754+
});
741755
});
742756
} else {
743757
assertSessionCommandable(state);

src/host/lifecycle.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import {
2525
sessionDir,
2626
socketPath,
2727
} from '../storage/sessionPaths.js';
28-
import { throwIfAborted } from '../util/abort.js';
28+
import { makeAbortError, throwIfAborted } from '../util/abort.js';
2929
import { invariant } from '../util/assert.js';
3030
import { sendRpc } from './rpcClient.js';
3131

@@ -46,6 +46,20 @@ function delayOptions(
4646
return signal === undefined ? undefined : { signal };
4747
}
4848

49+
async function pollDelay(
50+
intervalMs: number,
51+
signal?: AbortSignal,
52+
): Promise<void> {
53+
try {
54+
await delay(intervalMs, undefined, delayOptions(signal));
55+
} catch (error) {
56+
if (signal?.aborted === true) {
57+
throw makeAbortError(signal);
58+
}
59+
throw error;
60+
}
61+
}
62+
4963
interface NodeError extends Error {
5064
code?: string;
5165
}
@@ -284,7 +298,7 @@ async function waitForTerminalManifest(
284298
}
285299

286300
if (attempt + 1 < maxAttempts) {
287-
await delay(intervalMs, undefined, delayOptions(signal));
301+
await pollDelay(intervalMs, signal);
288302
}
289303
}
290304

@@ -322,7 +336,7 @@ async function waitForProcessAndSocketShutdown(
322336
}
323337

324338
if (attempt + 1 < maxAttempts) {
325-
await delay(intervalMs, undefined, delayOptions(signal));
339+
await pollDelay(intervalMs, signal);
326340
}
327341
}
328342

src/host/rpcClient.ts

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -133,20 +133,12 @@ export async function sendRpc(
133133
let responseHandled = false;
134134
let buffer = '';
135135

136-
const rejectWithCliError = (error: CliError): void => {
137-
settlers.reject(error);
138-
};
139-
140136
const rejectWithTransportError = (error: unknown): void => {
141-
rejectWithCliError(
137+
settlers.reject(
142138
toTransportCliError(error, socketPath, method, effectiveTimeoutMs),
143139
);
144140
};
145141

146-
const resolveWithResult = (result: unknown): void => {
147-
settlers.resolve(result);
148-
};
149-
150142
if (signal !== undefined) {
151143
addAbortListener(scope, 'rpc client abort listener', signal, () => {
152144
settlers.reject(makeAbortError(signal));
@@ -161,7 +153,7 @@ export async function sendRpc(
161153
});
162154

163155
socket.on('timeout', () => {
164-
rejectWithCliError(
156+
settlers.reject(
165157
makeCliError(ERROR_CODES.HOST_TIMEOUT, {
166158
message: `RPC request timed out after ${String(effectiveTimeoutMs)}ms.`,
167159
details: {
@@ -183,7 +175,7 @@ export async function sendRpc(
183175
}
184176

185177
if (buffer.length + chunk.length > MAX_RPC_BUFFER_BYTES) {
186-
rejectWithCliError(
178+
settlers.reject(
187179
makeCliError(ERROR_CODES.RPC_ERROR, {
188180
message: 'RPC response exceeds maximum buffer size.',
189181
details: { method, socketPath },
@@ -207,7 +199,7 @@ export async function sendRpc(
207199
const responseResult = RpcResponseSchema.safeParse(rawResponse);
208200

209201
if (!responseResult.success) {
210-
rejectWithCliError(
202+
settlers.reject(
211203
makeCliError(ERROR_CODES.RPC_ERROR, {
212204
message: 'RPC response failed schema validation.',
213205
details: {
@@ -223,7 +215,7 @@ export async function sendRpc(
223215
const response = responseResult.data;
224216

225217
if (response.id !== request.id) {
226-
rejectWithCliError(
218+
settlers.reject(
227219
makeCliError(ERROR_CODES.RPC_ERROR, {
228220
message: `RPC response id mismatch for method "${method}".`,
229221
details: {
@@ -244,7 +236,7 @@ export async function sendRpc(
244236
);
245237

246238
if (!resultResult.success) {
247-
rejectWithCliError(
239+
settlers.reject(
248240
makeCliError(ERROR_CODES.RPC_ERROR, {
249241
message: `RPC result failed validation for method "${method}".`,
250242
details: {
@@ -257,19 +249,19 @@ export async function sendRpc(
257249
return;
258250
}
259251

260-
resolveWithResult(resultResult.data);
252+
settlers.resolve(resultResult.data);
261253
return;
262254
}
263255

264-
resolveWithResult(response.result);
256+
settlers.resolve(response.result);
265257
return;
266258
}
267259

268-
rejectWithCliError(
260+
settlers.reject(
269261
toResponseCliError(response.error.code, response.error.message),
270262
);
271263
} catch (error) {
272-
rejectWithCliError(
264+
settlers.reject(
273265
makeCliError(ERROR_CODES.RPC_ERROR, {
274266
message: toErrorMessage(
275267
error,
@@ -290,7 +282,7 @@ export async function sendRpc(
290282
return;
291283
}
292284

293-
rejectWithCliError(
285+
settlers.reject(
294286
makeCliError(ERROR_CODES.RPC_ERROR, {
295287
message: `RPC connection closed before a complete response was received for method "${method}".`,
296288
details: {

src/host/rpcServer.ts

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ import {
1010
type RpcMethod,
1111
type RpcResponse,
1212
} from '../protocol/messages.js';
13-
import { createResourceScopedSettlers, makeAbortError } from '../util/abort.js';
13+
import {
14+
createResourceScopedSettlers,
15+
makeAbortReason,
16+
} from '../util/abort.js';
1417
import { invariant } from '../util/assert.js';
1518
import { ResourceScope } from '../util/resourceScope.js';
1619

@@ -20,6 +23,12 @@ const MAX_RPC_BUFFER_BYTES = 1_048_576;
2023
const SOCKET_LIVENESS_PROBE_TIMEOUT_MS = 1_000;
2124
const UNKNOWN_REQUEST_ID = 'unknown';
2225

26+
/**
27+
* Per-request context passed to RPC method handlers.
28+
*
29+
* `signal` aborts when the client socket closes, indicating the caller is no
30+
* longer waiting for a response.
31+
*/
2332
export interface MethodContext {
2433
readonly signal: AbortSignal;
2534
}
@@ -392,7 +401,9 @@ export class RpcServer {
392401
const requestScope = new ResourceScope();
393402
const requestAbortController = new AbortController();
394403
const abortRequest = (): void => {
395-
requestAbortController.abort(makeAbortError());
404+
requestAbortController.abort(
405+
makeAbortReason('RPC client socket closed.'),
406+
);
396407
};
397408
socket.once('close', abortRequest);
398409
requestScope.add('rpc request close listener', () => {
@@ -440,7 +451,11 @@ export class RpcServer {
440451
),
441452
);
442453
} finally {
443-
await requestScope.close();
454+
try {
455+
await requestScope.close();
456+
} catch (error) {
457+
console.debug('RPC request ResourceScope cleanup failed:', error);
458+
}
444459
}
445460
}
446461

src/host/runCompletionCoordinator.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@ export interface PreparedWaitedRun {
5050
marker: string;
5151
}
5252

53-
/** Registered completion state returned after `input_run` appends successfully. */
5453
export interface RunCompletionWaitOptions {
5554
readonly signal?: AbortSignal;
5655
}
5756

57+
/** Registered completion state returned after `input_run` appends successfully. */
5858
export interface RegisteredWaitedRunCompletion {
5959
postamble: string;
6060
sentinel: string;
@@ -291,7 +291,7 @@ export class RunCompletionCoordinator {
291291
operationName: 'run completion',
292292
operation: completionPromise,
293293
scope: new ResourceScope(),
294-
...(options.signal === undefined ? {} : { signal: options.signal }),
294+
signal: options.signal,
295295
timeoutMs,
296296
timeoutResult: () => {
297297
forgetWaiter();

0 commit comments

Comments
 (0)