Skip to content

Commit 5ba48d7

Browse files
sf-jin-kujpr5
authored andcommitted
fix: emit native Bedrock invoke stream chunks
InvokeModelWithResponseStream wraps Anthropic native stream events in Bedrock chunk frames, while ConverseStream keeps the Converse event schema.
1 parent ba845c9 commit 5ba48d7

5 files changed

Lines changed: 514 additions & 320 deletions

File tree

src/__tests__/bedrock-stream.test.ts

Lines changed: 99 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -261,38 +261,49 @@ describe("POST /model/{modelId}/invoke-with-response-stream", () => {
261261
const frames = parseFrames(res.body);
262262
expect(frames.length).toBeGreaterThanOrEqual(5);
263263

264-
// messageStart
265-
expect(frames[0].eventType).toBe("messageStart");
266-
expect(frames[0].payload).toEqual({ messageStart: { role: "assistant" } });
264+
// InvokeModelWithResponseStream wraps Anthropic-native stream events in Bedrock
265+
// EventStream chunk frames.
266+
expect(frames[0].eventType).toBe("chunk");
267+
expect(frames[0].payload).toMatchObject({
268+
type: "message_start",
269+
message: { role: "assistant", model: MODEL_ID },
270+
});
267271

268-
// contentBlockStart
269-
expect(frames[1].eventType).toBe("contentBlockStart");
272+
expect(frames[1].eventType).toBe("chunk");
270273
expect(frames[1].payload).toEqual({
271-
contentBlockIndex: 0,
272-
contentBlockStart: { contentBlockIndex: 0, start: { type: "text" } },
274+
type: "content_block_start",
275+
index: 0,
276+
content_block: { type: "text", text: "" },
273277
});
274278

275279
// Content delta(s) — collect text
276-
const deltas = frames.filter((f) => f.eventType === "contentBlockDelta");
280+
const deltas = frames.filter(
281+
(f) => (f.payload as { type?: string }).type === "content_block_delta",
282+
);
277283
expect(deltas.length).toBeGreaterThanOrEqual(1);
278284
const fullText = deltas
279-
.map(
280-
(f) =>
281-
(f.payload as { contentBlockDelta: { delta: { text: string } } }).contentBlockDelta.delta
282-
.text,
283-
)
285+
.map((f) => (f.payload as { delta: { text: string } }).delta.text)
284286
.join("");
285287
expect(fullText).toBe("Hi there!");
286288

287-
// contentBlockStop
288-
const stopBlock = frames.find((f) => f.eventType === "contentBlockStop");
289+
// content_block_stop
290+
const stopBlock = frames.find(
291+
(f) => (f.payload as { type?: string }).type === "content_block_stop",
292+
);
289293
expect(stopBlock).toBeDefined();
290-
expect(stopBlock!.payload).toEqual({ contentBlockIndex: 0 });
294+
expect(stopBlock!.payload).toEqual({ type: "content_block_stop", index: 0 });
291295

292-
// messageStop
293-
const msgStop = frames.find((f) => f.eventType === "messageStop");
296+
// message_delta/message_stop
297+
const msgDelta = frames.find(
298+
(f) => (f.payload as { type?: string }).type === "message_delta",
299+
);
300+
expect(msgDelta).toBeDefined();
301+
expect(msgDelta!.payload).toMatchObject({
302+
type: "message_delta",
303+
delta: { stop_reason: "end_turn" },
304+
});
305+
const msgStop = frames.find((f) => (f.payload as { type?: string }).type === "message_stop");
294306
expect(msgStop).toBeDefined();
295-
expect(msgStop!.payload).toEqual({ stopReason: "end_turn" });
296307
});
297308

