Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 150 additions & 0 deletions src/__tests__/client-metro-companion-worker.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { spawn } from 'node:child_process';
import assert from 'node:assert/strict';
import crypto from 'node:crypto';
import fs from 'node:fs';
import http from 'node:http';
import os from 'node:os';
import path from 'node:path';
import type { Duplex } from 'node:stream';
import { setTimeout as delay } from 'node:timers/promises';
import { afterEach, test } from 'vitest';
Expand Down Expand Up @@ -488,3 +491,150 @@ test('metro companion worker reconnects after the bridge closes immediately afte

assert.equal(bridgeConnections, 2);
});

test('metro companion worker exits after its state file is removed', async () => {
const tempRoot = fs.mkdtempSync(path.join(os.tmpdir(), 'agent-device-metro-companion-worker-'));
const statePath = path.join(tempRoot, 'metro-companion.json');
fs.writeFileSync(statePath, '{}', 'utf8');
cleanupTasks.push(async () => {
fs.rmSync(tempRoot, { recursive: true, force: true });
});

const bridgeSocketReady = createDeferred<void>();
let bridgePort = 0;
let bridgeSocketRef: Duplex | null = null;

const localServer = http.createServer((_, res) => {
res.writeHead(404);
res.end('not found');
});
cleanupTasks.push(() => closeServer(localServer));
const localPort = await listen(localServer);

const bridgeServer = http.createServer((req, res) => {
const url = new URL(req.url || '/', 'http://127.0.0.1');
if (req.method === 'POST' && url.pathname === '/api/metro/companion/register') {
req.resume();
req.on('end', () => {
res.writeHead(200, { 'content-type': 'application/json' });
res.end(
JSON.stringify({
ok: true,
data: { ws_url: `ws://127.0.0.1:${bridgePort}/bridge` },
}),
);
});
return;
}
res.writeHead(404);
res.end('not found');
});
bridgeServer.on('upgrade', (req, socket) => {
if (req.url !== '/bridge') {
socket.destroy();
return;
}
bridgeSocketRef = socket;
const key = req.headers['sec-websocket-key'];
if (typeof key !== 'string') {
socket.destroy();
return;
}
const accept = crypto
.createHash('sha1')
.update(`${key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11`)
.digest('base64');
socket.write(
[
'HTTP/1.1 101 Switching Protocols',
'Upgrade: websocket',
'Connection: Upgrade',
`Sec-WebSocket-Accept: ${accept}`,
'\r\n',
].join('\r\n'),
);
bridgeSocketReady.resolve();
});
cleanupTasks.push(() => closeServer(bridgeServer));
cleanupTasks.push(async () => {
bridgeSocketRef?.destroy();
});
bridgePort = await listen(bridgeServer);

const companion = spawn(
process.execPath,
['--experimental-strip-types', 'src/metro-companion.ts', '--agent-device-run-metro-companion'],
{
cwd: process.cwd(),
env: {
...process.env,
AGENT_DEVICE_METRO_COMPANION_SERVER_BASE_URL: `http://127.0.0.1:${bridgePort}`,
AGENT_DEVICE_METRO_COMPANION_BEARER_TOKEN: 'test-token',
AGENT_DEVICE_METRO_COMPANION_LOCAL_BASE_URL: `http://127.0.0.1:${localPort}`,
AGENT_DEVICE_METRO_COMPANION_STATE_PATH: statePath,
},
stdio: ['ignore', 'pipe', 'pipe'],
},
);
cleanupTasks.push(() => stopChild(companion));

let stderr = '';
companion.stderr.on('data', (chunk) => {
stderr += chunk.toString();
});

await waitFor(bridgeSocketReady.promise, 5_000, 'bridge websocket connection');
fs.unlinkSync(statePath);

const exit = await waitFor(
new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve) => {
companion.once('exit', (code, signal) => resolve({ code, signal }));
}),
5_000,
'worker exit after state cleanup',
);

