Skip to content

Commit 8d1adef

Browse files
committed
stream: add fromStreamIter/fromStreamIterSync to stream.Readable
Signed-off-by: James M Snell <jasnell@gmail.com> Assisted-by: Opencode/Opus 4.6
1 parent 0becf76 commit 8d1adef

File tree

4 files changed

+940
-0
lines changed

4 files changed

+940
-0
lines changed

doc/api/stream.md

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1998,6 +1998,61 @@ option. In the code example above, data will be in a single chunk if the file
19981998
has less then 64 KiB of data because no `highWaterMark` option is provided to
19991999
[`fs.createReadStream()`][].
20002000

2001+
##### `readable[Symbol.for('Stream.toAsyncStreamable')]()`
2002+
2003+
<!-- YAML
2004+
added: REPLACEME
2005+
-->
2006+
2007+
> Stability: 1 - Experimental
2008+
2009+
* Returns: {AsyncIterable} An `AsyncIterable<Uint8Array[]>` that yields
2010+
batched chunks from the stream.
2011+
2012+
When the `--experimental-stream-iter` flag is enabled, `Readable` streams
2013+
implement the [`Stream.toAsyncStreamable`][] protocol, enabling efficient
2014+
consumption by the [`stream/iter`][] API.
2015+
2016+
This provides a batched async iterator that drains the stream's internal
2017+
buffer into `Uint8Array[]` batches, amortizing the per-chunk Promise overhead
2018+
of the standard `Symbol.asyncIterator` path. For byte-mode streams, chunks
2019+
are yielded directly as `Buffer` instances (which are `Uint8Array` subclasses).
2020+
For object-mode or encoded streams, each chunk is normalized to `Uint8Array`
2021+
before batching.
2022+
2023+
The returned iterator is tagged as a trusted source, so [`from()`][stream-iter-from]
2024+
passes it through without additional normalization.
2025+
2026+
```mjs
2027+
import { Readable } from 'node:stream';
2028+
import { text, from } from 'node:stream/iter';
2029+
2030+
const readable = new Readable({
2031+
read() { this.push('hello'); this.push(null); },
2032+
});
2033+
2034+
// Readable is automatically consumed via toAsyncStreamable
2035+
console.log(await text(from(readable))); // 'hello'
2036+
```
2037+
2038+
```cjs
2039+
const { Readable } = require('node:stream');
2040+
const { text, from } = require('node:stream/iter');
2041+
2042+
async function run() {
2043+
const readable = new Readable({
2044+
read() { this.push('hello'); this.push(null); },
2045+
});
2046+
2047+
console.log(await text(from(readable))); // 'hello'
2048+
}
2049+
2050+
run().catch(console.error);
2051+
```
2052+
2053+
Without the `--experimental-stream-iter` flag, calling this method throws
2054+
[`ERR_STREAM_ITER_MISSING_FLAG`][].
2055+
20012056
##### `readable[Symbol.asyncDispose]()`
20022057

