Skip to content

Commit 061f1ff

Browse files
committed
stream: expand stream/iter test coverage
1 parent ba2974e commit 061f1ff

8 files changed

+1034
-0
lines changed
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Flags: --experimental-stream-iter
2+
'use strict';
3+
4+
// Coverage tests for broadcast.js: signal abort on pending write,
5+
// sync iterable from, ringbuffer grow.
6+
7+
const common = require('../common');
8+
const assert = require('assert');
9+
const {
10+
broadcast,
11+
Broadcast,
12+
text,
13+
} = require('stream/iter');
14+
15+
// Signal abort on pending write (covers wireBroadcastWriteSignal + removeAt)
16+
async function testBroadcastWriteAbort() {
17+
const { writer, broadcast: bc } = broadcast({
18+
highWaterMark: 1,
19+
backpressure: 'block',
20+
});
21+
const consumer = bc.push();
22+
23+
// Fill the buffer to capacity
24+
writer.writeSync(new Uint8Array([1]));
25+
26+
// Next write will block — pass a signal
27+
const ac = new AbortController();
28+
const writePromise = writer.write(new Uint8Array([2]),
29+
{ signal: ac.signal });
30+
31+
// Abort the signal
32+
ac.abort();
33+
34+
await assert.rejects(writePromise, { name: 'AbortError' });
35+
36+
// Clean up
37+
writer.endSync();
38+
// Drain the consumer
39+
const result = [];
40+
for await (const batch of consumer) {
41+
result.push(...batch);
42+
}
43+
assert.ok(result.length >= 1);
44+
}
45+
46+
// Broadcast.from with sync iterable (generator)
47+
async function testBroadcastFromSyncIterable() {
48+
function* source() {
49+
yield [new Uint8Array([10, 20])];
50+
yield [new Uint8Array([30, 40])];
51+
}
52+
53+
const { broadcast: bc } = Broadcast.from(source());
54+
const consumer = bc.push();
55+
// Just verify it completes without error and produces data
56+
let count = 0;
57+
for await (const batch of consumer) {
58+
count += batch.length;
59+
}
60+
assert.ok(count > 0);
61+
}
62+
63+
// Broadcast.from with sync iterable — string chunks
64+
async function testBroadcastFromSyncIterableStrings() {
65+
function* source() {
66+
yield 'hello';
67+
yield ' world';
68+
}
69+
const { broadcast: bc } = Broadcast.from(source());
70+
const consumer = bc.push();
71+
const result = await text(consumer);
72+
assert.strictEqual(result, 'hello world');
73+
}
74+
75+
// Ringbuffer grow — push > 16 items without consumer draining
76+
async function testRingbufferGrow() {
77+
const { writer, broadcast: bc } = broadcast({ highWaterMark: 32 });
78+
const consumer = bc.push();
79+
80+
// Push 20 items (exceeds default ringbuffer capacity of 16)
81+
for (let i = 0; i < 20; i++) {
82+
writer.writeSync(new Uint8Array([i]));
83+
}
84+
writer.endSync();
85+
86+
// Read all items back and verify order
87+
const items = [];
88+
for await (const batch of consumer) {
89+
for (const chunk of batch) {
90+
items.push(chunk[0]);
91+
}
92+
}
93+
assert.strictEqual(items.length, 20);
94+
for (let i = 0; i < 20; i++) {
95+
assert.strictEqual(items[i], i);
96+
}
97+
}
98+
99+
// Broadcast drainableProtocol after close returns null
100+
async function testDrainableAfterClose() {
101+
const { drainableProtocol } = require('stream/iter');
102+
const { writer } = broadcast();
103+
writer.endSync();
104+
const result = writer[drainableProtocol]();
105+
// After close, desired should be null
106+
assert.strictEqual(result, null);
107+
}
108+
109+
Promise.all([
110+
testBroadcastWriteAbort(),
111+
testBroadcastFromSyncIterable(),
112+
testBroadcastFromSyncIterableStrings(),
113+
testRingbufferGrow(),
114+
testDrainableAfterClose(),
115+
]).then(common.mustCall());
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
// Flags: --experimental-stream-iter
2+
'use strict';
3+
4+
// Coverage tests for from.js: sub-batching >128, DataView in generator,
5+
// non-Uint8Array TypedArray normalization.
6+
7+
const common = require('../common');
8+
const assert = require('assert');
9+
const {
10+
from,
11+
fromSync,
12+
bytes,
13+
bytesSync,
14+
} = require('stream/iter');
15+
16+
// fromSync: Uint8Array[] with > 128 elements triggers sub-batching
17+
async function testFromSyncSubBatching() {
18+
const bigBatch = Array.from({ length: 200 },
19+
(_, i) => new Uint8Array([i & 0xFF]));
20+
const batches = [];
21+
for (const batch of fromSync(bigBatch)) {
22+
batches.push(batch);
23+
}
24+
// Should be split into sub-batches: 128 + 72
25+
assert.strictEqual(batches.length, 2);
26+
assert.strictEqual(batches[0].length, 128);
27+
assert.strictEqual(batches[1].length, 72);
28+
// Verify no data loss
29+
let totalChunks = 0;
30+
for (const batch of batches) totalChunks += batch.length;
31+
assert.strictEqual(totalChunks, 200);
32+
}
33+
34+
// from: Uint8Array[] with > 128 elements triggers sub-batching (async)
35+
async function testFromAsyncSubBatching() {
36+
const bigBatch = Array.from({ length: 200 },
37+
(_, i) => new Uint8Array([i & 0xFF]));
38+
const batches = [];
39+
for await (const batch of from(bigBatch)) {
40+
batches.push(batch);
41+
}
42+
assert.strictEqual(batches.length, 2);
43+
assert.strictEqual(batches[0].length, 128);
44+
assert.strictEqual(batches[1].length, 72);
45+
}
46+
47+
// Exact boundary: 128 elements → single batch (no split)
48+
async function testFromSubBatchingBoundary() {
49+
const exactBatch = Array.from({ length: 128 },
50+
(_, i) => new Uint8Array([i]));
51+
const batches = [];
52+
for (const batch of fromSync(exactBatch)) {
53+
batches.push(batch);
54+
}
55+
assert.strictEqual(batches.length, 1);
56+
assert.strictEqual(batches[0].length, 128);
57+
}
58+
59+
// 129 elements → 2 batches (128 + 1)
60+
async function testFromSubBatchingBoundaryPlus1() {
61+
const batch129 = Array.from({ length: 129 },
62+
(_, i) => new Uint8Array([i & 0xFF]));
63+
const batches = [];
64+
for await (const batch of from(batch129)) {
65+
batches.push(batch);
66+
}
67+
assert.strictEqual(batches.length, 2);
68+
assert.strictEqual(batches[0].length, 128);
69+
assert.strictEqual(batches[1].length, 1);
70+
}
71+
72+
// DataView yielded from a sync generator → normalizeSyncValue path
73+
async function testFromSyncDataViewInGenerator() {
74+
function* gen() {
75+
const buf = new ArrayBuffer(3);
76+
const dv = new DataView(buf);
77+
dv.setUint8(0, 65);
78+
dv.setUint8(1, 66);
79+
dv.setUint8(2, 67);
80+
yield dv;
81+
}
82+
const data = bytesSync(fromSync(gen()));
83+
assert.deepStrictEqual(data, new Uint8Array([65, 66, 67]));
84+
}
85+
86+
// DataView yielded from an async generator → normalizeAsyncValue path
87+
async function testFromAsyncDataViewInGenerator() {
88+
async function* gen() {
89+
const buf = new ArrayBuffer(3);
90+
const dv = new DataView(buf);
91+
dv.setUint8(0, 68);
92+
dv.setUint8(1, 69);
93+
dv.setUint8(2, 70);
94+
yield dv;
95+
}
96+
const data = await bytes(from(gen()));
97+
assert.deepStrictEqual(data, new Uint8Array([68, 69, 70]));
98+
}
99+
100+
// Int16Array yielded from generator → primitiveToUint8Array fallback
101+
async function testFromSyncInt16ArrayInGenerator() {
102+
function* gen() {
103+
yield new Int16Array([0x0102, 0x0304]);
104+
}
105+
const data = bytesSync(fromSync(gen()));
106+
assert.strictEqual(data.byteLength, 4); // 2 int16 = 4 bytes
107+
}
108+
109+
// Float64Array as top-level input to from()
110+
async function testFromFloat64Array() {
111+
const f64 = new Float64Array([1.0]);
112+
const batches = [];
113+
for await (const batch of from(f64)) {
114+
batches.push(batch);
115+
}
116+
assert.strictEqual(batches.length, 1);
117+
assert.strictEqual(batches[0][0].byteLength, 8); // 1 float64 = 8 bytes
118+
}
119+
120+
// Sync generator yielding invalid type → ERR_INVALID_ARG_TYPE
121+
async function testFromSyncInvalidYield() {
122+
function* gen() {
123+
yield 42; // Not a valid stream value
124+
}
125+
assert.throws(
126+
() => {
127+
// eslint-disable-next-line no-unused-vars
128+
for (const batch of fromSync(gen())) { /* consume */ }
129+
},
130+
{ code: 'ERR_INVALID_ARG_TYPE' },
131+
);
132+
}
133+
134+
Promise.all([
135+
testFromSyncSubBatching(),
136+
testFromAsyncSubBatching(),
137+
testFromSubBatchingBoundary(),
138+
testFromSubBatchingBoundaryPlus1(),
139+
testFromSyncDataViewInGenerator(),
140+
testFromAsyncDataViewInGenerator(),
141+
testFromSyncInt16ArrayInGenerator(),
142+
testFromFloat64Array(),
143+
testFromSyncInvalidYield(),
144+
]).then(common.mustCall());
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Flags: --experimental-stream-iter
2+
'use strict';
3+
4+
// Edge case tests for pipeToSync: endSync fallback, preventFail.
5+
6+
const common = require('../common');
7+
const assert = require('assert');
8+
const { pipeToSync, fromSync } = require('stream/iter');
9+
10+
// pipeToSync endSync returns negative → falls back to end()
11+
async function testPipeToSyncEndSyncFallback() {
12+
let endCalled = false;
13+
const writer = {
14+
writeSync() { return true; },
15+
endSync() { return -1; }, // Negative → triggers end() fallback
16+
end() { endCalled = true; },
17+
};
18+
pipeToSync(fromSync('data'), writer);
19+
assert.strictEqual(endCalled, true);
20+
}
21+
22+
// pipeToSync endSync missing → falls back to end()
23+
async function testPipeToSyncNoEndSync() {
24+
let endCalled = false;
25+
const writer = {
26+
writeSync() { return true; },
27+
end() { endCalled = true; },
28+
};
29+
pipeToSync(fromSync('data'), writer);
30+
assert.strictEqual(endCalled, true);
31+
}
32+
33+
// pipeToSync with preventFail: true — source error does NOT call fail()
34+
async function testPipeToSyncPreventFail() {
35+
let failCalled = false;
36+
const writer = {
37+
writeSync() { return true; },
38+
endSync() { return 0; },
39+
fail() { failCalled = true; },
40+
};
41+
function* badSource() {
42+
yield [new Uint8Array([1])];
43+
throw new Error('source error');
44+
}
45+
assert.throws(
46+
() => pipeToSync(badSource(), writer, { preventFail: true }),
47+
{ message: 'source error' },
48+
);
49+
assert.strictEqual(failCalled, false);
50+
}
51+
52+
// pipeToSync with preventClose: true — end/endSync not called
53+
async function testPipeToSyncPreventClose() {
54+
let endCalled = false;
55+
const writer = {
56+
writeSync() { return true; },
57+
endSync() { endCalled = true; return 0; },
58+
};
59+
pipeToSync(fromSync('data'), writer, { preventClose: true });
60+
assert.strictEqual(endCalled, false);
61+
}
62+
63+
Promise.all([
64+
testPipeToSyncEndSyncFallback(),
65+
testPipeToSyncNoEndSync(),
66+
testPipeToSyncPreventFail(),
67+
testPipeToSyncPreventClose(),
68+
]).then(common.mustCall());

0 commit comments

Comments
 (0)