Skip to content
This repository was archived by the owner on Jun 14, 2026. It is now read-only.

Commit 75706cf

Browse files
committed
refactor(agent): export parseSSEStream
1 parent 9ad2355 commit 75706cf

6 files changed

Lines changed: 74 additions & 76 deletions

File tree

packages/agent/__tests__/run-handle.test.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@ import {
99
createScriptedProvider,
1010
makeFakeAssistantMessage,
1111
makeFakeModel,
12-
parseSSEStream,
1312
} from '@test/index';
1413

15-
import { Agent, type AgentEvent, type AgentTool } from '../src';
14+
import { Agent, type AgentEvent, type AgentTool, parseSSEStream } from '../src';
1615

1716
describe('AgentRunHandle', () => {
1817
describe('events()', () => {

packages/agent/__tests__/sse.test.ts

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
1-
// Exercises the `parseSSEStream` helper from tools/test/, not a production
2-
// parser. The kit ships no SSE parser today; consumers parse on their side.
3-
// These tests pin down the helper's edge-case behavior so future parser work
4-
// has a baseline to match.
5-
import { parseSSEStream } from '@test/index';
6-
7-
import type { AgentEvent } from '../src';
1+
// Exercises `parseSSEStream` exported from `@agentic-kit/agent`. Symmetric to
2+
// the SSE producer in `toResponse()` — these tests pin down the parser's
3+
// edge-case behavior so the wire-format contract has a baseline.
4+
import { type AgentEvent, parseSSEStream } from '../src';
85

96
const encoder = new TextEncoder();
107

packages/agent/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export * from './agent.js';
22
export * from './run-handle.js';
3+
export * from './sse.js';
34
export * from './types.js';
45
export * from './validation.js';

packages/agent/src/sse.ts

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import type { AgentEvent } from './types.js';
2+
3+
export async function* parseSSEStream(
4+
stream: ReadableStream<Uint8Array>
5+
): AsyncIterable<AgentEvent> {
6+
const reader = stream.getReader();
7+
const decoder = new TextDecoder('utf-8');
8+
let buffer = '';
9+
10+
try {
11+
while (true) {
12+
const { done, value } = await reader.read();
13+
if (done) {
14+
break;
15+
}
16+
17+
buffer += decoder.decode(value, { stream: true });
18+
buffer = buffer.replace(/\r\n/g, '\n').replace(/\r/g, '\n');
19+
20+
let blankIdx = buffer.indexOf('\n\n');
21+
while (blankIdx !== -1) {
22+
const rawEvent = buffer.slice(0, blankIdx);
23+
buffer = buffer.slice(blankIdx + 2);
24+
const event = parseEvent(rawEvent);
25+
if (event) {
26+
yield event;
27+
}
28+
blankIdx = buffer.indexOf('\n\n');
29+
}
30+
}
31+
} finally {
32+
reader.releaseLock();
33+
}
34+
}
35+
36+
function parseEvent(raw: string): AgentEvent | null {
37+
const dataLines: string[] = [];
38+
for (const line of raw.split('\n')) {
39+
if (line === '' || line.startsWith(':')) {
40+
continue;
41+
}
42+
const colon = line.indexOf(':');
43+
const field = colon === -1 ? line : line.slice(0, colon);
44+
let value = colon === -1 ? '' : line.slice(colon + 1);
45+
if (value.startsWith(' ')) {
46+
value = value.slice(1);
47+
}
48+
if (field === 'data') {
49+
dataLines.push(value);
50+
}
51+
}
52+
53+
if (dataLines.length === 0) {
54+
return null;
55+
}
56+
57+
const data = dataLines.join('\n');
58+
if (data === '[DONE]') {
59+
return null;
60+
}
61+
62+
try {
63+
return JSON.parse(data) as AgentEvent;
64+
} catch {
65+
return null;
66+
}
67+
}

tools/test/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
export { makeFakeAssistantMessage, makeFakeModel } from './fixtures';
22
export { createScriptedProvider, type ScriptedProviderOptions } from './scripted-provider';
3-
export { createScriptedSSEResponse, parseSSEStream } from './scripted-sse';
3+
export { createScriptedSSEResponse } from './scripted-sse';

tools/test/scripted-sse.ts

Lines changed: 0 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -20,69 +20,3 @@ export function createScriptedSSEResponse(events: AgentEvent[]): Response {
2020
},
2121
});
2222
}
23-
24-
export async function* parseSSEStream(
25-
stream: ReadableStream<Uint8Array>
26-
): AsyncIterable<AgentEvent> {
27-
const reader = stream.getReader();
28-
const decoder = new TextDecoder('utf-8');
29-
let buffer = '';
30-
31-
try {
32-
while (true) {
33-
const { done, value } = await reader.read();
34-
if (done) {
35-
break;
36-
}
37-
38-
buffer += decoder.decode(value, { stream: true });
39-
buffer = buffer.replace(/\r\n/g, '\n').replace(/\r/g, '\n');
40-
41-
let blankIdx = buffer.indexOf('\n\n');
42-
while (blankIdx !== -1) {
43-
const rawEvent = buffer.slice(0, blankIdx);
44-
buffer = buffer.slice(blankIdx + 2);
45-
const event = parseEvent(rawEvent);
46-
if (event) {
47-
yield event;
48-
}
49-
blankIdx = buffer.indexOf('\n\n');
50-
}
51-
}
52-
} finally {
53-
reader.releaseLock();
54-
}
55-
}
56-
57-
function parseEvent(raw: string): AgentEvent | null {
58-
const dataLines: string[] = [];
59-
for (const line of raw.split('\n')) {
60-
if (line === '' || line.startsWith(':')) {
61-
continue;
62-
}
63-
const colon = line.indexOf(':');
64-
const field = colon === -1 ? line : line.slice(0, colon);
65-
let value = colon === -1 ? '' : line.slice(colon + 1);
66-
if (value.startsWith(' ')) {
67-
value = value.slice(1);
68-
}
69-
if (field === 'data') {
70-
dataLines.push(value);
71-
}
72-
}
73-
74-
if (dataLines.length === 0) {
75-
return null;
76-
}
77-
78-
const data = dataLines.join('\n');
79-
if (data === '[DONE]') {
80-
return null;
81-
}
82-
83-
try {
84-
return JSON.parse(data) as AgentEvent;
85-
} catch {
86-
return null;
87-
}
88-
}

0 commit comments

Comments
 (0)