Skip to content

Commit 8e44dd5

Browse files
committed
fs: fill out more of the FileHandle pull/writer impl
1 parent 061f1ff commit 8e44dd5

15 files changed

+3173
-486
lines changed
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Flags: --experimental-stream-iter
2+
// Benchmark: pipeToSync with sync compression transforms.
3+
// Measures fully synchronous file-to-file pipeline (no threadpool, no promises).
4+
'use strict';
5+
6+
const common = require('../common.js');
7+
const fs = require('fs');
8+
const { openSync, closeSync, writeSync, readFileSync, unlinkSync } = fs;
9+
10+
const tmpdir = require('../../test/common/tmpdir');
11+
tmpdir.refresh();
12+
const srcFile = tmpdir.resolve(`.removeme-sync-bench-src-${process.pid}`);
13+
const dstFile = tmpdir.resolve(`.removeme-sync-bench-dst-${process.pid}`);
14+
15+
const bench = common.createBenchmark(main, {
16+
compression: ['gzip', 'deflate', 'brotli', 'zstd'],
17+
filesize: [1024 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024],
18+
n: [5],
19+
});
20+
21+
function main({ compression, filesize, n }) {
22+
// Create the fixture file with repeating lowercase ASCII
23+
const chunk = Buffer.alloc(Math.min(filesize, 64 * 1024), 'abcdefghij');
24+
const fd = openSync(srcFile, 'w');
25+
let remaining = filesize;
26+
while (remaining > 0) {
27+
const toWrite = Math.min(remaining, chunk.length);
28+
writeSync(fd, chunk, 0, toWrite);
29+
remaining -= toWrite;
30+
}
31+
closeSync(fd);
32+
33+
const {
34+
pipeToSync,
35+
compressGzipSync,
36+
compressDeflateSync,
37+
compressBrotliSync,
38+
compressZstdSync,
39+
} = require('stream/iter');
40+
const { open } = fs.promises;
41+
42+
const compressFactory = {
43+
gzip: compressGzipSync,
44+
deflate: compressDeflateSync,
45+
brotli: compressBrotliSync,
46+
zstd: compressZstdSync,
47+
}[compression];
48+
49+
// Stateless uppercase transform (sync)
50+
const upper = (chunks) => {
51+
if (chunks === null) return null;
52+
const out = new Array(chunks.length);
53+
for (let j = 0; j < chunks.length; j++) {
54+
const src = chunks[j];
55+
const buf = Buffer.allocUnsafe(src.length);
56+
for (let i = 0; i < src.length; i++) {
57+
const b = src[i];
58+
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
59+
}
60+
out[j] = buf;
61+
}
62+
return out;
63+
};
64+
65+
// Use a synchronous wrapper since pipeToSync is fully sync.
66+
// We need FileHandle for pullSync/writer, so open async then run sync.
67+
(async () => {
68+
const srcFh = await open(srcFile, 'r');
69+
const dstFh = await open(dstFile, 'w');
70+
71+
// Warm up
72+
runSync(srcFh, dstFh, upper, compressFactory, pipeToSync);
73+
74+
// Reset file positions for the benchmark
75+
await srcFh.close();
76+
await dstFh.close();
77+
78+
bench.start();
79+
let totalBytes = 0;
80+
for (let i = 0; i < n; i++) {
81+
const src = await open(srcFile, 'r');
82+
const dst = await open(dstFile, 'w');
83+
totalBytes += runSync(src, dst, upper, compressFactory, pipeToSync);
84+
await src.close();
85+
await dst.close();
86+
}
87+
bench.end(totalBytes / (1024 * 1024));
88+
89+
cleanup();
90+
})();
91+
}
92+
93+
function runSync(srcFh, dstFh, upper, compressFactory, pipeToSync) {
94+
const w = dstFh.writer();
95+
pipeToSync(srcFh.pullSync(upper, compressFactory()), w);
96+
97+
// Read back compressed size
98+
return readFileSync(dstFile).length;
99+
}
100+
101+
function cleanup() {
102+
try { unlinkSync(srcFile); } catch { /* Ignore */ }
103+
try { unlinkSync(dstFile); } catch { /* Ignore */ }
104+
}

benchmark/fs/bench-filehandle-pull-vs-webstream.js

Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Flags: --experimental-stream-iter
22
// Compare FileHandle.createReadStream() vs readableWebStream() vs pull()
3-
// reading a large file through two transforms: uppercase then gzip compress.
3+
// reading a large file through two transforms: uppercase then compress.
44
'use strict';
55

66
const common = require('../common.js');
@@ -14,11 +14,20 @@ const filename = tmpdir.resolve(`.removeme-benchmark-garbage-${process.pid}`);
1414

1515
const bench = common.createBenchmark(main, {
1616
api: ['classic', 'webstream', 'pull'],
17+
compression: ['gzip', 'deflate', 'brotli', 'zstd'],
1718
filesize: [1024 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024],
1819
n: [5],
20+
}, {
21+
// Classic and webstream only support gzip (native zlib / CompressionStream).
22+
// Brotli, deflate, zstd are pull-only via stream/iter transforms.
23+
combinationFilter({ api, compression }) {
24+
if (api === 'classic' && compression !== 'gzip') return false;
25+
if (api === 'webstream' && compression !== 'gzip') return false;
26+
return true;
27+
},
1928
});
2029