298309
it("returns tool call response as binary Event Stream frames", async () => {
@@ -306,38 +317,40 @@ describe("POST /model/{modelId}/invoke-with-response-stream", () => {
306317
expect(res.status).toBe(200);
307318
const frames = parseFrames(res.body);
308319

309-
// messageStart
310-
expect(frames[0].eventType).toBe("messageStart");
311-
expect(frames[0].payload).toEqual({ messageStart: { role: "assistant" } });
320+
expect(frames[0].eventType).toBe("chunk");
321+
expect(frames[0].payload).toMatchObject({ type: "message_start" });
312322

313-
// contentBlockStart with toolUse
314-
expect(frames[1].eventType).toBe("contentBlockStart");
323+
// content_block_start with tool_use
324+
expect(frames[1].eventType).toBe("chunk");
315325
const startPayload = frames[1].payload as {
316-
contentBlockIndex: number;
317-
contentBlockStart: {
318-
contentBlockIndex: number;
319-
start: { toolUse: { toolUseId: string; name: string } };
320-
};
326+
type: string;
327+
index: number;
328+
content_block: { type: string; id: string; name: string; input: object };
321329
};
322-
expect(startPayload.contentBlockIndex).toBe(0);
323-
expect(startPayload.contentBlockStart.start.toolUse.name).toBe("get_weather");
324-
expect(startPayload.contentBlockStart.start.toolUse.toolUseId).toBeDefined();
330+
expect(startPayload).toMatchObject({
331+
type: "content_block_start",
332+
index: 0,
333+
content_block: { type: "tool_use", name: "get_weather", input: {} },
334+
});
335+
expect(startPayload.content_block.id).toBeDefined();
325336

326-
// contentBlockDelta(s) with toolUse input
327-
const deltas = frames.filter((f) => f.eventType === "contentBlockDelta");
337+
// content_block_delta(s) with input_json_delta
338+
const deltas = frames.filter(
339+
(f) => (f.payload as { type?: string }).type === "content_block_delta",
340+
);
328341
expect(deltas.length).toBeGreaterThanOrEqual(1);
329342
const fullJson = deltas
330-
.map(
331-
(f) =>
332-
(f.payload as { contentBlockDelta: { delta: { toolUse: { input: string } } } })
333-
.contentBlockDelta.delta.toolUse.input,
334-
)
343+
.map((f) => (f.payload as { delta: { partial_json: string } }).delta.partial_json)
335344
.join("");
336345
expect(JSON.parse(fullJson)).toEqual({ city: "SF" });
337346

338-
// messageStop
339-
const msgStop = frames.find((f) => f.eventType === "messageStop");
340-
expect(msgStop!.payload).toEqual({ stopReason: "tool_use" });
347+
const msgDelta = frames.find(
348+
(f) => (f.payload as { type?: string }).type === "message_delta",
349+
);
350+
expect(msgDelta!.payload).toMatchObject({
351+
type: "message_delta",
352+
delta: { stop_reason: "tool_use" },
353+
});
341354
});
342355

343356
it("Content-Type is application/vnd.amazon.eventstream", async () => {
@@ -467,41 +480,41 @@ describe("POST /model/{modelId}/invoke-with-response-stream (multiple tool calls
467480
expect(res.status).toBe(200);
468481
const frames = parseFrames(res.body);
469482

470-
// Find contentBlockStart frames
471-
const blockStarts = frames.filter((f) => f.eventType === "contentBlockStart");
483+
// Find content_block_start frames
484+
const blockStarts = frames.filter(
485+
(f) => (f.payload as { type?: string }).type === "content_block_start",
486+
);
472487
expect(blockStarts.length).toBeGreaterThanOrEqual(2);
473488

474-
// First tool at contentBlockIndex 0
489+
// First tool at index 0
475490
const start0 = blockStarts[0].payload as {
476-
contentBlockIndex: number;
477-
contentBlockStart: {
478-
contentBlockIndex: number;
479-
start: { toolUse: { name: string } };
480-
};
491+
index: number;
492+
content_block: { name: string };
481493
};
482-
expect(start0.contentBlockIndex).toBe(0);
483-
expect(start0.contentBlockStart.start.toolUse.name).toBe("get_weather");
494+
expect(start0.index).toBe(0);
495+
expect(start0.content_block.name).toBe("get_weather");
484496

485-
// Second tool at contentBlockIndex 1
497+
// Second tool at index 1
486498
const start1 = blockStarts[1].payload as {
487-
contentBlockIndex: number;
488-
contentBlockStart: {
489-
contentBlockIndex: number;
490-
start: { toolUse: { name: string } };
491-
};
499+
index: number;
500+
content_block: { name: string };
492501
};
493-
expect(start1.contentBlockIndex).toBe(1);
494-
expect(start1.contentBlockStart.start.toolUse.name).toBe("get_time");
502+
expect(start1.index).toBe(1);
503+
expect(start1.content_block.name).toBe("get_time");
495504

496-
// contentBlockStop should also have correct indices
497-
const blockStops = frames.filter((f) => f.eventType === "contentBlockStop");
505+
// content_block_stop should also have correct indices
506+
const blockStops = frames.filter(
507+
(f) => (f.payload as { type?: string }).type === "content_block_stop",
508+
);
498509
expect(blockStops.length).toBeGreaterThanOrEqual(2);
499-
expect((blockStops[0].payload as { contentBlockIndex: number }).contentBlockIndex).toBe(0);
500-
expect((blockStops[1].payload as { contentBlockIndex: number }).contentBlockIndex).toBe(1);
510+
expect((blockStops[0].payload as { index: number }).index).toBe(0);
511+
expect((blockStops[1].payload as { index: number }).index).toBe(1);
501512

502-
// messageStop should indicate tool_use
503-
const msgStop = frames.find((f) => f.eventType === "messageStop");
504-
expect(msgStop!.payload).toEqual({ stopReason: "tool_use" });
513+
// message_delta should indicate tool_use
514+
const msgDelta = frames.find(
515+
(f) => (f.payload as { type?: string }).type === "message_delta",
516+
);
517+
expect(msgDelta!.payload).toMatchObject({ delta: { stop_reason: "tool_use" } });
505518
});
506519
});
507520

@@ -1025,15 +1038,12 @@ describe("POST /model/{modelId}/invoke-with-response-stream (malformed tool args
10251038
expect(res.status).toBe(200);
10261039
const frames = parseFrames(res.body);
10271040

1028-
// Find contentBlockDelta frames with toolUse input
1029-
const deltas = frames.filter((f) => f.eventType === "contentBlockDelta");
1041+
// Find Anthropic-native content_block_delta frames with input_json_delta
1042+
const deltas = frames.filter(
1043+
(f) => (f.payload as { type?: string }).type === "content_block_delta",
1044+
);
10301045
const fullJson = deltas
1031-
.map((f) => {
1032-
const payload = f.payload as {
1033-
contentBlockDelta: { delta: { toolUse?: { input: string } } };
1034-
};
1035-
return payload.contentBlockDelta.delta.toolUse?.input ?? "";
1036-
})
1046+
.map((f) => (f.payload as { delta: { partial_json?: string } }).delta.partial_json ?? "")
10371047
.join("");
10381048
// Malformed arguments should fall back to "{}"
10391049
expect(fullJson).toBe("{}");
@@ -1060,14 +1070,22 @@ describe("POST /model/{modelId}/invoke-with-response-stream (empty content)", ()
10601070
expect(res.status).toBe(200);
10611071
const frames = parseFrames(res.body);
10621072

1063-
// Should still have messageStart, contentBlockStart, contentBlockStop, messageStop
1064-
expect(frames[0].eventType).toBe("messageStart");
1065-
expect(frames.find((f) => f.eventType === "contentBlockStart")).toBeDefined();
1066-
expect(frames.find((f) => f.eventType === "contentBlockStop")).toBeDefined();
1067-
expect(frames.find((f) => f.eventType === "messageStop")).toBeDefined();
1073+
// Should still have message_start, content_block_start, content_block_stop, message_stop
1074+
// payloads inside Bedrock EventStream chunk frames.
1075+
expect(frames[0].eventType).toBe("chunk");
1076+
expect(frames[0].payload).toMatchObject({ type: "message_start" });
1077+
expect(
1078+
frames.find((f) => (f.payload as { type?: string }).type === "content_block_start"),
1079+
).toBeDefined();
1080+
expect(
1081+
frames.find((f) => (f.payload as { type?: string }).type === "content_block_stop"),
1082+
).toBeDefined();
1083+
expect(frames.find((f) => (f.payload as { type?: string }).type === "message_stop")).toBeDefined();
10681084

10691085
// Content deltas should be zero (empty string → no chunks)
1070-
const deltas = frames.filter((f) => f.eventType === "contentBlockDelta");
1086+
const deltas = frames.filter(
1087+
(f) => (f.payload as { type?: string }).type === "content_block_delta",
1088+
);
10711089
expect(deltas).toHaveLength(0);
10721090
});
10731091
});

