Skip to content

Commit 8efd867

Browse files
tombeckenhamclaude
andcommitted
fix(ai-client): port #508 robustness fixes onto fetcher-alt branch
The fetcher path uses the same SSE parsing and connect-wrapper plumbing as the stream() path on #508, so the polish that landed during #508's review applies directly here. Carry it over so this branch has the same robustness. - Skip SSE control lines (`:` comments, `event:` / `id:` / `retry:`) in responseToSSEChunks. Proxies and CDNs inject these as keepalives; letting them through would feed JSON.parse a non-payload line. - Drop unterminated trailing buffer in readStreamLines. A non-empty buffer at stream end means the connection was cut mid-line, so the data is partial — yielding it would surface a misleading RUN_ERROR for what is really a transport-layer issue. - Surface JSON.parse failures in responseToSSEChunks and fetchHttpStream. Stop swallowing them behind console.warn; let SyntaxError propagate so the connect-wrapper turns it into a visible RUN_ERROR. - Drop unsafe `as unknown as StreamChunk` casts in normalizeConnectionAdapter's synthesized RUN_FINISHED / RUN_ERROR events. Use EventType + RunFinishedEvent / RunErrorEvent so missing required fields are caught by the compiler. Track upstream threadId/runId from chunks and reuse them in the synthesis instead of fabricating both ids unconditionally. - Forward optional abortSignal third arg through stream() and rpcStream() factory signatures. Backwards-compatible for existing callers; lets long-running factories cancel when useChat aborts. Mirrors what fetcherToConnectionAdapter already does. Tests: - Update the two `should handle malformed JSON gracefully` tests to assert SyntaxError throws instead of silent drop. - Update stream() / rpcStream() factory mock assertions to expect the new third arg. - Add chat-fetcher test asserting a fetcher returning a malformed-SSE Response surfaces as a RUN_ERROR via onError. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent bb488ea commit 8efd867

3 files changed

Lines changed: 110 additions & 65 deletions

File tree

packages/typescript/ai-client/src/connection-adapters.ts

Lines changed: 60 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
1-
import type { ModelMessage, StreamChunk, UIMessage } from '@tanstack/ai'
1+
import { EventType } from '@tanstack/ai'
2+
import type {
3+
ModelMessage,
4+
RunErrorEvent,
5+
RunFinishedEvent,
6+
StreamChunk,
7+
UIMessage,
8+
} from '@tanstack/ai'
29
import type { ChatFetcher } from './types'
310

411
/**
@@ -54,9 +61,14 @@ async function* readStreamLines(
5461
}
5562
}
5663

57-
// Process any remaining data in the buffer
64+
// Drop any unterminated trailing buffer. A non-empty buffer at stream end
65+
// means the connection was cut mid-line (server crash, dropped TCP), so
66+
// the content is by definition partial — yielding it would feed truncated
67+
// JSON to downstream parsers and produce a confusing RUN_ERROR.
5868
if (buffer.trim()) {
59-
yield buffer
69+
console.warn(
70+
'[@tanstack/ai-client] Stream ended with unterminated trailing data; discarding. The connection was likely cut short.',
71+
)
6072
}
6173
} finally {
6274
reader.releaseLock()
@@ -68,6 +80,14 @@ async function* readStreamLines(
6880
*
6981
* Used by both `fetchServerSentEvents` (HTTP path) and
7082
* `fetcherToConnectionAdapter` (when a fetcher returns a Response).
83+
*
84+
* Accepts either `data: {...}` SSE lines or bare JSON lines (legacy/raw mode).
85+
* Skips non-payload SSE fields (comments starting with `:`, and `event:` /
86+
* `id:` / `retry:` lines) — proxies and CDNs may inject these as keepalives,
87+
* and they are not malformed JSON.
88+
*
89+
* A JSON parse failure on an actual payload line throws (surfacing as
90+
* RUN_ERROR through the connect-wrapper) rather than being silently dropped.
7191
*/
7292
async function* responseToSSEChunks(
7393
response: Response,
@@ -83,20 +103,22 @@ async function* responseToSSEChunks(
83103
throw new Error('Response body is not readable')
84104
}
85105
for await (const line of readStreamLines(reader, abortSignal)) {
106+
if (
107+
line.startsWith(':') ||
108+
line.startsWith('event:') ||
109+
line.startsWith('id:') ||
110+
line.startsWith('retry:')
111+
) {
112+
continue
113+
}
86114
const data = line.startsWith('data: ') ? line.slice(6) : line
87-
88115
if (data === '[DONE]') {
89116
console.warn(
90117
'[@tanstack/ai-client] Received [DONE] sentinel. This is deprecated — upgrade your @tanstack/ai server package. RUN_FINISHED is the stream terminator.',
91118
)
92119
continue
93120
}
94-
95-
try {
96-
yield JSON.parse(data) as StreamChunk
97-
} catch (parseError) {
98-
console.warn('Failed to parse SSE chunk:', data)
99-
}
121+
yield JSON.parse(data) as StreamChunk
100122
}
101123
}
102124

