Skip to content

Commit b681cd4

Browse files
committed
feat: stream replay test progress
1 parent 7a2428e commit b681cd4

14 files changed

Lines changed: 575 additions & 28 deletions

src/__tests__/cli-network.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ test('test command prints suite summary and exits non-zero on failures', async (
103103

104104
assert.equal(result.code, 1);
105105
assert.equal(result.calls.length, 1);
106+
assert.equal(result.calls[0]?.meta?.requestProgress, 'replay-test');
106107
assert.match(result.stderr, /Running replay suite\.\.\./);
107108
assert.doesNotMatch(result.stdout, /PASS \/tmp\/01-pass\.ad/);
108109
assert.match(result.stdout, /FAIL \/tmp\/02-fail\.ad after 2 attempts \(5ms\)/);

src/cli.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,11 @@ export async function runCli(argv: string[], deps: CliDeps = DEFAULT_CLI_DEPS):
288288
? startDaemonLogTail(daemonPaths.logPath)
289289
: null;
290290
const client = createAgentDeviceClient(buildClientConfig(effectiveFlags, resolvedRuntime), {
291-
transport: deps.sendToDaemon as AgentDeviceDaemonTransport,
291+
transport: createCliDaemonTransport({
292+
command,
293+
flags: effectiveFlags,
294+
transport: deps.sendToDaemon as AgentDeviceDaemonTransport,
295+
}),
292296
});
293297
if (command === 'batch') {
294298
if (!parsedBatchSteps) {
@@ -479,6 +483,23 @@ function hasExplicitMetroRuntimeOverrides(explicitFlagKeys: Set<FlagKey>): boole
479483
return false;
480484
}
481485

486+
function createCliDaemonTransport(options: {
487+
command: string;
488+
flags: CliFlags;
489+
transport: AgentDeviceDaemonTransport;
490+
}): AgentDeviceDaemonTransport {
491+
const { command, flags, transport } = options;
492+
if (command !== 'test' || flags.json) return transport;
493+
return async (req) =>
494+
await transport({
495+
...req,
496+
meta: {
497+
...req.meta,
498+
requestProgress: 'replay-test',
499+
},
500+
});
501+
}
502+
482503
function guessSessionFromArgv(argv: string[]): string | null {
483504
for (let i = 0; i < argv.length; i += 1) {
484505
const token = argv[i]!;

src/contracts.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ export type DaemonRequestMeta = {
5757
materializationId?: string;
5858
lockPolicy?: DaemonLockPolicy;
5959
lockPlatform?: 'ios' | 'macos' | 'android' | 'linux' | 'apple';
60+
requestProgress?: 'replay-test';
6061
};
6162

6263
export type DaemonRequest = {

src/daemon-client-progress.ts

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import http from 'node:http';
2+
import { AppError } from './utils/errors.ts';
3+
import type { DaemonRequest } from './daemon/types.ts';
4+
import type { RequestProgressEvent } from './daemon/request-progress.ts';
5+
import {
6+
formatRequestProgressEvent,
7+
isDaemonProgressEnvelope,
8+
isDaemonRpcResponseEnvelope,
9+
shouldStreamRequestProgress,
10+
} from './daemon/request-progress-protocol.ts';
11+
12+
export function writeRequestProgressEvent(event: RequestProgressEvent): void {
13+
const line = formatRequestProgressEvent(event);
14+
if (line) process.stderr.write(`${line}\n`);
15+
}
16+
17+
export function shouldReadDaemonProgressStream(
18+
req: DaemonRequest,
19+
contentType: string | string[] | undefined,
20+
): boolean {
21+
return (
22+
shouldStreamRequestProgress(req) &&
23+
String(Array.isArray(contentType) ? contentType.join(',') : (contentType ?? '')).includes(
24+
'application/x-ndjson',
25+
)
26+
);
27+
}
28+
29+
export function readDaemonHttpProgressResponse(
30+
res: http.IncomingMessage,
31+
options: {
32+
req: DaemonRequest;
33+
handleResponseBody: (body: string) => void;
34+
reject: (error: unknown) => void;
35+
clearTimeout: () => void;
36+
},
37+
): void {
38+
const { req, handleResponseBody, reject, clearTimeout } = options;
39+
let buffer = '';
40+
let settled = false;
41+
const rejectInvalidLine = (line: string, error: unknown) => {
42+
settled = true;
43+
clearTimeout();
44+
reject(
45+
new AppError(
46+
'COMMAND_FAILED',
47+
'Invalid daemon response',
48+
{
49+
requestId: req.meta?.requestId,
50+
line,
51+
},
52+
error instanceof Error ? error : undefined,
53+
),
54+
);
55+
};
56+
57+
const handleLine = (line: string): boolean => {
58+
try {
59+
const message = JSON.parse(line) as unknown;
60+
if (isDaemonProgressEnvelope(message)) {
61+
writeRequestProgressEvent(message.event);
62+
return false;
63+
}
64+
if (isDaemonRpcResponseEnvelope(message)) {
65+
settled = true;
66+
clearTimeout();
67+
handleResponseBody(JSON.stringify(message.response));
68+
return true;
69+
}
70+
throw new Error('Missing daemon progress response envelope');
71+
} catch (error) {
72+
rejectInvalidLine(line, error);
73+
return true;
74+
}
75+
};
76+
77+
res.setEncoding('utf8');
78+
res.on('data', (chunk) => {
79+
if (settled) return;
80+
buffer += chunk;
81+
let idx = buffer.indexOf('\n');
82+
while (idx !== -1) {
83+
const line = buffer.slice(0, idx).trim();
84+
buffer = buffer.slice(idx + 1);
85+
if (line && handleLine(line)) return;
86+
idx = buffer.indexOf('\n');
87+
}
88+
});
89+
res.on('end', () => {
90+
if (settled) return;
91+
const line = buffer.trim();
92+
if (line && handleLine(line)) return;
93+
settled = true;
94+
clearTimeout();
95+
reject(
96+
new AppError('COMMAND_FAILED', 'Invalid daemon response', {
97+
requestId: req.meta?.requestId,
98+
line,
99+
}),
100+
);
101+
});
102+
res.on('error', (error) => {
103+
if (settled) return;
104+
settled = true;
105+
clearTimeout();
106+
reject(
107+
new AppError(
108+
'COMMAND_FAILED',
109+
'Failed to read daemon response',
110+
{ requestId: req.meta?.requestId },
111+
error instanceof Error ? error : undefined,
112+
),
113+
);
114+
});
115+
}

src/daemon-client.ts

Lines changed: 56 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,15 @@ import {
2828
import { uploadArtifact } from './upload-client.ts';
2929
import { computeDaemonCodeSignature } from './daemon/code-signature.ts';
3030
import { PUBLIC_COMMANDS } from './command-catalog.ts';
31+
import {
32+
readDaemonHttpProgressResponse,
33+
shouldReadDaemonProgressStream,
34+
writeRequestProgressEvent,
35+
} from './daemon-client-progress.ts';
36+
import {
37+
isDaemonProgressEnvelope,
38+
isDaemonResponseEnvelope,
39+
} from './daemon/request-progress-protocol.ts';
3140
export { computeDaemonCodeSignature } from './daemon/code-signature.ts';
3241
export type DaemonRequest = SharedDaemonRequest;
3342
export type DaemonResponse = SharedDaemonResponse;
@@ -1152,28 +1161,41 @@ async function sendSocketRequest(
11521161
socket.setEncoding('utf8');
11531162
socket.on('data', (chunk) => {
11541163
buffer += chunk;
1155-
const idx = buffer.indexOf('\n');
1156-
if (idx === -1) return;
1157-
const line = buffer.slice(0, idx).trim();
1158-
if (!line) return;
1159-
try {
1160-
const response = JSON.parse(line) as DaemonResponse;
1161-
socket.end();
1162-
if (timeoutHandle) clearTimeout(timeoutHandle);
1163-
resolve(response);
1164-
} catch (err) {
1165-
if (timeoutHandle) clearTimeout(timeoutHandle);
1166-
reject(
1167-
new AppError(
1168-
'COMMAND_FAILED',
1169-
'Invalid daemon response',
1170-
{
1171-
requestId: req.meta?.requestId,
1172-
line,
1173-
},
1174-
err instanceof Error ? err : undefined,
1175-
),
1176-
);
1164+
let idx = buffer.indexOf('\n');
1165+
while (idx !== -1) {
1166+
const line = buffer.slice(0, idx).trim();
1167+
buffer = buffer.slice(idx + 1);
1168+
if (!line) {
1169+
idx = buffer.indexOf('\n');
1170+
continue;
1171+
}
1172+
try {
1173+
const message = JSON.parse(line) as unknown;
1174+
if (isDaemonProgressEnvelope(message)) {
1175+
writeRequestProgressEvent(message.event);
1176+
idx = buffer.indexOf('\n');
1177+
continue;
1178+
}
1179+
const response = isDaemonResponseEnvelope(message) ? message.response : message;
1180+
socket.end();
1181+
if (timeoutHandle) clearTimeout(timeoutHandle);
1182+
resolve(response as DaemonResponse);
1183+
return;
1184+
} catch (err) {
1185+
if (timeoutHandle) clearTimeout(timeoutHandle);
1186+
reject(
1187+
new AppError(
1188+
'COMMAND_FAILED',
1189+
'Invalid daemon response',
1190+
{
1191+
requestId: req.meta?.requestId,
1192+
line,
1193+
},
1194+
err instanceof Error ? err : undefined,
1195+
),
1196+
);
1197+
return;
1198+
}
11771199
}
11781200
});
11791201

@@ -1220,6 +1242,18 @@ async function sendHttpRequest(
12201242
headers,
12211243
},
12221244
(res) => {
1245+
if (shouldReadDaemonProgressStream(req, res.headers?.['content-type'])) {
1246+
readDaemonHttpProgressResponse(res, {
1247+
req,
1248+
reject,
1249+
clearTimeout: () => {
1250+
if (timeoutHandle) clearTimeout(timeoutHandle);
1251+
},
1252+
handleResponseBody: (body) =>
1253+
handleDaemonHttpResponseBody(body, { info, req, resolve, reject }),
1254+
});
1255+
return;
1256+
}
12231257
void readNodeHttpResponseBody(res)
12241258
.then((body) => {
12251259
if (timeoutHandle) clearTimeout(timeoutHandle);

src/daemon/handlers/__tests__/session-test-suite.test.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import path from 'node:path';
55
import { handleSessionCommands } from '../session.ts';
66
import { SessionStore } from '../../session-store.ts';
77
import type { DaemonRequest, DaemonResponse, DaemonResponseData } from '../../types.ts';
8+
import { type RequestProgressEvent, withRequestProgressSink } from '../../request-progress.ts';
89

910
function makeSessionStore(): SessionStore {
1011
const root = fs.mkdtempSync(path.join(os.tmpdir(), 'agent-device-session-test-suite-'));
@@ -91,3 +92,61 @@ test('test discovers Maestro YAML suites when replay backend is set', async () =
9192
expect(data.passed).toBe(1);
9293
expect(data.failed).toBe(0);
9394
});
95+
96+
test('test emits progress when attempts retry and pass', async () => {
97+
const sessionStore = makeSessionStore();
98+
const root = fs.mkdtempSync(path.join(os.tmpdir(), 'agent-device-test-suite-progress-'));
99+
fs.writeFileSync(path.join(root, '01-progress.ad'), 'context platform=ios\nopen "Demo"\n');
100+
101+
const events: RequestProgressEvent[] = [];
102+
let attempts = 0;
103+
const response = await withRequestProgressSink(
104+
(event) => events.push(event),
105+
async () =>
106+
await handleSessionCommands({
107+
req: {
108+
token: 't',
109+
session: 'default',
110+
command: 'test',
111+
positionals: [root],
112+
meta: { cwd: root, requestId: 'suite-progress' },
113+
flags: { retries: 1 },
114+
},
115+
sessionName: 'default',
116+
logPath: path.join(os.tmpdir(), 'daemon.log'),
117+
sessionStore,
118+
invoke: async () => {
119+
attempts += 1;
120+
if (attempts === 1) {
121+
return {
122+
ok: false,
123+
error: { code: 'COMMAND_FAILED', message: 'first attempt failed' },
124+
};
125+
}
126+
return { ok: true, data: { replayed: 1, healed: 0 } };
127+
},
128+
}),
129+
);
130+
131+
const data = expectOkData(response);
132+
expect(data.passed).toBe(1);
133+
expect(events.map((event) => event.status)).toEqual(['fail', 'pass']);
134+
expect(events[0]).toMatchObject({
135+
type: 'replay-test',
136+
status: 'fail',
137+
index: 1,
138+
total: 1,
139+
attempt: 1,
140+
maxAttempts: 2,
141+
retrying: true,
142+
message: 'Replay failed at step 1 (open "Demo"): first attempt failed',
143+
});
144+
expect(events[1]).toMatchObject({
145+
type: 'replay-test',
146+
status: 'pass',
147+
index: 1,
148+
total: 1,
149+
attempt: 2,
150+
maxAttempts: 2,
151+
});
152+
});

0 commit comments

Comments
 (0)