Skip to content

Commit 2b6ce13

Browse files
authored
stream: minor stream/iter implementation edits
Signed-off-by: Renegade334 <contact.9a5d6388@renegade334.me.uk> PR-URL: #63132 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent 11740ef commit 2b6ce13

5 files changed

Lines changed: 63 additions & 47 deletions

File tree

lib/internal/streams/iter/classic.js

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
const {
1515
ArrayIsArray,
16+
ArrayPrototypePush,
1617
MathMax,
1718
NumberMAX_SAFE_INTEGER,
1819
Promise,
@@ -107,14 +108,14 @@ async function normalizeBatch(raw) {
107108
for (let i = 0; i < raw.length; i++) {
108109
const value = raw[i];
109110
if (isUint8Array(value)) {
110-
batch.push(value);
111+
ArrayPrototypePush(batch, value);
111112
} else {
112113
// normalizeAsyncValue may await for async protocols (e.g.
113114
// toAsyncStreamable on yielded objects). Stream events during
114115
// the suspension are queued, not lost -- errors will surface
115116
// on the next loop iteration after this yield completes.
116117
for await (const normalized of normalizeAsyncValue(value)) {
117-
batch.push(normalized);
118+
ArrayPrototypePush(batch, normalized);
118119
}
119120
}
120121
}
@@ -163,7 +164,7 @@ async function* createBatchedAsyncIterator(stream, normalize) {
163164
stream._readableState?.length > 0) {
164165
const c = stream.read();
165166
if (c === null) break;
166-
batch.push(c);
167+
ArrayPrototypePush(batch, c);
167168
}
168169
if (normalize !== null) {
169170
const result = await normalize(batch);
@@ -495,7 +496,7 @@ function fromWritable(writable, options = kNullPrototype) {
495496

496497
function waitForDrain() {
497498
const { promise, resolve, reject } = PromiseWithResolvers();
498-
waiters.push({ __proto__: null, resolve, reject });
499+
ArrayPrototypePush(waiters, { __proto__: null, resolve, reject });
499500
installListeners();
500501
return promise;
501502
}
@@ -686,7 +687,7 @@ function fromWritable(writable, options = kNullPrototype) {
686687
return PromiseResolve(true);
687688
}
688689
const { promise, resolve } = PromiseWithResolvers();
689-
waiters.push({
690+
ArrayPrototypePush(waiters, {
690691
__proto__: null,
691692
resolve() { resolve(true); },
692693
reject() { resolve(false); },

lib/internal/streams/iter/consumers.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ const {
1212
ArrayBufferPrototypeSlice,
1313
ArrayPrototypeMap,
1414
ArrayPrototypePush,
15+
ArrayPrototypeShift,
1516
ArrayPrototypeSlice,
1617
Promise,
1718
PromisePrototypeThen,
@@ -477,7 +478,7 @@ function merge(...args) {
477478

478479
// Drain ready queue synchronously
479480
while (ready.length > 0) {
480-
const item = ready.shift();
481+
const item = ArrayPrototypeShift(ready);
481482
if (item?.error) {
482483
throw item.error;
483484
}

lib/internal/streams/iter/from.js

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ const {
3131

3232
const {
3333
isAnyArrayBuffer,
34-
isDataView,
3534
isPromise,
35+
isTypedArray,
3636
isUint8Array,
3737
} = require('internal/util/types');
3838

@@ -106,17 +106,21 @@ function primitiveToUint8Array(chunk) {
106106
return chunk;
107107
}
108108
// Other ArrayBufferView types (Int8Array, DataView, etc.)
109-
if (isDataView(chunk)) {
109+
return arrayBufferViewToUint8Array(chunk);
110+
}
111+
112+
function arrayBufferViewToUint8Array(chunk) {
113+
if (isTypedArray(chunk)) {
110114
return new Uint8Array(
111-
DataViewPrototypeGetBuffer(chunk),
112-
DataViewPrototypeGetByteOffset(chunk),
113-
DataViewPrototypeGetByteLength(chunk),
115+
TypedArrayPrototypeGetBuffer(chunk),
116+
TypedArrayPrototypeGetByteOffset(chunk),
117+
TypedArrayPrototypeGetByteLength(chunk),
114118
);
115119
}
116120
return new Uint8Array(
117-
TypedArrayPrototypeGetBuffer(chunk),
118-
TypedArrayPrototypeGetByteOffset(chunk),
119-
TypedArrayPrototypeGetByteLength(chunk),
121+
DataViewPrototypeGetBuffer(chunk),
122+
DataViewPrototypeGetByteOffset(chunk),
123+
DataViewPrototypeGetByteLength(chunk),
120124
);
121125
}
122126

@@ -580,6 +584,7 @@ function from(input) {
580584
// =============================================================================
581585

582586
module.exports = {
587+
arrayBufferViewToUint8Array,
583588
from,
584589
fromSync,
585590
isAsyncIterable,

lib/internal/streams/iter/pull.js

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ const {
3333
const { AbortController } = require('internal/abort_controller');
3434

3535
const {
36+
arrayBufferViewToUint8Array,
3637
from,
3738
fromSync,
38-
primitiveToUint8Array,
3939
isSyncIterable,
4040
isAsyncIterable,
4141
isUint8ArrayBatch,
@@ -136,7 +136,7 @@ function* flattenTransformYieldSync(value) {
136136
return;
137137
}
138138
if (ArrayBufferIsView(value)) {
139-
yield primitiveToUint8Array(value);
139+
yield arrayBufferViewToUint8Array(value);
140140
return;
141141
}
142142
// Must be Iterable<TransformYield>
@@ -170,7 +170,7 @@ async function* flattenTransformYieldAsync(value) {
170170
return;
171171
}
172172
if (ArrayBufferIsView(value)) {
173-
yield primitiveToUint8Array(value);
173+
yield arrayBufferViewToUint8Array(value);
174174
return;
175175
}
176176
// Check for async iterable first
@@ -180,10 +180,10 @@ async function* flattenTransformYieldAsync(value) {
180180
}
181181
return;
182182
}
183-
// Must be sync Iterable<TransformYield>
183+
// Must be sync Iterable<TransformYield>, no nested async iterables
184184
if (isSyncIterable(value)) {
185185
for (const item of value) {
186-
yield* flattenTransformYieldAsync(item);
186+
yield* flattenTransformYieldSync(item);
187187
}
188188
return;
189189
}
@@ -218,7 +218,7 @@ function* processTransformResultSync(result) {
218218
return;
219219
}
220220
if (ArrayBufferIsView(result)) {
221-
yield [primitiveToUint8Array(result)];
221+
yield [arrayBufferViewToUint8Array(result)];
222222
return;
223223
}
224224
// Uint8Array[] batch
@@ -278,7 +278,7 @@ async function* processTransformResultAsync(result) {
278278
return;
279279
}
280280
if (ArrayBufferIsView(result)) {
281-
yield [primitiveToUint8Array(result)];
281+
yield [arrayBufferViewToUint8Array(result)];
282282
return;
283283
}
284284
// Uint8Array[] batch
@@ -313,7 +313,9 @@ async function* processTransformResultAsync(result) {
313313
ArrayPrototypePush(batch, item);
314314
continue;
315315
}
316-
for await (const chunk of flattenTransformYieldAsync(item)) {
316+
// Note: This iteration is synchronous, since async iterables
317+
// may not be nested within sync iterables.
318+
for (const chunk of flattenTransformYieldSync(item)) {
317319
ArrayPrototypePush(batch, chunk);
318320
}
319321
}
@@ -366,7 +368,7 @@ function* applyFusedStatelessSyncTransforms(source, run) {
366368
} else if (isAnyArrayBuffer(current)) {
367369
yield [new Uint8Array(current)];
368370
} else if (ArrayBufferIsView(current)) {
369-
yield [primitiveToUint8Array(current)];
371+
yield [arrayBufferViewToUint8Array(current)];
370372
} else {
371373
yield* processTransformResultSync(current);
372374
}
@@ -428,7 +430,7 @@ function* createSyncPipeline(source, transforms) {
428430
}
429431
current = applyStatefulSyncTransform(current, transform.transform);
430432
} else {
431-
statelessRun.push(transform);
433+
ArrayPrototypePush(statelessRun, transform);
432434
}
433435
}
434436
if (statelessRun.length > 0) {
@@ -490,7 +492,7 @@ async function* applyFusedStatelessAsyncTransforms(source, run, signal) {
490492
} else if (isAnyArrayBuffer(current)) {
491493
yield [new Uint8Array(current)];
492494
} else if (ArrayBufferIsView(current)) {
493-
yield [primitiveToUint8Array(current)];
495+
yield [arrayBufferViewToUint8Array(current)];
494496
} else {
495497
yield* processTransformResultAsync(current);
496498
}
@@ -531,9 +533,7 @@ async function* applyFusedStatelessAsyncTransforms(source, run, signal) {
531533
* @yields {Uint8Array[]}
532534
*/
533535
async function* withFlushAsync(source) {
534-
for await (const batch of source) {
535-
yield batch;
536-
}
536+
yield* source;
537537
yield null;
538538
}
539539

@@ -647,7 +647,7 @@ async function* createAsyncPipeline(source, transforms, signal) {
647647
current, transform.transform, opts);
648648
}
649649
} else {
650-
statelessRun.push(transform);
650+
ArrayPrototypePush(statelessRun, transform);
651651
}
652652
}
653653
// Flush remaining stateless run

lib/internal/streams/iter/utils.js

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ const {
1212
TypedArrayPrototypeGetBuffer,
1313
TypedArrayPrototypeGetByteLength,
1414
TypedArrayPrototypeGetByteOffset,
15+
TypedArrayPrototypeSet,
1516
Uint8Array,
1617
} = primordials;
1718

@@ -24,8 +25,6 @@ const {
2425
} = require('internal/errors');
2526
const { isError } = require('internal/util');
2627

27-
const { Buffer } = require('buffer');
28-
2928
const { isSharedArrayBuffer, isUint8Array } = require('internal/util/types');
3029

3130
const { validateOneOf } = require('internal/validators');
@@ -127,26 +126,36 @@ function concatBytes(chunks) {
127126
if (chunks.length === 0) {
128127
return new Uint8Array(0);
129128
}
130-
// Single chunk: return directly if it covers the entire backing buffer
129+
// Single chunk: return directly if it covers the entire backing buffer,
130+
// otherwise return a copy
131131
if (chunks.length === 1) {
132132
const chunk = chunks[0];
133-
const buf = TypedArrayPrototypeGetBuffer(chunk);
134-
// SharedArrayBuffer is not available in primordials, so use
135-
// direct property access for its byteLength.
136-
const bufByteLength = isSharedArrayBuffer(buf) ?
137-
buf.byteLength :
138-
ArrayBufferPrototypeGetByteLength(buf);
139-
if (TypedArrayPrototypeGetByteOffset(chunk) === 0 &&
140-
TypedArrayPrototypeGetByteLength(chunk) === bufByteLength) {
141-
return chunk;
133+
// If non-zero offset, skip the remaining buffer checks.
134+
if (TypedArrayPrototypeGetByteOffset(chunk) === 0) {
135+
const buf = TypedArrayPrototypeGetBuffer(chunk);
136+
// SharedArrayBuffer is not available in primordials, so use
137+
// direct property access for its byteLength.
138+
const bufByteLength = isSharedArrayBuffer(buf) ?
139+
buf.byteLength :
140+
ArrayBufferPrototypeGetByteLength(buf);
141+
if (TypedArrayPrototypeGetByteLength(chunk) === bufByteLength) {
142+
return chunk;
143+
}
142144
}
145+
return new Uint8Array(chunk);
146+
}
147+
// Multiple chunks: concatenate
148+
let totalByteLength = 0;
149+
for (let i = 0; i < chunks.length; i++) {
150+
totalByteLength += TypedArrayPrototypeGetByteLength(chunks[i]);
151+
}
152+
const concatenated = new Uint8Array(totalByteLength);
153+
let offset = 0;
154+
for (let i = 0; i < chunks.length; i++) {
155+
TypedArrayPrototypeSet(concatenated, chunks[i], offset);
156+
offset += TypedArrayPrototypeGetByteLength(chunks[i]);
143157
}
144-
// Multiple chunks or shared buffer: concatenate
145-
const buf = Buffer.concat(chunks);
146-
return new Uint8Array(
147-
TypedArrayPrototypeGetBuffer(buf),
148-
TypedArrayPrototypeGetByteOffset(buf),
149-
TypedArrayPrototypeGetByteLength(buf));
158+
return concatenated;
150159
}
151160

152161
/**

0 commit comments

Comments
 (0)