assert.equal(exit.signal, null, `unexpected worker stderr: ${stderr}`);
assert.equal(exit.code, 0, `unexpected worker stderr: ${stderr}`);
});

test('metro companion worker exits immediately when its state file is already missing', async () => {
const tempRoot = fs.mkdtempSync(path.join(os.tmpdir(), 'agent-device-metro-companion-worker-'));
const statePath = path.join(tempRoot, 'missing-metro-companion.json');
cleanupTasks.push(async () => {
fs.rmSync(tempRoot, { recursive: true, force: true });
});

const companion = spawn(
process.execPath,
['--experimental-strip-types', 'src/metro-companion.ts', '--agent-device-run-metro-companion'],
{
cwd: process.cwd(),
env: {
...process.env,
AGENT_DEVICE_METRO_COMPANION_SERVER_BASE_URL: 'http://127.0.0.1:1',
AGENT_DEVICE_METRO_COMPANION_BEARER_TOKEN: 'test-token',
AGENT_DEVICE_METRO_COMPANION_LOCAL_BASE_URL: 'http://127.0.0.1:1',
AGENT_DEVICE_METRO_COMPANION_STATE_PATH: statePath,
},
stdio: ['ignore', 'pipe', 'pipe'],
},
);
cleanupTasks.push(() => stopChild(companion));

let stderr = '';
companion.stderr.on('data', (chunk) => {
stderr += chunk.toString();
});

const exit = await waitFor(
new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve) => {
companion.once('exit', (code, signal) => resolve({ code, signal }));
}),
5_000,
'worker exit with missing state file',
);

