Skip to content

Commit 0becf76

Browse files
committed
stream: make stream.Readable implement the toAsyncStreamable protocol
Signed-off-by: James M Snell <jasnell@gmail.com> Assisted-by: Opencode/Opus 4.6
1 parent b411f90 commit 0becf76

File tree

7 files changed

+867
-2
lines changed

7 files changed

+867
-2
lines changed

doc/api/errors.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2882,6 +2882,13 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream.
28822882
A stream method was called that cannot complete because the stream was
28832883
destroyed using `stream.destroy()`.
28842884

2885+
<a id="ERR_STREAM_ITER_MISSING_FLAG"></a>
2886+
2887+
### `ERR_STREAM_ITER_MISSING_FLAG`
2888+
2889+
A stream/iter API was used without the `--experimental-stream-iter` CLI flag
2890+
enabled.
2891+
28852892
<a id="ERR_STREAM_NULL_VALUES"></a>
28862893

28872894
### `ERR_STREAM_NULL_VALUES`

lib/internal/errors.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1770,6 +1770,8 @@ E('ERR_STREAM_ALREADY_FINISHED',
17701770
Error);
17711771
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
17721772
E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error);
1773+
E('ERR_STREAM_ITER_MISSING_FLAG',
1774+
'The stream/iter API requires the --experimental-stream-iter flag', TypeError);
17731775
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
17741776
E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error);
17751777
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);

lib/internal/streams/iter/from.js

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ const {
3737
} = require('internal/util/types');
3838

3939
const {
40+
kTrustedSource,
4041
toStreamable,
4142
toAsyncStreamable,
4243
} = require('internal/streams/iter/types');
@@ -483,6 +484,11 @@ function from(input) {
483484
throw new ERR_INVALID_ARG_TYPE('input', 'a non-null value', input);
484485
}
485486