src/__tests__/bedrock.test.ts

Lines changed: 46 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1448,31 +1448,29 @@ import { buildBedrockStreamTextEvents, buildBedrockStreamToolCallEvents } from "
14481448

14491449
describe("buildBedrockStreamTextEvents", () => {
14501450
it("creates correct event sequence for empty content", () => {
1451-
const events = buildBedrockStreamTextEvents("", 10);
1452-
// Should have: messageStart, contentBlockStart, contentBlockStop, messageStop (no deltas)
1453-
expect(events).toHaveLength(4);
1454-
expect(events[0].eventType).toBe("messageStart");
1455-
expect(events[1].eventType).toBe("contentBlockStart");
1456-
expect(events[2].eventType).toBe("contentBlockStop");
1457-
expect(events[3].eventType).toBe("messageStop");
1451+
const events = buildBedrockStreamTextEvents("", "model-id", 10);
1452+
// Should have: message_start, content_block_start, content_block_stop,
1453+
// message_delta, message_stop (no deltas), all wrapped in chunk events.
1454+
expect(events).toHaveLength(5);
1455+
expect(events.every((event) => event.eventType === "chunk")).toBe(true);
1456+
expect(events.map((event) => (event.payload as { type: string }).type)).toEqual([
1457+
"message_start",
1458+
"content_block_start",
1459+
"content_block_stop",
1460+
"message_delta",
1461+
"message_stop",
1462+
]);
14581463
});
14591464

14601465
it("chunks content according to chunkSize", () => {
1461-
const events = buildBedrockStreamTextEvents("ABCDEF", 2);
1462-
const deltas = events.filter((e) => e.eventType === "contentBlockDelta");
1466+
const events = buildBedrockStreamTextEvents("ABCDEF", "model-id", 2);
1467+
const deltas = events.filter(
1468+
(e) => (e.payload as { type?: string }).type === "content_block_delta",
1469+
);
14631470
expect(deltas).toHaveLength(3);
1464-
expect(
1465-
(deltas[0].payload as { contentBlockDelta: { delta: { text: string } } }).contentBlockDelta
1466-
.delta.text,
1467-
).toBe("AB");
1468-
expect(
1469-
(deltas[1].payload as { contentBlockDelta: { delta: { text: string } } }).contentBlockDelta
1470-
.delta.text,
1471-
).toBe("CD");
1472-
expect(
1473-
(deltas[2].payload as { contentBlockDelta: { delta: { text: string } } }).contentBlockDelta
1474-
.delta.text,
1475-
).toBe("EF");
1471+
expect((deltas[0].payload as { delta: { text: string } }).delta.text).toBe("AB");
1472+
expect((deltas[1].payload as { delta: { text: string } }).delta.text).toBe("CD");
1473+
expect((deltas[2].payload as { delta: { text: string } }).delta.text).toBe("EF");
14761474
});
14771475
});
14781476

