Skip to content

Commit dbae35c

Browse files
committed
fix: walk fal queue upstream during recording, persist final job body
The recorder shortcut wrote the IN_QUEUE submit envelope to fixtures and never seeded falQueueStates, so replay's GET /requests/<id> returned the envelope instead of the model output and broke fal.subscribe() consumers. The recorder now walks submit → status → result upstream and persists the final body. sync-run keeps the single-call generic recorder. Ported the same fix to the legacy /fal/queue/submit/{model} audio path. Adds RecordConfig.fal.{pollIntervalMs, timeoutMs} (defaults 1s / 120s), extracts persistFixture from proxyAndRecord, and exports buildFixtureMatch so the new walker stays consistent with the generic recording path.
1 parent ad24791 commit dbae35c

5 files changed

Lines changed: 906 additions & 222 deletions

File tree

src/__tests__/fal.test.ts

Lines changed: 235 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -5,36 +5,6 @@ import * as os from "node:os";
55
import * as path from "node:path";
66
import { LLMock } from "../llmock.js";
77

8-
// Spin up a tiny stub upstream that responds with whatever JSON the test
9-
// hands it; lets us exercise the record-and-replay path without depending on
10-
// fal.ai itself.
11-
function startStubUpstream(
12-
handler: (req: http.IncomingMessage, body: string) => { status?: number; body: unknown },
13-
): Promise<{ url: string; close: () => Promise<void> }> {
14-
return new Promise((resolve) => {
15-
const server = http.createServer((req, res) => {
16-
const chunks: Buffer[] = [];
17-
req.on("data", (c: Buffer) => chunks.push(c));
18-
req.on("end", () => {
19-
const body = Buffer.concat(chunks).toString();
20-
const result = handler(req, body);
21-
res.writeHead(result.status ?? 200, { "Content-Type": "application/json" });
22-
res.end(JSON.stringify(result.body));
23-
});
24-
});
25-
server.listen(0, "127.0.0.1", () => {
26-
const { port } = server.address() as { port: number };
27-
resolve({
28-
url: `http://127.0.0.1:${port}`,
29-
close: () =>
30-
new Promise<void>((r) => {
31-
server.close(() => r());
32-
}),
33-
});
34-
});
35-
});
36-
}
37-
388
describe("fal.ai general handler — fixture lookup", () => {
399
let mock: LLMock;
4010

@@ -231,85 +201,289 @@ describe("fal.ai general handler — fixture lookup", () => {
231201
});
232202
});
233203

