Skip to content

Commit 2ce6ce8

Browse files
committed
stream: apply more perf improvements to stream/new
1 parent 43c9e17 commit 2ce6ce8

File tree

2 files changed

+72
-20
lines changed

2 files changed

+72
-20
lines changed

lib/internal/streams/new/broadcast.js

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ const {
5656

5757
const encoder = new TextEncoder();
5858

59+
// Cached resolved promise to avoid allocating a new one on every sync fast-path.
60+
const kResolvedPromise = PromiseResolve();
61+
5962
// Non-exported symbol for internal cancel notification from BroadcastImpl
6063
// to BroadcastWriter. Because this symbol is not exported, external code
6164
// cannot call it.
@@ -388,11 +391,37 @@ class BroadcastWriter {
388391
return this._broadcast._getDesiredSize();
389392
}
390393

391-
async write(chunk, options) {
394+
write(chunk, options) {
395+
// Fast path: no signal, writer open, buffer has space
396+
if (!options?.signal && !this._closed && !this._aborted &&
397+
this._broadcast._canWrite()) {
398+
const converted =
399+
typeof chunk === 'string' ? encoder.encode(chunk) : chunk;
400+
this._broadcast._write([converted]);
401+
this._totalBytes += converted.byteLength;
402+
return kResolvedPromise;
403+
}
392404
return this.writev([chunk], options);
393405
}
394406

395-
async writev(chunks, options) {
407+
writev(chunks, options) {
408+
// Fast path: no signal, writer open, buffer has space
409+
if (!options?.signal && !this._closed && !this._aborted &&
410+
this._broadcast._canWrite()) {
411+
const converted = allUint8Array(chunks) ?
412+
ArrayPrototypeSlice(chunks) :
413+
ArrayPrototypeMap(chunks, (c) =>
414+
(typeof c === 'string' ? encoder.encode(c) : c));
415+
this._broadcast._write(converted);
416+
for (let i = 0; i < converted.length; i++) {
417+
this._totalBytes += converted[i].byteLength;
418+
}
419+
return kResolvedPromise;
420+
}
421+
return this._writevSlow(chunks, options);
422+
}
423+
424+
async _writevSlow(chunks, options) {
396425
const signal = options?.signal;
397426

398427
// Check for pre-aborted signal
@@ -404,9 +433,9 @@ class BroadcastWriter {
404433
throw new ERR_INVALID_STATE('Writer is closed');
405434
}
406435

407-
const converted = allUint8Array(chunks)
408-
? ArrayPrototypeSlice(chunks)
409-
: ArrayPrototypeMap(chunks, (c) =>
436+
const converted = allUint8Array(chunks) ?
437+
ArrayPrototypeSlice(chunks) :
438+
ArrayPrototypeMap(chunks, (c) =>
410439
(typeof c === 'string' ? encoder.encode(c) : c));
411440

412441
if (this._broadcast._write(converted)) {
@@ -447,9 +476,9 @@ class BroadcastWriter {
447476
writevSync(chunks) {
448477
if (this._closed || this._aborted) return false;
449478
if (!this._broadcast._canWrite()) return false;
450-
const converted = allUint8Array(chunks)
451-
? ArrayPrototypeSlice(chunks)
452-
: ArrayPrototypeMap(chunks, (c) =>
479+
const converted = allUint8Array(chunks) ?
480+
ArrayPrototypeSlice(chunks) :
481+
ArrayPrototypeMap(chunks, (c) =>
453482
(typeof c === 'string' ? encoder.encode(c) : c));
454483
if (this._broadcast._write(converted)) {
455484
for (let i = 0; i < converted.length; i++) {
@@ -461,12 +490,12 @@ class BroadcastWriter {
461490
}
462491

463492
// end() is synchronous internally - signal accepted for interface compliance.
464-
async end(options) {
465-
if (this._closed) return this._totalBytes;
493+
end(options) {
494+
if (this._closed) return PromiseResolve(this._totalBytes);
466495
this._closed = true;
467496
this._broadcast._end();
468497
this._resolvePendingDrains(false);
469-
return this._totalBytes;
498+
return PromiseResolve(this._totalBytes);
470499
}
471500

472501
endSync() {
@@ -477,14 +506,15 @@ class BroadcastWriter {
477506
return this._totalBytes;
478507
}
479508

480-
async fail(reason) {
481-
if (this._aborted) return;
509+
fail(reason) {
510+
if (this._aborted) return kResolvedPromise;
482511
this._aborted = true;
483512
this._closed = true;
484513
const error = reason ?? new ERR_INVALID_STATE('Failed');
485514
this._rejectPendingWrites(error);
486515
this._rejectPendingDrains(error);
487516
this._broadcast._abort(error);
517+
return kResolvedPromise;
488518
}
489519

490520
failSync(reason) {

lib/internal/streams/new/push.js

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ const {
3838
pull: pullWithTransforms,
3939
} = require('internal/streams/new/pull');
4040

41+
// Cached resolved promise to avoid allocating a new one on every sync fast-path.
42+
const kResolvedPromise = PromiseResolve();
43+
4144
// =============================================================================
4245
// PushQueue - Internal Queue with Chunk-Based Backpressure
4346
// =============================================================================
@@ -432,12 +435,30 @@ class PushWriter {
432435
return this._queue.desiredSize;
433436
}
434437

435-
async write(chunk, options) {
438+
write(chunk, options) {
439+
if (!options?.signal && this._queue.canWriteSync()) {
440+
const bytes = toUint8Array(chunk);
441+
this._queue.writeSync([bytes]);
442+
return kResolvedPromise;
443+
}
436444
const bytes = toUint8Array(chunk);
437-
await this._queue.writeAsync([bytes], options?.signal);
445+
return this._queue.writeAsync([bytes], options?.signal);
438446
}
439447

440-
async writev(chunks, options) {
448+
writev(chunks, options) {
449+
if (!options?.signal && this._queue.canWriteSync()) {
450+
let bytes;
451+
if (allUint8Array(chunks)) {
452+
bytes = ArrayPrototypeSlice(chunks);
453+
} else {
454+
bytes = [];
455+
for (let i = 0; i < chunks.length; i++) {
456+
ArrayPrototypePush(bytes, toUint8Array(chunks[i]));
457+
}
458+
}
459+
this._queue.writeSync(bytes);
460+
return kResolvedPromise;
461+
}
441462
let bytes;
442463
if (allUint8Array(chunks)) {
443464
bytes = ArrayPrototypeSlice(chunks);
@@ -447,7 +468,7 @@ class PushWriter {
447468
ArrayPrototypePush(bytes, toUint8Array(chunks[i]));
448469
}
449470
}
450-
await this._queue.writeAsync(bytes, options?.signal);
471+
return this._queue.writeAsync(bytes, options?.signal);
451472
}
452473

453474
writeSync(chunk) {
@@ -470,18 +491,19 @@ class PushWriter {
470491
return this._queue.writeSync(bytes);
471492
}
472493

473-
async end(options) {
494+
end(options) {
474495
// end() on PushQueue is synchronous (sets state, resolves pending reads).
475496
// Signal accepted for interface compliance but there is nothing to cancel.
476-
return this._queue.end();
497+
return PromiseResolve(this._queue.end());
477498
}
478499

479500
endSync() {
480501
return this._queue.end();
481502
}
482503

483-
async fail(reason) {
504+
fail(reason) {
484505
this._queue.fail(reason);
506+
return kResolvedPromise;
485507
}
486508

487509
failSync(reason) {

0 commit comments

Comments
 (0)