Skip to content

Commit dd503d1

Browse files
committed
stream: apply more stream/iter conformance fixes
1 parent dc1542b commit dd503d1

6 files changed

Lines changed: 138 additions & 37 deletions

File tree

lib/internal/streams/iter/broadcast.js

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -151,31 +151,40 @@ class BroadcastImpl {
151151
const { transforms, options } = parsePushArgs(args);
152152
const rawConsumer = this.#createRawConsumer();
153153

154+
// When transforms are present, delegate to pull() which creates its
155+
// own internal AbortController that follows the external signal.
156+
// When no transforms, return rawConsumer directly (controller elided
157+
// per PULL-02 optimization -- no transforms means no signal recipient).
154158
if (transforms.length > 0) {
159+
const pullArgs = [...transforms];
155160
if (options?.signal) {
156-
return pullWithTransforms(
157-
rawConsumer, ...transforms, { __proto__: null, signal: options.signal });
161+
ArrayPrototypePush(pullArgs,
162+
{ __proto__: null, signal: options.signal });
158163
}
159-
return pullWithTransforms(rawConsumer, ...transforms);
164+
return pullWithTransforms(rawConsumer, ...pullArgs);
160165
}
161166
return rawConsumer;
162167
}
163168

164169
#createRawConsumer() {
165170
const state = {
166171
__proto__: null,
167-
cursor: this.#bufferStart + this.#buffer.length,
172+
// Start at the oldest buffered entry so late-joining consumers
173+
// can read data already in the buffer.
174+
cursor: this.#bufferStart,
168175
resolve: null,
169176
reject: null,
170177
detached: false,
171178
};
172179

173180
this.#consumers.add(state);
174-
// New consumer starts at the latest position; min cursor unchanged
175-
// unless this is the first consumer.
181+
// New consumer starts at buffer start; recalculate min cursor
182+
// since this consumer may now be the slowest.
176183
if (this.#consumers.size === 1) {
177184
this.#cachedMinCursor = state.cursor;
178185
this.#minCursorDirty = false;
186+
} else {
187+
this.#minCursorDirty = true;
179188
}
180189
const self = this;
181190