204+
// Queue-protocol-aware stub upstream. Implements the three endpoints fal's
205+
// queue uses: POST submit → IN_QUEUE envelope, GET .../status (polled) →
206+
// IN_QUEUE/IN_PROGRESS until the configured threshold is reached, then
207+
// COMPLETED, and GET .../<id> → the supplied final body. Tracks call counts
208+
// per endpoint so tests can assert what hit the wire vs. the in-memory cache.
209+
function startFalQueueUpstream(opts: {
210+
finalBody: unknown;
211+
pollsBeforeCompleted?: number;
212+
upstreamRequestId?: string;
213+
}): Promise<{
214+
url: string;
215+
close: () => Promise<void>;
216+
counts: { submit: number; status: number; result: number };
217+
}> {
218+
const upstreamRequestId = opts.upstreamRequestId ?? "upstream-req-id";
219+
const pollsBeforeCompleted = opts.pollsBeforeCompleted ?? 2;
220+
const counts = { submit: 0, status: 0, result: 0 };
221+
const statusPolls = new Map<string, number>();
222+
const statusRe = /^\/(.+)\/requests\/([^/]+)\/status$/;
223+
const resultRe = /^\/(.+)\/requests\/([^/]+)$/;
224+
225+
return new Promise((resolve) => {
226+
let selfUrl = "http://stub";
227+
const server = http.createServer((req, res) => {
228+
const chunks: Buffer[] = [];
229+
req.on("data", (c: Buffer) => chunks.push(c));
230+
req.on("end", () => {
231+
const url = new URL(req.url ?? "/", selfUrl);
232+
const send = (status: number, body: unknown) => {
233+
res.writeHead(status, { "Content-Type": "application/json" });
234+
res.end(JSON.stringify(body));
235+
};
236+
237+
const statusMatch = url.pathname.match(statusRe);
238+
const resultMatch = url.pathname.match(resultRe);
239+
240+
if (req.method === "GET" && statusMatch) {
241+
counts.status++;
242+
const reqId = statusMatch[2];
243+
const n = (statusPolls.get(reqId) ?? 0) + 1;
244+
statusPolls.set(reqId, n);
245+
const status = n >= pollsBeforeCompleted ? "COMPLETED" : "IN_QUEUE";
246+
send(200, {
247+
status,
248+
request_id: reqId,
249+
...(status === "IN_QUEUE" ? { queue_position: 1 } : {}),
250+
});
251+
return;
252+
}
253+
if (req.method === "GET" && resultMatch && !statusMatch) {
254+
counts.result++;
255+
send(200, opts.finalBody);
256+
return;
257+
}
258+
if (req.method === "POST") {
259+
counts.submit++;
260+
const modelPath = url.pathname.replace(/^\/+/, "");
261+
const base = `${selfUrl}/${modelPath}/requests/${upstreamRequestId}`;
262+
send(200, {
263+
request_id: upstreamRequestId,
264+
response_url: base,
265+
status_url: `${base}/status`,
266+
cancel_url: `${base}/cancel`,
267+
status: "IN_QUEUE",
268+
queue_position: 1,
269+
});
270+
return;
271+
}
272+
send(404, { error: { message: "stub: unhandled", path: url.pathname } });
273+
});
274+
});
275+
server.listen(0, "127.0.0.1", () => {
276+
const { port } = server.address() as { port: number };
277+
selfUrl = `http://127.0.0.1:${port}`;
278+
resolve({
279+
url: selfUrl,
280+
counts,
281+
close: () =>
282+
new Promise<void>((r) => {
283+
server.close(() => r());
284+
}),
285+
});
286+
});
287+
});
288+
}
289+
234290
describe("fal.ai general handler — record and replay", () => {
235291
let mock: LLMock;
236292
let upstream: { url: string; close: () => Promise<void> } | undefined;
293+
let queueUpstream:
294+
| {
295+
url: string;
296+
close: () => Promise<void>;
297+
counts: { submit: number; status: number; result: number };
298+
}
299+
| undefined;
237300
let tmpDir: string | undefined;
238301

239302
afterEach(async () => {
240303
await mock?.stop();
241304
await upstream?.close();
305+
await queueUpstream?.close();
242306
upstream = undefined;
307+
queueUpstream = undefined;
243308
if (tmpDir) {
244309
fs.rmSync(tmpDir, { recursive: true, force: true });
245310
tmpDir = undefined;
246311
}
247312
});
248313

249-
test("proxies unmatched fal request to upstream and saves fixture", async () => {
250-
upstream = await startStubUpstream(() => ({
251-
body: { images: [{ url: "https://example.com/recorded.png" }] },
252-
}));
253-
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-fal-record-"));
314+
test("walks the queue upstream during recording and persists the FINAL body, not the submit envelope", async () => {
315+
const FINAL_BODY = {
316+
images: [{ url: "https://mock.fal.media/files/recorded-cat.png" }],
317+
seed: 42,
318+
};
319+
queueUpstream = await startFalQueueUpstream({
320+
finalBody: FINAL_BODY,
321+
pollsBeforeCompleted: 2,
322+
upstreamRequestId: "upstream-req-1",
323+
});
324+
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-fal-queue-record-"));
254325

255326
mock = new LLMock({
256327
port: 0,
257-
record: { providers: { fal: upstream.url }, fixturePath: tmpDir },
328+
record: {
329+
providers: { fal: queueUpstream.url },
330+
fixturePath: tmpDir,
331+
fal: { pollIntervalMs: 5, timeoutMs: 5000 },
332+
},
258333
});
259334
await mock.start();
260335

261-
// In record mode the proxy is transparent — what upstream says is what
262-
// the client gets. Queue envelope synthesis only happens in fixture mode.
263-
const res = await fetch(`${mock.url}/fal/fal-ai/flux/dev`, {
336+
// Submit — client should see a synthesised envelope (aimock requestId),
337+
// NOT upstream's IN_QUEUE envelope. The whole point of the fix.
338+
const submit = await fetch(`${mock.url}/fal/fal-ai/flux/dev`, {
264339
method: "POST",
265340
headers: { "Content-Type": "application/json", "x-fal-target-host": "queue.fal.run" },
266341
body: JSON.stringify({ input: { prompt: "a cat" } }),
267342
});
268-
expect(res.status).toBe(200);
269-
const body = await res.json();
270-
expect(body).toEqual({ images: [{ url: "https://example.com/recorded.png" }] });
343+
expect(submit.status).toBe(200);
344+
const envelope = await submit.json();
345+
expect(typeof envelope.request_id).toBe("string");
346+
expect(envelope.request_id).not.toBe("upstream-req-1");
347+
expect(envelope.status_url).toContain(envelope.request_id);
348+
349+
// Status — local job seeded with the final body, so this is COMPLETED.
350+
const status = await fetch(
351+
`${mock.url}/fal/fal-ai/flux/dev/requests/${envelope.request_id}/status`,
352+
{ headers: { "x-fal-target-host": "queue.fal.run" } },
353+
);
354+
expect(status.status).toBe(200);
355+
expect((await status.json()).status).toBe("COMPLETED");
356+
357+
// Result — must be the FINAL body, not the upstream submit envelope.
358+
// This is the assertion that fails before the fix: on main, the recorder
359+
// persisted the IN_QUEUE envelope, so this returned `{ request_id: ..., status: "IN_QUEUE", ... }`.
360+
const result = await fetch(`${mock.url}/fal/fal-ai/flux/dev/requests/${envelope.request_id}`, {
361+
headers: { "x-fal-target-host": "queue.fal.run" },
362+
});
363+
expect(result.status).toBe(200);
364+
expect(await result.json()).toEqual(FINAL_BODY);
365+
366+
expect(queueUpstream.counts.submit).toBe(1);
367+
expect(queueUpstream.counts.status).toBeGreaterThanOrEqual(2);
368+
expect(queueUpstream.counts.result).toBe(1);
271369

370+
// Persisted fixture: response.json must be the FINAL body, not the envelope.
272371
const files = fs.readdirSync(tmpDir);
273372
const falFixtures = files.filter((f) => f.startsWith("fal-") && f.endsWith(".json"));
274-
expect(falFixtures.length).toBeGreaterThanOrEqual(1);
275-
373+
expect(falFixtures.length).toBe(1);
276374
const recorded = JSON.parse(fs.readFileSync(path.join(tmpDir, falFixtures[0]), "utf-8"));
277375
expect(recorded.fixtures[0].match.endpoint).toBe("fal");
278-
expect(recorded.fixtures[0].response.json).toEqual({
279-
images: [{ url: "https://example.com/recorded.png" }],
280-
});
376+
expect(recorded.fixtures[0].response.json).toEqual(FINAL_BODY);
281377
});
282378

283-
test("replays from in-memory fixture on second identical request (no second proxy)", async () => {
284-
let upstreamCalls = 0;
285-
upstream = await startStubUpstream(() => {
286-
upstreamCalls++;
287-
return { body: { images: [{ url: "https://example.com/replay.png" }] } };
379+
test("replays from in-memory fixture on second identical request without a second queue walk", async () => {
380+
queueUpstream = await startFalQueueUpstream({
381+
finalBody: { images: [{ url: "https://example.com/replay.png" }] },
382+
pollsBeforeCompleted: 1,
288383
});
289-
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-fal-replay-"));
384+
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-fal-queue-replay-"));
290385

291386
mock = new LLMock({
292387
port: 0,
293-
record: { providers: { fal: upstream.url }, fixturePath: tmpDir },
388+
record: {
389+
providers: { fal: queueUpstream.url },
390+
fixturePath: tmpDir,
391+
fal: { pollIntervalMs: 5, timeoutMs: 5000 },
392+
},
294393
});
295394
await mock.start();
296395

297-
// First call — hits upstream
298-
await fetch(`${mock.url}/fal/fal-ai/flux/dev`, {
396+
// First call: records via a full queue walk
397+
const firstSubmit = await fetch(`${mock.url}/fal/fal-ai/flux/dev`, {
299398
method: "POST",
300399
headers: { "Content-Type": "application/json", "x-fal-target-host": "queue.fal.run" },
301400
body: JSON.stringify({ input: { prompt: "a cat" } }),
302401
});
303-
expect(upstreamCalls).toBe(1);
402+
expect(firstSubmit.status).toBe(200);
403+
expect(queueUpstream.counts.submit).toBe(1);
304404

305-
// Second call — should match recorded fixture, no upstream hit
306-
const res2 = await fetch(`${mock.url}/fal/fal-ai/flux/dev`, {
405+
// Second call with the same body — should match the cached fixture, no
406+
// upstream walk. Submit, status, result all served locally.
407+
const beforeReplay = { ...queueUpstream.counts };
408+
const replaySubmit = await fetch(`${mock.url}/fal/fal-ai/flux/dev`, {
307409
method: "POST",
308410
headers: { "Content-Type": "application/json", "x-fal-target-host": "queue.fal.run" },
309411
body: JSON.stringify({ input: { prompt: "a cat" } }),
310412
});
311-
expect(res2.status).toBe(200);
312-
expect(upstreamCalls).toBe(1);
413+
expect(replaySubmit.status).toBe(200);
414+
const replayEnvelope = await replaySubmit.json();
415+
const replayResult = await fetch(
416+
`${mock.url}/fal/fal-ai/flux/dev/requests/${replayEnvelope.request_id}`,
417+
{ headers: { "x-fal-target-host": "queue.fal.run" } },
418+
);
419+
expect(replayResult.status).toBe(200);
420+
expect(await replayResult.json()).toEqual({
421+
images: [{ url: "https://example.com/replay.png" }],
422+
});
423+
424+
expect(queueUpstream.counts).toEqual(beforeReplay);
425+
});
426+
427+
test("queue walk failure surfaces 502 and does not write a fixture", async () => {
428+
// Upstream returns a submit envelope, but status calls 500. Recorder must
429+
// give up cleanly: client sees 502, no fixture is persisted (a partial
430+
// fixture would shadow real requests on the next run).
431+
upstream = await new Promise((resolve) => {
432+
const server = http.createServer((req, res) => {
433+
const chunks: Buffer[] = [];
434+
req.on("data", (c: Buffer) => chunks.push(c));
435+
req.on("end", () => {
436+
const url = new URL(req.url ?? "/", "http://stub");
437+
if (req.method === "POST" && !url.pathname.includes("/requests/")) {
438+
res.writeHead(200, { "Content-Type": "application/json" });
439+
res.end(
440+
JSON.stringify({
441+
request_id: "x",
442+
status_url: `http://stub${url.pathname}/requests/x/status`,
443+
response_url: `http://stub${url.pathname}/requests/x`,
444+
}),
445+
);
446+
return;
447+
}
448+
res.writeHead(500, { "Content-Type": "application/json" });
449+
res.end(JSON.stringify({ error: { message: "upstream broke" } }));
450+
});
451+
});
452+
server.listen(0, "127.0.0.1", () => {
453+
const { port } = server.address() as { port: number };
454+
resolve({
455+
url: `http://127.0.0.1:${port}`,
456+
close: () =>
457+
new Promise<void>((r) => {
458+
server.close(() => r());
459+
}),
460+
});
461+
});
462+
});
463+
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "aimock-fal-queue-fail-"));
464+
465+
mock = new LLMock({
466+
port: 0,
467+
record: {
468+
providers: { fal: upstream.url },
469+
fixturePath: tmpDir,
470+
fal: { pollIntervalMs: 5, timeoutMs: 2000 },
471+
},
472+
});
473+
await mock.start();
474+
475+
const res = await fetch(`${mock.url}/fal/fal-ai/flux/dev`, {
476+
method: "POST",
477+
headers: { "Content-Type": "application/json", "x-fal-target-host": "queue.fal.run" },
478+
body: JSON.stringify({ input: { prompt: "a cat" } }),
479+
});
480+
expect(res.status).toBe(502);
481+
const body = await res.json();
482+
expect(body.error.type).toBe("proxy_error");
483+
484+
const files = fs.existsSync(tmpDir) ? fs.readdirSync(tmpDir) : [];
485+
const falFixtures = files.filter((f) => f.startsWith("fal-") && f.endsWith(".json"));
486+
expect(falFixtures.length).toBe(0);
313487
});
314488
});
315489

0 commit comments

Comments
 (0)