Skip to content

Commit 8b8d294

Browse files
committed
fix: improve server error handling, fal body relay, and chaos evaluation
- Use consumed flag + deferred splice for one-shot error fixtures - Guard Azure model injection catch to only swallow SyntaxError - Pass matched fixture to evaluateChaos for non-completions endpoints - Cache fal request body to prevent double-consumption on passthrough - Include PUT in fal queue body reading
1 parent 2534cca commit 8b8d294

1 file changed

Lines changed: 147 additions & 18 deletions

File tree

src/server.ts

Lines changed: 147 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -354,14 +354,21 @@ async function handleControlAPI(
354354
};
355355
// Insert at front so it matches before everything else
356356
fixtures.unshift(errorFixture);
357-
// Remove synchronously on first match to prevent race conditions where
358-
// two concurrent requests both match before the removal fires.
357+
// One-shot: match once then self-remove. We use a `consumed` flag to
358+
// prevent double-matching from concurrent requests and defer the actual
359+
// splice via queueMicrotask so it never mutates the fixtures array while
360+
// matchFixture is iterating over it.
361+
let consumed = false;
359362
const original = errorFixture.match.predicate!;
360363
errorFixture.match.predicate = (req) => {
364+
if (consumed) return false;
361365
const result = original(req);
362366
if (result) {
363-
const idx = fixtures.indexOf(errorFixture);
364-
if (idx !== -1) fixtures.splice(idx, 1);
367+
consumed = true;
368+
queueMicrotask(() => {
369+
const idx = fixtures.indexOf(errorFixture);
370+
if (idx !== -1) fixtures.splice(idx, 1);
371+
});
365372
}
366373
return result;
367374
};
@@ -1193,8 +1200,13 @@ export async function createServer(
11931200
parsed.model = deploymentId;
11941201
raw = JSON.stringify(parsed);
11951202
}
1196-
} catch {
1197-
// Fall through — let handleEmbeddings report the parse error
1203+
} catch (err) {
1204+
if (!(err instanceof SyntaxError)) {
1205+
defaults.logger.error(
1206+
`Unexpected error in Azure model injection: ${err instanceof Error ? err.message : String(err)}`,
1207+
);
1208+
}
1209+
// Fall through for parse errors — let handleEmbeddings report them
11981210
}
11991211
}
12001212
await handleEmbeddings(
@@ -1729,12 +1741,36 @@ export async function createServer(
17291741
setCorsHeaders(res);
17301742
try {
17311743
const raw = await readBody(req);
1732-
const chaosAction = evaluateChaos(null, defaults.chaos, req.headers, defaults.logger);
1744+
// Try to match a fixture so chaos evaluation can use fixture-level overrides
1745+
let elSoundFixture: Fixture | null = null;
1746+
try {
1747+
const parsed = JSON.parse(raw) as Record<string, unknown>;
1748+
const syntheticReq: ChatCompletionRequest = {
1749+
model: (parsed.model_id as string) ?? "eleven_text_to_sound_v2",
1750+
messages: [{ role: "user", content: (parsed.text as string) ?? "" }],
1751+
_endpointType: "audio-gen",
1752+
};
1753+
const testId = getTestId(req);
1754+
elSoundFixture = matchFixture(
1755+
fixtures,
1756+
syntheticReq,
1757+
journal.getFixtureMatchCountsForTest(testId),
1758+
defaults.requestTransform,
1759+
);
1760+
} catch {
1761+
// JSON parse failure — fixture matching not possible, handler will report the error
1762+
}
1763+
const chaosAction = evaluateChaos(
1764+
elSoundFixture,
1765+
defaults.chaos,
1766+
req.headers,
1767+
defaults.logger,
1768+
);
17331769
if (chaosAction) {
17341770
applyChaosAction(
17351771
chaosAction,
17361772
res,
1737-
null,
1773+
elSoundFixture,
17381774
journal,
17391775
{
17401776
method: req.method ?? "POST",
@@ -1770,12 +1806,43 @@ export async function createServer(
17701806
const musicSubType = musicMatch[1] ?? "music";
17711807
try {
17721808
const raw = await readBody(req);
1773-
const chaosAction = evaluateChaos(null, defaults.chaos, req.headers, defaults.logger);
1809+
// Try to match a fixture so chaos evaluation can use fixture-level overrides
1810+
let elMusicFixture: Fixture | null = null;
1811+
try {
1812+
const parsed = JSON.parse(raw) as Record<string, unknown>;
1813+
const prompt =
1814+
(typeof parsed.prompt === "string" ? parsed.prompt : null) ??
1815+
(parsed.composition_plan != null
1816+
? typeof parsed.composition_plan === "string"
1817+
? parsed.composition_plan
1818+
: JSON.stringify(parsed.composition_plan)
1819+
: "");
1820+
const syntheticReq: ChatCompletionRequest = {
1821+
model: (parsed.model_id as string) ?? "music_v1",
1822+
messages: [{ role: "user", content: prompt }],
1823+
_endpointType: "audio-gen",
1824+
};
1825+
const testId = getTestId(req);
1826+
elMusicFixture = matchFixture(
1827+
fixtures,
1828+
syntheticReq,
1829+
journal.getFixtureMatchCountsForTest(testId),
1830+
defaults.requestTransform,
1831+
);
1832+
} catch {
1833+
// JSON parse failure — fixture matching not possible, handler will report the error
1834+
}
1835+
const chaosAction = evaluateChaos(
1836+
elMusicFixture,
1837+
defaults.chaos,
1838+
req.headers,
1839+
defaults.logger,
1840+
);
17741841
if (chaosAction) {
17751842
applyChaosAction(
17761843
chaosAction,
17771844
res,
1778-
null,
1845+
elMusicFixture,
17791846
journal,
17801847
{
17811848
method: req.method ?? "POST",
@@ -1804,14 +1871,19 @@ export async function createServer(
18041871
return;
18051872
}
18061873

1874+
// Body read by the general fal handler; preserved so legacy fal-audio
1875+
// routes below don't double-consume the stream on passthrough.
1876+
let falBody: string | undefined;
1877+
18071878
// /fal/* with `x-fal-target-host` header — general fal.ai routing
18081879
// (queue.fal.run, fal.run, rest.fal.ai, rest.alpha.fal.ai).
18091880
// Matches the requestMiddleware path-mirror convention used by
18101881
// @fal-ai/client when proxyUrl can't be honoured server-side.
18111882
if (FAL_PREFIX_RE.test(pathname) && req.headers["x-fal-target-host"]) {
18121883
setCorsHeaders(res);
18131884
try {
1814-
const raw = req.method === "POST" || req.method === "PUT" ? await readBody(req) : "";
1885+
falBody = req.method === "POST" || req.method === "PUT" ? await readBody(req) : "";
1886+
const raw = falBody;
18151887
const chaosAction = evaluateChaos(null, defaults.chaos, req.headers, defaults.logger);
18161888
if (chaosAction) {
18171889
applyChaosAction(
@@ -1853,13 +1925,41 @@ export async function createServer(
18531925
if (falQueueSubmitMatch && req.method === "POST") {
18541926
setCorsHeaders(res);
18551927
try {
1856-
const raw = await readBody(req);
1857-
const chaosAction = evaluateChaos(null, defaults.chaos, req.headers, defaults.logger);
1928+
const raw = falBody ?? (await readBody(req));
1929+
// Try to match a fixture so chaos evaluation can use fixture-level overrides
1930+
let falSubmitFixture: Fixture | null = null;
1931+
try {
1932+
const parsed = raw.trim() ? (JSON.parse(raw) as Record<string, unknown>) : {};
1933+
const prompt =
1934+
(typeof parsed.prompt === "string" ? parsed.prompt : null) ??
1935+
(typeof parsed.text === "string" ? parsed.text : null) ??
1936+
"";
1937+
const syntheticReq: ChatCompletionRequest = {
1938+
model: falQueueSubmitMatch[1],
1939+
messages: [{ role: "user", content: prompt }],
1940+
_endpointType: "fal-audio",
1941+
};
1942+
const testId = getTestId(req);
1943+
falSubmitFixture = matchFixture(
1944+
fixtures,
1945+
syntheticReq,
1946+
journal.getFixtureMatchCountsForTest(testId),
1947+
defaults.requestTransform,
1948+
);
1949+
} catch {
1950+
// JSON parse failure — fixture matching not possible, handler will report the error
1951+
}
1952+
const chaosAction = evaluateChaos(
1953+
falSubmitFixture,
1954+
defaults.chaos,
1955+
req.headers,
1956+
defaults.logger,
1957+
);
18581958
if (chaosAction) {
18591959
applyChaosAction(
18601960
chaosAction,
18611961
res,
1862-
null,
1962+
falSubmitFixture,
18631963
journal,
18641964
{
18651965
method: req.method ?? "POST",
@@ -1896,7 +1996,8 @@ export async function createServer(
18961996
) {
18971997
setCorsHeaders(res);
18981998
try {
1899-
const raw = req.method === "POST" ? await readBody(req) : "{}";
1999+
const raw =
2000+
req.method === "POST" || req.method === "PUT" ? (falBody ?? (await readBody(req))) : "{}";
19002001
const chaosAction = evaluateChaos(null, defaults.chaos, req.headers, defaults.logger);
19012002
if (chaosAction) {
19022003
applyChaosAction(
@@ -1936,13 +2037,41 @@ export async function createServer(
19362037
if (falRunMatch && req.method === "POST") {
19372038
setCorsHeaders(res);
19382039
try {
1939-
const raw = await readBody(req);
1940-
const chaosAction = evaluateChaos(null, defaults.chaos, req.headers, defaults.logger);
2040+
const raw = falBody ?? (await readBody(req));
2041+
// Try to match a fixture so chaos evaluation can use fixture-level overrides
2042+
let falRunFixture: Fixture | null = null;
2043+
try {
2044+
const parsed = raw.trim() ? (JSON.parse(raw) as Record<string, unknown>) : {};
2045+
const prompt =
2046+
(typeof parsed.prompt === "string" ? parsed.prompt : null) ??
2047+
(typeof parsed.text === "string" ? parsed.text : null) ??
2048+
"";
2049+
const syntheticReq: ChatCompletionRequest = {
2050+
model: falRunMatch[1],
2051+
messages: [{ role: "user", content: prompt }],
2052+
_endpointType: "fal-audio",
2053+
};
2054+
const testId = getTestId(req);
2055+
falRunFixture = matchFixture(
2056+
fixtures,
2057+
syntheticReq,
2058+
journal.getFixtureMatchCountsForTest(testId),
2059+
defaults.requestTransform,
2060+
);
2061+
} catch {
2062+
// JSON parse failure — fixture matching not possible, handler will report the error
2063+
}
2064+
const chaosAction = evaluateChaos(
2065+
falRunFixture,
2066+
defaults.chaos,
2067+
req.headers,
2068+
defaults.logger,
2069+
);
19412070
if (chaosAction) {
19422071
applyChaosAction(
19432072
chaosAction,
19442073
res,
1945-
null,
2074+
falRunFixture,
19462075
journal,
19472076
{
19482077
method: req.method ?? "POST",

0 commit comments

Comments
 (0)