Skip to content

Commit 1da0a0e

Browse files
authored
fix: thread abort signals through host waits (#94)
* fix: thread abort signals through host waits * test: cover abort signal cleanup paths * fix: guard scope cleanup and abort reasons
1 parent 416956a commit 1da0a0e

10 files changed

Lines changed: 986 additions & 164 deletions

File tree

src/host/hostMain.ts

Lines changed: 129 additions & 83 deletions
Large diffs are not rendered by default.

src/host/lifecycle.ts

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,41 @@ import {
2525
sessionDir,
2626
socketPath,
2727
} from '../storage/sessionPaths.js';
28+
import { makeAbortError, throwIfAborted } from '../util/abort.js';
2829
import { invariant } from '../util/assert.js';
2930
import { sendRpc } from './rpcClient.js';
3031

3132
const DESTROY_POLL_INTERVAL_MS = 100;
3233
const DESTROY_MAX_ATTEMPTS = 50;
3334

35+
interface PollOptions {
36+
readonly signal?: AbortSignal;
37+
}
38+
39+
function pollOptions(signal?: AbortSignal): PollOptions {
40+
return signal === undefined ? {} : { signal };
41+
}
42+
43+
function delayOptions(
44+
signal?: AbortSignal,
45+
): { signal: AbortSignal } | undefined {
46+
return signal === undefined ? undefined : { signal };
47+
}
48+
49+
async function pollDelay(
50+
intervalMs: number,
51+
signal?: AbortSignal,
52+
): Promise<void> {
53+
try {
54+
await delay(intervalMs, undefined, delayOptions(signal));
55+
} catch (error) {
56+
if (signal?.aborted === true) {
57+
throw makeAbortError(signal);
58+
}
59+
throw error;
60+
}
61+
}
62+
3463
interface NodeError extends Error {
3564
code?: string;
3665
}
@@ -246,6 +275,7 @@ async function waitForTerminalManifest(
246275
manifestFile: string,
247276
maxAttempts: number = DESTROY_MAX_ATTEMPTS,
248277
intervalMs: number = DESTROY_POLL_INTERVAL_MS,
278+
options: PollOptions = {},
249279
): Promise<SessionRecord | null> {
250280
invariant(
251281
Number.isInteger(maxAttempts) && maxAttempts > 0,
@@ -256,15 +286,19 @@ async function waitForTerminalManifest(
256286
'intervalMs must be a non-negative integer',
257287
);
258288

289+
const { signal } = options;
290+
throwIfAborted(signal);
291+
259292
for (let attempt = 0; attempt < maxAttempts; attempt += 1) {
293+
throwIfAborted(signal);
260294
const manifest = await readManifest(manifestFile);
261295

262296
if (isTerminalSessionStatus(manifest.status)) {
263297
return manifest;
264298
}
265299

266300
if (attempt + 1 < maxAttempts) {
267-
await delay(intervalMs);
301+
await pollDelay(intervalMs, signal);
268302
}
269303
}
270304

@@ -277,6 +311,7 @@ async function waitForProcessAndSocketShutdown(
277311
socketFile: string,
278312
maxAttempts: number = DESTROY_MAX_ATTEMPTS,
279313
intervalMs: number = DESTROY_POLL_INTERVAL_MS,
314+
options: PollOptions = {},
280315
): Promise<boolean> {
281316
invariant(
282317
Number.isInteger(maxAttempts) && maxAttempts > 0,
@@ -287,7 +322,11 @@ async function waitForProcessAndSocketShutdown(
287322
'intervalMs must be a non-negative integer',
288323
);
289324

325+
const { signal } = options;
326+
throwIfAborted(signal);
327+
290328
for (let attempt = 0; attempt < maxAttempts; attempt += 1) {
329+
throwIfAborted(signal);
291330
const hostAlive = isProcessAlive(hostPid);
292331
const childAlive = isProcessAlive(childPid);
293332
const socketPresent = await pathExists(socketFile);
@@ -297,7 +336,7 @@ async function waitForProcessAndSocketShutdown(
297336
}
298337

299338
if (attempt + 1 < maxAttempts) {
300-
await delay(intervalMs);
339+
await pollDelay(intervalMs, signal);
301340
}
302341
}
303342

@@ -458,10 +497,17 @@ export function launchHost(config: LaunchHostConfig): number {
458497
return child.pid;
459498
}
460499

500+
export interface DestroySessionOptions {
501+
readonly signal?: AbortSignal;
502+
}
503+
461504
export async function destroySession(
462505
sessionId: string,
463506
force?: boolean,
507+
options: DestroySessionOptions = {},
464508
): Promise<void> {
509+
const { signal } = options;
510+
throwIfAborted(signal);
465511
const { sessionDirectory, manifestFile, socketFile } =
466512
getSessionPaths(sessionId);
467513
const manifest = await readSessionManifestOrThrow(sessionId, manifestFile);
@@ -493,6 +539,9 @@ export async function destroySession(
493539
manifest.hostPid,
494540
manifest.childPid,
495541
socketFile,
542+
DESTROY_MAX_ATTEMPTS,
543+
DESTROY_POLL_INTERVAL_MS,
544+
pollOptions(signal),
496545
);
497546
await reconcileSession(sessionDirectory);
498547

@@ -514,7 +563,8 @@ export async function destroySession(
514563
}
515564

516565
try {
517-
await sendRpc(socketFile, 'destroy');
566+
throwIfAborted(signal);
567+
await sendRpc(socketFile, 'destroy', undefined, undefined, signal);
518568
} catch (error) {
519569
if (
520570
!(error instanceof CliError) ||
@@ -535,7 +585,12 @@ export async function destroySession(
535585
throw error;
536586
}
537587

538-
const terminalManifest = await waitForTerminalManifest(manifestFile);
588+
const terminalManifest = await waitForTerminalManifest(
589+
manifestFile,
590+
DESTROY_MAX_ATTEMPTS,
591+
DESTROY_POLL_INTERVAL_MS,
592+
pollOptions(signal),
593+
);
539594
if (terminalManifest !== null) {
540595
return;
541596
}

src/host/rpcClient.ts

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,14 @@ import {
1313
RpcResponseSchema,
1414
type RpcMethod,
1515
} from '../protocol/messages.js';
16+
import {
17+
addAbortListener,
18+
createResourceScopedSettlers,
19+
makeAbortError,
20+
throwIfAborted,
21+
} from '../util/abort.js';
1622
import { invariant } from '../util/assert.js';
23+
import { ResourceScope } from '../util/resourceScope.js';
1724

1825
const DEFAULT_TIMEOUT_MS = 5_000;
1926
const MAX_RPC_BUFFER_BYTES = 1_048_576;
@@ -95,7 +102,9 @@ export async function sendRpc(
95102
method: string,
96103
params?: Record<string, unknown>,
97104
timeoutMs?: number,
105+
signal?: AbortSignal,
98106
): Promise<unknown> {
107+
throwIfAborted(signal);
99108
const effectiveTimeoutMs = timeoutMs ?? DEFAULT_TIMEOUT_MS;
100109
invariant(
101110
Number.isFinite(effectiveTimeoutMs) && effectiveTimeoutMs >= 0,
@@ -116,35 +125,25 @@ export async function sendRpc(
116125

117126
return await new Promise<unknown>((resolve, reject) => {
118127
const socket = net.connect({ path: socketPath });
119-
let settled = false;
128+
const scope = new ResourceScope();
129+
scope.add('rpc client socket', () => {
130+
socket.destroy();
131+
});
132+
const settlers = createResourceScopedSettlers(scope, resolve, reject);
120133
let responseHandled = false;
121134
let buffer = '';
122135

123-
const rejectWithCliError = (error: CliError): void => {
124-
if (settled) {
125-
return;
126-
}
127-
128-
settled = true;
129-
socket.destroy();
130-
reject(error);
131-
};
132-
133136
const rejectWithTransportError = (error: unknown): void => {
134-
rejectWithCliError(
137+
settlers.reject(
135138
toTransportCliError(error, socketPath, method, effectiveTimeoutMs),
136139
);
137140
};
138141

139-
const resolveWithResult = (result: unknown): void => {
140-
if (settled) {
141-
return;
142-
}
143-
144-
settled = true;
145-
socket.destroy();
146-
resolve(result);
147-
};
142+
if (signal !== undefined) {
143+
addAbortListener(scope, 'rpc client abort listener', signal, () => {
144+
settlers.reject(makeAbortError(signal));
145+
});
146+
}
148147

149148
socket.setEncoding('utf8');
150149
socket.setTimeout(effectiveTimeoutMs);
@@ -154,7 +153,7 @@ export async function sendRpc(
154153
});
155154

156155
socket.on('timeout', () => {
157-
rejectWithCliError(
156+
settlers.reject(
158157
makeCliError(ERROR_CODES.HOST_TIMEOUT, {
159158
message: `RPC request timed out after ${String(effectiveTimeoutMs)}ms.`,
160159
details: {
@@ -176,7 +175,7 @@ export async function sendRpc(
176175
}
177176

178177
if (buffer.length + chunk.length > MAX_RPC_BUFFER_BYTES) {
179-
rejectWithCliError(
178+
settlers.reject(
180179
makeCliError(ERROR_CODES.RPC_ERROR, {
181180
message: 'RPC response exceeds maximum buffer size.',
182181
details: { method, socketPath },
@@ -200,7 +199,7 @@ export async function sendRpc(
200199
const responseResult = RpcResponseSchema.safeParse(rawResponse);
201200

202201
if (!responseResult.success) {
203-
rejectWithCliError(
202+
settlers.reject(
204203
makeCliError(ERROR_CODES.RPC_ERROR, {
205204
message: 'RPC response failed schema validation.',
206205
details: {
@@ -216,7 +215,7 @@ export async function sendRpc(
216215
const response = responseResult.data;
217216

218217
if (response.id !== request.id) {
219-
rejectWithCliError(
218+
settlers.reject(
220219
makeCliError(ERROR_CODES.RPC_ERROR, {
221220
message: `RPC response id mismatch for method "${method}".`,
222221
details: {
@@ -237,7 +236,7 @@ export async function sendRpc(
237236
);
238237

239238
if (!resultResult.success) {
240-
rejectWithCliError(
239+
settlers.reject(
241240
makeCliError(ERROR_CODES.RPC_ERROR, {
242241
message: `RPC result failed validation for method "${method}".`,
243242
details: {
@@ -250,19 +249,19 @@ export async function sendRpc(
250249
return;
251250
}
252251

253-
resolveWithResult(resultResult.data);
252+
settlers.resolve(resultResult.data);
254253
return;
255254
}
256255

257-
resolveWithResult(response.result);
256+
settlers.resolve(response.result);
258257
return;
259258
}
260259

261-
rejectWithCliError(
260+
settlers.reject(
262261
toResponseCliError(response.error.code, response.error.message),
263262
);
264263
} catch (error) {
265-
rejectWithCliError(
264+
settlers.reject(
266265
makeCliError(ERROR_CODES.RPC_ERROR, {
267266
message: toErrorMessage(
268267
error,
@@ -279,11 +278,11 @@ export async function sendRpc(
279278
});
280279

281280
socket.on('end', () => {
282-
if (settled || responseHandled) {
281+
if (settlers.isSettled() || responseHandled) {
283282
return;
284283
}
285284

286-
rejectWithCliError(
285+
settlers.reject(
287286
makeCliError(ERROR_CODES.RPC_ERROR, {
288287
message: `RPC connection closed before a complete response was received for method "${method}".`,
289288
details: {

0 commit comments

Comments
 (0)