Skip to content

Commit a7d5446

Browse files
authored
stream: propagate abort reason in share and broadcast
Pass signal.reason to the multi-consumer cancel paths so signal abort is reported as AbortError instead of clean iterator completion. Also make detached share consumers rethrow a stored source error when they resume after cancellation, preserving the abort reason for pending pulls. Fixes: nodejs#63357 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 PR-URL: nodejs#63358 Fixes: nodejs#63357 Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
1 parent fa0aadc commit a7d5446

4 files changed

Lines changed: 87 additions & 24 deletions

File tree

lib/internal/streams/iter/broadcast.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,10 @@ function wireBroadcastWriteSignal(entry, signal, resolve, reject, self) {
686686
signal.addEventListener('abort', onAbort, { __proto__: null, once: true });
687687
}
688688

689+
function onBroadcastCancel(broadcastImpl, signal) {
690+
onSignalAbort(signal, () => broadcastImpl.cancel(signal.reason));
691+
}
692+
689693
// =============================================================================
690694
// Public API
691695
// =============================================================================
@@ -720,7 +724,7 @@ function broadcast(options = { __proto__: null }) {
720724
broadcastImpl.setWriter(writer);
721725

722726
if (signal) {
723-
onSignalAbort(signal, () => broadcastImpl.cancel());
727+
onBroadcastCancel(broadcastImpl, signal);
724728
}
725729

726730
return { __proto__: null, writer, broadcast: broadcastImpl };

lib/internal/streams/iter/share.js

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ class ShareImpl {
144144
// cursor must re-pull rather than terminating prematurely.
145145
for (;;) {
146146
if (state.detached) {
147+
if (self.#sourceError) throw self.#sourceError;
147148
return { __proto__: null, done: true, value: undefined };
148149
}
149150

@@ -628,6 +629,10 @@ class SyncShareImpl {
628629
}
629630
}
630631

632+
function onShareCancel(shareImpl, signal) {
633+
onSignalAbort(signal, () => shareImpl.cancel(signal.reason));
634+
}
635+
631636
// =============================================================================
632637
// Public API
633638
// =============================================================================
@@ -657,7 +662,7 @@ function share(source, options = { __proto__: null }) {
657662
const shareImpl = new ShareImpl(normalized, opts);
658663

659664
if (signal) {
660-
onSignalAbort(signal, () => shareImpl.cancel());
665+
onShareCancel(shareImpl, signal);
661666
}
662667

663668
return shareImpl;

test/parallel/test-stream-iter-broadcast-from.js

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,12 @@ async function testAbortSignal() {
7070

7171
ac.abort();
7272

73-
const batches = [];
74-
for await (const batch of consumer) {
75-
batches.push(batch);
76-
}
77-
assert.strictEqual(batches.length, 0);
73+
await assert.rejects(async () => {
74+
// eslint-disable-next-line no-unused-vars
75+
for await (const _ of consumer) {
76+
assert.fail('Should not reach here');
77+
}
78+
}, { name: 'AbortError' });
7879
}
7980

8081
async function testAlreadyAbortedSignal() {
@@ -84,11 +85,12 @@ async function testAlreadyAbortedSignal() {
8485
const { broadcast: bc } = broadcast({ signal: ac.signal });
8586
const consumer = bc.push();
8687

87-
const batches = [];
88-
for await (const batch of consumer) {
89-
batches.push(batch);
90-
}
91-
assert.strictEqual(batches.length, 0);
88+
await assert.rejects(async () => {
89+
// eslint-disable-next-line no-unused-vars
90+
for await (const _ of consumer) {
91+
assert.fail('Should not reach here');
92+
}
93+
}, { name: 'AbortError' });
9294
}
9395

9496
// =============================================================================

test/parallel/test-stream-iter-share-async.js

Lines changed: 64 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -134,16 +134,66 @@ async function testShareCancelWithReason() {
134134

135135
async function testShareAbortSignal() {
136136
const ac = new AbortController();
137-
const shared = share(from('data'), { signal: ac.signal });
138-
const consumer = shared.pull();
137+
const reason = new Error('share aborted');
138+
const enc = new TextEncoder();
139+
async function* source() {
140+
yield [enc.encode('a')];
141+
yield [enc.encode('b')];
142+
}
143+
const shared = share(source(), {
144+
highWaterMark: 1,
145+
backpressure: 'block',
146+
signal: ac.signal,
147+
});
148+
const fast = shared.pull()[Symbol.asyncIterator]();
149+
shared.pull();
150+
151+
await fast.next();
152+
const read = fast.next();
153+
const rejected = assert.rejects(read, (error) => error === reason);
154+
ac.abort(reason);
155+
156+
await rejected;
157+
}
158+
159+
async function testShareAbortSignalWhileSourcePullPending() {
160+
const ac = new AbortController();
161+
const {
162+
promise: resumePromise,
163+
resolve: resume,
164+
} = Promise.withResolvers();
165+
const {
166+
promise: sourceStartedPromise,
167+
resolve: sourceStarted,
168+
} = Promise.withResolvers();
169+
170+
const source = {
171+
__proto__: null,
172+
[Symbol.asyncIterator]() {
173+
return {
174+
__proto__: null,
175+
async next() {
176+
sourceStarted();
177+
await resumePromise;
178+
return { __proto__: null, done: true, value: undefined };
179+
},
180+
};
181+
},
182+
};
183+
184+
const shared = share(source, { signal: ac.signal });
185+
const iter1 = shared.pull()[Symbol.asyncIterator]();
186+
const iter2 = shared.pull()[Symbol.asyncIterator]();
187+
const read1 = iter1.next();
188+
const read2 = iter2.next();
189+
const rejected1 = assert.rejects(read1, { name: 'AbortError' });
190+
const rejected2 = assert.rejects(read2, { name: 'AbortError' });
139191

192+
await sourceStartedPromise;
140193
ac.abort();
194+
resume();
141195

142-
const batches = [];
143-
for await (const batch of consumer) {
144-
batches.push(batch);
145-
}
146-
assert.strictEqual(batches.length, 0);
196+
await Promise.all([rejected1, rejected2]);
147197
}
148198

149199
async function testShareAlreadyAborted() {
@@ -153,11 +203,12 @@ async function testShareAlreadyAborted() {
153203
const shared = share(from('data'), { signal: ac.signal });
154204
const consumer = shared.pull();
155205

156-
const batches = [];
157-
for await (const batch of consumer) {
158-
batches.push(batch);
159-
}
160-
assert.strictEqual(batches.length, 0);
206+
await assert.rejects(async () => {
207+
// eslint-disable-next-line no-unused-vars
208+
for await (const _ of consumer) {
209+
assert.fail('Should not reach here');
210+
}
211+
}, { name: 'AbortError' });
161212
}
162213

163214
// =============================================================================
@@ -273,6 +324,7 @@ Promise.all([
273324
testShareCancelMidIteration(),
274325
testShareCancelWithReason(),
275326
testShareAbortSignal(),
327+
testShareAbortSignalWhileSourcePullPending(),
276328
testShareAlreadyAborted(),
277329
testShareSourceError(),
278330
testShareLateJoiningConsumer(),

0 commit comments

Comments
 (0)