487+
// Fast path: trusted source already yields valid Uint8Array[] batches
488+
if (input[kTrustedSource]) {
489+
return input;
490+
}
491+
486492
// Check for primitives first (ByteInput)
487493
if (isPrimitiveChunk(input)) {
488494
const chunk = primitiveToUint8Array(input);
@@ -531,11 +537,22 @@ function from(input) {
531537
// Check toAsyncStreamable protocol (takes precedence over toStreamable and
532538
// iteration protocols)
533539
if (typeof input[toAsyncStreamable] === 'function') {
540+
const result = input[toAsyncStreamable]();
541+
// Synchronous trusted source (e.g. Readable batched iterator)
542+
if (result?.[kTrustedSource]) {
543+
return result;
544+
}
534545
return {
535546
__proto__: null,
536547
async *[SymbolAsyncIterator]() {
537-
const result = await input[toAsyncStreamable]();
538-
yield* from(result)[SymbolAsyncIterator]();
548+
// The result may be a Promise. Check trusted on both the Promise
549+
// itself (if tagged) and the resolved value.
550+
const resolved = await result;
551+
if (resolved?.[kTrustedSource]) {
552+
yield* resolved[SymbolAsyncIterator]();
553+
return;
554+
}
555+
yield* from(resolved)[SymbolAsyncIterator]();
539556
},
540557
};
541558
}

lib/internal/streams/iter/types.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,19 @@ const drainableProtocol = SymbolFor('Stream.drainableProtocol');
5555
*/
5656
const kTrustedTransform = Symbol('kTrustedTransform');
5757

58+
/**
59+
* Internal sentinel for trusted sources. An async iterable with
60+
* [kTrustedSource] = true signals that it already yields valid
61+
* Uint8Array[] batches - no normalizeAsyncSource wrapper needed.
62+
* from() will return such sources directly, skipping all normalization.
63+
* This is NOT a public protocol symbol - it uses Symbol() not Symbol.for().
64+
*/
65+
const kTrustedSource = Symbol('kTrustedSource');
66+
5867
module.exports = {
5968
broadcastProtocol,
6069
drainableProtocol,
70+
kTrustedSource,
6171
kTrustedTransform,
6272
shareProtocol,
6373
shareSyncProtocol,

lib/internal/streams/readable.js

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ const {
3535
Symbol,
3636
SymbolAsyncDispose,
3737
SymbolAsyncIterator,
38+
SymbolFor,
3839
SymbolSpecies,
3940
TypedArrayPrototypeSet,
4041
} = primordials;
@@ -52,6 +53,8 @@ const {
5253
} = require('internal/streams/add-abort-signal');
5354
const { eos } = require('internal/streams/end-of-stream');
5455

56+
const { getOptionValue } = require('internal/options');
57+
5558
let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
5659
debug = fn;
5760
});
@@ -82,6 +85,7 @@ const {
8285
ERR_INVALID_ARG_TYPE,
8386
ERR_METHOD_NOT_IMPLEMENTED,
8487
ERR_OUT_OF_RANGE,
88+
ERR_STREAM_ITER_MISSING_FLAG,
8589
ERR_STREAM_PUSH_AFTER_EOF,
8690
ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
8791
ERR_UNKNOWN_ENCODING,
@@ -1796,3 +1800,144 @@ Readable.wrap = function(src, options) {
17961800
},
17971801
}).wrap(src);
17981802
};
1803+
1804+
// Efficient interop with the stream/iter API via toAsyncStreamable protocol.
1805+
// Provides a batched async iterator that drains the internal buffer into
1806+
// Uint8Array[] batches, avoiding the per-chunk Promise overhead of the
1807+
// standard Symbol.asyncIterator path.
1808+
//
1809+
// The flag cannot be checked at module load time (readable.js loads during
1810+
// bootstrap before options are available). Instead, toAsyncStreamable is
1811+
// always defined but lazily initializes on first call - throwing if the
1812+
// flag is not set, or installing the real implementation if it is.
1813+
{
1814+
const toAsyncStreamable = SymbolFor('Stream.toAsyncStreamable');
1815+
let kTrustedSource;
1816+
let normalizeAsyncValue;
1817+
let isU8;
1818+
1819+
// Maximum chunks to drain into a single batch. Bounds peak memory when
1820+
// _read() synchronously pushes many chunks into the buffer.
1821+
const MAX_DRAIN_BATCH = 128;
1822+
1823+
function lazyInit() {
1824+
if (kTrustedSource !== undefined) return;
1825+
if (!getOptionValue('--experimental-stream-iter')) {
1826+
throw new ERR_STREAM_ITER_MISSING_FLAG();
1827+
}
1828+
({ kTrustedSource } = require('internal/streams/iter/types'));
1829+
({ normalizeAsyncValue } = require('internal/streams/iter/from'));
1830+
({ isUint8Array: isU8 } = require('internal/util/types'));
1831+
}
1832+
1833+
// Normalize a batch of raw chunks from an object-mode or encoded
1834+
// Readable into Uint8Array values. Returns the normalized batch,
1835+
// or null if normalization produced no output.
1836+
async function normalizeBatch(raw) {
1837+
const batch = [];
1838+
for (let i = 0; i < raw.length; i++) {
1839+
const value = raw[i];
1840+
if (isU8(value)) {
1841+
batch.push(value);
1842+
} else {
1843+
// normalizeAsyncValue may await for async protocols (e.g.
1844+
// toAsyncStreamable on yielded objects). Stream events during
1845+
// the suspension are queued, not lost - errors will surface
1846+
// on the next loop iteration after this yield completes.
1847+
for await (const normalized of normalizeAsyncValue(value)) {
1848+
batch.push(normalized);
1849+
}
1850+
}
1851+
}
1852+
return batch.length > 0 ? batch : null;
1853+
}
1854+
1855+
// Batched async iterator for Readable streams. Same mechanism as
1856+
// createAsyncIterator (same event setup, same stream.read() to
1857+
// trigger _read(), same teardown) but drains all currently buffered
1858+
// chunks into a single Uint8Array[] batch per yield, amortizing the
1859+
// Promise/microtask cost across multiple chunks.
1860+
//
1861+
// When normalize is provided (object-mode / encoded streams), each
1862+
// drained batch is passed through it to convert chunks to Uint8Array.
1863+
// When normalize is null (byte-mode), chunks are already Buffers
1864+
// (Uint8Array subclass) and are yielded directly.
1865+
async function* createBatchedAsyncIterator(stream, normalize) {
1866+
let callback = nop;
1867+
1868+
function next(resolve) {
1869+
if (this === stream) {
1870+
callback();
1871+
callback = nop;
1872+
} else {
1873+
callback = resolve;
1874+
}
1875+
}
1876+
1877+
stream.on('readable', next);
1878+
1879+
let error;
1880+
const cleanup = eos(stream, { writable: false }, (err) => {
1881+
error = err ? aggregateTwoErrors(error, err) : null;
1882+
callback();
1883+
callback = nop;
1884+
});
1885+
1886+
try {
1887+
while (true) {
1888+
const chunk = stream.destroyed ? null : stream.read();
1889+
if (chunk !== null) {
1890+
// Drain any additional already-buffered chunks into the same
1891+
// batch. The first read() may trigger _read() which
1892+
// synchronously pushes more data into the buffer. We drain
1893+
// that buffered data without issuing unbounded _read() calls -
1894+
// once state.length hits 0 or MAX_DRAIN_BATCH is reached, we
1895+
// stop and yield what we have.
1896+
const batch = [chunk];
1897+
while (batch.length < MAX_DRAIN_BATCH &&
1898+
stream._readableState.length > 0) {
1899+
const c = stream.read();
1900+
if (c === null) break;
1901+
batch.push(c);
1902+
}
1903+
if (normalize !== null) {
1904+
const result = await normalize(batch);
1905+
if (result !== null) {
1906+
yield result;
1907+
}
1908+
} else {
1909+
yield batch;
1910+
}
1911+
} else if (error) {
1912+
throw error;
1913+
} else if (error === null) {
1914+
return;
1915+
} else {
1916+
await new Promise(next);
1917+
}
1918+
}
1919+
} catch (err) {
1920+
error = aggregateTwoErrors(error, err);
1921+
throw error;
1922+
} finally {
1923+
if (error === undefined || stream._readableState.autoDestroy) {
1924+
destroyImpl.destroyer(stream, null);
1925+
} else {
1926+
stream.off('readable', next);
1927+
cleanup();
1928+
}
1929+
}
1930+
}
1931+
1932+
Readable.prototype[toAsyncStreamable] = function() {
1933+
lazyInit();
1934+
const state = this._readableState;
1935+
const normalize = (state.objectMode || state.encoding) ?
1936+
normalizeBatch :
1937+
null;
1938+
const iter = createBatchedAsyncIterator(this, normalize);
1939+
iter[kTrustedSource] = true;
1940+
iter.stream = this;
1941+
return iter;
1942+
};
1943+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
'use strict';
2+
3+
// Tests that toAsyncStreamable throws ERR_STREAM_ITER_MISSING_FLAG
4+
// when --experimental-stream-iter is not enabled.
5+
6+
const common = require('../common');
7+
const assert = require('assert');
8+
const { spawnPromisified } = common;
9+
10+
async function testToAsyncStreamableWithoutFlag() {
11+
const { stderr, code } = await spawnPromisified(process.execPath, [
12+
'-e',
13+
`
14+
const { Readable } = require('stream');
15+
const r = new Readable({ read() {} });
16+
r[Symbol.for('Stream.toAsyncStreamable')]();
17+
`,
18+
]);
19+
assert.notStrictEqual(code, 0);
20+
assert.match(stderr, /ERR_STREAM_ITER_MISSING_FLAG/);
21+
}
22+
23+
async function testToAsyncStreamableWithFlag() {
24+
const { code } = await spawnPromisified(process.execPath, [
25+
'--experimental-stream-iter',
26+
'-e',
27+
`
28+
const { Readable } = require('stream');
29+
const r = new Readable({
30+
read() { this.push(Buffer.from('ok')); this.push(null); }
31+
});
32+
const sym = Symbol.for('Stream.toAsyncStreamable');
33+
const iter = r[sym]();
34+
// Should not throw, and should have stream property
35+
if (!iter.stream) process.exit(1);
36+
`,
37+
]);
38+
assert.strictEqual(code, 0);
39+
}
40+
41+
Promise.all([
42+
testToAsyncStreamableWithoutFlag(),
43+
testToAsyncStreamableWithFlag(),
44+
]).then(common.mustCall());

0 commit comments

Comments
 (0)