Skip to content

Commit fb49804

Browse files
committed
stream: implement toStreamIterWriter/fromStreamIterWriter
Signed-off-by: James M Snell <jasnell@gmail.com> Assisted-by: Opencode/Opus 4.6
1 parent 8d1adef commit fb49804

File tree

4 files changed

+1866
-0
lines changed

4 files changed

+1866
-0
lines changed

doc/api/stream.md

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -979,6 +979,94 @@ added: v12.3.0
979979

980980
Getter for the property `objectMode` of a given `Writable` stream.
981981

982+
##### `writable.toStreamIterWriter([options])`
983+
984+
<!-- YAML
985+
added: REPLACEME
986+
-->
987+
988+
> Stability: 1 - Experimental
989+
990+
* `options` {Object}
991+
* `backpressure` {string} Backpressure policy. One of `'strict'` (default),
992+
`'block'`, or `'drop-newest'`. See below for details.
993+
* Returns: {Object} A [`stream/iter` Writer][stream-iter-writer] adapter.
994+
995+
When the `--experimental-stream-iter` flag is enabled, returns an adapter
996+
object that conforms to the [`stream/iter`][] Writer interface, allowing the
997+
`Writable` to be used as a destination in the iterable streams API.
998+
999+
Since all writes on a classic `stream.Writable` are fundamentally
1000+
asynchronous, the synchronous methods (`writeSync`, `writevSync`, `endSync`)
1001+
always return `false` or `-1`, deferring to the async path. The per-write
1002+
`options.signal` parameter from the Writer interface is also ignored; classic
1003+
`stream.Writable` has no per-write abort signal support, so cancellation
1004+
should be handled at the pipeline level.
1005+
1006+
**Backpressure policies:**
1007+
1008+
* `'strict'` (default) — writes are rejected with `ERR_INVALID_STATE` when
1009+
the buffer is full (`writableLength >= writableHighWaterMark`). This catches
1010+
callers that ignore backpressure.
1011+
* `'block'` — writes wait for the `'drain'` event when the buffer is full.
1012+
This matches classic `stream.Writable` behavior and is the recommended
1013+
policy when using [`pipeTo()`][stream-iter-pipeto].
1014+
* `'drop-newest'` — writes are silently discarded when the buffer is full.
1015+
The data is not written to the underlying resource, but `writer.end()`
1016+
still reports the total bytes (including dropped bytes) for consistency
1017+
with the Writer spec.
1018+
* `'drop-oldest'`**not supported**. Classic `stream.Writable` does not
1019+
provide an API to evict already-buffered data without risking partial
1020+
eviction of atomic `writev()` batches. Passing this value throws
1021+
`ERR_INVALID_ARG_VALUE`.
1022+
1023+
The adapter maps:
1024+
1025+
* `writer.write(chunk)` — calls `writable.write(chunk)`, subject to the
1026+
backpressure policy.
1027+
* `writer.writev(chunks)` — corks the writable, writes all chunks, then
1028+
uncorks. Subject to the backpressure policy.
1029+
* `writer.end()` — calls `writable.end()` and resolves with total bytes
1030+
written when the `'finish'` event fires.
1031+
* `writer.fail(reason)` — calls `writable.destroy(reason)`.
1032+
* `writer.desiredSize` — returns the available buffer space
1033+
(`writableHighWaterMark - writableLength`), or `null` if the stream
1034+
is destroyed or finished.
1035+
1036+
```mjs
1037+
import { Writable } from 'node:stream';
1038+
import { from, pipeTo } from 'node:stream/iter';
1039+
1040+
const chunks = [];
1041+
const writable = new Writable({
1042+
write(chunk, encoding, cb) { chunks.push(chunk); cb(); },
1043+
});
1044+
1045+
// Use 'block' policy with pipeTo for classic backpressure behavior
1046+
await pipeTo(from('hello world'),
1047+
writable.toStreamIterWriter({ backpressure: 'block' }));
1048+
```
1049+
1050+
```cjs
1051+
const { Writable } = require('node:stream');
1052+
const { from, pipeTo } = require('node:stream/iter');
1053+
1054+
async function run() {
1055+
const chunks = [];
1056+
const writable = new Writable({
1057+
write(chunk, encoding, cb) { chunks.push(chunk); cb(); },
1058+
});
1059+
1060+
await pipeTo(from('hello world'),
1061+
writable.toStreamIterWriter({ backpressure: 'block' }));
1062+
}
1063+
1064+
run().catch(console.error);
1065+
```
1066+
1067+
Without the `--experimental-stream-iter` flag, calling this method throws
1068+
[`ERR_STREAM_ITER_MISSING_FLAG`][].
1069+
9821070
##### `writable[Symbol.asyncDispose]()`
9831071

