Skip to content

Commit 2534cca

Browse files
committed
fix: harden recorder proxy relay and data integrity
- Forward HTTP method to upstream instead of hardcoding POST - Clear response timeout after successful completion (guard null socket) - Fix client-disconnect handler to check writableFinished before destroying - Wrap onHookBypassed and beforeWriteResponse callbacks in try/catch - Override audio content-type to application/json on non-2xx error relay - Atomic write (tmp+rename) for snapshot-mode fixture files - Sanitize undefined toolCall name/arguments before saving fixtures - Tighten video detection heuristic to exclude LLM provider fields - Clarify Float32Array alignment copy behavior
1 parent 449382f commit 2534cca

1 file changed

Lines changed: 67 additions & 22 deletions

File tree

src/recorder.ts

Lines changed: 67 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ export async function proxyAndRecord(
165165
let streamedToClient = false;
166166
let clientDisconnected = false;
167167
try {
168-
const result = await makeUpstreamRequest(target, forwardHeaders, requestBody, res);
168+
const result = await makeUpstreamRequest(target, forwardHeaders, requestBody, res, req.method);
169169
upstreamStatus = result.status;
170170
upstreamHeaders = result.headers;
171171
upstreamBody = result.body;
@@ -247,15 +247,20 @@ export async function proxyAndRecord(
247247
} else {
248248
const reasoningSpread = collapsed.reasoning ? { reasoning: collapsed.reasoning } : {};
249249
if (collapsed.toolCalls && collapsed.toolCalls.length > 0) {
250+
const sanitizedToolCalls = collapsed.toolCalls.map((tc) => ({
251+
...tc,
252+
name: tc.name ?? "",
253+
arguments: tc.arguments ?? "{}",
254+
}));
250255
if (collapsed.content) {
251256
// Both content and toolCalls present — save as ContentWithToolCallsResponse
252257
fixtureResponse = {
253258
content: collapsed.content,
254-
toolCalls: collapsed.toolCalls,
259+
toolCalls: sanitizedToolCalls,
255260
...reasoningSpread,
256261
};
257262
} else {
258-
fixtureResponse = { toolCalls: collapsed.toolCalls, ...reasoningSpread };
263+
fixtureResponse = { toolCalls: sanitizedToolCalls, ...reasoningSpread };
259264
}
260265
} else {
261266
fixtureResponse = { content: collapsed.content ?? "", ...reasoningSpread };
@@ -373,6 +378,9 @@ export async function proxyAndRecord(
373378
try {
374379
// Create the target directory (must be inside try/catch so filesystem
375380
// errors don't prevent the upstream response from being relayed).
381+
// Keep synchronous: for streamed responses the HTTP reply is already on
382+
// the wire, so any async yield lets callers observe the filesystem before
383+
// the fixture is written.
376384
if (isSnapshotMode) {
377385
fs.mkdirSync(path.dirname(filepath), { recursive: true });
378386
} else {
@@ -412,7 +420,12 @@ export async function proxyAndRecord(
412420
fileContent._warning = warnings.join("; ");
413421
}
414422

415-
fs.writeFileSync(filepath, JSON.stringify(fileContent, null, 2), "utf-8");
423+
// Atomic write: write to temp file then rename to avoid read-modify-write races
424+
// Keep synchronous — for streamed responses the HTTP response is already on the
425+
// wire, so async writes would race with callers checking the filesystem.
426+
const tmpPath = filepath + ".tmp." + process.pid;
427+
fs.writeFileSync(tmpPath, JSON.stringify(fileContent, null, 2), "utf-8");
428+
fs.renameSync(tmpPath, filepath);
416429
writtenToDisk = true;
417430
} catch (err) {
418431
const msg = err instanceof Error ? err.message : "Unknown filesystem error";
@@ -450,31 +463,50 @@ export async function proxyAndRecord(
450463
: ctString.toLowerCase().includes("application/x-ndjson")
451464
? "ndjson_streamed"
452465
: "sse_streamed";
453-
options.onHookBypassed(bypassReason);
466+
try {
467+
options.onHookBypassed(bypassReason);
468+
} catch (err) {
469+
defaults.logger.warn(
470+
`onHookBypassed callback threw: ${err instanceof Error ? err.message : String(err)}`,
471+
);
472+
}
454473
}
455474
} else {
456475
// Give the caller a chance to mutate or replace the response before relay.
457476
// Used by the chaos layer to turn a successful proxy into a malformed body.
458477
// `body` is the raw upstream bytes so binary payloads survive round-tripping.
459478
if (options?.beforeWriteResponse) {
460-
const handled = await options.beforeWriteResponse({
461-
status: upstreamStatus,
462-
contentType: ctString,
463-
body: rawBuffer,
464-
});
479+
let handled: boolean | undefined;
480+
try {
481+
handled = await options.beforeWriteResponse({
482+
status: upstreamStatus,
483+
contentType: ctString,
484+
body: rawBuffer,
485+
});
486+
} catch (err) {
487+
const msg = err instanceof Error ? err.message : String(err);
488+
throw new Error(`beforeWriteResponse hook failed for ${providerKey}: ${msg}`);
489+
}
465490
if (handled) return "handled_by_hook";
466491
}
467492

468-
const relayHeaders: Record<string, string> = {};
469-
if (ctString) {
470-
relayHeaders["Content-Type"] = ctString;
471-
}
472493
// Normalize status codes for the client: aimock acts as a gateway, so
473494
// upstream provider details (429 rate-limits, 503 outages, etc.) should
474495
// not leak. Successes → 200, errors → 502 (Bad Gateway).
475496
const clientStatus = upstreamStatus >= 200 && upstreamStatus < 300 ? 200 : 502;
476-
res.writeHead(clientStatus, relayHeaders);
477497
const isAudioRelay = ctString.toLowerCase().startsWith("audio/");
498+
// When an upstream error (non-2xx) is relayed for an audio endpoint, the
499+
// body is typically a JSON error object — override the content-type so
500+
// clients don't try to decode JSON as audio.
501+
const relayHeaders: Record<string, string> = {};
502+
const clientCt =
503+
(clientStatus >= 200 && clientStatus < 300) || !isAudioRelay
504+
? (ctString ?? "application/json")
505+
: "application/json";
506+
if (clientCt) {
507+
relayHeaders["Content-Type"] = clientCt;
508+
}
509+
res.writeHead(clientStatus, relayHeaders);
478510
res.end(isBinaryStream || isAudioRelay ? rawBuffer : upstreamBody);
479511
}
480512

@@ -490,6 +522,7 @@ function makeUpstreamRequest(
490522
headers: Record<string, string>,
491523
body: string,
492524
clientRes?: http.ServerResponse,
525+
method: string = "POST",
493526
): Promise<{
494527
status: number;
495528
headers: http.IncomingHttpHeaders;
@@ -505,7 +538,7 @@ function makeUpstreamRequest(
505538
const req = transport.request(
506539
target,
507540
{
508-
method: "POST",
541+
method,
509542
timeout: UPSTREAM_TIMEOUT_MS,
510543
headers: {
511544
...headers,
@@ -548,10 +581,14 @@ function makeUpstreamRequest(
548581
// before the first data chunk arrives.
549582
if (typeof clientRes.flushHeaders === "function") clientRes.flushHeaders();
550583
streamedToClient = true;
551-
// Stop relaying if the client disconnects mid-stream
584+
// Stop relaying if the client disconnects mid-stream.
585+
// Check writableFinished to distinguish normal completion (where
586+
// "close" also fires) from premature client disconnects.
552587
clientRes.on("close", () => {
553-
clientDisconnected = true;
554-
req.destroy();
588+
if (!clientRes.writableFinished) {
589+
clientDisconnected = true;
590+
req.destroy();
591+
}
555592
});
556593
}
557594
const chunks: Buffer[] = [];
@@ -574,6 +611,7 @@ function makeUpstreamRequest(
574611
});
575612
res.on("error", reject);
576613
res.on("end", () => {
614+
if (res.socket) res.setTimeout(0);
577615
const rawBuffer = Buffer.concat(chunks);
578616
if (
579617
streamedToClient &&
@@ -661,8 +699,10 @@ function buildFixtureResponse(
661699
// Malformed embedding — return a zero-dimension embedding fixture
662700
return { embedding: [] };
663701
}
664-
const aligned = new Uint8Array(buf).buffer; // Always offset 0
665-
const floats = new Float32Array(aligned, 0, buf.byteLength / 4);
702+
// Uint8Array constructor copies Buffer data to a fresh ArrayBuffer at offset 0,
703+
// guaranteeing the alignment Float32Array requires.
704+
const copied = new Uint8Array(buf);
705+
const floats = new Float32Array(copied.buffer, 0, buf.byteLength / 4);
666706
return { embedding: Array.from(floats) };
667707
}
668708
// OpenAI image generation: { created, data: [{ url, b64_json, revised_prompt }] }
@@ -748,7 +788,12 @@ function buildFixtureResponse(
748788
!("message" in obj) &&
749789
!("data" in obj) &&
750790
!("object" in obj) &&
751-
!("outputs" in obj)
791+
!("outputs" in obj) &&
792+
!("model" in obj) &&
793+
!("response" in obj) &&
794+
!("done" in obj) &&
795+
!("usage" in obj) &&
796+
!("error" in obj)
752797
) {
753798
if (obj.status === "completed" && obj.url) {
754799
return {

0 commit comments

Comments
 (0)