21-
function main({ api, filesize, n }) {
30+
function main({ api, compression, filesize, n }) {
2231
// Create the fixture file with repeating lowercase ASCII
2332
const chunk = Buffer.alloc(Math.min(filesize, 64 * 1024), 'abcdefghij');
2433
const fd = fs.openSync(filename, 'w');
@@ -35,19 +44,28 @@ function main({ api, filesize, n }) {
3544
} else if (api === 'webstream') {
3645
benchWebStream(n, filesize).then(() => cleanup());
3746
} else {
38-
benchPull(n, filesize).then(() => cleanup());
47+
benchPull(n, filesize, compression).then(() => cleanup());
3948
}
4049
}
4150

4251
function cleanup() {
4352
try { fs.unlinkSync(filename); } catch { /* ignore */ }
4453
}
4554

55+
// Stateless uppercase transform (shared by all paths)
56+
function uppercaseChunk(chunk) {
57+
const buf = Buffer.allocUnsafe(chunk.length);
58+
for (let i = 0; i < chunk.length; i++) {
59+
const b = chunk[i];
60+
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
61+
}
62+
return buf;
63+
}
64+
4665
// ---------------------------------------------------------------------------
4766
// Classic streams path: createReadStream -> Transform (upper) -> createGzip
4867
// ---------------------------------------------------------------------------
4968
async function benchClassic(n, filesize) {
50-
// Warm up
5169
await runClassic();
5270

5371
bench.start();
@@ -62,22 +80,14 @@ function runClassic() {
6280
return new Promise((resolve, reject) => {
6381
const rs = fs.createReadStream(filename);
6482

65-
// Transform 1: uppercase
6683
const upper = new Transform({
6784
transform(chunk, encoding, callback) {
68-
const buf = Buffer.allocUnsafe(chunk.length);
69-
for (let i = 0; i < chunk.length; i++) {
70-
const b = chunk[i];
71-
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
72-
}
73-
callback(null, buf);
85+
callback(null, uppercaseChunk(chunk));
7486
},
7587
});
7688

77-
// Transform 2: gzip
7889
const gz = zlib.createGzip();
7990

80-
// Sink: count compressed bytes
8191
let totalBytes = 0;
8292
const sink = new Writable({
8393
write(chunk, encoding, callback) {
@@ -97,7 +107,6 @@ function runClassic() {
97107
// WebStream path: readableWebStream -> TransformStream (upper) -> CompressionStream
98108
// ---------------------------------------------------------------------------
99109
async function benchWebStream(n, filesize) {
100-
// Warm up
101110
await runWebStream();
102111

103112
bench.start();
@@ -113,22 +122,18 @@ async function runWebStream() {
113122
try {
114123
const rs = fh.readableWebStream();
115124

116-
// Transform 1: uppercase
117125
const upper = new TransformStream({
118126
transform(chunk, controller) {
119127
const buf = new Uint8Array(chunk.length);
120128
for (let i = 0; i < chunk.length; i++) {
121129
const b = chunk[i];
122-
// a-z (0x61-0x7a) -> A-Z (0x41-0x5a)
123130
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
124131
}
125132
controller.enqueue(buf);
126133
},
127134
});
128135

129-
// Transform 2: gzip via CompressionStream
130136
const compress = new CompressionStream('gzip');
131-
132137
const output = rs.pipeThrough(upper).pipeThrough(compress);
133138
const reader = output.getReader();
134139

@@ -145,45 +150,44 @@ async function runWebStream() {
145150
}
146151

147152
// ---------------------------------------------------------------------------
148-
// New streams path: pull() with uppercase transform + gzip transform
153+
// Pull/iter path: pull() with uppercase transform + selected compression
149154
// ---------------------------------------------------------------------------
150-
async function benchPull(n, filesize) {
151-
const { pull, compressGzip } = require('stream/iter');
155+
async function benchPull(n, filesize, compression) {
156+
const iter = require('stream/iter');
157+
158+
const compressFactory = {
159+
gzip: iter.compressGzip,
160+
deflate: iter.compressDeflate,
161+
brotli: iter.compressBrotli,
162+
zstd: iter.compressZstd,
163+
}[compression];
152164

153165
// Warm up
154-
await runPull(pull, compressGzip);
166+
await runPull(compressFactory);
155167

156168
bench.start();
157169
let totalBytes = 0;
158170
for (let i = 0; i < n; i++) {
159-
totalBytes += await runPull(pull, compressGzip);
171+
totalBytes += await runPull(compressFactory);
160172
}
161173
bench.end(totalBytes / (1024 * 1024));
162174
}
163175

164-
async function runPull(pull, compressGzip) {
176+
async function runPull(compressFactory) {
165177
const fh = await fs.promises.open(filename, 'r');
166178
try {
167179
// Stateless transform: uppercase each chunk in the batch
168180
const upper = (chunks) => {
169181
if (chunks === null) return null;
170182
const out = new Array(chunks.length);
171183
for (let j = 0; j < chunks.length; j++) {
172-
const src = chunks[j];
173-
const buf = new Uint8Array(src.length);
174-
for (let i = 0; i < src.length; i++) {
175-
const b = src[i];
176-
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
177-
}
178-
out[j] = buf;
184+
out[j] = uppercaseChunk(chunks[j]);
179185
}
180186
return out;
181187
};
182188

183-
const readable = fh.pull(upper, compressGzip());
189+
const readable = fh.pull(upper, compressFactory());
184190

185-
// Count bytes symmetrically with the classic path (no final
186-
// concatenation into a single buffer).
187191
let totalBytes = 0;
188192
for await (const chunks of readable) {
189193
for (let i = 0; i < chunks.length; i++) {

0 commit comments

Comments
 (0)