Skip to content

Commit d8d364b

Browse files
committed
fix: harden Converse handlers with metadata events, text filtering, and stream flag
- Add optional logger to converseToCompletionRequest for warnings - Filter empty-string text blocks in all conversion paths - Warn on unexpected message roles - Change textContent || null to ?? null to preserve empty strings - Set completionReq.stream = true in handleConverseStream - Add metadata events (usage + latencyMs) to all 3 stream builders - Remove duplicate contentBlockIndex from stream event payloads - Add webSearches warning on tool-call-only responses - Add JSDoc to converseUsage explaining zero-default behavior - Fix error type || to ?? for consistency - Update test expectations and add safeResolve guard in postPartialBinary - Import shared test helpers from helpers/mock-res.ts
1 parent 72eec65 commit d8d364b

2 files changed

Lines changed: 69 additions & 70 deletions

File tree

src/__tests__/bedrock-stream.test.ts

Lines changed: 15 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
import { describe, it, expect, afterEach } from "vitest";
22
import * as http from "node:http";
33
import { crc32 } from "node:zlib";
4-
import type { Fixture, HandlerDefaults } from "../types.js";
4+
import type { Fixture } from "../types.js";
55
import { createServer, type ServerInstance } from "../server.js";
66
import {
77
converseToCompletionRequest,
88
handleConverse,
99
handleConverseStream,
1010
} from "../bedrock-converse.js";
1111
import { Journal } from "../journal.js";
12-
import { Logger } from "../logger.js";
12+
import { createMockReq, createMockRes, createDefaults } from "./helpers/mock-res.js";
1313

1414
// --- helpers ---
1515

