Skip to content

Commit aeaa1e7

Browse files
committed
stream: apply more perf improvements to stream/new
1 parent 2ce6ce8 commit aeaa1e7

File tree

4 files changed

+205
-60
lines changed

4 files changed

+205
-60
lines changed

lib/internal/streams/new/broadcast.js

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,9 @@
88

99
const {
1010
ArrayIsArray,
11-
ArrayPrototypeIndexOf,
1211
ArrayPrototypeMap,
1312
ArrayPrototypePush,
14-
ArrayPrototypeShift,
1513
ArrayPrototypeSlice,
16-
ArrayPrototypeSplice,
1714
Error,
1815
MathMax,
1916
Promise,
@@ -54,6 +51,10 @@ const {
5451
allUint8Array,
5552
} = require('internal/streams/new/utils');
5653

54+
const {
55+
RingBuffer,
56+
} = require('internal/streams/new/ringbuffer');
57+
5758
const encoder = new TextEncoder();
5859

5960
// Cached resolved promise to avoid allocating a new one on every sync fast-path.
@@ -97,7 +98,7 @@ function parsePushArgs(args) {
9798

9899
class BroadcastImpl {
99100
constructor(options) {
100-
this._buffer = [];
101+
this._buffer = new RingBuffer();
101102
this._bufferStart = 0;
102103
this._consumers = new SafeSet();
103104
this._ended = false;
@@ -157,7 +158,7 @@ class BroadcastImpl {
157158

158159
const bufferIndex = state.cursor - self._bufferStart;
159160
if (bufferIndex < self._buffer.length) {
160-
const chunk = self._buffer[bufferIndex];
161+
const chunk = self._buffer.get(bufferIndex);
161162
state.cursor++;
162163
self._tryTrimBuffer();
163164
return { __proto__: null, done: false, value: chunk };
@@ -245,7 +246,7 @@ class BroadcastImpl {
245246
case 'block':
246247
return false;
247248
case 'drop-oldest':
248-
ArrayPrototypeShift(this._buffer);
249+
this._buffer.shift();
249250
this._bufferStart++;
250251
for (const consumer of this._consumers) {
251252
if (consumer.cursor < this._bufferStart) {
@@ -258,7 +259,7 @@ class BroadcastImpl {
258259
}
259260
}
260261

261-
ArrayPrototypePush(this._buffer, chunk);
262+
this._buffer.push(chunk);
262263
this._notifyConsumers();
263264
return true;
264265
}
@@ -271,7 +272,7 @@ class BroadcastImpl {
271272
if (consumer.resolve) {
272273
const bufferIndex = consumer.cursor - this._bufferStart;
273274
if (bufferIndex < this._buffer.length) {
274-
const chunk = this._buffer[bufferIndex];
275+
const chunk = this._buffer.get(bufferIndex);
275276
consumer.cursor++;
276277
consumer.resolve({ done: false, value: chunk });
277278
} else {
@@ -330,7 +331,7 @@ class BroadcastImpl {
330331
const minCursor = this._getMinCursor();
331332
const trimCount = minCursor - this._bufferStart;
332333
if (trimCount > 0) {
333-
ArrayPrototypeSplice(this._buffer, 0, trimCount);
334+
this._buffer.trimFront(trimCount);
334335
this._bufferStart = minCursor;
335336

336337
if (this._onBufferDrained &&
@@ -345,7 +346,7 @@ class BroadcastImpl {
345346
if (consumer.resolve) {
346347
const bufferIndex = consumer.cursor - this._bufferStart;
347348
if (bufferIndex < this._buffer.length) {
348-
const chunk = this._buffer[bufferIndex];
349+
const chunk = this._buffer.get(bufferIndex);
349350
consumer.cursor++;
350351
const resolve = consumer.resolve;
351352
consumer.resolve = null;
@@ -368,7 +369,7 @@ class BroadcastWriter {
368369
this._totalBytes = 0;
369370
this._closed = false;
370371
this._aborted = false;
371-
this._pendingWrites = [];
372+
this._pendingWrites = new RingBuffer();
372373
this._pendingDrains = [];
373374

374375
this._broadcast._onBufferDrained = () => {
@@ -544,14 +545,14 @@ class BroadcastWriter {
544545
_createPendingWrite(chunk, signal) {
545546
return new Promise((resolve, reject) => {
546547
const entry = { chunk, resolve, reject };
547-
ArrayPrototypePush(this._pendingWrites, entry);
548+
this._pendingWrites.push(entry);
548549

549550
if (!signal) return;
550551

551552
const onAbort = () => {
552553
// Remove from queue so it doesn't occupy a slot
553-
const idx = ArrayPrototypeIndexOf(this._pendingWrites, entry);
554-
if (idx !== -1) ArrayPrototypeSplice(this._pendingWrites, idx, 1);
554+
const idx = this._pendingWrites.indexOf(entry);
555+
if (idx !== -1) this._pendingWrites.removeAt(idx);
555556
reject(signal.reason ?? lazyDOMException('Aborted', 'AbortError'));
556557
};
557558

@@ -573,7 +574,7 @@ class BroadcastWriter {
573574

574575
_resolvePendingWrites() {
575576
while (this._pendingWrites.length > 0 && this._broadcast._canWrite()) {
576-
const pending = ArrayPrototypeShift(this._pendingWrites);
577+
const pending = this._pendingWrites.shift();
577578
if (this._broadcast._write(pending.chunk)) {
578579
for (let i = 0; i < pending.chunk.length; i++) {
579580
this._totalBytes += pending.chunk[i].byteLength;
@@ -587,10 +588,8 @@ class BroadcastWriter {
587588
}
588589

589590
_rejectPendingWrites(error) {
590-
const writes = this._pendingWrites;
591-
this._pendingWrites = [];
592-
for (let i = 0; i < writes.length; i++) {
593-
writes[i].reject(error);
591+
while (this._pendingWrites.length > 0) {
592+
this._pendingWrites.shift().reject(error);
594593
}
595594
}
596595

lib/internal/streams/new/push.js

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,8 @@
66
// with built-in backpressure.
77

88
const {
9-
ArrayPrototypeIndexOf,
109
ArrayPrototypePush,
11-
ArrayPrototypeShift,
1210
ArrayPrototypeSlice,
13-
ArrayPrototypeSplice,
1411
Error,
1512
MathMax,
1613
Promise,
@@ -38,6 +35,10 @@ const {
3835
pull: pullWithTransforms,
3936
} = require('internal/streams/new/pull');
4037

38+
const {
39+
RingBuffer,
40+
} = require('internal/streams/new/ringbuffer');
41+
4142
// Cached resolved promise to avoid allocating a new one on every sync fast-path.
4243
const kResolvedPromise = PromiseResolve();
4344

@@ -48,11 +49,11 @@ const kResolvedPromise = PromiseResolve();
4849
class PushQueue {
4950
constructor(options = {}) {
5051
/** Buffered chunks (each slot is from one write/writev call) */
51-
this._slots = [];
52+
this._slots = new RingBuffer();
5253
/** Pending writes waiting for buffer space */
53-
this._pendingWrites = [];
54+
this._pendingWrites = new RingBuffer();
5455
/** Pending reads waiting for data */
55-
this._pendingReads = [];
56+
this._pendingReads = new RingBuffer();
5657
/** Pending drains waiting for backpressure to clear */
5758
this._pendingDrains = [];
5859
/** Writer state: 'open' | 'closed' | 'errored' */
@@ -134,7 +135,7 @@ class PushQueue {
134135
return false;
135136
case 'drop-oldest':
136137
if (this._slots.length > 0) {
137-
ArrayPrototypeShift(this._slots);
138+
this._slots.shift();
138139
}
139140
break;
140141
case 'drop-newest':
@@ -146,7 +147,7 @@ class PushQueue {
146147
}
147148
}
148149

149-
ArrayPrototypePush(this._slots, chunks);
150+
this._slots.push(chunks);
150151
for (let i = 0; i < chunks.length; i++) {
151152
this._bytesWritten += chunks[i].byteLength;
152153
}
@@ -209,14 +210,14 @@ class PushQueue {
209210
_createPendingWrite(chunks, signal) {
210211
return new Promise((resolve, reject) => {
211212
const entry = { chunks, resolve, reject };
212-
ArrayPrototypePush(this._pendingWrites, entry);
213+
this._pendingWrites.push(entry);
213214

214215
if (!signal) return;
215216

216217
const onAbort = () => {
217218
// Remove from queue so it doesn't occupy a slot
218-
const idx = ArrayPrototypeIndexOf(this._pendingWrites, entry);
219-
if (idx !== -1) ArrayPrototypeSplice(this._pendingWrites, idx, 1);
219+
const idx = this._pendingWrites.indexOf(entry);
220+
if (idx !== -1) this._pendingWrites.removeAt(idx);
220221
reject(signal.reason ?? lazyDOMException('Aborted', 'AbortError'));
221222
};
222223

@@ -301,7 +302,7 @@ class PushQueue {
301302
}
302303

303304
return new Promise((resolve, reject) => {
304-
ArrayPrototypePush(this._pendingReads, { resolve, reject });
305+
this._pendingReads.push({ resolve, reject });
305306
});
306307
}
307308

@@ -331,27 +332,27 @@ class PushQueue {
331332
_drain() {
332333
const result = [];
333334
for (let i = 0; i < this._slots.length; i++) {
334-
const slot = this._slots[i];
335+
const slot = this._slots.get(i);
335336
for (let j = 0; j < slot.length; j++) {
336337
ArrayPrototypePush(result, slot[j]);
337338
}
338339
}
339-
this._slots = [];
340+
this._slots.clear();
340341
return result;
341342
}
342343

343344
_resolvePendingReads() {
344345
while (this._pendingReads.length > 0) {
345346
if (this._slots.length > 0) {
346-
const pending = ArrayPrototypeShift(this._pendingReads);
347+
const pending = this._pendingReads.shift();
347348
const result = this._drain();
348349
this._resolvePendingWrites();
349350
pending.resolve({ value: result, done: false });
350351
} else if (this._writerState === 'closed') {
351-
const pending = ArrayPrototypeShift(this._pendingReads);
352+
const pending = this._pendingReads.shift();
352353
pending.resolve({ value: undefined, done: true });
353354
} else if (this._writerState === 'errored' && this._error) {
354-
const pending = ArrayPrototypeShift(this._pendingReads);
355+
const pending = this._pendingReads.shift();
355356
pending.reject(this._error);
356357
} else {
357358
break;
@@ -362,8 +363,8 @@ class PushQueue {
362363
_resolvePendingWrites() {
363364
while (this._pendingWrites.length > 0 &&
364365
this._slots.length < this._highWaterMark) {
365-
const pending = ArrayPrototypeShift(this._pendingWrites);
366-
ArrayPrototypePush(this._slots, pending.chunks);
366+
const pending = this._pendingWrites.shift();
367+
this._slots.push(pending.chunks);
367368
for (let i = 0; i < pending.chunks.length; i++) {
368369
this._bytesWritten += pending.chunks[i].byteLength;
369370
}
@@ -392,18 +393,14 @@ class PushQueue {
392393
}
393394

394395
_rejectPendingReads(error) {
395-
const reads = this._pendingReads;
396-
this._pendingReads = [];
397-
for (let i = 0; i < reads.length; i++) {
398-
reads[i].reject(error);
396+
while (this._pendingReads.length > 0) {
397+
this._pendingReads.shift().reject(error);
399398
}
400399
}
401400

402401
_rejectPendingWrites(error) {
403-
const writes = this._pendingWrites;
404-
this._pendingWrites = [];
405-
for (let i = 0; i < writes.length; i++) {
406-
writes[i].reject(error);
402+
while (this._pendingWrites.length > 0) {
403+
this._pendingWrites.shift().reject(error);
407404
}
408405
}
409406

0 commit comments

Comments
 (0)