feat(ai-client): support TanStack Start server functions in stream() adapter#508
feat(ai-client): support TanStack Start server functions in stream() adapter#508tombeckenham wants to merge 11 commits intoTanStack:mainfrom
Conversation
…adapter
The stream() factory now accepts Promise<AsyncIterable<StreamChunk>> and
Promise<Response> in addition to the existing AsyncIterable shape, so a
TanStack Start server function (which is just an async API endpoint) can
be wired directly into useChat without a route handler:
useChat({
connection: stream((messages) => chatFn({ data: { messages } })),
})
When the factory returns a Response (e.g. via toServerSentEventsResponse),
the adapter parses the SSE body into chunks. rpcStream() likewise accepts
a Promise-returning RPC call.
Adds unit tests for both new shapes and a docs section in
chat/connection-adapters.md.
Adds a working /server-fn-chat route that wires useChat to a TanStack
Start server function via the stream() connection adapter:
useChat({
connection: stream((messages) =>
chatFn({ data: { messages: messages as UIMessage[] } }),
),
})
The new chatFn handler in lib/server-fns.ts returns
toServerSentEventsResponse(chat({ ... })) — the stream() adapter awaits
the server function and parses the SSE response into chunks.
Sits alongside the existing index.tsx pattern (fetchServerSentEvents
to a route handler) so users can compare the two invocation styles.
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Lead with what stream() does (typed RPC into useChat), instead of calling a server function "just a fancy/async API endpoint." Same edits applied to the changeset and the stream() JSDoc. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…server-functions-pBsj5 # Conflicts: # examples/ts-react-chat/src/components/Header.tsx # examples/ts-react-chat/src/lib/server-fns.ts # packages/typescript/ai-client/src/connection-adapters.ts
|
View your CI Pipeline Execution ↗ for commit cb39b84
☁️ Nx Cloud last updated this comment at |
The previous merge commit accidentally included stale references in @tanstack/ai-fal that this PR shouldn't have touched. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@tanstack/ai
@tanstack/ai-anthropic
@tanstack/ai-client
@tanstack/ai-code-mode
@tanstack/ai-code-mode-skills
@tanstack/ai-devtools-core
@tanstack/ai-elevenlabs
@tanstack/ai-event-client
@tanstack/ai-fal
@tanstack/ai-gemini
@tanstack/ai-grok
@tanstack/ai-groq
@tanstack/ai-isolate-cloudflare
@tanstack/ai-isolate-node
@tanstack/ai-isolate-quickjs
@tanstack/ai-ollama
@tanstack/ai-openai
@tanstack/ai-openrouter
@tanstack/ai-preact
@tanstack/ai-react
@tanstack/ai-react-ui
@tanstack/ai-solid
@tanstack/ai-solid-ui
@tanstack/ai-svelte
@tanstack/ai-vue
@tanstack/ai-vue-ui
@tanstack/preact-ai-devtools
@tanstack/react-ai-devtools
@tanstack/solid-ai-devtools
commit: |
Type-check was failing on CI because the new server-function and RPC async-iterable test fixtures yielded raw string literals for chunk type, which don't satisfy the EventType enum required by StreamChunk. Switch to the enum and add the required threadId to RUN_FINISHED. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…abortSignal Address review findings on the server-function stream() adapter PR: - Synthesized RUN_FINISHED/RUN_ERROR events in normalizeConnectionAdapter no longer use `as unknown as StreamChunk`. Track threadId/runId from upstream chunks during iteration and reuse them; fall back to synthesized IDs only when no upstream chunk carried them. Use the EventType enum and typed RunFinishedEvent/RunErrorEvent so missing required fields are caught by the compiler instead of papered over. - Stop swallowing JSON.parse failures in parseSSEChunks and fetchHttpStream. A malformed mid-stream chunk is a protocol error; let it throw so it surfaces as RUN_ERROR via the connect-wrapper's catch path instead of silently dropping data behind a console.warn the user never sees. - Widen stream() and rpcStream() factory signatures with an optional abortSignal third arg and pass it through. Backwards-compatible — callers that ignore the third parameter are unaffected. Lets long-running server functions cancel in-flight work when useChat aborts. Tests updated to assert SyntaxError propagation rather than silent dropping on malformed JSON, and to expect the new third call argument on factory mocks. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…railing buffer Refine the SSE parser so the throw-on-parse-failure behavior doesn't regress on legitimate SSE traffic: - parseSSEChunks now skips SSE comment lines (`:`) and non-data fields (`event:`, `id:`, `retry:`) which proxies and CDNs commonly inject as keepalives. Previously these would have flowed into JSON.parse and thrown, killing otherwise-healthy streams behind any infrastructure that injects SSE control lines. - readStreamLines no longer yields the unterminated trailing buffer at stream end. A non-empty buffer means the connection was cut mid-line, so the content is partial by definition — yielding it would feed truncated JSON to the parser and surface a misleading RUN_ERROR for what is really a transport-layer issue. Warn and discard instead. Bare-line JSON (legacy/raw mode) is still accepted to preserve the existing public behavior covered by the `should handle SSE format without data: prefix` test. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…t UIMessage[] in chat() Drops the `as any` / `as Array<UIMessage>` casts previously needed when wiring useChat through a TanStack Start server function into chat(). The stream() factory now declares Array<UIMessage> (with a runtime assert matching the ChatClient invariant), and chat()'s messages option accepts UIMessage[] directly alongside ConstrainedModelMessage[] — the runtime already normalised both via convertMessagesToModelMessages. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pulls the @tanstack/ai changes back out — the chat() messages-option widening to accept UIMessage[] is a separate concern from the stream() server-function feature this PR is about. Restores the example's `as any` cast with a comment, drops the @tanstack/ai minor bump from the changeset, and reverts chat/index.ts to its pre-PR state. Also bumps the example adapter to gpt-5.2. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ranch The fetcher path uses the same SSE parsing and connect-wrapper plumbing as the stream() path on TanStack#508, so the polish that landed during TanStack#508's review applies directly here. Carry it over so this branch has the same robustness. - Skip SSE control lines (`:` comments, `event:` / `id:` / `retry:`) in responseToSSEChunks. Proxies and CDNs inject these as keepalives; letting them through would feed JSON.parse a non-payload line. - Drop unterminated trailing buffer in readStreamLines. A non-empty buffer at stream end means the connection was cut mid-line, so the data is partial — yielding it would surface a misleading RUN_ERROR for what is really a transport-layer issue. - Surface JSON.parse failures in responseToSSEChunks and fetchHttpStream. Stop swallowing them behind console.warn; let SyntaxError propagate so the connect-wrapper turns it into a visible RUN_ERROR. - Drop unsafe `as unknown as StreamChunk` casts in normalizeConnectionAdapter's synthesized RUN_FINISHED / RUN_ERROR events. Use EventType + RunFinishedEvent / RunErrorEvent so missing required fields are caught by the compiler. Track upstream threadId/runId from chunks and reuse them in the synthesis instead of fabricating both ids unconditionally. - Forward optional abortSignal third arg through stream() and rpcStream() factory signatures. Backwards-compatible for existing callers; lets long-running factories cancel when useChat aborts. Mirrors what fetcherToConnectionAdapter already does. Tests: - Update the two `should handle malformed JSON gracefully` tests to assert SyntaxError throws instead of silent drop. - Update stream() / rpcStream() factory mock assertions to expect the new third arg. - Add chat-fetcher test asserting a fetcher returning a malformed-SSE Response surfaces as a RUN_ERROR via onError. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
🎯 Changes
Closes #509.
Wiring
useChatto a TanStack Start server function currently fails to typecheck becausestream()'s factory is typed as() => AsyncIterable<StreamChunk>, but a server function call returnsPromise<AsyncIterable<StreamChunk>>(handler returns the chat stream) orPromise<Response>(handler returnstoServerSentEventsResponse(stream)):This PR widens
stream()(andrpcStream()) to accept all three shapes — awaiting the result and parsing SSE if aResponseis returned — so a server function can be wired directly intouseChatwith end-to-end type safety from the call site to the handler:Reported by @vfshera in Discord.
Files touched
packages/typescript/ai-client/src/connection-adapters.ts— extendstream()andrpcStream(); factor shared SSE-from-Response helper.packages/typescript/ai-client/src/index.ts— export newStreamFactoryResulttype.packages/typescript/ai-client/tests/connection-adapters.test.ts— 4 new tests (Promise, Promise, Response error, rpcStream Promise).docs/chat/connection-adapters.md— new "TanStack Start Server Functions" section.examples/ts-react-chat— new/server-fn-chatroute +chatFnserver function demonstrating the pattern..changeset/stream-adapter-server-functions.md—@tanstack/ai-clientminor.✅ Checklist
pnpm run test:pr(test:lib,test:types,test:eslintall pass; manual/server-fn-chatsmoke-test was not run — sandbox couldn't launch a browser).🚀 Release Impact
@tanstack/ai-clientminor).cc @vfshera