@@ -161,6 +161,13 @@ function postPartialBinary(
161161
const parsed = new URL(url);
162162
const chunks: Buffer[] = [];
163163
let aborted = false;
164+
let resolved = false;
165+
const safeResolve = (value: { body: Buffer; aborted: boolean }) => {
166+
if (!resolved) {
167+
resolved = true;
168+
resolve(value);
169+
}
170+
};
164171
const req = http.request(
165172
{
166173
hostname: parsed.hostname,
@@ -175,7 +182,7 @@ function postPartialBinary(
175182
(res) => {
176183
res.on("data", (c: Buffer) => chunks.push(c));
177184
res.on("end", () => {
178-
resolve({ body: Buffer.concat(chunks), aborted });
185+
safeResolve({ body: Buffer.concat(chunks), aborted });
179186
});
180187
res.on("error", () => {
181188
aborted = true;
@@ -184,13 +191,13 @@ function postPartialBinary(
184191
aborted = true;
185192
});
186193
res.on("close", () => {
187-
resolve({ body: Buffer.concat(chunks), aborted });
194+
safeResolve({ body: Buffer.concat(chunks), aborted });
188195
});
189196
},
190197
);
191198
req.on("error", () => {
192199
aborted = true;
193-
resolve({ body: Buffer.concat(chunks), aborted });
200+
safeResolve({ body: Buffer.concat(chunks), aborted });
194201
});
195202
req.write(data);
196203
req.end();
@@ -1472,7 +1479,7 @@ describe("converseToCompletionRequest (edge cases)", () => {
14721479
},
14731480
"model",
14741481
);
1475-
expect(result.messages[0]).toEqual({ role: "assistant", content: null });
1482+
expect(result.messages[0]).toEqual({ role: "assistant", content: "" });
14761483
});
14771484

14781485
it("handles user tool result with missing text in content items (text ?? '' fallback)", () => {
@@ -1583,8 +1590,8 @@ describe("converseToCompletionRequest (edge cases)", () => {
15831590
"model",
15841591
);
15851592
expect(result.messages[0].tool_calls).toHaveLength(1);
1586-
// Empty text → content is null (falsy)
1587-
expect(result.messages[0].content).toBeNull();
1593+
// Empty text → content is "" (nullish coalescing preserves empty string)
1594+
expect(result.messages[0].content).toBe("");
15881595
});
15891596
});
15901597

@@ -1723,50 +1730,6 @@ describe("POST /model/{modelId}/invoke-with-response-stream (error fixture no ex
17231730

17241731
// ─── Direct handler tests for req.method/req.url fallback branches ──────────
17251732

1726-
function createMockReq(overrides: Partial<http.IncomingMessage> = {}): http.IncomingMessage {
1727-
return {
1728-
method: undefined,
1729-
url: undefined,
1730-
headers: {},
1731-
...overrides,
1732-
} as unknown as http.IncomingMessage;
1733-
}
1734-
1735-
function createMockRes(): http.ServerResponse & { _written: string; _status: number } {
1736-
const res = {
1737-
_written: "",
1738-
_status: 0,
1739-
writableEnded: false,
1740-
statusCode: 0,
1741-
writeHead(status: number) {
1742-
res._status = status;
1743-
res.statusCode = status;
1744-
},
1745-
setHeader() {},
1746-
write(data: string) {
1747-
res._written += data;
1748-
return true;
1749-
},
1750-
end(data?: string) {
1751-
if (data) res._written += data;
1752-
res.writableEnded = true;
1753-
},
1754-
destroy() {
1755-
res.writableEnded = true;
1756-
},
1757-
};
1758-
return res as unknown as http.ServerResponse & { _written: string; _status: number };
1759-
}
1760-
1761-
function createDefaults(overrides: Partial<HandlerDefaults> = {}): HandlerDefaults {
1762-
return {
1763-
latency: 0,
1764-
chunkSize: 100,
1765-
logger: new Logger("silent"),
1766-
...overrides,
1767-
};
1768-
}
1769-
17701733
describe("handleConverse (direct handler call, method/url fallbacks)", () => {
17711734
it("uses fallback for text response with undefined method/url", async () => {
17721735
const fixture: Fixture = {

src/bedrock-converse.ts

Lines changed: 54 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@ function converseStopReason(
7575
return overrideFinishReason;
7676
}
7777

78+
/**
79+
* Build Converse-format usage from fixture overrides.
80+
*
81+
* When no overrides are provided (the common case for mocks), all token
82+
* counts default to zero. This is intentional — aimock is a mock server
83+
* and does not perform real tokenisation. Callers that need non-zero
84+
* usage should supply explicit `usage` overrides in their fixture.
85+
*/
7886
function converseUsage(overrides?: ResponseOverrides): {
7987
inputTokens: number;
8088
outputTokens: number;
@@ -113,15 +121,13 @@ function buildBedrockStreamTextEvents(
113121
events.push({
114122
eventType: "contentBlockStart",
115123
payload: {
116-
contentBlockIndex: blockIndex,
117124
contentBlockStart: { contentBlockIndex: blockIndex, start: { type: "thinking" } },
118125
},
119126
});
120127
for (let i = 0; i < reasoning.length; i += chunkSize) {
121128
events.push({
122129
eventType: "contentBlockDelta",
123130
payload: {
124-
contentBlockIndex: blockIndex,
125131
contentBlockDelta: {
126132
contentBlockIndex: blockIndex,
127133
delta: { type: "thinking_delta", thinking: reasoning.slice(i, i + chunkSize) },
@@ -136,15 +142,13 @@ function buildBedrockStreamTextEvents(
136142
events.push({
137143
eventType: "contentBlockStart",
138144
payload: {
139-
contentBlockIndex: textBlockIndex,
140145
contentBlockStart: { contentBlockIndex: textBlockIndex, start: { type: "text" } },
141146
},
142147
});
143148
for (let i = 0; i < content.length; i += chunkSize) {
144149
events.push({
145150
eventType: "contentBlockDelta",
146151
payload: {
147-
contentBlockIndex: textBlockIndex,
148152
contentBlockDelta: {
149153
contentBlockIndex: textBlockIndex,
150154
delta: { type: "text_delta", text: content.slice(i, i + chunkSize) },
@@ -157,6 +161,11 @@ function buildBedrockStreamTextEvents(
157161
eventType: "messageStop",
158162
payload: { stopReason: converseStopReason(overrides?.finishReason, "end_turn") },
159163
});
164+
const usage = converseUsage(overrides);
165+
events.push({
166+
eventType: "metadata",
167+
payload: { metadata: { usage, metrics: { latencyMs: 0 } } },
168+
});
160169
return events;
161170
}
162171

@@ -172,15 +181,16 @@ function buildBedrockStreamContentWithToolCallsEvents(
172181
...overrides,
173182
finishReason: "stop",
174183
});
175-
events.pop();
184+
// Remove trailing metadata + messageStop events — we re-emit them after tool blocks
185+
events.pop(); // metadata
186+
events.pop(); // messageStop
176187
let blockIndex = reasoning ? 2 : 1;
177188

178189
for (const tc of toolCalls) {
179190
const toolUseId = tc.id || generateToolUseId();
180191
events.push({
181192
eventType: "contentBlockStart",
182193
payload: {
183-
contentBlockIndex: blockIndex,
184194
contentBlockStart: {
185195
contentBlockIndex: blockIndex,
186196
start: { toolUse: { toolUseId, name: tc.name } },
@@ -192,7 +202,6 @@ function buildBedrockStreamContentWithToolCallsEvents(
192202
events.push({
193203
eventType: "contentBlockDelta",
194204
payload: {
195-
contentBlockIndex: blockIndex,
196205
contentBlockDelta: {
197206
contentBlockIndex: blockIndex,
198207
delta: { toolUse: { input: argsStr.slice(i, i + chunkSize) } },
@@ -207,6 +216,11 @@ function buildBedrockStreamContentWithToolCallsEvents(
207216
eventType: "messageStop",
208217
payload: { stopReason: converseStopReason(overrides?.finishReason, "tool_use") },
209218
});
219+
const usage = converseUsage(overrides);
220+
events.push({
221+
eventType: "metadata",
222+
payload: { metadata: { usage, metrics: { latencyMs: 0 } } },
223+
});
210224
return events;
211225
}
212226

@@ -226,7 +240,6 @@ function buildBedrockStreamToolCallEvents(
226240
events.push({
227241
eventType: "contentBlockStart",
228242
payload: {
229-
contentBlockIndex: tcIdx,
230243
contentBlockStart: {
231244
contentBlockIndex: tcIdx,
232245
start: { toolUse: { toolUseId, name: tc.name } },
@@ -238,7 +251,6 @@ function buildBedrockStreamToolCallEvents(
238251
events.push({
239252
eventType: "contentBlockDelta",
240253
payload: {
241-
contentBlockIndex: tcIdx,
242254
contentBlockDelta: {
243255
contentBlockIndex: tcIdx,
244256
delta: { toolUse: { input: argsStr.slice(i, i + chunkSize) } },
@@ -252,6 +264,11 @@ function buildBedrockStreamToolCallEvents(
252264
eventType: "messageStop",
253265
payload: { stopReason: converseStopReason(overrides?.finishReason, "tool_use") },
254266
});
267+
const usage = converseUsage(overrides);
268+
events.push({
269+
eventType: "metadata",
270+
payload: { metadata: { usage, metrics: { latencyMs: 0 } } },
271+
});
255272
return events;
256273
}
257274

@@ -260,6 +277,7 @@ function buildBedrockStreamToolCallEvents(
260277
export function converseToCompletionRequest(
261278
req: ConverseRequest,
262279
modelId: string,
280+
logger?: Logger,
263281
): ChatCompletionRequest {
264282
const messages: ChatMessage[] = [];
265283

@@ -275,7 +293,9 @@ export function converseToCompletionRequest(
275293
if (msg.role === "user") {
276294
// Check for toolResult blocks
277295
const toolResults = msg.content.filter((b) => b.toolResult);
278-
const textBlocks = msg.content.filter((b) => b.text !== undefined && !b.toolResult);
296+
const textBlocks = msg.content.filter(
297+
(b) => b.text !== undefined && b.text !== "" && !b.toolResult,
298+
);
279299

280300
if (toolResults.length > 0) {
281301
for (const block of toolResults) {
@@ -298,21 +318,21 @@ export function converseToCompletionRequest(
298318

299319
// Plain user message
300320
const text = msg.content
301-
.filter((b) => b.text !== undefined)
321+
.filter((b) => b.text !== undefined && b.text !== "")
302322
.map((b) => b.text ?? "")
303323
.join("");
304324
messages.push({ role: "user", content: text });
305325
} else if (msg.role === "assistant") {
306326
const toolUseBlocks = msg.content.filter((b) => b.toolUse);
307327
const textContent = msg.content
308-
.filter((b) => b.text !== undefined)
328+
.filter((b) => b.text !== undefined && b.text !== "")
309329
.map((b) => b.text ?? "")
310330
.join("");
311331

312332
if (toolUseBlocks.length > 0) {
313333
messages.push({
314334
role: "assistant",
315-
content: textContent || null,
335+
content: textContent ?? null,
316336
tool_calls: toolUseBlocks.map((b) => ({
317337
id: b.toolUse!.toolUseId,
318338
type: "function" as const,
@@ -323,7 +343,12 @@ export function converseToCompletionRequest(
323343
})),
324344
});
325345
} else {
326-
messages.push({ role: "assistant", content: textContent || null });
346+
messages.push({ role: "assistant", content: textContent ?? null });
347+
}
348+
} else {
349+
const warnMsg = `Unexpected message role "${msg.role}" in Converse request — skipping`;
350+
if (logger) {
351+
logger.warn(warnMsg);
327352
}
328353
}
329354
}
@@ -518,7 +543,7 @@ export async function handleConverse(
518543
return;
519544
}
520545

521-
const completionReq = converseToCompletionRequest(converseReq, modelId);
546+
const completionReq = converseToCompletionRequest(converseReq, modelId, logger);
522547
completionReq._endpointType = "chat";
523548

524549
const testId = getTestId(req);
@@ -623,7 +648,7 @@ export async function handleConverse(
623648
const errBody = {
624649
type: "error",
625650
error: {
626-
type: response.error.type || "invalid_request_error",
651+
type: response.error.type ?? "invalid_request_error",
627652
message: response.error.message,
628653
},
629654
};
@@ -681,6 +706,11 @@ export async function handleConverse(
681706

682707
// Tool call response
683708
if (isToolCallResponse(response)) {
709+
if ("webSearches" in response) {
710+
logger.warn(
711+
"webSearches in fixture response are not supported for Bedrock Converse API — ignoring",
712+
);
713+
}
684714
const overrides = extractOverrides(response);
685715
journal.add({
686716
method: req.method ?? "POST",
@@ -775,7 +805,8 @@ export async function handleConverseStream(
775805
return;
776806
}
777807

778-
const completionReq = converseToCompletionRequest(converseReq, modelId);
808+
const completionReq = converseToCompletionRequest(converseReq, modelId, logger);
809+
completionReq.stream = true;
779810
completionReq._endpointType = "chat";
780811

781812
const testId = getTestId(req);
@@ -882,7 +913,7 @@ export async function handleConverseStream(
882913
const errBody = {
883914
type: "error",
884915
error: {
885-
type: response.error.type || "invalid_request_error",
916+
type: response.error.type ?? "invalid_request_error",
886917
message: response.error.message,
887918
},
888919
};
@@ -968,6 +999,11 @@ export async function handleConverseStream(
968999

9691000
// Tool call response — stream as Event Stream
9701001
if (isToolCallResponse(response)) {
1002+
if ("webSearches" in response) {
1003+
logger.warn(
1004+
"webSearches in fixture response are not supported for Bedrock Converse API — ignoring",
1005+
);
1006+
}
9711007
const overrides = extractOverrides(response);
9721008
const journalEntry = journal.add({
9731009
method: req.method ?? "POST",

0 commit comments

Comments
 (0)