From a7d544697ad037e2d0054309851d1af948bb72e8 Mon Sep 17 00:00:00 2001 From: Trivikram Kamat <16024985+trivikr@users.noreply.github.com> Date: Fri, 22 May 2026 18:14:47 -0700 Subject: [PATCH 1/2] stream: propagate abort reason in share and broadcast Pass signal.reason to the multi-consumer cancel paths so signal abort is reported as AbortError instead of clean iterator completion. Also make detached share consumers rethrow a stored source error when they resume after cancellation, preserving the abort reason for pending pulls. Fixes: https://github.com/nodejs/node/issues/63357 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 PR-URL: https://github.com/nodejs/node/pull/63358 Fixes: https://github.com/nodejs/node/issues/63357 Reviewed-By: Antoine du Hamel --- lib/internal/streams/iter/broadcast.js | 6 +- lib/internal/streams/iter/share.js | 7 +- .../test-stream-iter-broadcast-from.js | 22 +++--- test/parallel/test-stream-iter-share-async.js | 76 ++++++++++++++++--- 4 files changed, 87 insertions(+), 24 deletions(-) diff --git a/lib/internal/streams/iter/broadcast.js b/lib/internal/streams/iter/broadcast.js index 7b6fc3525d122f..e6a404729d6e0b 100644 --- a/lib/internal/streams/iter/broadcast.js +++ b/lib/internal/streams/iter/broadcast.js @@ -686,6 +686,10 @@ function wireBroadcastWriteSignal(entry, signal, resolve, reject, self) { signal.addEventListener('abort', onAbort, { __proto__: null, once: true }); } +function onBroadcastCancel(broadcastImpl, signal) { + onSignalAbort(signal, () => broadcastImpl.cancel(signal.reason)); +} + // ============================================================================= // Public API // ============================================================================= @@ -720,7 +724,7 @@ function broadcast(options = { __proto__: null }) { broadcastImpl.setWriter(writer); if (signal) { - onSignalAbort(signal, () => broadcastImpl.cancel()); + onBroadcastCancel(broadcastImpl, signal); } return { __proto__: null, writer, broadcast: broadcastImpl }; diff --git a/lib/internal/streams/iter/share.js b/lib/internal/streams/iter/share.js index 0160bc7eace009..3cd24409222712 100644 --- a/lib/internal/streams/iter/share.js +++ b/lib/internal/streams/iter/share.js @@ -144,6 +144,7 @@ class ShareImpl { // cursor must re-pull rather than terminating prematurely. for (;;) { if (state.detached) { + if (self.#sourceError) throw self.#sourceError; return { __proto__: null, done: true, value: undefined }; } @@ -628,6 +629,10 @@ class SyncShareImpl { } } +function onShareCancel(shareImpl, signal) { + onSignalAbort(signal, () => shareImpl.cancel(signal.reason)); +} + // ============================================================================= // Public API // ============================================================================= @@ -657,7 +662,7 @@ function share(source, options = { __proto__: null }) { const shareImpl = new ShareImpl(normalized, opts); if (signal) { - onSignalAbort(signal, () => shareImpl.cancel()); + onShareCancel(shareImpl, signal); } return shareImpl; diff --git a/test/parallel/test-stream-iter-broadcast-from.js b/test/parallel/test-stream-iter-broadcast-from.js index 2f17b1a7de92fa..c7458dee19ad64 100644 --- a/test/parallel/test-stream-iter-broadcast-from.js +++ b/test/parallel/test-stream-iter-broadcast-from.js @@ -70,11 +70,12 @@ async function testAbortSignal() { ac.abort(); - const batches = []; - for await (const batch of consumer) { - batches.push(batch); - } - assert.strictEqual(batches.length, 0); + await assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of consumer) { + assert.fail('Should not reach here'); + } + }, { name: 'AbortError' }); } async function testAlreadyAbortedSignal() { @@ -84,11 +85,12 @@ async function testAlreadyAbortedSignal() { const { broadcast: bc } = broadcast({ signal: ac.signal }); const consumer = bc.push(); - const batches = []; - for await (const batch of consumer) { - batches.push(batch); - } - assert.strictEqual(batches.length, 0); + await assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of consumer) { + assert.fail('Should not reach here'); + } + }, { name: 'AbortError' }); } // ============================================================================= diff --git a/test/parallel/test-stream-iter-share-async.js b/test/parallel/test-stream-iter-share-async.js index 86b0eb9b273a34..076fe0a4037aa0 100644 --- a/test/parallel/test-stream-iter-share-async.js +++ b/test/parallel/test-stream-iter-share-async.js @@ -134,16 +134,66 @@ async function testShareCancelWithReason() { async function testShareAbortSignal() { const ac = new AbortController(); - const shared = share(from('data'), { signal: ac.signal }); - const consumer = shared.pull(); + const reason = new Error('share aborted'); + const enc = new TextEncoder(); + async function* source() { + yield [enc.encode('a')]; + yield [enc.encode('b')]; + } + const shared = share(source(), { + highWaterMark: 1, + backpressure: 'block', + signal: ac.signal, + }); + const fast = shared.pull()[Symbol.asyncIterator](); + shared.pull(); + + await fast.next(); + const read = fast.next(); + const rejected = assert.rejects(read, (error) => error === reason); + ac.abort(reason); + + await rejected; +} + +async function testShareAbortSignalWhileSourcePullPending() { + const ac = new AbortController(); + const { + promise: resumePromise, + resolve: resume, + } = Promise.withResolvers(); + const { + promise: sourceStartedPromise, + resolve: sourceStarted, + } = Promise.withResolvers(); + + const source = { + __proto__: null, + [Symbol.asyncIterator]() { + return { + __proto__: null, + async next() { + sourceStarted(); + await resumePromise; + return { __proto__: null, done: true, value: undefined }; + }, + }; + }, + }; + + const shared = share(source, { signal: ac.signal }); + const iter1 = shared.pull()[Symbol.asyncIterator](); + const iter2 = shared.pull()[Symbol.asyncIterator](); + const read1 = iter1.next(); + const read2 = iter2.next(); + const rejected1 = assert.rejects(read1, { name: 'AbortError' }); + const rejected2 = assert.rejects(read2, { name: 'AbortError' }); + await sourceStartedPromise; ac.abort(); + resume(); - const batches = []; - for await (const batch of consumer) { - batches.push(batch); - } - assert.strictEqual(batches.length, 0); + await Promise.all([rejected1, rejected2]); } async function testShareAlreadyAborted() { @@ -153,11 +203,12 @@ async function testShareAlreadyAborted() { const shared = share(from('data'), { signal: ac.signal }); const consumer = shared.pull(); - const batches = []; - for await (const batch of consumer) { - batches.push(batch); - } - assert.strictEqual(batches.length, 0); + await assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of consumer) { + assert.fail('Should not reach here'); + } + }, { name: 'AbortError' }); } // ============================================================================= @@ -273,6 +324,7 @@ Promise.all([ testShareCancelMidIteration(), testShareCancelWithReason(), testShareAbortSignal(), + testShareAbortSignalWhileSourcePullPending(), testShareAlreadyAborted(), testShareSourceError(), testShareLateJoiningConsumer(), From 8d3245e551eb98f910119595a7c9103eae7744a4 Mon Sep 17 00:00:00 2001 From: Trivikram Kamat <16024985+trivikr@users.noreply.github.com> Date: Fri, 22 May 2026 20:18:56 -0700 Subject: [PATCH 2/2] stream: avoid duplicate writes in toWritable PushWriter can return false after accepting a chunk when block backpressure is active. Teach the classic Writable adapter to treat that case as accepted backpressure instead of retrying through the async write path. Fixes: https://github.com/nodejs/node/issues/63359 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 PR-URL: https://github.com/nodejs/node/pull/63360 Fixes: https://github.com/nodejs/node/issues/63359 Reviewed-By: James M Snell --- lib/internal/streams/iter/classic.js | 51 ++++++++++++++++--- lib/internal/streams/iter/push.js | 12 +++++ lib/internal/streams/iter/types.js | 13 +++++ .../test-stream-iter-writable-from.js | 49 ++++++++++++++++++ 4 files changed, 119 insertions(+), 6 deletions(-) diff --git a/lib/internal/streams/iter/classic.js b/lib/internal/streams/iter/classic.js index fd5f811ea52d97..73c6bc48b954c8 100644 --- a/lib/internal/streams/iter/classic.js +++ b/lib/internal/streams/iter/classic.js @@ -57,6 +57,7 @@ const { const { toAsyncStreamable: kToAsyncStreamable, kValidatedSource, + kSyncWriteAccepted, drainableProtocol, } = require('internal/streams/iter/types'); @@ -764,13 +765,41 @@ function toWritable(writer) { const hasEndSync = hasEnd && typeof writer.endSync === 'function'; const hasFail = typeof writer.fail === 'function'; + const hasSyncWriteAccepted = + typeof writer[kSyncWriteAccepted] === 'function'; - // Try-sync-first pattern: attempt the synchronous method and - // fall back to the async method if it returns false (indicating - // the sync path was not accepted) or throws. When the sync path - // succeeds, the callback is deferred via queueMicrotask to - // preserve the async resolution contract that Writable internals - // expect from _write/_writev/_final callbacks. + function syncWriteAccepted() { + return hasSyncWriteAccepted && writer[kSyncWriteAccepted](); + } + + function finishAfterSyncBackpressure(cb) { + let ondrain; + try { + if (typeof writer[drainableProtocol] === 'function') { + ondrain = writer[drainableProtocol](); + } + } catch (err) { + cb(err); + return; + } + if (ondrain !== null && ondrain !== undefined) { + PromisePrototypeThen(ondrain, (drained) => { + if (drained === false) { + cb(new ERR_INVALID_STATE.TypeError('Stream closed by consumer')); + return; + } + cb(); + }, cb); + return; + } + queueMicrotask(cb); + } + + // Try-sync-first pattern: attempt the synchronous method and fall back to the + // async method if it returns false without accepting the data, or if it + // throws. When the sync path succeeds, the callback is deferred via + // queueMicrotask to preserve the async resolution contract that Writable + // internals expect from _write/_writev/_final callbacks. function _write(chunk, encoding, cb) { const bytes = typeof chunk === 'string' ? @@ -781,6 +810,11 @@ function toWritable(writer) { queueMicrotask(cb); return; } + if (syncWriteAccepted()) { + // The chunk was accepted; false only signaled backpressure. + finishAfterSyncBackpressure(cb); + return; + } } catch { // Sync path threw -- fall through to async. } @@ -805,6 +839,11 @@ function toWritable(writer) { queueMicrotask(cb); return; } + if (syncWriteAccepted()) { + // The chunks were accepted; false only signaled backpressure. + finishAfterSyncBackpressure(cb); + return; + } } catch { // Sync path threw -- fall through to async. } diff --git a/lib/internal/streams/iter/push.js b/lib/internal/streams/iter/push.js index c5b12663f83c24..1c367ff02bae71 100644 --- a/lib/internal/streams/iter/push.js +++ b/lib/internal/streams/iter/push.js @@ -32,6 +32,7 @@ const { const { drainableProtocol, + kSyncWriteAccepted, kSyncWriteAcceptedOnFalse, } = require('internal/streams/iter/types'); @@ -545,11 +546,16 @@ class PushQueue { class PushWriter { #queue; + #syncWriteAccepted = false; constructor(queue) { this.#queue = queue; } + [kSyncWriteAccepted]() { + return this.#syncWriteAccepted; + } + [drainableProtocol]() { const desired = this.desiredSize; if (desired === null) return null; @@ -589,6 +595,7 @@ class PushWriter { } writeSync(chunk) { + this.#syncWriteAccepted = false; const bytes = toUint8Array(chunk); const result = this.#queue.writeSync([bytes]); if (!result && this.#queue.backpressurePolicy === 'block' && @@ -596,12 +603,15 @@ class PushWriter { // Block policy: force-enqueue and return false as backpressure signal. // Data IS accepted; false tells caller to slow down. this.#queue.forceEnqueue([bytes]); + this.#syncWriteAccepted = true; return false; } + this.#syncWriteAccepted = result; return result; } writevSync(chunks) { + this.#syncWriteAccepted = false; if (!ArrayIsArray(chunks)) { throw new ERR_INVALID_ARG_TYPE('chunks', 'Array', chunks); } @@ -610,8 +620,10 @@ class PushWriter { if (!result && this.#queue.backpressurePolicy === 'block' && this.#queue.desiredSize === 0) { this.#queue.forceEnqueue(bytes); + this.#syncWriteAccepted = true; return false; } + this.#syncWriteAccepted = result; return result; } diff --git a/lib/internal/streams/iter/types.js b/lib/internal/streams/iter/types.js index 71112b1515c081..e72972d91ad243 100644 --- a/lib/internal/streams/iter/types.js +++ b/lib/internal/streams/iter/types.js @@ -64,11 +64,24 @@ const kValidatedTransform = Symbol('kValidatedTransform'); */ const kValidatedSource = Symbol('kValidatedSource'); +/** + * Internal sentinel for writers whose sync write methods can return false + * after accepting data as a backpressure signal. + */ +const kSyncWriteAccepted = Symbol('kSyncWriteAccepted'); + +/** + * Internal sentinel for writers whose sync write methods may return false + * after accepting data when backpressure is applied. Such writers must expose + * desiredSize so callers can distinguish accepted backpressure from a sync + * write that was not performed. + */ const kSyncWriteAcceptedOnFalse = Symbol('kSyncWriteAcceptedOnFalse'); module.exports = { broadcastProtocol, drainableProtocol, + kSyncWriteAccepted, kSyncWriteAcceptedOnFalse, kValidatedSource, kValidatedTransform, diff --git a/test/parallel/test-stream-iter-writable-from.js b/test/parallel/test-stream-iter-writable-from.js index fd922c5cf99537..51dc465b45d148 100644 --- a/test/parallel/test-stream-iter-writable-from.js +++ b/test/parallel/test-stream-iter-writable-from.js @@ -335,6 +335,53 @@ async function testRoundTrip() { assert.strictEqual(result, data); } +// ============================================================================= +// PushWriter writeSync false accepted as backpressure is not retried +// ============================================================================= + +async function testPushWriterBlockBackpressureNoDuplicate() { + const { writer, readable } = push({ highWaterMark: 1, backpressure: 'block' }); + const writable = toWritable(writer); + + await new Promise((resolve, reject) => { + writable.write('a', (err) => { + if (err) reject(err); + else resolve(); + }); + }); + + writable.write('b'); + writable.end(); + + const result = await text(readable); + assert.strictEqual(result, 'ab'); +} + +// ============================================================================= +// PushWriter writevSync false accepted as backpressure is not retried +// ============================================================================= + +async function testPushWriterBlockBackpressureWritevNoDuplicate() { + const { writer, readable } = push({ highWaterMark: 1, backpressure: 'block' }); + const writable = toWritable(writer); + + await new Promise((resolve, reject) => { + writable.write('a', (err) => { + if (err) reject(err); + else resolve(); + }); + }); + + writable.cork(); + writable.write('b'); + writable.write('c'); + writable.uncork(); + writable.end(); + + const result = await text(readable); + assert.strictEqual(result, 'abc'); +} + // ============================================================================= // Multiple sequential writes // ============================================================================= @@ -590,6 +637,8 @@ Promise.all([ testWriteThrowsSyncPropagation(), testEndThrowsSyncPropagation(), testRoundTrip(), + testPushWriterBlockBackpressureNoDuplicate(), + testPushWriterBlockBackpressureWritevNoDuplicate(), testSequentialWrites(), testSyncCallbackDeferred(), testMinimalWriter(),