@@ -213,9 +235,17 @@ export function normalizeConnectionAdapter(
213235
},
214236
async send(messages, data, abortSignal) {
215237
let hasTerminalEvent = false
238+
let upstreamThreadId: string | undefined
239+
let upstreamRunId: string | undefined
216240
try {
217241
const stream = connection.connect(messages, data, abortSignal)
218242
for await (const chunk of stream) {
243+
if ('threadId' in chunk && typeof chunk.threadId === 'string') {
244+
upstreamThreadId = chunk.threadId
245+
}
246+
if ('runId' in chunk && typeof chunk.runId === 'string') {
247+
upstreamRunId = chunk.runId
248+
}
219249
if (chunk.type === 'RUN_FINISHED' || chunk.type === 'RUN_ERROR') {
220250
hasTerminalEvent = true
221251
}
@@ -225,28 +255,26 @@ export function normalizeConnectionAdapter(
225255
// If the connect stream ended cleanly without a terminal event,
226256
// synthesize RUN_FINISHED so request-scoped consumers can complete.
227257
if (!abortSignal?.aborted && !hasTerminalEvent) {
228-
push({
229-
type: 'RUN_FINISHED',
230-
runId: `run-${Date.now()}`,
258+
const synthetic: RunFinishedEvent = {
259+
type: EventType.RUN_FINISHED,
260+
threadId: upstreamThreadId ?? `thread-${Date.now()}`,
261+
runId: upstreamRunId ?? `run-${Date.now()}`,
231262
model: 'connect-wrapper',
232263
timestamp: Date.now(),
233264
finishReason: 'stop',
234-
} as unknown as StreamChunk)
265+
}
266+
push(synthetic)
235267
}
236268
} catch (err) {
237269
if (!abortSignal?.aborted && !hasTerminalEvent) {
238-
push({
239-
type: 'RUN_ERROR',
270+
const message =
271+
err instanceof Error ? err.message : 'Unknown error in connect()'
272+
const synthetic: RunErrorEvent = {
273+
type: EventType.RUN_ERROR,
240274
timestamp: Date.now(),
241-
message:
242-
err instanceof Error ? err.message : 'Unknown error in connect()',
243-
error: {
244-
message:
245-
err instanceof Error
246-
? err.message
247-
: 'Unknown error in connect()',
248-
},
249-
} as unknown as StreamChunk)
275+
message,
276+
}
277+
push(synthetic)
250278
}
251279
throw err
252280
}
@@ -421,12 +449,7 @@ export function fetchHttpStream(
421449
}
422450

423451
for await (const line of readStreamLines(reader, abortSignal)) {
424-
try {
425-
const parsed: StreamChunk = JSON.parse(line)
426-
yield parsed
427-
} catch (parseError) {
428-
console.warn('Failed to parse HTTP stream chunk:', line)
429-
}
452+
yield JSON.parse(line) as StreamChunk
430453
}
431454
},
432455
}
@@ -450,13 +473,14 @@ export function stream(
450473
streamFactory: (
451474
messages: Array<UIMessage> | Array<ModelMessage>,
452475
data?: Record<string, any>,
476+
abortSignal?: AbortSignal,
453477
) => AsyncIterable<StreamChunk>,
454478
): ConnectConnectionAdapter {
455479
return {
456-
async *connect(messages, data) {
480+
async *connect(messages, data, abortSignal) {
457481
// Pass messages as-is (UIMessages with parts preserved)
458482
// Server-side chat() handles conversion to ModelMessages
459-
yield* streamFactory(messages, data)
483+
yield* streamFactory(messages, data, abortSignal)
460484
},
461485
}
462486
}
@@ -520,13 +544,14 @@ export function rpcStream(
520544
rpcCall: (
521545
messages: Array<UIMessage> | Array<ModelMessage>,
522546
data?: Record<string, any>,
547+
abortSignal?: AbortSignal,
523548
) => AsyncIterable<StreamChunk>,
524549
): ConnectConnectionAdapter {
525550
return {
526-
async *connect(messages, data) {
551+
async *connect(messages, data, abortSignal) {
527552
// Pass messages as-is (UIMessages with parts preserved)
528553
// Server-side chat() handles conversion to ModelMessages
529-
yield* rpcCall(messages, data)
554+
yield* rpcCall(messages, data, abortSignal)
530555
},
531556
}
532557
}

packages/typescript/ai-client/tests/chat-fetcher.test.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,34 @@ describe('ChatClient — fetcher transport', () => {
131131
expect(client.getStatus()).toBe('error')
132132
})
133133

134+
it('surfaces a malformed-SSE Response as a ChatClient error', async () => {
135+
// A fetcher that returns a Response whose body has a malformed JSON line.
136+
// The new behavior is to throw SyntaxError from the SSE parser; the
137+
// chat client should surface that as an error rather than silently
138+
// dropping the bad chunk.
139+
const sseBody = 'data: { not valid json\n\n'
140+
const fetcher: ChatFetcher = async () => {
141+
return new Response(sseBody, {
142+
status: 200,
143+
headers: { 'content-type': 'text/event-stream' },
144+
})
145+
}
146+
147+
let observedError: Error | undefined
148+
const client = new ChatClient({
149+
fetcher,
150+
onError: (err) => {
151+
observedError = err
152+
},
153+
})
154+
155+
await client.sendMessage('hi')
156+
157+
expect(observedError).toBeDefined()
158+
expect(observedError!.name).toBe('SyntaxError')
159+
expect(client.getStatus()).toBe('error')
160+
})
161+
134162
it('passes UIMessages and merged body to the fetcher', async () => {
135163
const fetcher = vi.fn<ChatFetcher>(async function* () {
136164
yield {

packages/typescript/ai-client/tests/connection-adapters.test.ts

Lines changed: 22 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,7 @@ describe('connection-adapters', () => {
144144
warnSpy.mockRestore()
145145
})
146146

147-
it('should handle malformed JSON gracefully', async () => {
148-
const consoleWarnSpy = vi
149-
.spyOn(console, 'warn')
150-
.mockImplementation(() => {})
151-
147+
it('should throw a SyntaxError on malformed JSON', async () => {
152148
const mockReader = {
153149
read: vi
154150
.fn()
@@ -170,17 +166,16 @@ describe('connection-adapters', () => {
170166
fetchMock.mockResolvedValue(mockResponse as any)
171167

172168
const adapter = fetchServerSentEvents('/api/chat')
173-
const chunks: Array<StreamChunk> = []
174169

175-
for await (const chunk of adapter.connect([
176-
{ role: 'user', content: 'Hello' },
177-
])) {
178-
chunks.push(chunk)
179-
}
180-
181-
expect(chunks).toHaveLength(0)
182-
expect(consoleWarnSpy).toHaveBeenCalled()
183-
consoleWarnSpy.mockRestore()
170+
await expect(
171+
(async () => {
172+
for await (const _ of adapter.connect([
173+
{ role: 'user', content: 'Hello' },
174+
])) {
175+
// Consume
176+
}
177+
})(),
178+
).rejects.toThrow(SyntaxError)
184179
})
185180

186181
it('should handle HTTP errors', async () => {
@@ -550,11 +545,7 @@ describe('connection-adapters', () => {
550545
expect(chunks).toHaveLength(1)
551546
})
552547

553-
it('should handle malformed JSON gracefully', async () => {
554-
const consoleWarnSpy = vi
555-
.spyOn(console, 'warn')
556-
.mockImplementation(() => {})
557-
548+
it('should throw a SyntaxError on malformed JSON', async () => {
558549
const mockReader = {
559550
read: vi
560551
.fn()
@@ -576,17 +567,16 @@ describe('connection-adapters', () => {
576567
fetchMock.mockResolvedValue(mockResponse as any)
577568

578569
const adapter = fetchHttpStream('/api/chat')
579-
const chunks: Array<StreamChunk> = []
580570

581-
for await (const chunk of adapter.connect([
582-
{ role: 'user', content: 'Hello' },
583-
])) {
584-
chunks.push(chunk)
585-
}
586-
587-
expect(chunks).toHaveLength(0)
588-
expect(consoleWarnSpy).toHaveBeenCalled()
589-
consoleWarnSpy.mockRestore()
571+
await expect(
572+
(async () => {
573+
for await (const _ of adapter.connect([
574+
{ role: 'user', content: 'Hello' },
575+
])) {
576+
// Consume
577+
}
578+
})(),
579+
).rejects.toThrow(SyntaxError)
590580
})
591581

592582
it('should handle HTTP errors', async () => {
@@ -836,6 +826,7 @@ describe('connection-adapters', () => {
836826
expect(streamFactory).toHaveBeenCalledWith(
837827
expect.arrayContaining([expect.objectContaining({ role: 'user' })]),
838828
data,
829+
undefined,
839830
)
840831
})
841832
})
@@ -1023,6 +1014,7 @@ describe('connection-adapters', () => {
10231014
expect(rpcCall).toHaveBeenCalledWith(
10241015
expect.arrayContaining([expect.objectContaining({ role: 'user' })]),
10251016
data,
1017+
undefined,
10261018
)
10271019
})
10281020
})

0 commit comments

Comments
 (0)