@@ -252,7 +261,7 @@ class BroadcastImpl {
252261
this.#cancelled = true;
253262
this.#ended = true; // Prevents [kAbort]() from redundantly iterating consumers
254263

255-
if (reason) {
264+
if (reason !== undefined) {
256265
this.#error = reason;
257266
}
258267

@@ -261,7 +270,7 @@ class BroadcastImpl {
261270

262271
for (const consumer of this.#consumers) {
263272
if (consumer.resolve) {
264-
if (reason) {
273+
if (reason !== undefined) {
265274
consumer.reject?.(reason);
266275
} else {
267276
consumer.resolve({ __proto__: null, done: true, value: undefined });

lib/internal/streams/iter/duplex.js

Lines changed: 67 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@
77

88
const {
99
SymbolAsyncDispose,
10+
SymbolAsyncIterator,
1011
} = primordials;
1112

1213
const {
1314
push,
1415
} = require('internal/streams/iter/push');
1516
const {
17+
validateAbortSignal,
1618
validateObject,
1719
} = require('internal/validators');
1820

@@ -31,34 +33,54 @@ function duplex(options = { __proto__: null }) {
3133
if (b !== undefined) {
3234
validateObject(b, 'options.b');
3335
}
36+
if (signal !== undefined) {
37+
validateAbortSignal(signal, 'options.signal');
38+
}
3439

35-
// Channel A writes to B's readable (A->B direction)
40+
// Channel A writes to B's readable (A->B direction).
41+
// Signal is NOT passed to push() -- we handle abort via close() below.
3642
const { writer: aWriter, readable: bReadable } = push({
3743
highWaterMark: a?.highWaterMark ?? highWaterMark,
3844
backpressure: a?.backpressure ?? backpressure,
39-
signal,
4045
});
4146

4247
// Channel B writes to A's readable (B->A direction)
4348
const { writer: bWriter, readable: aReadable } = push({
4449
highWaterMark: b?.highWaterMark ?? highWaterMark,
4550
backpressure: b?.backpressure ?? backpressure,
46-
signal,
4751
});
4852

49-
let aWriterRef = aWriter;
50-
let bWriterRef = bWriter;
53+
let aClosed = false;
54+
let bClosed = false;
55+
// Track active iterators so close() can call .return() on them
56+
let aReadableIterator = null;
57+
let bReadableIterator = null;
5158

5259
const channelA = {
5360
__proto__: null,
5461
get writer() { return aWriter; },
55-
readable: aReadable,
62+
// Wrap readable to track the iterator for cleanup on close()
63+
get readable() {
64+
return {
65+
__proto__: null,
66+
[SymbolAsyncIterator]() {
67+
const iter = aReadable[SymbolAsyncIterator]();
68+
aReadableIterator = iter;
69+
return iter;
70+
},
71+
};
72+
},
5673
async close() {
57-
if (aWriterRef === null) return;
58-
const writer = aWriterRef;
59-
aWriterRef = null;
60-
if (writer.endSync() < 0) {
61-
await writer.end();
74+
if (aClosed) return;
75+
aClosed = true;
76+
// End the writer (signals end-of-stream to B's readable)
77+
if (aWriter.endSync() < 0) {
78+
await aWriter.end();
79+
}
80+
// Stop iteration of this channel's readable
81+
if (aReadableIterator?.return) {
82+
await aReadableIterator.return();
83+
aReadableIterator = null;
6284
}
6385
},
6486
[SymbolAsyncDispose]() {
@@ -69,20 +91,48 @@ function duplex(options = { __proto__: null }) {
6991
const channelB = {
7092
__proto__: null,
7193
get writer() { return bWriter; },
72-
readable: bReadable,
94+
get readable() {
95+
return {
96+
__proto__: null,
97+
[SymbolAsyncIterator]() {
98+
const iter = bReadable[SymbolAsyncIterator]();
99+
bReadableIterator = iter;
100+
return iter;
101+
},
102+
};
103+
},
73104
async close() {
74-
if (bWriterRef === null) return;
75-
const writer = bWriterRef;
76-
bWriterRef = null;
77-
if (writer.endSync() < 0) {
78-
await writer.end();
105+
if (bClosed) return;
106+
bClosed = true;
107+
if (bWriter.endSync() < 0) {
108+
await bWriter.end();
109+
}
110+
if (bReadableIterator?.return) {
111+
await bReadableIterator.return();
112+
bReadableIterator = null;
79113
}
80114
},
81115
[SymbolAsyncDispose]() {
82116
return this.close();
83117
},
84118
};
85119

120+
// Signal handler: fail both writers with the abort reason so consumers
121+
// see the error. This is an error-path shutdown, not a clean close.
122+
if (signal) {
123+
const abortBoth = () => {
124+
const reason = signal.reason;
125+
aWriter.fail(reason);
126+
bWriter.fail(reason);
127+
};
128+
if (signal.aborted) {
129+
abortBoth();
130+
} else {
131+
signal.addEventListener('abort', abortBoth,
132+
{ __proto__: null, once: true });
133+
}
134+
}
135+
86136
return [channelA, channelB];
87137
}
88138

lib/internal/streams/iter/share.js

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ const {
2828
} = require('internal/streams/iter/types');
2929

3030
const {
31+
from,
32+
fromSync,
3133
isAsyncIterable,
3234
isSyncIterable,
3335
} = require('internal/streams/iter/from');
@@ -198,7 +200,7 @@ class ShareImpl {
198200
if (this.#cancelled) return;
199201
this.#cancelled = true;
200202

201-
if (reason) {
203+
if (reason !== undefined) {
202204
this.#sourceError = reason;
203205
}
204206

@@ -208,7 +210,7 @@ class ShareImpl {
208210

209211
for (const consumer of this.#consumers) {
210212
if (consumer.resolve) {
211-
if (reason) {
213+
if (reason !== undefined) {
212214
consumer.reject?.(reason);
213215
} else {
214216
consumer.resolve({ __proto__: null, done: true, value: undefined });
@@ -505,7 +507,7 @@ class SyncShareImpl {
505507
if (this.#cancelled) return;
506508
this.#cancelled = true;
507509

508-
if (reason) {
510+
if (reason !== undefined) {
509511
this.#sourceError = reason;
510512
}
511513

@@ -569,10 +571,8 @@ class SyncShareImpl {
569571
// =============================================================================
570572

571573
function share(source, options = { __proto__: null }) {
572-
if (!isAsyncIterable(source) && !isSyncIterable(source)) {
573-
throw new ERR_INVALID_ARG_TYPE(
574-
'source', ['AsyncIterable', 'Iterable'], source);
575-
}
574+
// Normalize source via from() - accepts strings, ArrayBuffers, protocols, etc.
575+
const normalized = from(source);
576576
validateObject(options, 'options');
577577
const {
578578
highWaterMark = kMultiConsumerDefaultHWM,
@@ -592,7 +592,7 @@ function share(source, options = { __proto__: null }) {
592592
signal,
593593
};
594594

595-
const shareImpl = new ShareImpl(source, opts);
595+
const shareImpl = new ShareImpl(normalized, opts);
596596

597597
if (signal) {
598598
if (signal.aborted) {
@@ -608,9 +608,8 @@ function share(source, options = { __proto__: null }) {
608608
}
609609

610610
function shareSync(source, options = { __proto__: null }) {
611-
if (!isSyncIterable(source)) {
612-
throw new ERR_INVALID_ARG_TYPE('source', 'Iterable', source);
613-
}
611+
// Normalize source via fromSync() - accepts strings, ArrayBuffers, protocols, etc.
612+
const normalized = fromSync(source);
614613
validateObject(options, 'options');
615614
const {
616615
highWaterMark = kMultiConsumerDefaultHWM,
@@ -625,7 +624,7 @@ function shareSync(source, options = { __proto__: null }) {
625624
backpressure,
626625
};
627626

628-
return new SyncShareImpl(source, opts);
627+
return new SyncShareImpl(normalized, opts);
629628
}
630629

631630
function isShareable(value) {

test/parallel/test-stream-iter-broadcast-basic.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,17 @@ async function testWriterFailIdempotent() {
218218
}, { message: 'fail!' });
219219
}
220220

221+
// cancel() with falsy reason (0, "", false) should still treat as error
222+
async function testCancelWithFalsyReason() {
223+
const { broadcast: bc } = broadcast();
224+
const consumer = bc.push();
225+
const resultPromise = text(consumer).catch((err) => err);
226+
await new Promise((resolve) => setImmediate(resolve));
227+
bc.cancel(0);
228+
const result = await resultPromise;
229+
assert.strictEqual(result, 0);
230+
}
231+
221232
Promise.all([
222233
testBasicBroadcast(),
223234
testMultipleWrites(),
@@ -228,6 +239,22 @@ Promise.all([
228239
testWriterFail(),
229240
testCancelWithoutReason(),
230241
testCancelWithReason(),
242+
testCancelWithFalsyReason(),
231243
testFailDetachesConsumers(),
232244
testWriterFailIdempotent(),
245+
testLateJoinerSeesBufferedData(),
233246
]).then(common.mustCall());
247+
248+
// Late-joining consumer should read from oldest buffered entry
249+
async function testLateJoinerSeesBufferedData() {
250+
const { writer, broadcast: bc } = broadcast({ highWaterMark: 16 });
251+
252+
// Write data before any consumer joins
253+
writer.writeSync('before-join');
254+
writer.endSync();
255+
256+
// Consumer joins after data is written
257+
const consumer = bc.push();
258+
const result = await text(consumer);
259+
assert.strictEqual(result, 'before-join');
260+
}

test/parallel/test-stream-iter-share-async.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,4 +271,12 @@ Promise.all([
271271
testShareLateJoiningConsumer(),
272272
testShareConsumerBreak(),
273273
testShareMultipleConsumersConcurrentPull(),
274+
testShareStringSource(),
274275
]).then(common.mustCall());
276+
277+
// share() accepts string source directly (normalized via from())
278+
async function testShareStringSource() {
279+
const shared = share('hello-share');
280+
const result = await text(shared.pull());
281+
assert.strictEqual(result, 'hello-share');
282+
}

test/parallel/test-stream-iter-share-sync.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,12 @@ Promise.all([
151151
testShareSyncCancelMidIteration(),
152152
testShareSyncCancelWithReason(),
153153
testShareSyncSourceError(),
154+
testShareSyncStringSource(),
154155
]).then(common.mustCall());
156+
157+
// shareSync() accepts string source directly (normalized via fromSync())
158+
function testShareSyncStringSource() {
159+
const shared = shareSync('hello-sync-share');
160+
const result = textSync(shared.pull());
161+
assert.strictEqual(result, 'hello-sync-share');
162+
}

0 commit comments

Comments
 (0)