Skip to content

Commit e7c906b

Browse files
committed
feat: implement server-sent events consumer
1 parent 4f16d3e commit e7c906b

File tree

1 file changed

+127
-0
lines changed

1 file changed

+127
-0
lines changed

src/streaming.ts

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/**
2+
* FastFetch Server-Sent Events (SSE) streaming support.
3+
*
4+
* Consumes a `ReadableStream` from a `fetch()` Response body and calls
5+
* `handler` for each well-formed SSE event received.
6+
*
7+
* SSE spec reference: https://html.spec.whatwg.org/#server-sent-events
8+
*/
9+
10+
// ---------------------------------------------------------------------------
11+
// Types
12+
// ---------------------------------------------------------------------------
13+
14+
export interface SSEEvent {
15+
/** Event type field (defaults to `"message"` if not specified by server). */
16+
type: string;
17+
/** The `data` field value. Multi-line data fields are joined with `\n`. */
18+
data: string;
19+
/** Optional `id` field from the server. */
20+
id?: string;
21+
/** Optional `retry` field in milliseconds. */
22+
retry?: number;
23+
}
24+
25+
export type SSEHandler = (event: SSEEvent) => void | Promise<void>;
26+
27+
// ---------------------------------------------------------------------------
28+
// consumeSSE
29+
// ---------------------------------------------------------------------------
30+
31+
/**
32+
* Consume a Server-Sent Events stream and invoke `handler` for each event.
33+
*
34+
* The function resolves when the stream ends (or `signal` is aborted).
35+
* The underlying reader is always released in the `finally` block.
36+
*
37+
* @param reader A `ReadableStreamDefaultReader<Uint8Array>` from `response.body.getReader()`.
38+
* @param handler Called for each complete SSE event.
39+
* @param signal Optional `AbortSignal` to stop consuming early.
40+
*
41+
* @example
42+
* ```ts
43+
* const res = await fetch('/api/stream');
44+
* if (!res.body) throw new Error('No body');
45+
* await consumeSSE(res.body.getReader(), (event) => {
46+
* console.log(event.type, event.data);
47+
* });
48+
* ```
49+
*/
50+
export async function consumeSSE(
51+
reader: ReadableStreamDefaultReader<Uint8Array>,
52+
handler: SSEHandler,
53+
signal?: AbortSignal,
54+
): Promise<void> {
55+
const decoder = new TextDecoder("utf-8");
56+
let buffer = "";
57+
58+
function parseAndEmit(block: string): void {
59+
// SSE field defaults
60+
let eventType = "message";
61+
let data = "";
62+
let id: string | undefined;
63+
let retry: number | undefined;
64+
65+
for (const line of block.split("\n")) {
66+
const colon = line.indexOf(":");
67+
if (colon === -1) continue; // ignore malformed lines
68+
69+
const field = line.slice(0, colon).trim();
70+
// A space immediately after ":" is part of the spec and must be stripped.
71+
const value = line.slice(colon + 1).replace(/^ /, "");
72+
73+
switch (field) {
74+
case "event":
75+
eventType = value;
76+
break;
77+
case "data":
78+
data = data ? `${data}\n${value}` : value;
79+
break;
80+
case "id":
81+
id = value;
82+
break;
83+
case "retry": {
84+
const ms = parseInt(value, 10);
85+
if (!isNaN(ms)) retry = ms;
86+
break;
87+
}
88+
// "comment" lines (field === "") are silently ignored per spec
89+
}
90+
}
91+
92+
// Dispatch only if a data field was present (spec §9.2.6)
93+
if (data !== "") {
94+
void handler({ type: eventType, data, id, retry });
95+
}
96+
}
97+
98+
try {
99+
while (true) {
100+
if (signal?.aborted) break;
101+
102+
const { done, value } = await reader.read();
103+
if (done) break;
104+
105+
buffer += decoder.decode(value, { stream: true });
106+
107+
// Events are separated by two newlines (\n\n or \r\n\r\n)
108+
// Split on double-newline — keep trailing incomplete chunk in buffer.
109+
const parts = buffer.split(/\n\n|\r\n\r\n/);
110+
buffer = parts.pop() ?? "";
111+
112+
for (const block of parts) {
113+
const trimmed = block.trim();
114+
if (trimmed) {
115+
parseAndEmit(trimmed);
116+
}
117+
}
118+
}
119+
120+
// Flush any remaining data in the buffer
121+
if (buffer.trim()) {
122+
parseAndEmit(buffer.trim());
123+
}
124+
} finally {
125+
reader.releaseLock();
126+
}
127+
}

0 commit comments

Comments
 (0)