Skip to content

Commit bf9fda8

Browse files
committed
fix: harden Converse handlers — align stream shapes, add metadata,
unwrap inputSchema Wrap contentBlockStop and messageStop payloads to match real AWS Converse API shape. Remove duplicate top-level contentBlockIndex from contentBlockStart/contentBlockDelta. Add trailing metadata events (usage + latencyMs) to all three stream builders. Filter empty-string text blocks in converseToCompletionRequest. Unwrap inputSchema from Converse { json: {...} } wrapper. Set completionReq.stream = true in streaming handler. Add content-loss warnings for non-text blocks. Fix error type || to ??.
1 parent f3f4f5d commit bf9fda8

1 file changed

Lines changed: 91 additions & 30 deletions

File tree

src/bedrock-converse.ts

Lines changed: 91 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@ function converseStopReason(
7575
return overrideFinishReason;
7676
}
7777

78+
/**
79+
* Build Converse-format usage from fixture overrides.
80+
*
81+
* When no overrides are provided (the common case for mocks), all token
82+
* counts default to zero. This is intentional — aimock is a mock server
83+
* and does not perform real tokenisation. Callers that need non-zero
84+
* usage should supply explicit `usage` overrides in their fixture.
85+
*/
7886
function converseUsage(overrides?: ResponseOverrides): {
7987
inputTokens: number;
8088
outputTokens: number;
@@ -113,49 +121,58 @@ function buildBedrockStreamTextEvents(
113121
events.push({
114122
eventType: "contentBlockStart",
115123
payload: {
116-
contentBlockIndex: blockIndex,
117124
contentBlockStart: { contentBlockIndex: blockIndex, start: { type: "thinking" } },
118125
},
119126
});
120127
for (let i = 0; i < reasoning.length; i += chunkSize) {
121128
events.push({
122129
eventType: "contentBlockDelta",
123130
payload: {
124-
contentBlockIndex: blockIndex,
125131
contentBlockDelta: {
126132
contentBlockIndex: blockIndex,
127133
delta: { type: "thinking_delta", thinking: reasoning.slice(i, i + chunkSize) },
128134
},
129135
},
130136
});
131137
}
132-
events.push({ eventType: "contentBlockStop", payload: { contentBlockIndex: blockIndex } });
138+
events.push({
139+
eventType: "contentBlockStop",
140+
payload: { contentBlockStop: { contentBlockIndex: blockIndex } },
141+
});
133142
}
134143

135144
const textBlockIndex = reasoning ? 1 : 0;
136145
events.push({
137146
eventType: "contentBlockStart",
138147
payload: {
139-
contentBlockIndex: textBlockIndex,
140148
contentBlockStart: { contentBlockIndex: textBlockIndex, start: { type: "text" } },
141149
},
142150
});
143151
for (let i = 0; i < content.length; i += chunkSize) {
144152
events.push({
145153
eventType: "contentBlockDelta",
146154
payload: {
147-
contentBlockIndex: textBlockIndex,
148155
contentBlockDelta: {
149156
contentBlockIndex: textBlockIndex,
150157
delta: { type: "text_delta", text: content.slice(i, i + chunkSize) },
151158
},
152159
},
153160
});
154161
}
155-
events.push({ eventType: "contentBlockStop", payload: { contentBlockIndex: textBlockIndex } });
162+
events.push({
163+
eventType: "contentBlockStop",
164+
payload: { contentBlockStop: { contentBlockIndex: textBlockIndex } },
165+
});
156166
events.push({
157167
eventType: "messageStop",
158-
payload: { stopReason: converseStopReason(overrides?.finishReason, "end_turn") },
168+
payload: {
169+
messageStop: { stopReason: converseStopReason(overrides?.finishReason, "end_turn") },
170+
},
171+
});
172+
const usage = converseUsage(overrides);
173+
events.push({
174+
eventType: "metadata",
175+
payload: { metadata: { usage, metrics: { latencyMs: 0 } } },
159176
});
160177
return events;
161178
}
@@ -168,19 +185,17 @@ function buildBedrockStreamContentWithToolCallsEvents(
168185
reasoning?: string,
169186
overrides?: ResponseOverrides,
170187
): Array<{ eventType: string; payload: object }> {
171-
const events = buildBedrockStreamTextEvents(content, chunkSize, reasoning, {
172-
...overrides,
173-
finishReason: "stop",
174-
});
175-
events.pop();
188+
const events = buildBedrockStreamTextEvents(content, chunkSize, reasoning, overrides);
189+
// Remove trailing metadata + messageStop events — we re-emit them after tool blocks
190+
events.pop(); // metadata
191+
events.pop(); // messageStop
176192
let blockIndex = reasoning ? 2 : 1;
177193

178194
for (const tc of toolCalls) {
179195
const toolUseId = tc.id || generateToolUseId();
180196
events.push({
181197
eventType: "contentBlockStart",
182198
payload: {
183-
contentBlockIndex: blockIndex,
184199
contentBlockStart: {
185200
contentBlockIndex: blockIndex,
186201
start: { toolUse: { toolUseId, name: tc.name } },
@@ -192,20 +207,29 @@ function buildBedrockStreamContentWithToolCallsEvents(
192207
events.push({
193208
eventType: "contentBlockDelta",
194209
payload: {
195-
contentBlockIndex: blockIndex,
196210
contentBlockDelta: {
197211
contentBlockIndex: blockIndex,
198212
delta: { toolUse: { input: argsStr.slice(i, i + chunkSize) } },
199213
},
200214
},
201215
});
202216
}
203-
events.push({ eventType: "contentBlockStop", payload: { contentBlockIndex: blockIndex } });
217+
events.push({
218+
eventType: "contentBlockStop",
219+
payload: { contentBlockStop: { contentBlockIndex: blockIndex } },
220+
});
204221
blockIndex++;
205222
}
206223
events.push({
207224
eventType: "messageStop",
208-
payload: { stopReason: converseStopReason(overrides?.finishReason, "tool_use") },
225+
payload: {
226+
messageStop: { stopReason: converseStopReason(overrides?.finishReason, "tool_use") },
227+
},
228+
});
229+
const usage = converseUsage(overrides);
230+
events.push({
231+
eventType: "metadata",
232+
payload: { metadata: { usage, metrics: { latencyMs: 0 } } },
209233
});
210234
return events;
211235
}
@@ -226,7 +250,6 @@ function buildBedrockStreamToolCallEvents(
226250
events.push({
227251
eventType: "contentBlockStart",
228252
payload: {
229-
contentBlockIndex: tcIdx,
230253
contentBlockStart: {
231254
contentBlockIndex: tcIdx,
232255
start: { toolUse: { toolUseId, name: tc.name } },
@@ -238,19 +261,28 @@ function buildBedrockStreamToolCallEvents(
238261
events.push({
239262
eventType: "contentBlockDelta",
240263
payload: {
241-
contentBlockIndex: tcIdx,
242264
contentBlockDelta: {
243265
contentBlockIndex: tcIdx,
244266
delta: { toolUse: { input: argsStr.slice(i, i + chunkSize) } },
245267
},
246268
},
247269
});
248270
}
249-
events.push({ eventType: "contentBlockStop", payload: { contentBlockIndex: tcIdx } });
271+
events.push({
272+
eventType: "contentBlockStop",
273+
payload: { contentBlockStop: { contentBlockIndex: tcIdx } },
274+
});
250275
}
251276
events.push({
252277
eventType: "messageStop",
253-
payload: { stopReason: converseStopReason(overrides?.finishReason, "tool_use") },
278+
payload: {
279+
messageStop: { stopReason: converseStopReason(overrides?.finishReason, "tool_use") },
280+
},
281+
});
282+
const usage = converseUsage(overrides);
283+
events.push({
284+
eventType: "metadata",
285+
payload: { metadata: { usage, metrics: { latencyMs: 0 } } },
254286
});
255287
return events;
256288
}
@@ -260,6 +292,7 @@ function buildBedrockStreamToolCallEvents(
260292
export function converseToCompletionRequest(
261293
req: ConverseRequest,
262294
modelId: string,
295+
logger?: Logger,
263296
): ChatCompletionRequest {
264297
const messages: ChatMessage[] = [];
265298

@@ -275,7 +308,17 @@ export function converseToCompletionRequest(
275308
if (msg.role === "user") {
276309
// Check for toolResult blocks
277310
const toolResults = msg.content.filter((b) => b.toolResult);
278-
const textBlocks = msg.content.filter((b) => b.text !== undefined && !b.toolResult);
311+
const textBlocks = msg.content.filter(
312+
(b) => b.text !== undefined && b.text !== "" && !b.toolResult,
313+
);
314+
const unsupportedBlocks = msg.content.filter(
315+
(b) => b.text === undefined && !b.toolResult && !b.toolUse,
316+
);
317+
if (unsupportedBlocks.length > 0 && logger) {
318+
logger.warn(
319+
`Converse user message contains unsupported content block types — these will be dropped during conversion`,
320+
);
321+
}
279322

280323
if (toolResults.length > 0) {
281324
for (const block of toolResults) {
@@ -298,21 +341,21 @@ export function converseToCompletionRequest(
298341

299342
// Plain user message
300343
const text = msg.content
301-
.filter((b) => b.text !== undefined)
344+
.filter((b) => b.text !== undefined && b.text !== "")
302345
.map((b) => b.text ?? "")
303346
.join("");
304347
messages.push({ role: "user", content: text });
305348
} else if (msg.role === "assistant") {
306349
const toolUseBlocks = msg.content.filter((b) => b.toolUse);
307350
const textContent = msg.content
308-
.filter((b) => b.text !== undefined)
351+
.filter((b) => b.text !== undefined && b.text !== "")
309352
.map((b) => b.text ?? "")
310353
.join("");
311354

312355
if (toolUseBlocks.length > 0) {
313356
messages.push({
314357
role: "assistant",
315-
content: textContent || null,
358+
content: textContent ?? null,
316359
tool_calls: toolUseBlocks.map((b) => ({
317360
id: b.toolUse!.toolUseId,
318361
type: "function" as const,
@@ -323,7 +366,12 @@ export function converseToCompletionRequest(
323366
})),
324367
});
325368
} else {
326-
messages.push({ role: "assistant", content: textContent || null });
369+
messages.push({ role: "assistant", content: textContent ?? null });
370+
}
371+
} else {
372+
const warnMsg = `Unexpected message role "${msg.role}" in Converse request — skipping`;
373+
if (logger) {
374+
logger.warn(warnMsg);
327375
}
328376
}
329377
}
@@ -336,7 +384,9 @@ export function converseToCompletionRequest(
336384
function: {
337385
name: t.toolSpec.name,
338386
description: t.toolSpec.description,
339-
parameters: t.toolSpec.inputSchema,
387+
parameters: (t.toolSpec.inputSchema && "json" in t.toolSpec.inputSchema
388+
? (t.toolSpec.inputSchema as Record<string, unknown>).json
389+
: t.toolSpec.inputSchema) as object | undefined,
340390
},
341391
}));
342392
}
@@ -518,7 +568,7 @@ export async function handleConverse(
518568
return;
519569
}
520570

521-
const completionReq = converseToCompletionRequest(converseReq, modelId);
571+
const completionReq = converseToCompletionRequest(converseReq, modelId, logger);
522572
completionReq._endpointType = "chat";
523573

524574
const testId = getTestId(req);
@@ -623,7 +673,7 @@ export async function handleConverse(
623673
const errBody = {
624674
type: "error",
625675
error: {
626-
type: response.error.type || "invalid_request_error",
676+
type: response.error.type ?? "invalid_request_error",
627677
message: response.error.message,
628678
},
629679
};
@@ -681,6 +731,11 @@ export async function handleConverse(
681731

682732
// Tool call response
683733
if (isToolCallResponse(response)) {
734+
if ("webSearches" in response) {
735+
logger.warn(
736+
"webSearches in fixture response are not supported for Bedrock Converse API — ignoring",
737+
);
738+
}
684739
const overrides = extractOverrides(response);
685740
journal.add({
686741
method: req.method ?? "POST",
@@ -775,7 +830,8 @@ export async function handleConverseStream(
775830
return;
776831
}
777832

778-
const completionReq = converseToCompletionRequest(converseReq, modelId);
833+
const completionReq = converseToCompletionRequest(converseReq, modelId, logger);
834+
completionReq.stream = true;
779835
completionReq._endpointType = "chat";
780836

781837
const testId = getTestId(req);
@@ -882,7 +938,7 @@ export async function handleConverseStream(
882938
const errBody = {
883939
type: "error",
884940
error: {
885-
type: response.error.type || "invalid_request_error",
941+
type: response.error.type ?? "invalid_request_error",
886942
message: response.error.message,
887943
},
888944
};
@@ -968,6 +1024,11 @@ export async function handleConverseStream(
9681024

9691025
// Tool call response — stream as Event Stream
9701026
if (isToolCallResponse(response)) {
1027+
if ("webSearches" in response) {
1028+
logger.warn(
1029+
"webSearches in fixture response are not supported for Bedrock Converse API — ignoring",
1030+
);
1031+
}
9711032
const overrides = extractOverrides(response);
9721033
const journalEntry = journal.add({
9731034
method: req.method ?? "POST",

0 commit comments

Comments
 (0)