@@ -1482,55 +1480,63 @@ describe("buildBedrockStreamToolCallEvents", () => {
14821480
it("falls back to '{}' for malformed JSON arguments", () => {
14831481
const events = buildBedrockStreamToolCallEvents(
14841482
[{ name: "fn", arguments: "NOT VALID" }],
1483+
"model-id",
14851484
100,
14861485
logger,
14871486
);
1488-
const deltas = events.filter((e) => e.eventType === "contentBlockDelta");
1487+
const deltas = events.filter(
1488+
(e) => (e.payload as { type?: string }).type === "content_block_delta",
1489+
);
14891490
const fullJson = deltas
1490-
.map(
1491-
(e) =>
1492-
(e.payload as { contentBlockDelta: { delta: { toolUse: { input: string } } } })
1493-
.contentBlockDelta.delta.toolUse.input,
1494-
)
1491+
.map((e) => (e.payload as { delta: { partial_json: string } }).delta.partial_json)
14951492
.join("");
14961493
expect(fullJson).toBe("{}");
14971494
});
14981495

14991496
it("generates tool use id when not provided", () => {
15001497
const events = buildBedrockStreamToolCallEvents(
15011498
[{ name: "fn", arguments: '{"x":1}' }],
1499+
"model-id",
15021500
100,
15031501
logger,
15041502
);
1505-
const startEvent = events.find((e) => e.eventType === "contentBlockStart");
1503+
const startEvent = events.find(
1504+
(e) => (e.payload as { type?: string }).type === "content_block_start",
1505+
);
15061506
const payload = startEvent!.payload as {
1507-
contentBlockStart: { start: { toolUse: { toolUseId: string } } };
1507+
content_block: { id: string };
15081508
};
1509-
expect(payload.contentBlockStart.start.toolUse.toolUseId).toMatch(/^toolu_/);
1509+
expect(payload.content_block.id).toMatch(/^toolu_/);
15101510
});
15111511

15121512
it("uses provided tool id", () => {
15131513
const events = buildBedrockStreamToolCallEvents(
15141514
[{ name: "fn", arguments: '{"x":1}', id: "custom_id" }],
1515+
"model-id",
15151516
100,
15161517
logger,
15171518
);
1518-
const startEvent = events.find((e) => e.eventType === "contentBlockStart");
1519+
const startEvent = events.find(
1520+
(e) => (e.payload as { type?: string }).type === "content_block_start",
1521+
);
15191522
const payload = startEvent!.payload as {
1520-
contentBlockStart: { start: { toolUse: { toolUseId: string } } };
1523+
content_block: { id: string };
15211524
};
1522-
expect(payload.contentBlockStart.start.toolUse.toolUseId).toBe("custom_id");
1525+
expect(payload.content_block.id).toBe("custom_id");
15231526
});
15241527

15251528
it("uses '{}' when arguments is empty string", () => {
1526-
const events = buildBedrockStreamToolCallEvents([{ name: "fn", arguments: "" }], 100, logger);
1527-
const deltas = events.filter((e) => e.eventType === "contentBlockDelta");
1529+
const events = buildBedrockStreamToolCallEvents(
1530+
[{ name: "fn", arguments: "" }],
1531+
"model-id",
1532+
100,
1533+
logger,
1534+
);
1535+
const deltas = events.filter(
1536+
(e) => (e.payload as { type?: string }).type === "content_block_delta",
1537+
);
15281538
const fullJson = deltas
1529-
.map(
1530-
(e) =>
1531-
(e.payload as { contentBlockDelta: { delta: { toolUse: { input: string } } } })
1532-
.contentBlockDelta.delta.toolUse.input,
1533-
)
1539+
.map((e) => (e.payload as { delta: { partial_json: string } }).delta.partial_json)
15341540
.join("");
15351541
expect(fullJson).toBe("{}");
15361542
});

0 commit comments

Comments
 (0)