Skip to content

Commit 500c3a5

Browse files
committed
stream: add stream/iter to classic stream adapters
Signed-off-by: James M Snell <jasnell@gmail.com> Assisted-by: Opencode/Opus 4.6 PR-URL: #62469 Reviewed-By: Robert Nagy <ronagy@icloud.com>
1 parent 4f08c64 commit 500c3a5

17 files changed

+3848
-14
lines changed

doc/api/errors.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2882,6 +2882,13 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream.
28822882
A stream method was called that cannot complete because the stream was
28832883
destroyed using `stream.destroy()`.
28842884

2885+
<a id="ERR_STREAM_ITER_MISSING_FLAG"></a>
2886+
2887+
### `ERR_STREAM_ITER_MISSING_FLAG`
2888+
2889+
A stream/iter API was used without the `--experimental-stream-iter` CLI flag
2890+
enabled.
2891+
28852892
<a id="ERR_STREAM_NULL_VALUES"></a>
28862893

28872894
### `ERR_STREAM_NULL_VALUES`

doc/api/stream.md

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1998,6 +1998,61 @@ option. In the code example above, data will be in a single chunk if the file
19981998
has less then 64 KiB of data because no `highWaterMark` option is provided to
19991999
[`fs.createReadStream()`][].
20002000

2001+
##### `readable[Symbol.for('Stream.toAsyncStreamable')]()`
2002+
2003+
<!-- YAML
2004+
added: REPLACEME
2005+
-->
2006+
2007+
> Stability: 1 - Experimental
2008+
2009+
* Returns: {AsyncIterable} An `AsyncIterable<Uint8Array[]>` that yields
2010+
batched chunks from the stream.
2011+
2012+
When the `--experimental-stream-iter` flag is enabled, `Readable` streams
2013+
implement the [`Stream.toAsyncStreamable`][] protocol, enabling efficient
2014+
consumption by the [`stream/iter`][] API.
2015+
2016+
This provides a batched async iterator that drains the stream's internal
2017+
buffer into `Uint8Array[]` batches, amortizing the per-chunk Promise overhead
2018+
of the standard `Symbol.asyncIterator` path. For byte-mode streams, chunks
2019+
are yielded directly as `Buffer` instances (which are `Uint8Array` subclasses).
2020+
For object-mode or encoded streams, each chunk is normalized to `Uint8Array`
2021+
before batching.
2022+
2023+
The returned iterator is tagged as a validated source, so [`from()`][stream-iter-from]
2024+
passes it through without additional normalization.
2025+
2026+
```mjs
2027+
import { Readable } from 'node:stream';
2028+
import { text, from } from 'node:stream/iter';
2029+
2030+
const readable = new Readable({
2031+
read() { this.push('hello'); this.push(null); },
2032+
});
2033+
2034+
// Readable is automatically consumed via toAsyncStreamable
2035+
console.log(await text(from(readable))); // 'hello'
2036+
```
2037+
2038+
```cjs
2039+
const { Readable } = require('node:stream');
2040+
const { text, from } = require('node:stream/iter');
2041+
2042+
async function run() {
2043+
const readable = new Readable({
2044+
read() { this.push('hello'); this.push(null); },
2045+
});
2046+
2047+
console.log(await text(from(readable))); // 'hello'
2048+
}
2049+
2050+
run().catch(console.error);
2051+
```
2052+
2053+
Without the `--experimental-stream-iter` flag, calling this method throws
2054+
[`ERR_STREAM_ITER_MISSING_FLAG`][].
2055+
20012056
##### `readable[Symbol.asyncDispose]()`
20022057