20032058
<!-- YAML
@@ -3152,6 +3207,101 @@ Readable.from([
31523207
]);
31533208
```
31543209

3210+
### `stream.Readable.fromStreamIter(source[, options])`
3211+
3212+
<!-- YAML
3213+
added: REPLACEME
3214+
-->
3215+
3216+
> Stability: 1 - Experimental
3217+
3218+
* `source` {AsyncIterable} An `AsyncIterable<Uint8Array[]>` source, such as
3219+
the return value of [`pull()`][] or [`from()`][stream-iter-from].
3220+
* `options` {Object}
3221+
* `highWaterMark` {number} The internal buffer size in bytes before
3222+
backpressure is applied. **Default:** `65536` (64 KB).
3223+
* `signal` {AbortSignal} An optional signal that can be used to abort
3224+
the readable, destroying the stream and cleaning up the source iterator.
3225+
* Returns: {stream.Readable}
3226+
3227+
Creates a byte-mode {stream.Readable} from an `AsyncIterable<Uint8Array[]>`
3228+
(the native batch format used by the [`stream/iter`][] API). Each
3229+
`Uint8Array` in a yielded batch is pushed as a separate chunk into the
3230+
Readable.
3231+
3232+
This method requires the `--experimental-stream-iter` CLI flag.
3233+
3234+
```mjs
3235+
import { Readable } from 'node:stream';
3236+
import { createWriteStream } from 'node:fs';
3237+
import { from, pull } from 'node:stream/iter';
3238+
import { compressGzip } from 'node:zlib/iter';
3239+
3240+
// Bridge a stream/iter pipeline to a classic Readable
3241+
const source = pull(from('hello world'), compressGzip());
3242+
const readable = Readable.fromStreamIter(source);
3243+
3244+
readable.pipe(createWriteStream('output.gz'));
3245+
```
3246+
3247+
```cjs
3248+
const { Readable } = require('node:stream');
3249+
const { createWriteStream } = require('node:fs');
3250+
const { from, pull } = require('node:stream/iter');
3251+
const { compressGzip } = require('node:zlib/iter');
3252+
3253+
const source = pull(from('hello world'), compressGzip());
3254+
const readable = Readable.fromStreamIter(source);
3255+
3256+
readable.pipe(createWriteStream('output.gz'));
3257+
```
3258+
3259+
### `stream.Readable.fromStreamIterSync(source[, options])`
3260+
3261+
<!-- YAML
3262+
added: REPLACEME
3263+
-->
3264+
3265+
> Stability: 1 - Experimental
3266+
3267+
* `source` {Iterable} An `Iterable<Uint8Array[]>` source, such as the
3268+
return value of [`pullSync()`][] or [`fromSync()`][].
3269+
* `options` {Object}
3270+
* `highWaterMark` {number} The internal buffer size in bytes before
3271+
backpressure is applied. **Default:** `65536` (64 KB).
3272+
* Returns: {stream.Readable}
3273+
3274+
Creates a byte-mode {stream.Readable} from a synchronous
3275+
`Iterable<Uint8Array[]>` (the native batch format used by the
3276+
[`stream/iter`][] sync API). Each `Uint8Array` in a yielded batch is
3277+
pushed as a separate chunk into the Readable.
3278+
3279+
The `_read()` method pulls from the iterator synchronously, so data is
3280+
available immediately via `readable.read()` without waiting for async
3281+
callbacks.
3282+
3283+
This method requires the `--experimental-stream-iter` CLI flag.
3284+
3285+
```mjs
3286+
import { Readable } from 'node:stream';
3287+
import { fromSync } from 'node:stream/iter';
3288+
3289+
const source = fromSync('hello world');
3290+
const readable = Readable.fromStreamIterSync(source);
3291+
3292+
console.log(readable.read().toString()); // 'hello world'
3293+
```
3294+
3295+
```cjs
3296+
const { Readable } = require('node:stream');
3297+
const { fromSync } = require('node:stream/iter');
3298+
3299+
const source = fromSync('hello world');
3300+
const readable = Readable.fromStreamIterSync(source);
3301+
3302+
console.log(readable.read().toString()); // 'hello world'
3303+
```
3304+
31553305
### `stream.Readable.fromWeb(readableStream[, options])`
31563306

31573307
<!-- YAML
@@ -4997,17 +5147,22 @@ contain multi-byte characters.
49975147
[`'finish'`]: #event-finish
49985148
[`'readable'`]: #event-readable
49995149
[`Duplex`]: #class-streamduplex
5150+
[`ERR_STREAM_ITER_MISSING_FLAG`]: errors.md#err_stream_iter_missing_flag
50005151
[`EventEmitter`]: events.md#class-eventemitter
50015152
[`Readable`]: #class-streamreadable
5153+
[`Stream.toAsyncStreamable`]: stream_iter.md#streamtoasyncstreamable
50025154
[`Symbol.hasInstance`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol/hasInstance
50035155
[`Transform`]: #class-streamtransform
50045156
[`Writable`]: #class-streamwritable
5157+
[`fromSync()`]: stream_iter.md#fromsyncinput
50055158
[`fs.createReadStream()`]: fs.md#fscreatereadstreampath-options
50065159
[`fs.createWriteStream()`]: fs.md#fscreatewritestreampath-options
50075160
[`net.Socket`]: net.md#class-netsocket
50085161
[`process.stderr`]: process.md#processstderr
50095162
[`process.stdin`]: process.md#processstdin
50105163
[`process.stdout`]: process.md#processstdout
5164+
[`pull()`]: stream_iter.md#pullsource-transforms-options
5165+
[`pullSync()`]: stream_iter.md#pullsyncsource-transforms-options
50115166
[`readable._read()`]: #readable_readsize
50125167
[`readable.compose(stream)`]: #readablecomposestream-options
50135168
[`readable.map`]: #readablemapfn-options
@@ -5024,6 +5179,7 @@ contain multi-byte characters.
50245179
[`stream.uncork()`]: #writableuncork
50255180
[`stream.unpipe()`]: #readableunpipedestination
50265181
[`stream.wrap()`]: #readablewrapstream
5182+
[`stream/iter`]: stream_iter.md
50275183
[`writable._final()`]: #writable_finalcallback
50285184
[`writable._write()`]: #writable_writechunk-encoding-callback
50295185
[`writable._writev()`]: #writable_writevchunks-callback
@@ -5052,6 +5208,7 @@ contain multi-byte characters.
50525208
[stream-end]: #writableendchunk-encoding-callback
50535209
[stream-finished]: #streamfinishedstream-options-callback
50545210
[stream-finished-promise]: #streamfinishedstream-options
5211+
[stream-iter-from]: stream_iter.md#frominput
50555212
[stream-pause]: #readablepause
50565213
[stream-pipeline]: #streampipelinesource-transforms-destination-callback
50575214
[stream-pipeline-promise]: #streampipelinesource-transforms-destination-options

lib/internal/streams/readable.js

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,15 @@ const {
3030
ObjectKeys,
3131
ObjectSetPrototypeOf,
3232
Promise,
33+
PromisePrototypeThen,
34+
PromiseWithResolvers,
3335
ReflectApply,
3436
SafeSet,
3537
Symbol,
3638
SymbolAsyncDispose,
3739
SymbolAsyncIterator,
3840
SymbolFor,
41+
SymbolIterator,
3942
SymbolSpecies,
4043
TypedArrayPrototypeSet,
4144
} = primordials;
@@ -93,6 +96,7 @@ const {
9396
} = require('internal/errors');
9497
const {
9598
validateAbortSignal,
99+
validateInteger,
96100
validateObject,
97101
} = require('internal/validators');
98102

@@ -1811,6 +1815,7 @@ Readable.wrap = function(src, options) {
18111815
// always defined but lazily initializes on first call - throwing if the
18121816
// flag is not set, or installing the real implementation if it is.
18131817
{
1818+
const kNullPrototype = { __proto__: null };
18141819
const toAsyncStreamable = SymbolFor('Stream.toAsyncStreamable');
18151820
let kTrustedSource;
18161821
let normalizeAsyncValue;
@@ -1940,4 +1945,136 @@ Readable.wrap = function(src, options) {
19401945
iter.stream = this;
19411946
return iter;
19421947
};
1948+
1949+
// Create a byte-mode Readable from an AsyncIterable<Uint8Array[]>.
1950+
// The source must yield Uint8Array[] batches (the stream/iter native
1951+
// format). Each Uint8Array in a batch is pushed as a separate chunk.
1952+
Readable.fromStreamIter = function fromStreamIter(source, options = kNullPrototype) {
1953+
lazyInit();
1954+
if (typeof source?.[SymbolAsyncIterator] !== 'function') {
1955+
throw new ERR_INVALID_ARG_TYPE('source', 'AsyncIterable', source);
1956+
}
1957+
1958+
validateObject(options, 'options');
1959+
const {
1960+
highWaterMark = 64 * 1024,
1961+
signal,
1962+
} = options;
1963+
validateInteger(highWaterMark, 'options.highWaterMark', 0);
1964+
if (signal !== undefined) {
1965+
validateAbortSignal(signal, 'options.signal');
1966+
}
1967+
1968+
const iterator = source[SymbolAsyncIterator]();
1969+
let backpressure;
1970+
let pumping = false;
1971+
let done = false;
1972+
1973+
const readable = new Readable({
1974+
__proto__: null,
1975+
highWaterMark,
1976+
read() {
1977+
if (backpressure) {
1978+
const { resolve } = backpressure;
1979+
backpressure = null;
1980+
resolve();
1981+
} else if (!pumping && !done) {
1982+
pumping = true;
1983+
pump();
1984+
}
1985+
},
1986+
destroy(err, cb) {
1987+
done = true;
1988+
// Wake up the pump if it's waiting on backpressure so it
1989+
// can see done === true and exit cleanly.
1990+
if (backpressure) {
1991+
backpressure.resolve();
1992+
backpressure = null;
1993+
}
1994+
if (typeof iterator.return === 'function') {
1995+
PromisePrototypeThen(iterator.return(),
1996+
() => cb(err), (e) => cb(e || err));
1997+
} else {
1998+
cb(err);
1999+
}
2000+
},
2001+
});
2002+
2003+
if (signal) {
2004+
addAbortSignalNoValidate(signal, readable);
2005+
}
2006+
2007+
async function pump() {
2008+
try {
2009+
while (!done) {
2010+
const { value: batch, done: iterDone } = await iterator.next();
2011+
if (iterDone) {
2012+
done = true;
2013+
readable.push(null);
2014+
return;
2015+
}
2016+
// Individual chunks are not validated as Uint8Array here.
2017+
// The caller is responsible for providing a well-formed
2018+
// AsyncIterable<Uint8Array[]> source. If a non-Buffer chunk
2019+
// is pushed, the byte-mode Readable will throw internally.
2020+
for (let i = 0; i < batch.length; i++) {
2021+
if (!readable.push(batch[i])) {
2022+
// Backpressure: wait for next _read() call
2023+
backpressure = PromiseWithResolvers();
2024+
await backpressure.promise;
2025+
if (done) return; // Destroyed while waiting
2026+
}
2027+
}
2028+
}
2029+
} catch (err) {
2030+
done = true;
2031+
readable.destroy(err);
2032+
}
2033+
}
2034+
2035+
return readable;
2036+
};
2037+
2038+
// Create a byte-mode Readable from an Iterable<Uint8Array[]>.
2039+
// Fully synchronous - _read() pulls from the iterator directly.
2040+
// The source must yield Uint8Array[] batches.
2041+
Readable.fromStreamIterSync = function fromStreamIterSync(source, options = kNullPrototype) {
2042+
lazyInit();
2043+
if (typeof source?.[SymbolIterator] !== 'function') {
2044+
throw new ERR_INVALID_ARG_TYPE('source', 'Iterable', source);
2045+
}
2046+
2047+
validateObject(options, 'options');
2048+
const {
2049+
highWaterMark = 64 * 1024,
2050+
} = options;
2051+
validateInteger(highWaterMark, 'options.highWaterMark', 0);
2052+
2053+
const iterator = source[SymbolIterator]();
2054+
2055+
return new Readable({
2056+
__proto__: null,
2057+
highWaterMark,
2058+
read() {
2059+
for (;;) {
2060+
const { value: batch, done } = iterator.next();
2061+
if (done) {
2062+
this.push(null);
2063+
return;
2064+
}
2065+
// Individual chunks are not validated as Uint8Array here.
2066+
// The caller is responsible for providing a well-formed
2067+
// Iterable<Uint8Array[]> source. If a non-Buffer chunk
2068+
// is pushed, the byte-mode Readable will throw internally.
2069+
for (let i = 0; i < batch.length; i++) {
2070+
if (!this.push(batch[i])) return;
2071+
}
2072+
}
2073+
},
2074+
destroy(err, cb) {
2075+
if (typeof iterator.return === 'function') iterator.return();
2076+
cb(err);
2077+
},
2078+
});
2079+
};
19432080
}

0 commit comments

Comments
 (0)