Skip to content

Commit 1c50f49

Browse files
committed
stream: prototype for new stream implementation
1 parent 4d0cb65 commit 1c50f49

27 files changed

+9162
-0
lines changed
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
// Compare FileHandle.createReadStream() vs readableWebStream() vs pull()
2+
// reading a large file through two transforms: uppercase then gzip compress.
3+
'use strict';
4+
5+
const common = require('../common.js');
6+
const fs = require('fs');
7+
const zlib = require('zlib');
8+
const { Transform, Writable, pipeline } = require('stream');
9+
10+
const tmpdir = require('../../test/common/tmpdir');
11+
tmpdir.refresh();
12+
const filename = tmpdir.resolve(`.removeme-benchmark-garbage-${process.pid}`);
13+
14+
const bench = common.createBenchmark(main, {
15+
api: ['classic', 'webstream', 'pull'],
16+
filesize: [1024 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024],
17+
n: [5],
18+
});
19+
20+
function main({ api, filesize, n }) {
21+
// Create the fixture file with repeating lowercase ASCII
22+
const chunk = Buffer.alloc(Math.min(filesize, 64 * 1024), 'abcdefghij');
23+
const fd = fs.openSync(filename, 'w');
24+
let remaining = filesize;
25+
while (remaining > 0) {
26+
const toWrite = Math.min(remaining, chunk.length);
27+
fs.writeSync(fd, chunk, 0, toWrite);
28+
remaining -= toWrite;
29+
}
30+
fs.closeSync(fd);
31+
32+
if (api === 'classic') {
33+
benchClassic(n, filesize).then(() => cleanup());
34+
} else if (api === 'webstream') {
35+
benchWebStream(n, filesize).then(() => cleanup());
36+
} else {
37+
benchPull(n, filesize).then(() => cleanup());
38+
}
39+
}
40+
41+
function cleanup() {
42+
try { fs.unlinkSync(filename); } catch { /* ignore */ }
43+
}
44+
45+
// ---------------------------------------------------------------------------
46+
// Classic streams path: createReadStream -> Transform (upper) -> createGzip
47+
// ---------------------------------------------------------------------------
48+
async function benchClassic(n, filesize) {
49+
// Warm up
50+
await runClassic();
51+
52+
bench.start();
53+
let totalBytes = 0;
54+
for (let i = 0; i < n; i++) {
55+
totalBytes += await runClassic();
56+
}
57+
bench.end(totalBytes / (1024 * 1024));
58+
}
59+
60+
function runClassic() {
61+
return new Promise((resolve, reject) => {
62+
const rs = fs.createReadStream(filename);
63+
64+
// Transform 1: uppercase
65+
const upper = new Transform({
66+
transform(chunk, encoding, callback) {
67+
const buf = Buffer.allocUnsafe(chunk.length);
68+
for (let i = 0; i < chunk.length; i++) {
69+
const b = chunk[i];
70+
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
71+
}
72+
callback(null, buf);
73+
},
74+
});
75+
76+
// Transform 2: gzip
77+
const gz = zlib.createGzip();
78+
79+
// Sink: count compressed bytes
80+
let totalBytes = 0;
81+
const sink = new Writable({
82+
write(chunk, encoding, callback) {
83+
totalBytes += chunk.length;
84+
callback();
85+
},
86+
});
87+
88+
pipeline(rs, upper, gz, sink, (err) => {
89+
if (err) reject(err);
90+
else resolve(totalBytes);
91+
});
92+
});
93+
}
94+
95+
// ---------------------------------------------------------------------------
96+
// WebStream path: readableWebStream -> TransformStream (upper) -> CompressionStream
97+
// ---------------------------------------------------------------------------
98+
async function benchWebStream(n, filesize) {
99+
// Warm up
100+
await runWebStream();
101+
102+
bench.start();
103+
let totalBytes = 0;
104+
for (let i = 0; i < n; i++) {
105+
totalBytes += await runWebStream();
106+
}
107+
bench.end(totalBytes / (1024 * 1024));
108+
}
109+
110+
async function runWebStream() {
111+
const fh = await fs.promises.open(filename, 'r');
112+
try {
113+
const rs = fh.readableWebStream();
114+
115+
// Transform 1: uppercase
116+
const upper = new TransformStream({
117+
transform(chunk, controller) {
118+
const buf = new Uint8Array(chunk.length);
119+
for (let i = 0; i < chunk.length; i++) {
120+
const b = chunk[i];
121+
// a-z (0x61-0x7a) -> A-Z (0x41-0x5a)
122+
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
123+
}
124+
controller.enqueue(buf);
125+
},
126+
});
127+
128+
// Transform 2: gzip via CompressionStream
129+
const compress = new CompressionStream('gzip');
130+
131+
const output = rs.pipeThrough(upper).pipeThrough(compress);
132+
const reader = output.getReader();
133+
134+
let totalBytes = 0;
135+
while (true) {
136+
const { done, value } = await reader.read();
137+
if (done) break;
138+
totalBytes += value.byteLength;
139+
}
140+
return totalBytes;
141+
} finally {
142+
await fh.close();
143+
}
144+
}
145+
146+
// ---------------------------------------------------------------------------
147+
// New streams path: pull() with uppercase transform + gzip transform
148+
// ---------------------------------------------------------------------------
149+
async function benchPull(n, filesize) {
150+
const { pull, compressGzip } = require('stream/new');
151+
152+
// Warm up
153+
await runPull(pull, compressGzip);
154+
155+
bench.start();
156+
let totalBytes = 0;
157+
for (let i = 0; i < n; i++) {
158+
totalBytes += await runPull(pull, compressGzip);
159+
}
160+
bench.end(totalBytes / (1024 * 1024));
161+
}
162+
163+
async function runPull(pull, compressGzip) {
164+
const fh = await fs.promises.open(filename, 'r');
165+
try {
166+
// Stateless transform: uppercase each chunk in the batch
167+
const upper = (chunks) => {
168+
if (chunks === null) return null;
169+
const out = new Array(chunks.length);
170+
for (let j = 0; j < chunks.length; j++) {
171+
const src = chunks[j];
172+
const buf = new Uint8Array(src.length);
173+
for (let i = 0; i < src.length; i++) {
174+
const b = src[i];
175+
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
176+
}
177+
out[j] = buf;
178+
}
179+
return out;
180+
};
181+
182+
const readable = fh.pull(upper, compressGzip());
183+
184+
// Count bytes symmetrically with the classic path (no final
185+
// concatenation into a single buffer).
186+
let totalBytes = 0;
187+
for await (const chunks of readable) {
188+
for (let i = 0; i < chunks.length; i++) {
189+
totalBytes += chunks[i].byteLength;
190+
}
191+
}
192+
return totalBytes;
193+
} finally {
194+
await fh.close();
195+
}
196+
}

