Skip to content

Commit 5083033

Browse files
feat(stream): document and publicly export readStream and toStream
Surface the readStream/toStream helpers under the `stream` namespace alongside the other stream utilities, and add full TSDoc describing their abort/cancel semantics so they are discoverable and supported for user code. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 8a91a0b commit 5083033

3 files changed

Lines changed: 65 additions & 2 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@livekit/agents': patch
3+
---
4+
5+
Document and publicly export the `readStream` and `toStream` stream helpers. `readStream` consumes a `ReadableStream` as an abortable async generator, and `toStream` adapts an `AsyncIterable` into a `ReadableStream`. Both are now surfaced under the `stream` namespace alongside the other stream utilities and carry full TSDoc describing their abort/cancel semantics.

agents/src/stream/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ export { IdentityTransform } from './identity_transform.js';
66
export { mergeReadableStreams } from './merge_readable_streams.js';
77
export { MultiInputStream } from './multi_input_stream.js';
88
export { createStreamChannel, type StreamChannel } from './stream_channel.js';
9+
export { readStream, toStream } from '../utils.js';

agents/src/utils.ts

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1311,8 +1311,35 @@ export async function waitForTrackPublication({
13111311
}
13121312

13131313
/**
1314-
* Yields values from a ReadableStream until the stream ends or the signal is aborted.
1315-
* Handles reader cleanup and stream-release errors internally.
1314+
* Consume a {@link ReadableStream} as an async generator, yielding each chunk in
1315+
* order until the stream ends or the optional {@link AbortSignal} is aborted.
1316+
*
1317+
* This is a convenience wrapper around the Web Streams reader API that handles the
1318+
* boilerplate of acquiring a reader, looping over `reader.read()`, and releasing the
1319+
* reader lock when iteration finishes.
1320+
*
1321+
* Key behaviors:
1322+
* - Acquires a reader for the lifetime of the iteration and always releases the lock
1323+
* when the generator completes, throws, or is returned early (e.g. `break`).
1324+
* - If a `signal` is provided, each read races against the abort signal; once aborted
1325+
* the generator stops yielding and returns cleanly without throwing. If the signal is
1326+
* already aborted before the first read, no values are yielded.
1327+
* - Errors raised by the underlying stream propagate to the caller; errors thrown while
1328+
* releasing the reader lock during cleanup are swallowed (the source may already be
1329+
* closed).
1330+
*
1331+
* @typeParam T - The type of the chunks emitted by the stream.
1332+
* @param stream - The readable stream to consume. The stream must not already be locked
1333+
* to another reader.
1334+
* @param signal - Optional abort signal used to stop iteration early.
1335+
* @returns An async generator that yields each chunk read from the stream.
1336+
*
1337+
* @example
1338+
* ```ts
1339+
* for await (const chunk of readStream(response.body, controller.signal)) {
1340+
* process(chunk);
1341+
* }
1342+
* ```
13161343
*/
13171344
export async function* readStream<T>(
13181345
stream: ReadableStream<T>,
@@ -1352,6 +1379,36 @@ export async function* readStream<T>(
13521379
}
13531380
}
13541381

1382+
/**
1383+
* Adapt an {@link AsyncIterable} into a {@link ReadableStream}, enqueuing each value the
1384+
* iterable produces and closing the stream when the iterable is exhausted.
1385+
*
1386+
* This is the inverse of {@link readStream}: it lets an async generator or any other
1387+
* async-iterable source be consumed anywhere a Web {@link ReadableStream} is expected
1388+
* (piping, teeing, `Response` bodies, etc.).
1389+
*
1390+
* Key behaviors:
1391+
* - Pulls from the iterable eagerly in the stream's `start` callback and enqueues every
1392+
* value until the iterable completes, at which point the stream is closed.
1393+
* - If the iterable throws, the error is forwarded to the stream's consumer via
1394+
* `controller.error()`.
1395+
* - When the consumer cancels the stream, the underlying iterator's `return()` is invoked
1396+
* (with the cancel reason) so the source can release resources, and no further values
1397+
* are enqueued.
1398+
*
1399+
* @typeParam T - The type of the values produced by the iterable and emitted by the stream.
1400+
* @param iterable - The async iterable to adapt.
1401+
* @returns A readable stream that emits the values produced by `iterable`.
1402+
*
1403+
* @example
1404+
* ```ts
1405+
* async function* numbers() {
1406+
* yield 1;
1407+
* yield 2;
1408+
* }
1409+
* const stream = toStream(numbers());
1410+
* ```
1411+
*/
13551412
export function toStream<T>(iterable: AsyncIterable<T>): ReadableStream<T> {
13561413
let iterator: AsyncIterator<T> | undefined;
13571414
let cancelled = false;

0 commit comments

Comments
 (0)