Skip to content

Commit 8d3245e

Browse files
authored
stream: avoid duplicate writes in toWritable
PushWriter can return false after accepting a chunk when block backpressure is active. Teach the classic Writable adapter to treat that case as accepted backpressure instead of retrying through the async write path. Fixes: nodejs#63359 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 PR-URL: nodejs#63360 Fixes: nodejs#63359 Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent a7d5446 commit 8d3245e

4 files changed

Lines changed: 119 additions & 6 deletions

File tree

lib/internal/streams/iter/classic.js

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ const {
5757
const {
5858
toAsyncStreamable: kToAsyncStreamable,
5959
kValidatedSource,
60+
kSyncWriteAccepted,
6061
drainableProtocol,
6162
} = require('internal/streams/iter/types');
6263

@@ -764,13 +765,41 @@ function toWritable(writer) {
764765
const hasEndSync = hasEnd &&
765766
typeof writer.endSync === 'function';
766767
const hasFail = typeof writer.fail === 'function';
768+
const hasSyncWriteAccepted =
769+
typeof writer[kSyncWriteAccepted] === 'function';
767770

768-
// Try-sync-first pattern: attempt the synchronous method and
769-
// fall back to the async method if it returns false (indicating
770-
// the sync path was not accepted) or throws. When the sync path
771-
// succeeds, the callback is deferred via queueMicrotask to
772-
// preserve the async resolution contract that Writable internals
773-
// expect from _write/_writev/_final callbacks.
771+
function syncWriteAccepted() {
772+
return hasSyncWriteAccepted && writer[kSyncWriteAccepted]();
773+
}
774+
775+
function finishAfterSyncBackpressure(cb) {
776+
let ondrain;
777+
try {
778+
if (typeof writer[drainableProtocol] === 'function') {
779+
ondrain = writer[drainableProtocol]();
780+
}
781+
} catch (err) {
782+
cb(err);
783+
return;
784+
}
785+
if (ondrain !== null && ondrain !== undefined) {
786+
PromisePrototypeThen(ondrain, (drained) => {
787+
if (drained === false) {
788+
cb(new ERR_INVALID_STATE.TypeError('Stream closed by consumer'));
789+
return;
790+
}
791+
cb();
792+
}, cb);
793+
return;
794+
}
795+
queueMicrotask(cb);
796+
}
797+
798+
// Try-sync-first pattern: attempt the synchronous method and fall back to the
799+
// async method if it returns false without accepting the data, or if it
800+
// throws. When the sync path succeeds, the callback is deferred via
801+
// queueMicrotask to preserve the async resolution contract that Writable
802+
// internals expect from _write/_writev/_final callbacks.
774803

775804
function _write(chunk, encoding, cb) {
776805
const bytes = typeof chunk === 'string' ?
@@ -781,6 +810,11 @@ function toWritable(writer) {
781810
queueMicrotask(cb);
782811
return;
783812
}
813+
if (syncWriteAccepted()) {
814+
// The chunk was accepted; false only signaled backpressure.
815+
finishAfterSyncBackpressure(cb);
816+
return;
817+
}
784818
} catch {
785819
// Sync path threw -- fall through to async.
786820
}
@@ -805,6 +839,11 @@ function toWritable(writer) {
805839
queueMicrotask(cb);
806840
return;
807841
}
842+
if (syncWriteAccepted()) {
843+
// The chunks were accepted; false only signaled backpressure.
844+
finishAfterSyncBackpressure(cb);
845+
return;
846+
}
808847
} catch {
809848
// Sync path threw -- fall through to async.
810849
}

lib/internal/streams/iter/push.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ const {
3232

3333
const {
3434
drainableProtocol,
35+
kSyncWriteAccepted,
3536
kSyncWriteAcceptedOnFalse,
3637
} = require('internal/streams/iter/types');
3738

@@ -545,11 +546,16 @@ class PushQueue {
545546

546547
class PushWriter {
547548
#queue;
549+
#syncWriteAccepted = false;
548550

549551
constructor(queue) {
550552
this.#queue = queue;
551553
}
552554

555+
[kSyncWriteAccepted]() {
556+
return this.#syncWriteAccepted;
557+
}
558+
553559
[drainableProtocol]() {
554560
const desired = this.desiredSize;
555561
if (desired === null) return null;
@@ -589,19 +595,23 @@ class PushWriter {
589595
}
590596

591597
writeSync(chunk) {
598+
this.#syncWriteAccepted = false;
592599
const bytes = toUint8Array(chunk);
593600
const result = this.#queue.writeSync([bytes]);
594601
if (!result && this.#queue.backpressurePolicy === 'block' &&
595602
this.#queue.desiredSize === 0) {
596603
// Block policy: force-enqueue and return false as backpressure signal.
597604
// Data IS accepted; false tells caller to slow down.
598605
this.#queue.forceEnqueue([bytes]);
606+
this.#syncWriteAccepted = true;
599607
return false;
600608
}
609+
this.#syncWriteAccepted = result;
601610
return result;
602611
}
603612

604613
writevSync(chunks) {
614+
this.#syncWriteAccepted = false;
605615
if (!ArrayIsArray(chunks)) {
606616
throw new ERR_INVALID_ARG_TYPE('chunks', 'Array', chunks);
607617
}
@@ -610,8 +620,10 @@ class PushWriter {
610620
if (!result && this.#queue.backpressurePolicy === 'block' &&
611621
this.#queue.desiredSize === 0) {
612622
this.#queue.forceEnqueue(bytes);
623+
this.#syncWriteAccepted = true;
613624
return false;
614625
}
626+
this.#syncWriteAccepted = result;
615627
return result;
616628
}
617629

lib/internal/streams/iter/types.js

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,24 @@ const kValidatedTransform = Symbol('kValidatedTransform');
6464
*/
6565
const kValidatedSource = Symbol('kValidatedSource');
6666

67+
/**
68+
* Internal sentinel for writers whose sync write methods can return false
69+
* after accepting data as a backpressure signal.
70+
*/
71+
const kSyncWriteAccepted = Symbol('kSyncWriteAccepted');
72+
73+
/**
74+
* Internal sentinel for writers whose sync write methods may return false
75+
* after accepting data when backpressure is applied. Such writers must expose
76+
* desiredSize so callers can distinguish accepted backpressure from a sync
77+
* write that was not performed.
78+
*/
6779
const kSyncWriteAcceptedOnFalse = Symbol('kSyncWriteAcceptedOnFalse');
6880

6981
module.exports = {
7082
broadcastProtocol,
7183
drainableProtocol,
84+
kSyncWriteAccepted,
7285
kSyncWriteAcceptedOnFalse,
7386
kValidatedSource,
7487
kValidatedTransform,

test/parallel/test-stream-iter-writable-from.js

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,53 @@ async function testRoundTrip() {
335335
assert.strictEqual(result, data);
336336
}
337337

338+
// =============================================================================
339+
// PushWriter writeSync false accepted as backpressure is not retried
340+
// =============================================================================
341+
342+
async function testPushWriterBlockBackpressureNoDuplicate() {
343+
const { writer, readable } = push({ highWaterMark: 1, backpressure: 'block' });
344+
const writable = toWritable(writer);
345+
346+
await new Promise((resolve, reject) => {
347+
writable.write('a', (err) => {
348+
if (err) reject(err);
349+
else resolve();
350+
});
351+
});
352+
353+
writable.write('b');
354+
writable.end();
355+
356+
const result = await text(readable);
357+
assert.strictEqual(result, 'ab');
358+
}
359+
360+
// =============================================================================
361+
// PushWriter writevSync false accepted as backpressure is not retried
362+
// =============================================================================
363+
364+
async function testPushWriterBlockBackpressureWritevNoDuplicate() {
365+
const { writer, readable } = push({ highWaterMark: 1, backpressure: 'block' });
366+
const writable = toWritable(writer);
367+
368+
await new Promise((resolve, reject) => {
369+
writable.write('a', (err) => {
370+
if (err) reject(err);
371+
else resolve();
372+
});
373+
});
374+
375+
writable.cork();
376+
writable.write('b');
377+
writable.write('c');
378+
writable.uncork();
379+
writable.end();
380+
381+
const result = await text(readable);
382+
assert.strictEqual(result, 'abc');
383+
}
384+
338385
// =============================================================================
339386
// Multiple sequential writes
340387
// =============================================================================
@@ -590,6 +637,8 @@ Promise.all([
590637
testWriteThrowsSyncPropagation(),
591638
testEndThrowsSyncPropagation(),
592639
testRoundTrip(),
640+
testPushWriterBlockBackpressureNoDuplicate(),
641+
testPushWriterBlockBackpressureWritevNoDuplicate(),
593642
testSequentialWrites(),
594643
testSyncCallbackDeferred(),
595644
testMinimalWriter(),

0 commit comments

Comments
 (0)