Skip to content

Commit 4e1b07a

Browse files
authored
fix: Flush decoder state at end of NDJSON stream (#119)
A few other cleanups as well. But make sure we get the final content. Once we are done
1 parent a23c541 commit 4e1b07a

4 files changed

Lines changed: 163 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ This update provides much improved schema interfaces. The migration should be mi
160160
- Allow for more customization of error messages: https://github.com/agentclientprotocol/typescript-sdk/pull/12
161161
- Update to latest ACP JSON Schema: https://github.com/agentclientprotocol/typescript-sdk/pull/10
162162

163-
## 0.4.9 (20205-10-21)
163+
## 0.4.9 (2025-10-21)
164164

165165
- Fix: incorrect method for session/set_model client implementation.
166166

scripts/generate.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env node
22

3-
import { createClient, defineConfig } from "@hey-api/openapi-ts";
3+
import { createClient } from "@hey-api/openapi-ts";
44
import * as fs from "fs/promises";
55
import { dirname } from "path";
66
import * as prettier from "prettier";

src/stream.test.ts

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
import { describe, it, expect } from "vitest";
2+
import { ndJsonStream } from "./stream.js";
3+
import type { AnyMessage } from "./jsonrpc.js";
4+
5+
function readableFromChunks(chunks: Uint8Array[]): ReadableStream<Uint8Array> {
6+
return new ReadableStream({
7+
start(controller) {
8+
for (const chunk of chunks) {
9+
controller.enqueue(chunk);
10+
}
11+
controller.close();
12+
},
13+
});
14+
}
15+
16+
async function collectMessages(
17+
readable: ReadableStream<AnyMessage>,
18+
): Promise<AnyMessage[]> {
19+
const messages: AnyMessage[] = [];
20+
const reader = readable.getReader();
21+
while (true) {
22+
const { value, done } = await reader.read();
23+
if (done) break;
24+
messages.push(value);
25+
}
26+
return messages;
27+
}
28+
29+
describe("ndJsonStream", () => {
30+
const nullWritable = new WritableStream<Uint8Array>();
31+
32+
it("parses a single message", async () => {
33+
const msg = { jsonrpc: "2.0" as const, id: 1, method: "test" };
34+
const input = readableFromChunks([
35+
new TextEncoder().encode(JSON.stringify(msg) + "\n"),
36+
]);
37+
38+
const { readable } = ndJsonStream(nullWritable, input);
39+
const messages = await collectMessages(readable);
40+
41+
expect(messages).toEqual([msg]);
42+
});
43+
44+
it("parses multiple messages", async () => {
45+
const msg1 = { jsonrpc: "2.0" as const, id: 1, method: "first" };
46+
const msg2 = { jsonrpc: "2.0" as const, id: 2, method: "second" };
47+
const input = readableFromChunks([
48+
new TextEncoder().encode(
49+
JSON.stringify(msg1) + "\n" + JSON.stringify(msg2) + "\n",
50+
),
51+
]);
52+
53+
const { readable } = ndJsonStream(nullWritable, input);
54+
const messages = await collectMessages(readable);
55+
56+
expect(messages).toEqual([msg1, msg2]);
57+
});
58+
59+
it("parses a message split across chunks", async () => {
60+
const msg = { jsonrpc: "2.0" as const, id: 1, method: "split" };
61+
const full = JSON.stringify(msg) + "\n";
62+
const mid = Math.floor(full.length / 2);
63+
const encoder = new TextEncoder();
64+
65+
const input = readableFromChunks([
66+
encoder.encode(full.slice(0, mid)),
67+
encoder.encode(full.slice(mid)),
68+
]);
69+
70+
const { readable } = ndJsonStream(nullWritable, input);
71+
const messages = await collectMessages(readable);
72+
73+
expect(messages).toEqual([msg]);
74+
});
75+
76+
it("handles multi-byte UTF-8 characters split across chunks", async () => {
77+
const msg = {
78+
jsonrpc: "2.0" as const,
79+
id: 1,
80+
method: "test",
81+
params: { text: "héllo wörld" },
82+
};
83+
const bytes = new TextEncoder().encode(JSON.stringify(msg) + "\n");
84+
85+
// Find the byte offset of 'é' (0xC3 0xA9) and split between its two bytes
86+
const éOffset = bytes.indexOf(0xc3);
87+
expect(éOffset).toBeGreaterThan(0);
88+
89+
const input = readableFromChunks([
90+
bytes.slice(0, éOffset + 1), // includes 0xC3 but not 0xA9
91+
bytes.slice(éOffset + 1), // starts with 0xA9
92+
]);
93+
94+
const { readable } = ndJsonStream(nullWritable, input);
95+
const messages = await collectMessages(readable);
96+
97+
expect(messages).toEqual([msg]);
98+
});
99+
100+
it("parses a final message without trailing newline", async () => {
101+
const msg = { jsonrpc: "2.0" as const, id: 1, method: "unterminated" };
102+
const input = readableFromChunks([
103+
new TextEncoder().encode(JSON.stringify(msg)), // no \n
104+
]);
105+
106+
const { readable } = ndJsonStream(nullWritable, input);
107+
const messages = await collectMessages(readable);
108+
109+
expect(messages).toEqual([msg]);
110+
});
111+
112+
it("parses a final message without trailing newline with multi-byte chars split across chunks", async () => {
113+
const msg = {
114+
jsonrpc: "2.0" as const,
115+
id: 1,
116+
method: "tëst",
117+
};
118+
const bytes = new TextEncoder().encode(JSON.stringify(msg)); // no \n
119+
const éOffset = bytes.indexOf(0xc3);
120+
expect(éOffset).toBeGreaterThan(0);
121+
122+
const input = readableFromChunks([
123+
bytes.slice(0, éOffset + 1), // includes 0xC3 but not 0xAB
124+
bytes.slice(éOffset + 1),
125+
]);
126+
127+
const { readable } = ndJsonStream(nullWritable, input);
128+
const messages = await collectMessages(readable);
129+
130+
expect(messages).toEqual([msg]);
131+
});
132+
133+
it("skips malformed lines and continues parsing", async () => {
134+
const msg1 = { jsonrpc: "2.0" as const, id: 1, method: "before" };
135+
const msg2 = { jsonrpc: "2.0" as const, id: 2, method: "after" };
136+
const input = readableFromChunks([
137+
new TextEncoder().encode(
138+
JSON.stringify(msg1) +
139+
"\n" +
140+
"not valid json\n" +
141+
JSON.stringify(msg2) +
142+
"\n",
143+
),
144+
]);
145+
146+
const { readable } = ndJsonStream(nullWritable, input);
147+
const messages = await collectMessages(readable);
148+
149+
expect(messages).toEqual([msg1, msg2]);
150+
});
151+
});

src/stream.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ export function ndJsonStream(
3838
while (true) {
3939
const { value, done } = await reader.read();
4040
if (done) {
41+
content += textDecoder.decode();
4142
break;
4243
}
4344
if (!value) {
@@ -63,6 +64,15 @@ export function ndJsonStream(
6364
}
6465
}
6566
}
67+
const trimmedLine = content.trim();
68+
if (trimmedLine) {
69+
try {
70+
const message = JSON.parse(trimmedLine) as AnyMessage;
71+
controller.enqueue(message);
72+
} catch (err) {
73+
console.error("Failed to parse JSON message:", trimmedLine, err);
74+
}
75+
}
6676
} catch (err) {
6777
controller.error(err);
6878
return;

0 commit comments

Comments
 (0)