Skip to content

Commit 897c25d

Browse files
authored
fix: clean up metro companion workers (#376)
* fix: clean up metro companion workers * fix: clarify metro companion lease shutdown * test: fix installed-package metro typecheck
1 parent 3c62a2b commit 897c25d

6 files changed

Lines changed: 357 additions & 116 deletions

src/__tests__/client-metro-companion-worker.test.ts

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import { spawn } from 'node:child_process';
22
import assert from 'node:assert/strict';
33
import crypto from 'node:crypto';
4+
import fs from 'node:fs';
45
import http from 'node:http';
6+
import os from 'node:os';
7+
import path from 'node:path';
58
import type { Duplex } from 'node:stream';
69
import { setTimeout as delay } from 'node:timers/promises';
710
import { afterEach, test } from 'vitest';
@@ -488,3 +491,150 @@ test('metro companion worker reconnects after the bridge closes immediately afte
488491

489492
assert.equal(bridgeConnections, 2);
490493
});
494+
495+
test('metro companion worker exits after its state file is removed', async () => {
496+
const tempRoot = fs.mkdtempSync(path.join(os.tmpdir(), 'agent-device-metro-companion-worker-'));
497+
const statePath = path.join(tempRoot, 'metro-companion.json');
498+
fs.writeFileSync(statePath, '{}', 'utf8');
499+
cleanupTasks.push(async () => {
500+
fs.rmSync(tempRoot, { recursive: true, force: true });
501+
});
502+
503+
const bridgeSocketReady = createDeferred<void>();
504+
let bridgePort = 0;
505+
let bridgeSocketRef: Duplex | null = null;
506+
507+
const localServer = http.createServer((_, res) => {
508+
res.writeHead(404);
509+
res.end('not found');
510+
});
511+
cleanupTasks.push(() => closeServer(localServer));
512+
const localPort = await listen(localServer);
513+
514+
const bridgeServer = http.createServer((req, res) => {
515+
const url = new URL(req.url || '/', 'http://127.0.0.1');
516+
if (req.method === 'POST' && url.pathname === '/api/metro/companion/register') {
517+
req.resume();
518+
req.on('end', () => {
519+
res.writeHead(200, { 'content-type': 'application/json' });
520+
res.end(
521+
JSON.stringify({
522+
ok: true,
523+
data: { ws_url: `ws://127.0.0.1:${bridgePort}/bridge` },
524+
}),
525+
);
526+
});
527+
return;
528+
}
529+
res.writeHead(404);
530+
res.end('not found');
531+
});
532+
bridgeServer.on('upgrade', (req, socket) => {
533+
if (req.url !== '/bridge') {
534+
socket.destroy();
535+
return;
536+
}
537+
bridgeSocketRef = socket;
538+
const key = req.headers['sec-websocket-key'];
539+
if (typeof key !== 'string') {
540+
socket.destroy();
541+
return;
542+
}
543+
const accept = crypto
544+
.createHash('sha1')
545+
.update(`${key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11`)
546+
.digest('base64');
547+
socket.write(
548+
[
549+
'HTTP/1.1 101 Switching Protocols',
550+
'Upgrade: websocket',
551+
'Connection: Upgrade',
552+
`Sec-WebSocket-Accept: ${accept}`,
553+
'\r\n',
554+
].join('\r\n'),
555+
);
556+
bridgeSocketReady.resolve();
557+
});
558+
cleanupTasks.push(() => closeServer(bridgeServer));
559+
cleanupTasks.push(async () => {
560+
bridgeSocketRef?.destroy();
561+
});
562+
bridgePort = await listen(bridgeServer);
563+
564+
const companion = spawn(
565+
process.execPath,
566+
['--experimental-strip-types', 'src/metro-companion.ts', '--agent-device-run-metro-companion'],
567+
{
568+
cwd: process.cwd(),
569+
env: {
570+
...process.env,
571+
AGENT_DEVICE_METRO_COMPANION_SERVER_BASE_URL: `http://127.0.0.1:${bridgePort}`,
572+
AGENT_DEVICE_METRO_COMPANION_BEARER_TOKEN: 'test-token',
573+
AGENT_DEVICE_METRO_COMPANION_LOCAL_BASE_URL: `http://127.0.0.1:${localPort}`,
574+
AGENT_DEVICE_METRO_COMPANION_STATE_PATH: statePath,
575+
},
576+
stdio: ['ignore', 'pipe', 'pipe'],
577+
},
578+
);
579+
cleanupTasks.push(() => stopChild(companion));
580+
581+
let stderr = '';
582+
companion.stderr.on('data', (chunk) => {
583+
stderr += chunk.toString();
584+
});
585+
586+
await waitFor(bridgeSocketReady.promise, 5_000, 'bridge websocket connection');
587+
fs.unlinkSync(statePath);
588+
589+
const exit = await waitFor(
590+
new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve) => {
591+
companion.once('exit', (code, signal) => resolve({ code, signal }));
592+
}),
593+
5_000,
594+
'worker exit after state cleanup',
595+
);
596+
597+
assert.equal(exit.signal, null, `unexpected worker stderr: ${stderr}`);
598+
assert.equal(exit.code, 0, `unexpected worker stderr: ${stderr}`);
599+
});
600+
601+
test('metro companion worker exits immediately when its state file is already missing', async () => {
602+
const tempRoot = fs.mkdtempSync(path.join(os.tmpdir(), 'agent-device-metro-companion-worker-'));
603+
const statePath = path.join(tempRoot, 'missing-metro-companion.json');
604+
cleanupTasks.push(async () => {
605+
fs.rmSync(tempRoot, { recursive: true, force: true });
606+
});
607+
608+
const companion = spawn(
609+
process.execPath,
610+
['--experimental-strip-types', 'src/metro-companion.ts', '--agent-device-run-metro-companion'],
611+
{
612+
cwd: process.cwd(),
613+
env: {
614+
...process.env,
615+
AGENT_DEVICE_METRO_COMPANION_SERVER_BASE_URL: 'http://127.0.0.1:1',
616+
AGENT_DEVICE_METRO_COMPANION_BEARER_TOKEN: 'test-token',
617+
AGENT_DEVICE_METRO_COMPANION_LOCAL_BASE_URL: 'http://127.0.0.1:1',
618+
AGENT_DEVICE_METRO_COMPANION_STATE_PATH: statePath,
619+
},
620+
stdio: ['ignore', 'pipe', 'pipe'],
621+
},
622+
);
623+
cleanupTasks.push(() => stopChild(companion));
624+
625+
let stderr = '';
626+
companion.stderr.on('data', (chunk) => {
627+
stderr += chunk.toString();
628+
});
629+
630+
const exit = await waitFor(
631+
new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve) => {
632+
companion.once('exit', (code, signal) => resolve({ code, signal }));
633+
}),
634+
5_000,
635+
'worker exit with missing state file',
636+
);
637+
638+
assert.equal(exit.signal, null, `unexpected worker stderr: ${stderr}`);
639+
assert.equal(exit.code, 0, `unexpected worker stderr: ${stderr}`);
640+
});