9841072
<!-- YAML
@@ -3375,6 +3463,62 @@ changes:
33753463
`'bytes'` or undefined.
33763464
* Returns: {ReadableStream}
33773465

3466+
### `stream.Writable.fromStreamIter(writer)`
3467+
3468+
<!-- YAML
3469+
added: REPLACEME
3470+
-->
3471+
3472+
> Stability: 1 - Experimental
3473+
3474+
* `writer` {Object} A [`stream/iter`][] Writer. Only the `write()` method is
3475+
required; `end()`, `fail()`, `writeSync()`, `writevSync()`, `endSync()`,
3476+
and `writev()` are optional.
3477+
* Returns: {stream.Writable}
3478+
3479+
When the `--experimental-stream-iter` flag is enabled, creates a classic
3480+
`stream.Writable` backed by a [`stream/iter` Writer][stream-iter-writer].
3481+
3482+
Each `_write()` / `_writev()` call attempts the Writer's synchronous method
3483+
first (`writeSync` / `writevSync`), falling back to the async method if the
3484+
sync path returns `false`. Similarly, `_final()` tries `endSync()` before
3485+
`end()`. When the sync path succeeds, the callback is deferred via
3486+
`queueMicrotask` to preserve the async resolution contract that Writable
3487+
internals expect.
3488+
3489+
* `_write(chunk, encoding, cb)` — tries `writer.writeSync(bytes)`, falls
3490+
back to `await writer.write(bytes)`.
3491+
* `_writev(entries, cb)` — tries `writer.writevSync(chunks)`, falls
3492+
back to `await writer.writev(chunks)`. Only defined if `writer.writev`
3493+
exists.
3494+
* `_final(cb)` — tries `writer.endSync()`, falls back to
3495+
`await writer.end()`.
3496+
* `_destroy(err, cb)` — calls `writer.fail(err)`.
3497+
3498+
```mjs
3499+
import { Writable } from 'node:stream';
3500+
import { push, from, pipeTo } from 'node:stream/iter';
3501+
3502+
const { writer, readable } = push();
3503+
const writable = Writable.fromStreamIter(writer);
3504+
3505+
writable.write('hello');
3506+
writable.end();
3507+
```
3508+
3509+
```cjs
3510+
const { Writable } = require('node:stream');
3511+
const { push, from, pipeTo } = require('node:stream/iter');
3512+
3513+
const { writer, readable } = push();
3514+
const writable = Writable.fromStreamIter(writer);
3515+
3516+
writable.write('hello');
3517+
writable.end();
3518+
```
3519+
3520+
This method requires the `--experimental-stream-iter` CLI flag.
3521+
33783522
### `stream.Writable.fromWeb(writableStream[, options])`
33793523

33803524
<!-- YAML
@@ -5209,6 +5353,8 @@ contain multi-byte characters.
52095353
[stream-finished]: #streamfinishedstream-options-callback
52105354
[stream-finished-promise]: #streamfinishedstream-options
52115355
[stream-iter-from]: stream_iter.md#frominput
5356+
[stream-iter-pipeto]: stream_iter.md#pipetosource-transforms-writer
5357+
[stream-iter-writer]: stream_iter.md#writer-interface
52125358
[stream-pause]: #readablepause
52135359
[stream-pipeline]: #streampipelinesource-transforms-destination-callback
52145360
[stream-pipeline-promise]: #streampipelinesource-transforms-destination-options

0 commit comments

Comments
 (0)