Skip to content

Commit 52a9869

Browse files
committed
feat(ai): implement streaming response handling in HonoHttpServer and encode Vercel data stream in AI routes
1 parent afee778 commit 52a9869

2 files changed

Lines changed: 63 additions & 12 deletions

File tree

packages/plugins/plugin-hono-server/src/adapter.ts

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,24 +31,24 @@ export class HonoHttpServer implements IHttpServer {
3131
private wrap(handler: RouteHandler) {
3232
return async (c: any) => {
3333
let body: any = {};
34-
34+
3535
// Try to parse JSON body first if content-type is JSON
3636
if (c.req.header('content-type')?.includes('application/json')) {
37-
try {
38-
body = await c.req.json();
37+
try {
38+
body = await c.req.json();
3939
} catch(e) {
4040
// If JSON parsing fails, try parseBody
41-
try {
42-
body = await c.req.parseBody();
41+
try {
42+
body = await c.req.parseBody();
4343
} catch(e2) {}
4444
}
4545
} else {
4646
// For non-JSON content types, use parseBody
47-
try {
48-
body = await c.req.parseBody();
47+
try {
48+
body = await c.req.parseBody();
4949
} catch(e) {}
5050
}
51-
51+
5252
const req = {
5353
params: c.req.param(),
5454
query: c.req.query(),
@@ -59,16 +59,66 @@ export class HonoHttpServer implements IHttpServer {
5959
};
6060

6161
let capturedResponse: any;
62+
let streamController: ReadableStreamDefaultController | null = null;
63+
let streamEncoder: TextEncoder | null = null;
64+
let streamHeaders: Record<string, string> = {};
65+
let isStreaming = false;
6266

6367
const res = {
6468
json: (data: any) => { capturedResponse = c.json(data); },
6569
send: (data: string) => { capturedResponse = c.html(data); },
6670
status: (code: number) => { c.status(code); return res; },
67-
header: (name: string, value: string) => { c.header(name, value); return res; }
71+
header: (name: string, value: string) => {
72+
c.header(name, value);
73+
streamHeaders[name] = value;
74+
return res;
75+
},
76+
write: (chunk: string | Uint8Array) => {
77+
isStreaming = true;
78+
if (streamController && streamEncoder) {
79+
const data = typeof chunk === 'string' ? streamEncoder.encode(chunk) : chunk;
80+
streamController.enqueue(data);
81+
}
82+
},
83+
end: () => {
84+
if (streamController) {
85+
streamController.close();
86+
}
87+
},
6888
};
6989

70-
await handler(req as any, res as any);
71-
return capturedResponse;
90+
// Create a streaming response wrapper — if handler calls res.write(),
91+
// we return a ReadableStream; otherwise fall back to capturedResponse.
92+
const streamPromise = new Promise<Response | null>((resolve) => {
93+
const stream = new ReadableStream({
94+
start(controller) {
95+
streamController = controller;
96+
streamEncoder = new TextEncoder();
97+
},
98+
});
99+
100+
// Run the handler; once it's done, check if streaming was used
101+
const result = handler(req as any, res as any);
102+
const done = result instanceof Promise ? result : Promise.resolve(result);
103+
done.then(() => {
104+
if (isStreaming) {
105+
resolve(new Response(stream, {
106+
status: 200,
107+
headers: streamHeaders,
108+
}));
109+
} else {
110+
// Not streaming — close the unused stream and return null
111+
streamController?.close();
112+
resolve(null);
113+
}
114+
}).catch(() => {
115+
streamController?.close();
116+
resolve(null);
117+
});
118+
});
119+
120+
const streamResponse = await streamPromise;
121+
return streamResponse ?? capturedResponse;
72122
};
73123
}
74124

packages/services/service-ai/src/routes/ai-routes.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import type { IAIService, IAIConversationService, ModelMessage } from '@objectstack/spec/contracts';
44
import type { Logger } from '@objectstack/spec/contracts';
5+
import { encodeVercelDataStream } from '../stream/vercel-stream-encoder.js';
56

67
/**
78
* Minimal HTTP handler abstraction so routes stay framework-agnostic.
@@ -255,7 +256,7 @@ export function buildAIRoutes(
255256
return { status: 501, body: { error: 'Streaming is not supported by the configured AI service' } };
256257
}
257258
const events = aiService.streamChat(finalMessages, resolvedOptions as any);
258-
return { status: 200, stream: true, vercelDataStream: true, events };
259+
return { status: 200, stream: true, vercelDataStream: true, events: encodeVercelDataStream(events) };
259260
} catch (err) {
260261
logger.error('[AI Route] /chat stream error', err instanceof Error ? err : undefined);
261262
return { status: 500, body: { error: 'Internal AI service error' } };

0 commit comments

Comments
 (0)