src/__tests__/client-metro-companion.test.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ test('companion ownership is profile-scoped and consumer-counted', async () => {
8787
assert.notEqual(stagingFirst.statePath, prod.statePath);
8888
assert.equal(vi.mocked(runCmdDetached).mock.calls.length, 2);
8989
assertCompanionSpawnTarget();
90+
assert.equal(fs.existsSync(stagingFirst.logPath), true);
91+
assert.equal(fs.existsSync(prod.logPath), true);
9092

9193
const stagingState = JSON.parse(fs.readFileSync(stagingFirst.statePath, 'utf8')) as {
9294
consumers: string[];
@@ -114,6 +116,8 @@ test('companion ownership is profile-scoped and consumer-counted', async () => {
114116
assert.equal(finalStop.stopped, true);
115117
assert.equal(killSpy.mock.calls.length, 1);
116118
assert.deepEqual(killSpy.mock.calls[0], [111, 'SIGTERM']);
119+
assert.equal(fs.existsSync(stagingFirst.statePath), false);
120+
assert.equal(fs.existsSync(stagingFirst.logPath), false);
117121

118122
const prodStop = await stopMetroCompanion({
119123
projectRoot,
@@ -123,6 +127,8 @@ test('companion ownership is profile-scoped and consumer-counted', async () => {
123127
assert.equal(prodStop.stopped, true);
124128
assert.equal(killSpy.mock.calls.length, 2);
125129
assert.deepEqual(killSpy.mock.calls[1], [222, 'SIGTERM']);
130+
assert.equal(fs.existsSync(prod.statePath), false);
131+
assert.equal(fs.existsSync(prod.logPath), false);
126132
} finally {
127133
fs.rmSync(projectRoot, { recursive: true, force: true });
128134
}

src/client-metro-companion-contract.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
export const METRO_COMPANION_RUN_ARG = '--agent-device-run-metro-companion';
22
export const METRO_COMPANION_RECONNECT_DELAY_MS = 1_000;
3+
export const METRO_COMPANION_LEASE_CHECK_INTERVAL_MS = 250;
34
export const WS_READY_STATE_OPEN = 1;
45

56
export const ENV_SERVER_BASE_URL = 'AGENT_DEVICE_METRO_COMPANION_SERVER_BASE_URL';
67
export const ENV_BEARER_TOKEN = 'AGENT_DEVICE_METRO_COMPANION_BEARER_TOKEN';
78
export const ENV_LOCAL_BASE_URL = 'AGENT_DEVICE_METRO_COMPANION_LOCAL_BASE_URL';
89
export const ENV_LAUNCH_URL = 'AGENT_DEVICE_METRO_COMPANION_LAUNCH_URL';
10+
export const ENV_STATE_PATH = 'AGENT_DEVICE_METRO_COMPANION_STATE_PATH';
911

1012
export type { MetroTunnelRequestMessage as MetroCompanionRequest } from './metro.ts';
1113

@@ -14,4 +16,5 @@ export type CompanionOptions = {
1416
bearerToken: string;
1517
localBaseUrl: string;
1618
launchUrl?: string;
19+
statePath?: string;
1720
};

src/client-metro-companion-worker.ts

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
import fs from 'node:fs';
12
import { setTimeout as delay } from 'node:timers/promises';
23
import {
34
ENV_BEARER_TOKEN,
45
ENV_LAUNCH_URL,
56
ENV_LOCAL_BASE_URL,
67
ENV_SERVER_BASE_URL,
8+
ENV_STATE_PATH,
9+
METRO_COMPANION_LEASE_CHECK_INTERVAL_MS,
710
METRO_COMPANION_RECONNECT_DELAY_MS,
811
METRO_COMPANION_RUN_ARG,
912
WS_READY_STATE_OPEN,
@@ -74,6 +77,12 @@ function normalizeCloseCode(code: number | undefined): number {
7477
return 1011;
7578
}
7679

80+
function normalizeOutgoingCloseCode(code: number): number {
81+
if (code === 1000) return code;
82+
if (code >= 3000 && code <= 4999) return code;
83+
return 3001;
84+
}
85+
7786
function sendJson(socket: WebSocket, payload: object): void {
7887
if (socket.readyState !== WS_READY_STATE_OPEN) return;
7988
socket.send(JSON.stringify(payload));
@@ -126,12 +135,16 @@ async function waitForSocketShutdown(socket: WebSocket): Promise<void> {
126135

127136
function closeSocketQuietly(socket: WebSocket, code: number, reason: string): void {
128137
try {
129-
socket.close(code, reason);
138+
socket.close(normalizeOutgoingCloseCode(code), reason);
130139
} catch {
131140
// ignore shutdown races
132141
}
133142
}
134143

144+
function shouldKeepWorkerRunning(options: CompanionOptions): boolean {
145+
return !options.statePath || fs.existsSync(options.statePath);
146+
}
147+
135148
async function handleBridgeMessage(
136149
bridgeSocket: WebSocket,
137150
message: MetroCompanionRequest,
@@ -254,7 +267,16 @@ async function handleBridgeMessage(
254267

255268
export async function runMetroCompanionWorker(options: CompanionOptions): Promise<void> {
256269
const upstreamSockets = new Map<string, WebSocket>();
257-
while (true) {
270+
const lifetimeHandle = setInterval(() => {
271+
if (!shouldKeepWorkerRunning(options)) {
272+
// Node's built-in WebSocket client does not expose a force-close API. If the peer never
273+
// answers the close handshake, a detached worker can linger indefinitely, so lease expiry
274+
// uses a hard exit to guarantee teardown.
275+
process.exit(0);
276+
}
277+
}, METRO_COMPANION_LEASE_CHECK_INTERVAL_MS);
278+
lifetimeHandle.unref();
279+
while (shouldKeepWorkerRunning(options)) {
258280
try {
259281
const registration = await registerCompanion(options);
260282
const bridgeSocket = new WebSocket(registration.wsUrl);
@@ -272,10 +294,17 @@ export async function runMetroCompanionWorker(options: CompanionOptions): Promis
272294
upstreamSockets.forEach((socket) => closeSocketQuietly(socket, 1012, 'bridge disconnected'));
273295
upstreamSockets.clear();
274296
} catch (error) {
297+
if (!shouldKeepWorkerRunning(options)) {
298+
break;
299+
}
275300
console.error(error instanceof Error ? error.message : String(error));
276301
}
302+
if (!shouldKeepWorkerRunning(options)) {
303+
break;
304+
}
277305
await delay(METRO_COMPANION_RECONNECT_DELAY_MS);
278306
}
307+
clearInterval(lifetimeHandle);
279308
}
280309

281310
function readWorkerOptions(argv: string[], env: NodeJS.ProcessEnv): CompanionOptions | null {
@@ -291,6 +320,7 @@ function readWorkerOptions(argv: string[], env: NodeJS.ProcessEnv): CompanionOpt
291320
bearerToken,
292321
localBaseUrl,
293322
launchUrl: env[ENV_LAUNCH_URL]?.trim() || undefined,
323+
statePath: env[ENV_STATE_PATH]?.trim() || undefined,
294324
};
295325
}
296326

src/client-metro-companion.ts

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
ENV_LAUNCH_URL,
88
ENV_LOCAL_BASE_URL,
99
ENV_SERVER_BASE_URL,
10+
ENV_STATE_PATH,
1011
METRO_COMPANION_RUN_ARG,
1112
} from './client-metro-companion-contract.ts';
1213
import { normalizeBaseUrl } from './utils/url.ts';
@@ -129,6 +130,39 @@ function clearCompanionState(statePath: string): void {
129130
}
130131
}
131132

133+
function clearCompanionLog(logPath: string): void {
134+
try {
135+
fs.unlinkSync(logPath);
136+
} catch {
137+
// best effort cleanup
138+
}
139+
}
140+
141+
function removeDirectoryIfEmpty(dirPath: string): void {
142+
try {
143+
const entries = fs.readdirSync(dirPath);
144+
if (entries.length === 0) {
145+
fs.rmdirSync(dirPath);
146+
}
147+
} catch {
148+
// best effort cleanup
149+
}
150+
}
151+
152+
function clearCompanionArtifacts(paths: { statePath: string; logPath: string }): void {
153+
const stateDir = path.dirname(paths.statePath);
154+
const logDir = path.dirname(paths.logPath);
155+
clearCompanionState(paths.statePath);
156+
clearCompanionLog(paths.logPath);
157+
removeDirectoryIfEmpty(stateDir);
158+
if (logDir !== stateDir) {
159+
removeDirectoryIfEmpty(logDir);
160+
}
161+
if (path.basename(stateDir) === METRO_COMPANION_STATE_DIR) {
162+
removeDirectoryIfEmpty(path.dirname(stateDir));
163+
}
164+
}
165+
132166
function isMetroCompanionCommand(command: string): boolean {
133167
return command.includes(METRO_COMPANION_RUN_ARG);
134168
}
@@ -214,6 +248,7 @@ function buildCompanionEnv(
214248
[ENV_SERVER_BASE_URL]: normalizeBaseUrl(options.serverBaseUrl),
215249
[ENV_BEARER_TOKEN]: options.bearerToken,
216250
[ENV_LOCAL_BASE_URL]: normalizeBaseUrl(options.localBaseUrl),
251+
[ENV_STATE_PATH]: resolveCompanionPaths(options.projectRoot, options.profileKey).statePath,
217252
};
218253
if (options.launchUrl?.trim()) {
219254
nextEnv[ENV_LAUNCH_URL] = options.launchUrl.trim();
@@ -288,7 +323,7 @@ export async function ensureMetroCompanion(
288323

289324
if (existing) {
290325
await stopCompanionProcess(existing);
291-
clearCompanionState(paths.statePath);
326+
clearCompanionArtifacts(paths);
292327
}
293328

294329
const spawned = spawnCompanionProcess(options, paths.logPath);
@@ -308,7 +343,7 @@ export async function stopMetroCompanion(
308343
const paths = resolveCompanionPaths(options.projectRoot, options.profileKey);
309344
const existing = readCompanionState(paths.statePath);
310345
if (!existing) {
311-
clearCompanionState(paths.statePath);
346+
clearCompanionArtifacts(paths);
312347
return { stopped: false, statePath: paths.statePath };
313348
}
314349
const nextState = withoutConsumer(existing, consumerKey);
@@ -317,6 +352,6 @@ export async function stopMetroCompanion(
317352
return { stopped: false, statePath: paths.statePath };
318353
}
319354
await stopCompanionProcess(existing);
320-
clearCompanionState(paths.statePath);
355+
clearCompanionArtifacts(paths);
321356
return { stopped: true, statePath: paths.statePath };
322357
}

0 commit comments

Comments
 (0)