20032058
<!-- YAML
@@ -4997,8 +5052,10 @@ contain multi-byte characters.
49975052
[`'finish'`]: #event-finish
49985053
[`'readable'`]: #event-readable
49995054
[`Duplex`]: #class-streamduplex
5055+
[`ERR_STREAM_ITER_MISSING_FLAG`]: errors.md#err_stream_iter_missing_flag
50005056
[`EventEmitter`]: events.md#class-eventemitter
50015057
[`Readable`]: #class-streamreadable
5058+
[`Stream.toAsyncStreamable`]: stream_iter.md#streamtoasyncstreamable
50025059
[`Symbol.hasInstance`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol/hasInstance
50035060
[`Transform`]: #class-streamtransform
50045061
[`Writable`]: #class-streamwritable
@@ -5024,6 +5081,7 @@ contain multi-byte characters.
50245081
[`stream.uncork()`]: #writableuncork
50255082
[`stream.unpipe()`]: #readableunpipedestination
50265083
[`stream.wrap()`]: #readablewrapstream
5084+
[`stream/iter`]: stream_iter.md
50275085
[`writable._final()`]: #writable_finalcallback
50285086
[`writable._write()`]: #writable_writechunk-encoding-callback
50295087
[`writable._writev()`]: #writable_writevchunks-callback
@@ -5052,6 +5110,7 @@ contain multi-byte characters.
50525110
[stream-end]: #writableendchunk-encoding-callback
50535111
[stream-finished]: #streamfinishedstream-options-callback
50545112
[stream-finished-promise]: #streamfinishedstream-options
5113+
[stream-iter-from]: stream_iter.md#frominput
50555114
[stream-pause]: #readablepause
50565115
[stream-pipeline]: #streampipelinesource-transforms-destination-callback
50575116
[stream-pipeline-promise]: #streampipelinesource-transforms-destination-options

doc/api/stream_iter.md

Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1424,6 +1424,258 @@ Compression and decompression transforms for use with `pull()`, `pullSync()`,
14241424
`pipeTo()`, and `pipeToSync()` are available via the [`node:zlib/iter`][]
14251425
module. See the [`node:zlib/iter` documentation][] for details.
14261426

1427+
## Classic stream interop
1428+
1429+
These utility functions bridge between classic
1430+
[`stream.Readable`][]/[`stream.Writable`][] streams and the `stream/iter`
1431+
API.
1432+
1433+
Both `fromReadable()` and `fromWritable()` accept duck-typed objects -- they
1434+
do not require the input to extend `stream.Readable` or `stream.Writable`
1435+
directly. The minimum contract is described below for each function.
1436+
1437+
### `fromReadable(readable)`
1438+
1439+
<!-- YAML
1440+
added: REPLACEME
1441+
-->
1442+
1443+
> Stability: 1 - Experimental
1444+
1445+
* `readable` {stream.Readable|Object} A classic Readable stream or any object
1446+
with `read()` and `on()` methods.
1447+
* Returns: {AsyncIterable\<Uint8Array\[]>} A stream/iter async iterable source.
1448+
1449+
Converts a classic Readable stream (or duck-typed equivalent) into a
1450+
stream/iter async iterable source that can be passed to [`from()`][],
1451+
[`pull()`][], [`text()`][], etc.
1452+
1453+
If the object implements the [`toAsyncStreamable`][] protocol (as
1454+
`stream.Readable` does), that protocol is used. Otherwise, the function
1455+
duck-types on `read()` and `on()` (EventEmitter) and wraps the stream with
1456+
a batched async iterator.
1457+
1458+
The result is cached per instance -- calling `fromReadable()` twice with the
1459+
same stream returns the same iterable.
1460+
1461+
For object-mode or encoded Readable streams, chunks are automatically
1462+
normalized to `Uint8Array`.
1463+
1464+
```mjs
1465+
import { Readable } from 'node:stream';
1466+
import { fromReadable, text } from 'node:stream/iter';
1467+
1468+
const readable = new Readable({
1469+
read() { this.push('hello world'); this.push(null); },
1470+
});
1471+
1472+
const result = await text(fromReadable(readable));
1473+
console.log(result); // 'hello world'
1474+
```
1475+
1476+
```cjs
1477+
const { Readable } = require('node:stream');
1478+
const { fromReadable, text } = require('node:stream/iter');
1479+
1480+
const readable = new Readable({
1481+
read() { this.push('hello world'); this.push(null); },
1482+
});
1483+
1484+
async function run() {
1485+
const result = await text(fromReadable(readable));
1486+
console.log(result); // 'hello world'
1487+
}
1488+
run();
1489+
```
1490+
1491+
### `fromWritable(writable[, options])`
1492+
1493+
<!-- YAML
1494+
added: REPLACEME
1495+
-->
1496+
1497+
> Stability: 1 - Experimental
1498+
1499+
* `writable` {stream.Writable|Object} A classic Writable stream or any object
1500+
with `write()` and `on()` methods.
1501+
* `options` {Object}
1502+
* `backpressure` {string} Backpressure policy. **Default:** `'strict'`.
1503+
* `'strict'` -- writes are rejected when the buffer is full. Catches
1504+
callers that ignore backpressure.
1505+
* `'block'` -- writes wait for drain when the buffer is full. Recommended
1506+
for use with [`pipeTo()`][].
1507+
* `'drop-newest'` -- writes are silently discarded when the buffer is full.
1508+
* `'drop-oldest'` -- **not supported**. Throws `ERR_INVALID_ARG_VALUE`.
1509+
* Returns: {Object} A stream/iter Writer adapter.
1510+
1511+
Creates a stream/iter Writer adapter from a classic Writable stream (or
1512+
duck-typed equivalent). The adapter can be passed to [`pipeTo()`][] as a
1513+
destination.
1514+
1515+
Since all writes on a classic Writable are fundamentally asynchronous,
1516+
the synchronous Writer methods (`writeSync`, `writevSync`, `endSync`) always
1517+
return `false` or `-1`, deferring to the async path. The per-write
1518+
`options.signal` parameter from the Writer interface is also ignored.
1519+
1520+
The result is cached per instance -- calling `fromWritable()` twice with the
1521+
same stream returns the same Writer.
1522+
1523+
For duck-typed streams that do not expose `writableHighWaterMark`,
1524+
`writableLength`, or similar properties, sensible defaults are used.
1525+
Object-mode writables (if detectable) are rejected since the Writer
1526+
interface is bytes-only.
1527+
1528+
```mjs
1529+
import { Writable } from 'node:stream';
1530+
import { from, fromWritable, pipeTo } from 'node:stream/iter';
1531+
1532+
const writable = new Writable({
1533+
write(chunk, encoding, cb) { console.log(chunk.toString()); cb(); },
1534+
});
1535+
1536+
await pipeTo(from('hello world'),
1537+
fromWritable(writable, { backpressure: 'block' }));
1538+
```
1539+
1540+
```cjs
1541+
const { Writable } = require('node:stream');
1542+
const { from, fromWritable, pipeTo } = require('node:stream/iter');
1543+
1544+
async function run() {
1545+
const writable = new Writable({
1546+
write(chunk, encoding, cb) { console.log(chunk.toString()); cb(); },
1547+
});
1548+
1549+
await pipeTo(from('hello world'),
1550+
fromWritable(writable, { backpressure: 'block' }));
1551+
}
1552+
run();
1553+
```
1554+
1555+
### `toReadable(source[, options])`
1556+
1557+
<!-- YAML
1558+
added: REPLACEME
1559+
-->
1560+
1561+
> Stability: 1 - Experimental
1562+
1563+
* `source` {AsyncIterable} An `AsyncIterable<Uint8Array[]>` source, such as
1564+
the return value of [`pull()`][] or [`from()`][].
1565+
* `options` {Object}
1566+
* `highWaterMark` {number} The internal buffer size in bytes before
1567+
backpressure is applied. **Default:** `65536` (64 KB).
1568+
* `signal` {AbortSignal} An optional signal to abort the readable.
1569+
* Returns: {stream.Readable}
1570+
1571+
Creates a byte-mode [`stream.Readable`][] from an `AsyncIterable<Uint8Array[]>`
1572+
(the native batch format used by the stream/iter API). Each `Uint8Array` in a
1573+
yielded batch is pushed as a separate chunk into the Readable.
1574+
1575+
```mjs
1576+
import { createWriteStream } from 'node:fs';
1577+
import { from, pull, toReadable } from 'node:stream/iter';
1578+
import { compressGzip } from 'node:zlib/iter';
1579+
1580+
const source = pull(from('hello world'), compressGzip());
1581+
const readable = toReadable(source);
1582+
1583+
readable.pipe(createWriteStream('output.gz'));
1584+
```
1585+
1586+
```cjs
1587+
const { createWriteStream } = require('node:fs');
1588+
const { from, pull, toReadable } = require('node:stream/iter');
1589+
const { compressGzip } = require('node:zlib/iter');
1590+
1591+
const source = pull(from('hello world'), compressGzip());
1592+
const readable = toReadable(source);
1593+
1594+
readable.pipe(createWriteStream('output.gz'));
1595+
```
1596+
1597+
### `toReadableSync(source[, options])`
1598+
1599+
<!-- YAML
1600+
added: REPLACEME
1601+
-->
1602+
1603+
> Stability: 1 - Experimental
1604+
1605+
* `source` {Iterable} An `Iterable<Uint8Array[]>` source, such as the
1606+
return value of [`pullSync()`][] or [`fromSync()`][].
1607+
* `options` {Object}
1608+
* `highWaterMark` {number} The internal buffer size in bytes before
1609+
backpressure is applied. **Default:** `65536` (64 KB).
1610+
* Returns: {stream.Readable}
1611+
1612+
Creates a byte-mode [`stream.Readable`][] from a synchronous
1613+
`Iterable<Uint8Array[]>`. The `_read()` method pulls from the iterator
1614+
synchronously, so data is available immediately via `readable.read()`.
1615+
1616+
```mjs
1617+
import { fromSync, toReadableSync } from 'node:stream/iter';
1618+
1619+
const source = fromSync('hello world');
1620+
const readable = toReadableSync(source);
1621+
1622+
console.log(readable.read().toString()); // 'hello world'
1623+
```
1624+
1625+
```cjs
1626+
const { fromSync, toReadableSync } = require('node:stream/iter');
1627+
1628+
const source = fromSync('hello world');
1629+
const readable = toReadableSync(source);
1630+
1631+
console.log(readable.read().toString()); // 'hello world'
1632+
```
1633+
1634+
### `toWritable(writer)`
1635+
1636+
<!-- YAML
1637+
added: REPLACEME
1638+
-->
1639+
1640+
> Stability: 1 - Experimental
1641+
1642+
* `writer` {Object} A stream/iter Writer. Only the `write()` method is
1643+
required; `end()`, `fail()`, `writeSync()`, `writevSync()`, `endSync()`,
1644+
and `writev()` are optional.
1645+
* Returns: {stream.Writable}
1646+
1647+
Creates a classic [`stream.Writable`][] backed by a stream/iter Writer.
1648+
1649+
Each `_write()` / `_writev()` call attempts the Writer's synchronous method
1650+
first (`writeSync` / `writevSync`), falling back to the async method if the
1651+
sync path returns `false` or throws. Similarly, `_final()` tries `endSync()`
1652+
before `end()`. When the sync path succeeds, the callback is deferred via
1653+
`queueMicrotask` to preserve the async resolution contract.
1654+
1655+
The Writable's `highWaterMark` is set to `Number.MAX_SAFE_INTEGER` to
1656+
effectively disable its internal buffering, allowing the underlying Writer
1657+
to manage backpressure directly.
1658+
1659+
```mjs
1660+
import { push, toWritable } from 'node:stream/iter';
1661+
1662+
const { writer, readable } = push();
1663+
const writable = toWritable(writer);
1664+
1665+
writable.write('hello');
1666+
writable.end();
1667+
```
1668+
1669+
```cjs
1670+
const { push, toWritable } = require('node:stream/iter');
1671+
1672+
const { writer, readable } = push();
1673+
const writable = toWritable(writer);
1674+
1675+
writable.write('hello');
1676+
writable.end();
1677+
```
1678+
14271679
## Protocol symbols
14281680

14291681
These well-known symbols allow third-party objects to participate in the
@@ -1816,10 +2068,15 @@ console.log(textSync(stream)); // 'hello world'
18162068
[`arrayBuffer()`]: #arraybuffersource-options
18172069
[`bytes()`]: #bytessource-options
18182070
[`from()`]: #frominput
2071+
[`fromSync()`]: #fromsyncinput
18192072
[`node:zlib/iter`]: zlib_iter.md
18202073
[`node:zlib/iter` documentation]: zlib_iter.md
18212074
[`pipeTo()`]: #pipetosource-transforms-writer-options
18222075
[`pull()`]: #pullsource-transforms-options
2076+
[`pullSync()`]: #pullsyncsource-transforms-options
18232077
[`share()`]: #sharesource-options
2078+
[`stream.Readable`]: stream.md#class-streamreadable
2079+
[`stream.Writable`]: stream.md#class-streamwritable
18242080
[`tap()`]: #tapcallback
18252081
[`text()`]: #textsource-options
2082+
[`toAsyncStreamable`]: #streamtoasyncstreamable

lib/internal/errors.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1770,6 +1770,8 @@ E('ERR_STREAM_ALREADY_FINISHED',
17701770
Error);
17711771
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
17721772
E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error);
1773+
E('ERR_STREAM_ITER_MISSING_FLAG',
1774+
'The stream/iter API requires the --experimental-stream-iter flag', TypeError);
17731775
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
17741776
E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error);
17751777
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);

0 commit comments

Comments
 (0)