assert.equal(exit.signal, null, `unexpected worker stderr: ${stderr}`);
assert.equal(exit.code, 0, `unexpected worker stderr: ${stderr}`);
});
6 changes: 6 additions & 0 deletions src/__tests__/client-metro-companion.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ test('companion ownership is profile-scoped and consumer-counted', async () => {
assert.notEqual(stagingFirst.statePath, prod.statePath);
assert.equal(vi.mocked(runCmdDetached).mock.calls.length, 2);
assertCompanionSpawnTarget();
assert.equal(fs.existsSync(stagingFirst.logPath), true);
assert.equal(fs.existsSync(prod.logPath), true);

const stagingState = JSON.parse(fs.readFileSync(stagingFirst.statePath, 'utf8')) as {
consumers: string[];
Expand Down Expand Up @@ -114,6 +116,8 @@ test('companion ownership is profile-scoped and consumer-counted', async () => {
assert.equal(finalStop.stopped, true);
assert.equal(killSpy.mock.calls.length, 1);
assert.deepEqual(killSpy.mock.calls[0], [111, 'SIGTERM']);
assert.equal(fs.existsSync(stagingFirst.statePath), false);
assert.equal(fs.existsSync(stagingFirst.logPath), false);

const prodStop = await stopMetroCompanion({
projectRoot,
Expand All @@ -123,6 +127,8 @@ test('companion ownership is profile-scoped and consumer-counted', async () => {
assert.equal(prodStop.stopped, true);
assert.equal(killSpy.mock.calls.length, 2);
assert.deepEqual(killSpy.mock.calls[1], [222, 'SIGTERM']);
assert.equal(fs.existsSync(prod.statePath), false);
assert.equal(fs.existsSync(prod.logPath), false);
} finally {
fs.rmSync(projectRoot, { recursive: true, force: true });
}
Expand Down
3 changes: 3 additions & 0 deletions src/client-metro-companion-contract.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
export const METRO_COMPANION_RUN_ARG = '--agent-device-run-metro-companion';
export const METRO_COMPANION_RECONNECT_DELAY_MS = 1_000;
export const METRO_COMPANION_LEASE_CHECK_INTERVAL_MS = 250;
export const WS_READY_STATE_OPEN = 1;

export const ENV_SERVER_BASE_URL = 'AGENT_DEVICE_METRO_COMPANION_SERVER_BASE_URL';
export const ENV_BEARER_TOKEN = 'AGENT_DEVICE_METRO_COMPANION_BEARER_TOKEN';
export const ENV_LOCAL_BASE_URL = 'AGENT_DEVICE_METRO_COMPANION_LOCAL_BASE_URL';
export const ENV_LAUNCH_URL = 'AGENT_DEVICE_METRO_COMPANION_LAUNCH_URL';
export const ENV_STATE_PATH = 'AGENT_DEVICE_METRO_COMPANION_STATE_PATH';

export type { MetroTunnelRequestMessage as MetroCompanionRequest } from './metro.ts';

Expand All @@ -14,4 +16,5 @@ export type CompanionOptions = {
bearerToken: string;
localBaseUrl: string;
launchUrl?: string;
statePath?: string;
};
34 changes: 32 additions & 2 deletions src/client-metro-companion-worker.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import fs from 'node:fs';
import { setTimeout as delay } from 'node:timers/promises';
import {
ENV_BEARER_TOKEN,
ENV_LAUNCH_URL,
ENV_LOCAL_BASE_URL,
ENV_SERVER_BASE_URL,
ENV_STATE_PATH,
METRO_COMPANION_LEASE_CHECK_INTERVAL_MS,
METRO_COMPANION_RECONNECT_DELAY_MS,
METRO_COMPANION_RUN_ARG,
WS_READY_STATE_OPEN,
Expand Down Expand Up @@ -74,6 +77,12 @@ function normalizeCloseCode(code: number | undefined): number {
return 1011;
}

function normalizeOutgoingCloseCode(code: number): number {
if (code === 1000) return code;
if (code >= 3000 && code <= 4999) return code;
return 3001;
}

function sendJson(socket: WebSocket, payload: object): void {
if (socket.readyState !== WS_READY_STATE_OPEN) return;
socket.send(JSON.stringify(payload));
Expand Down Expand Up @@ -126,12 +135,16 @@ async function waitForSocketShutdown(socket: WebSocket): Promise<void> {

function closeSocketQuietly(socket: WebSocket, code: number, reason: string): void {
try {
socket.close(code, reason);
socket.close(normalizeOutgoingCloseCode(code), reason);
} catch {
// ignore shutdown races
}
}

function shouldKeepWorkerRunning(options: CompanionOptions): boolean {
return !options.statePath || fs.existsSync(options.statePath);
}

async function handleBridgeMessage(
bridgeSocket: WebSocket,
message: MetroCompanionRequest,
Expand Down Expand Up @@ -254,7 +267,16 @@ async function handleBridgeMessage(

export async function runMetroCompanionWorker(options: CompanionOptions): Promise<void> {
const upstreamSockets = new Map<string, WebSocket>();
while (true) {
const lifetimeHandle = setInterval(() => {
if (!shouldKeepWorkerRunning(options)) {
// Node's built-in WebSocket client does not expose a force-close API. If the peer never
// answers the close handshake, a detached worker can linger indefinitely, so lease expiry
// uses a hard exit to guarantee teardown.
process.exit(0);
}
}, METRO_COMPANION_LEASE_CHECK_INTERVAL_MS);
lifetimeHandle.unref();
while (shouldKeepWorkerRunning(options)) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid lease check before first worker iteration

The new while (shouldKeepWorkerRunning(options)) gate can make a freshly spawned companion exit immediately on cold start, because ensureMetroCompanion still writes the lease/state file only after spawning the process (src/client-metro-companion.ts), so if the child reaches this condition first it sees no file and returns without ever registering. This introduces a startup race that can leave a dead PID in state and cause metro tunnel setup to fail intermittently until retried.

Useful? React with 👍 / 👎.

try {
const registration = await registerCompanion(options);
const bridgeSocket = new WebSocket(registration.wsUrl);
Expand All @@ -272,10 +294,17 @@ export async function runMetroCompanionWorker(options: CompanionOptions): Promis
upstreamSockets.forEach((socket) => closeSocketQuietly(socket, 1012, 'bridge disconnected'));
upstreamSockets.clear();
} catch (error) {
if (!shouldKeepWorkerRunning(options)) {
break;
}
console.error(error instanceof Error ? error.message : String(error));
}
if (!shouldKeepWorkerRunning(options)) {
break;
}
await delay(METRO_COMPANION_RECONNECT_DELAY_MS);
}
clearInterval(lifetimeHandle);
}

function readWorkerOptions(argv: string[], env: NodeJS.ProcessEnv): CompanionOptions | null {
Expand All @@ -291,6 +320,7 @@ function readWorkerOptions(argv: string[], env: NodeJS.ProcessEnv): CompanionOpt
bearerToken,
localBaseUrl,
launchUrl: env[ENV_LAUNCH_URL]?.trim() || undefined,
statePath: env[ENV_STATE_PATH]?.trim() || undefined,
};
}

Expand Down
41 changes: 38 additions & 3 deletions src/client-metro-companion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
ENV_LAUNCH_URL,
ENV_LOCAL_BASE_URL,
ENV_SERVER_BASE_URL,
ENV_STATE_PATH,
METRO_COMPANION_RUN_ARG,
} from './client-metro-companion-contract.ts';
import { normalizeBaseUrl } from './utils/url.ts';
Expand Down Expand Up @@ -129,6 +130,39 @@ function clearCompanionState(statePath: string): void {
}
}

function clearCompanionLog(logPath: string): void {
try {
fs.unlinkSync(logPath);
} catch {
// best effort cleanup
}
}

function removeDirectoryIfEmpty(dirPath: string): void {
try {
const entries = fs.readdirSync(dirPath);
if (entries.length === 0) {
fs.rmdirSync(dirPath);
}
} catch {
// best effort cleanup
}
}

function clearCompanionArtifacts(paths: { statePath: string; logPath: string }): void {
const stateDir = path.dirname(paths.statePath);
const logDir = path.dirname(paths.logPath);
clearCompanionState(paths.statePath);
clearCompanionLog(paths.logPath);
removeDirectoryIfEmpty(stateDir);
if (logDir !== stateDir) {
removeDirectoryIfEmpty(logDir);
}
if (path.basename(stateDir) === METRO_COMPANION_STATE_DIR) {
removeDirectoryIfEmpty(path.dirname(stateDir));
}
}

function isMetroCompanionCommand(command: string): boolean {
return command.includes(METRO_COMPANION_RUN_ARG);
}
Expand Down Expand Up @@ -214,6 +248,7 @@ function buildCompanionEnv(
[ENV_SERVER_BASE_URL]: normalizeBaseUrl(options.serverBaseUrl),
[ENV_BEARER_TOKEN]: options.bearerToken,
[ENV_LOCAL_BASE_URL]: normalizeBaseUrl(options.localBaseUrl),
[ENV_STATE_PATH]: resolveCompanionPaths(options.projectRoot, options.profileKey).statePath,
};
if (options.launchUrl?.trim()) {
nextEnv[ENV_LAUNCH_URL] = options.launchUrl.trim();
Expand Down Expand Up @@ -288,7 +323,7 @@ export async function ensureMetroCompanion(

if (existing) {
await stopCompanionProcess(existing);
clearCompanionState(paths.statePath);
clearCompanionArtifacts(paths);
}

const spawned = spawnCompanionProcess(options, paths.logPath);
Expand All @@ -308,7 +343,7 @@ export async function stopMetroCompanion(
const paths = resolveCompanionPaths(options.projectRoot, options.profileKey);
const existing = readCompanionState(paths.statePath);
if (!existing) {
clearCompanionState(paths.statePath);
clearCompanionArtifacts(paths);
return { stopped: false, statePath: paths.statePath };
}
const nextState = withoutConsumer(existing, consumerKey);
Expand All @@ -317,6 +352,6 @@ export async function stopMetroCompanion(
return { stopped: false, statePath: paths.statePath };
}
await stopCompanionProcess(existing);
clearCompanionState(paths.statePath);
clearCompanionArtifacts(paths);
return { stopped: true, statePath: paths.statePath };
}
Loading
Loading