Skip to content

Commit ef1f82b

Browse files
committed
stream: add sab support to stream/iter
1 parent ff9bd33 commit ef1f82b

File tree

5 files changed

+226
-14
lines changed

5 files changed

+226
-14
lines changed

lib/internal/streams/iter/consumers.js

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ const {
5252
drainableProtocol,
5353
} = require('internal/streams/iter/types');
5454

55+
const {
56+
isSharedArrayBuffer,
57+
} = require('internal/util/types');
58+
5559
// =============================================================================
5660
// Type Guards
5761
// =============================================================================
@@ -143,13 +147,22 @@ async function collectAsync(source, signal, limit) {
143147

144148
/**
145149
* Convert a Uint8Array to its backing ArrayBuffer, slicing if necessary.
150+
* Handles both ArrayBuffer and SharedArrayBuffer backing stores.
146151
* @param {Uint8Array} data
147-
* @returns {ArrayBuffer}
152+
* @returns {ArrayBuffer|SharedArrayBuffer}
148153
*/
149154
function toArrayBuffer(data) {
150155
const byteOffset = TypedArrayPrototypeGetByteOffset(data);
151156
const byteLength = TypedArrayPrototypeGetByteLength(data);
152157
const buffer = TypedArrayPrototypeGetBuffer(data);
158+
// SharedArrayBuffer is not available in primordials, so use
159+
// direct property access for its byteLength and slice.
160+
if (isSharedArrayBuffer(buffer)) {
161+
if (byteOffset === 0 && byteLength === buffer.byteLength) {
162+
return buffer;
163+
}
164+
return buffer.slice(byteOffset, byteOffset + byteLength);
165+
}
153166
if (byteOffset === 0 &&
154167
byteLength === ArrayBufferPrototypeGetByteLength(buffer)) {
155168
return buffer;

lib/internal/streams/iter/from.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const {
3030
} = require('internal/errors');
3131

3232
const {
33-
isArrayBuffer,
33+
isAnyArrayBuffer,
3434
isDataView,
3535
isPromise,
3636
isUint8Array,
@@ -60,7 +60,7 @@ const FROM_BATCH_SIZE = 128;
6060
* @returns {boolean}
6161
*/
6262
function isPrimitiveChunk(value) {
63-
return typeof value === 'string' || isArrayBuffer(value) || ArrayBufferIsView(value);
63+
return typeof value === 'string' || isAnyArrayBuffer(value) || ArrayBufferIsView(value);
6464
}
6565

6666
/**
@@ -98,7 +98,7 @@ function primitiveToUint8Array(chunk) {
9898
if (typeof chunk === 'string') {
9999
return toUint8Array(chunk);
100100
}
101-
if (isArrayBuffer(chunk)) {
101+
if (isAnyArrayBuffer(chunk)) {
102102
return new Uint8Array(chunk);
103103
}
104104
if (isUint8Array(chunk)) {

lib/internal/streams/iter/pull.js

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ const {
2525
const { lazyDOMException } = require('internal/util');
2626
const { validateAbortSignal } = require('internal/validators');
2727
const {
28-
isArrayBuffer,
28+
isAnyArrayBuffer,
2929
isPromise,
3030
isUint8Array,
3131
} = require('internal/util/types');
@@ -126,7 +126,7 @@ function* flattenTransformYieldSync(value) {
126126
yield toUint8Array(value);
127127
return;
128128
}
129-
if (isArrayBuffer(value)) {
129+
if (isAnyArrayBuffer(value)) {
130130
yield new Uint8Array(value);
131131
return;
132132
}
@@ -160,7 +160,7 @@ async function* flattenTransformYieldAsync(value) {
160160
yield toUint8Array(value);
161161
return;
162162
}
163-
if (isArrayBuffer(value)) {
163+
if (isAnyArrayBuffer(value)) {
164164
yield new Uint8Array(value);
165165
return;
166166
}
@@ -208,7 +208,7 @@ function* processTransformResultSync(result) {
208208
return;
209209
}
210210
// ArrayBuffer / ArrayBufferView -> convert and wrap
211-
if (isArrayBuffer(result)) {
211+
if (isAnyArrayBuffer(result)) {
212212
yield [new Uint8Array(result)];
213213
return;
214214
}
@@ -268,7 +268,7 @@ async function* processTransformResultAsync(result) {
268268
return;
269269
}
270270
// ArrayBuffer / ArrayBufferView -> convert and wrap
271-
if (isArrayBuffer(result)) {
271+
if (isAnyArrayBuffer(result)) {
272272
yield [new Uint8Array(result)];
273273
return;
274274
}
@@ -469,7 +469,7 @@ async function* applyFusedStatelessAsyncTransforms(source, run, signal) {
469469
yield [current];
470470
} else if (typeof current === 'string') {
471471
yield [toUint8Array(current)];
472-
} else if (isArrayBuffer(current)) {
472+
} else if (isAnyArrayBuffer(current)) {
473473
yield [new Uint8Array(current)];
474474
} else if (ArrayBufferIsView(current)) {
475475
yield [primitiveToUint8Array(current)];

lib/internal/streams/iter/utils.js

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ const { isError } = require('internal/util');
2626

2727
const { Buffer } = require('buffer');
2828

29-
const { isUint8Array } = require('internal/util/types');
29+
const { isSharedArrayBuffer, isUint8Array } = require('internal/util/types');
3030

3131
const { validateOneOf } = require('internal/validators');
3232

@@ -127,13 +127,17 @@ function concatBytes(chunks) {
127127
if (chunks.length === 0) {
128128
return new Uint8Array(0);
129129
}
130-
// Single chunk: return directly if buffer is not shared
130+
// Single chunk: return directly if it covers the entire backing buffer
131131
if (chunks.length === 1) {
132132
const chunk = chunks[0];
133133
const buf = TypedArrayPrototypeGetBuffer(chunk);
134+
// SharedArrayBuffer is not available in primordials, so use
135+
// direct property access for its byteLength.
136+
const bufByteLength = isSharedArrayBuffer(buf) ?
137+
buf.byteLength :
138+
ArrayBufferPrototypeGetByteLength(buf);
134139
if (TypedArrayPrototypeGetByteOffset(chunk) === 0 &&
135-
TypedArrayPrototypeGetByteLength(chunk) ===
136-
ArrayBufferPrototypeGetByteLength(buf)) {
140+
TypedArrayPrototypeGetByteLength(chunk) === bufByteLength) {
137141
return chunk;
138142
}
139143
}
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
// Flags: --experimental-stream-iter
2+
'use strict';
3+
4+
const common = require('../common');
5+
const assert = require('assert');
6+
const {
7+
from,
8+
fromSync,
9+
bytes,
10+
bytesSync,
11+
text,
12+
textSync,
13+
arrayBuffer,
14+
arrayBufferSync,
15+
pipeTo,
16+
pipeToSync,
17+
pull,
18+
pullSync,
19+
} = require('stream/iter');
20+
21+
// =============================================================================
22+
// from() / fromSync() with SharedArrayBuffer
23+
// =============================================================================
24+
25+
function testFromSyncSAB() {
26+
const sab = new SharedArrayBuffer(4);
27+
new Uint8Array(sab).set([10, 20, 30, 40]);
28+
const batches = [];
29+
for (const batch of fromSync(sab)) {
30+
batches.push(batch);
31+
}
32+
assert.strictEqual(batches.length, 1);
33+
assert.deepStrictEqual(batches[0][0], new Uint8Array([10, 20, 30, 40]));
34+
}
35+
36+
async function testFromAsyncSAB() {
37+
const sab = new SharedArrayBuffer(4);
38+
new Uint8Array(sab).set([10, 20, 30, 40]);
39+
const batches = [];
40+
for await (const batch of from(sab)) {
41+
batches.push(batch);
42+
}
43+
assert.strictEqual(batches.length, 1);
44+
assert.deepStrictEqual(batches[0][0], new Uint8Array([10, 20, 30, 40]));
45+
}
46+
47+
// =============================================================================
48+
// Consumers with SAB-backed source
49+
// =============================================================================
50+
51+
function testBytesSyncSAB() {
52+
const sab = new SharedArrayBuffer(5);
53+
new Uint8Array(sab).set([104, 101, 108, 108, 111]); // 'hello'
54+
const data = bytesSync(fromSync(sab));
55+
assert.deepStrictEqual(data, new Uint8Array([104, 101, 108, 108, 111]));
56+
}
57+
58+
async function testBytesAsyncSAB() {
59+
const sab = new SharedArrayBuffer(5);
60+
new Uint8Array(sab).set([104, 101, 108, 108, 111]); // 'hello'
61+
const data = await bytes(from(sab));
62+
assert.deepStrictEqual(data, new Uint8Array([104, 101, 108, 108, 111]));
63+
}
64+
65+
function testTextSyncSAB() {
66+
const sab = new SharedArrayBuffer(5);
67+
new Uint8Array(sab).set([104, 101, 108, 108, 111]); // 'hello'
68+
const result = textSync(fromSync(sab));
69+
assert.strictEqual(result, 'hello');
70+
}
71+
72+
async function testTextAsyncSAB() {
73+
const sab = new SharedArrayBuffer(5);
74+
new Uint8Array(sab).set([104, 101, 108, 108, 111]); // 'hello'
75+
const result = await text(from(sab));
76+
assert.strictEqual(result, 'hello');
77+
}
78+
79+
function testArrayBufferSyncSAB() {
80+
const sab = new SharedArrayBuffer(4);
81+
new Uint8Array(sab).set([1, 2, 3, 4]);
82+
const result = arrayBufferSync(fromSync(sab));
83+
assert.ok(result instanceof SharedArrayBuffer || result instanceof ArrayBuffer);
84+
assert.deepStrictEqual(new Uint8Array(result), new Uint8Array([1, 2, 3, 4]));
85+
}
86+
87+
async function testArrayBufferAsyncSAB() {
88+
const sab = new SharedArrayBuffer(4);
89+
new Uint8Array(sab).set([1, 2, 3, 4]);
90+
const result = await arrayBuffer(from(sab));
91+
assert.ok(result instanceof SharedArrayBuffer || result instanceof ArrayBuffer);
92+
assert.deepStrictEqual(new Uint8Array(result), new Uint8Array([1, 2, 3, 4]));
93+
}
94+
95+
// =============================================================================
96+
// pipeTo / pipeToSync with SAB source
97+
// =============================================================================
98+
99+
function testPipeToSyncSAB() {
100+
const sab = new SharedArrayBuffer(3);
101+
new Uint8Array(sab).set([65, 66, 67]); // 'ABC'
102+
const written = [];
103+
const writer = {
104+
writeSync(chunk) { written.push(chunk); return true; },
105+
endSync() { return written.length; },
106+
};
107+
const totalBytes = pipeToSync(fromSync(sab), writer);
108+
assert.strictEqual(totalBytes, 3);
109+
assert.deepStrictEqual(written[0], new Uint8Array([65, 66, 67]));
110+
}
111+
112+
async function testPipeToAsyncSAB() {
113+
const sab = new SharedArrayBuffer(3);
114+
new Uint8Array(sab).set([65, 66, 67]); // 'ABC'
115+
const written = [];
116+
const writer = {
117+
async write(chunk) { written.push(chunk); },
118+
async end() { return written.length; },
119+
};
120+
await pipeTo(from(sab), writer);
121+
assert.deepStrictEqual(written[0], new Uint8Array([65, 66, 67]));
122+
}
123+
124+
// =============================================================================
125+
// pull / pullSync with SAB-yielding generator
126+
// =============================================================================
127+
128+
function testPullSyncSABChunks() {
129+
function* gen() {
130+
const sab = new SharedArrayBuffer(2);
131+
new Uint8Array(sab).set([1, 2]);
132+
yield [new Uint8Array(sab)];
133+
}
134+
const batches = [];
135+
for (const batch of pullSync(gen())) {
136+
batches.push(batch);
137+
}
138+
assert.strictEqual(batches.length, 1);
139+
assert.deepStrictEqual(batches[0][0], new Uint8Array([1, 2]));
140+
}
141+
142+
async function testPullAsyncSABChunks() {
143+
async function* gen() {
144+
const sab = new SharedArrayBuffer(2);
145+
new Uint8Array(sab).set([3, 4]);
146+
yield [new Uint8Array(sab)];
147+
}
148+
const batches = [];
149+
for await (const batch of pull(gen())) {
150+
batches.push(batch);
151+
}
152+
assert.strictEqual(batches.length, 1);
153+
assert.deepStrictEqual(batches[0][0], new Uint8Array([3, 4]));
154+
}
155+
156+
// =============================================================================
157+
// Transform returning SAB
158+
// =============================================================================
159+
160+
async function testTransformReturningSAB() {
161+
function* source() {
162+
yield [new Uint8Array([1, 2, 3])];
163+
}
164+
const transform = (chunks) => {
165+
if (chunks === null) return null;
166+
// Transform returns a Uint8Array backed by a SharedArrayBuffer
167+
const sab = new SharedArrayBuffer(chunks[0].length);
168+
new Uint8Array(sab).set(chunks[0]);
169+
return new Uint8Array(sab);
170+
};
171+
const batches = [];
172+
for await (const batch of pull(source(), transform)) {
173+
batches.push(batch);
174+
}
175+
assert.strictEqual(batches.length, 1);
176+
assert.deepStrictEqual(batches[0][0], new Uint8Array([1, 2, 3]));
177+
}
178+
179+
// =============================================================================
180+
181+
Promise.all([
182+
testFromSyncSAB(),
183+
testFromAsyncSAB(),
184+
testBytesSyncSAB(),
185+
testBytesAsyncSAB(),
186+
testTextSyncSAB(),
187+
testTextAsyncSAB(),
188+
testArrayBufferSyncSAB(),
189+
testArrayBufferAsyncSAB(),
190+
testPipeToSyncSAB(),
191+
testPipeToAsyncSAB(),
192+
testPullSyncSABChunks(),
193+
testPullAsyncSABChunks(),
194+
testTransformReturningSAB(),
195+
]).then(common.mustCall());

0 commit comments

Comments
 (0)