doc/api/fs.md

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,61 @@ added: v10.0.0
377377
378378
* Type: {number} The numeric file descriptor managed by the {FileHandle} object.
379379
380+
#### `filehandle.pull([...transforms][, options])`
381+
382+
<!-- YAML
383+
added: REPLACEME
384+
-->
385+
386+
> Stability: 1 - Experimental
387+
388+
* `...transforms` {Function|Object} Optional transforms to apply via
389+
[`stream/new pull()`][].
390+
* `options` {Object}
391+
* `signal` {AbortSignal}
392+
* `autoClose` {boolean} Close the file handle when the stream ends.
393+
**Default:** `false`.
394+
* Returns: {AsyncIterable\<Uint8Array\[]>}
395+
396+
Return the file contents as an async iterable using the
397+
[`node:stream/new`][] pull model. Reads are performed in 64 KB chunks.
398+
If transforms are provided, they are applied via [`stream/new pull()`][].
399+
400+
The file handle is locked while the iterable is being consumed and unlocked
401+
when iteration completes.
402+
403+
```mjs
404+
import { open } from 'node:fs/promises';
405+
import { text, compressGzip } from 'node:stream/new';
406+
407+
const fh = await open('input.txt', 'r');
408+
409+
// Read as text
410+
console.log(await text(fh.pull({ autoClose: true })));
411+
412+
// Read with compression
413+
const fh2 = await open('input.txt', 'r');
414+
const compressed = fh2.pull(compressGzip(), { autoClose: true });
415+
```
416+
417+
```cjs
418+
const { open } = require('node:fs/promises');
419+
const { text, compressGzip } = require('node:stream/new');
420+
421+
async function run() {
422+
const fh = await open('input.txt', 'r');
423+
424+
// Read as text
425+
console.log(await text(fh.pull({ autoClose: true })));
426+
427+
// Read with compression
428+
const fh2 = await open('input.txt', 'r');
429+
const compressed = fh2.pull(compressGzip(), { autoClose: true });
430+
}
431+
432+
run().catch(console.error);
433+
```
434+
380435
#### `filehandle.read(buffer, offset, length, position)`
381436
382437
<!-- YAML
@@ -859,6 +914,55 @@ On Linux, positional writes don't work when the file is opened in append mode.
859914
The kernel ignores the position argument and always appends the data to
860915
the end of the file.
861916
917+
#### `filehandle.writer([options])`
918+
919+
<!-- YAML
920+
added: REPLACEME
921+
-->
922+
923+
> Stability: 1 - Experimental
924+
925+
* `options` {Object}
926+
* `autoClose` {boolean} Close the file handle when the writer ends.
927+
**Default:** `false`.
928+
* `start` {number} Byte offset to start writing at. **Default:** current
929+
position (append).
930+
* Returns: {Object}
931+
* `write(chunk)` {Function} Returns {Promise\<void>}.
932+
* `writev(chunks)` {Function} Returns {Promise\<void>}. Uses scatter/gather
933+
I/O via a single `writev()` syscall.
934+
* `end()` {Function} Returns {Promise\<number>} total bytes written.
935+
* `abort(reason)` {Function} Returns {Promise\<void>}.
936+
937+
Return a [`node:stream/new`][] writer backed by this file handle.
938+
939+
The writer supports `Symbol.asyncDispose`, so it can be used with
940+
`await using`.
941+
942+
```mjs
943+
import { open } from 'node:fs/promises';
944+
import { from, pipeTo, compressGzip } from 'node:stream/new';
945+
946+
const fh = await open('output.gz', 'w');
947+
const w = fh.writer({ autoClose: true });
948+
await pipeTo(from('Hello!'), compressGzip(), w);
949+
await w.end();
950+
```
951+
952+
```cjs
953+
const { open } = require('node:fs/promises');
954+
const { from, pipeTo, compressGzip } = require('node:stream/new');
955+
956+
async function run() {
957+
const fh = await open('output.gz', 'w');
958+
const w = fh.writer({ autoClose: true });
959+
await pipeTo(from('Hello!'), compressGzip(), w);
960+
await w.end();
961+
}
962+
963+
run().catch(console.error);
964+
```
965+
862966
#### `filehandle[Symbol.asyncDispose]()`
863967
864968
<!-- YAML
@@ -8779,6 +8883,8 @@ the file contents.
87798883
[`inotify(7)`]: https://man7.org/linux/man-pages/man7/inotify.7.html
87808884
[`kqueue(2)`]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
87818885
[`minimatch`]: https://github.com/isaacs/minimatch
8886+
[`node:stream/new`]: stream_new.md
8887+
[`stream/new pull()`]: stream_new.md#pullsource-transforms-options
87828888
[`util.promisify()`]: util.md#utilpromisifyoriginal
87838889
[bigints]: https://tc39.github.io/proposal-bigint
87848890
[caveats]: #caveats

doc/api/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
* [Modules: Packages](packages.md)
4444
* [Modules: TypeScript](typescript.md)
4545
* [Net](net.md)
46+
* [New Streams API](stream_new.md)
4647
* [OS](os.md)
4748
* [Path](path.md)
4849
* [Performance hooks](perf_hooks.md)

0 commit comments

Comments
 (0)