Skip to content

Commit 68e5f87

Browse files
authored
stream: propagate destruction in duplexPair
Ensure destroying one side of a duplexPair triggers destruction of the other side via process.nextTick(). Only the destruction signal is sent to avoid breaking changes. Fixes: #61015 PR-URL: #61098 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Gürgün Dayıoğlu <hey@gurgun.day> Reviewed-By: René <contact.9a5d6388@renegade334.me.uk>
1 parent 511a57a commit 68e5f87

File tree

2 files changed

+57
-1
lines changed

2 files changed

+57
-1
lines changed

lib/internal/streams/duplexpair.js

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,37 @@ class DuplexSide extends Duplex {
5050
this.#otherSide.on('end', callback);
5151
this.#otherSide.push(null);
5252
}
53+
54+
55+
_destroy(err, callback) {
56+
const otherSide = this.#otherSide;
57+
58+
if (otherSide !== null && !otherSide.destroyed) {
59+
// Use nextTick to avoid crashing the current execution stack (like HTTP parser)
60+
process.nextTick(() => {
61+
if (otherSide.destroyed) return;
62+
63+
if (err) {
64+
// Destroy the other side, without passing the 'err' object.
65+
// This closes the other side gracefully so it doesn't hang,
66+
// but prevents the "Unhandled error" crash.
67+
otherSide.destroy();
68+
} else {
69+
// Standard graceful close
70+
otherSide.push(null);
71+
}
72+
});
73+
}
74+
75+
callback(err);
76+
}
5377
}
5478

5579
function duplexPair(options) {
5680
const side0 = new DuplexSide(options);
5781
const side1 = new DuplexSide(options);
5882
side0[kInitOtherSide](side1);
5983
side1[kInitOtherSide](side0);
60-
return [ side0, side1 ];
84+
return [side0, side1];
6185
}
6286
module.exports = duplexPair;

test/parallel/test-duplex-error.js

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const { duplexPair } = require('stream');
6+
7+
const [sideA, sideB] = duplexPair();
8+
9+
// Side A should receive the error because we called .destroy(err) on it.
10+
sideA.on('error', common.mustCall((err) => {
11+
assert.strictEqual(err.message, 'Simulated error');
12+
}));
13+
14+
// Side B should NOT necessarily emit an error (to avoid crashing
15+
// existing code), but it MUST be destroyed.
16+
sideB.on('error', common.mustNotCall('Side B should not emit an error event'));
17+
18+
sideB.on('close', common.mustCall(() => {
19+
assert.strictEqual(sideB.destroyed, true);
20+
}));
21+
22+
sideA.resume();
23+
sideB.resume();
24+
25+
// Trigger the destruction
26+
sideA.destroy(new Error('Simulated error'));
27+
28+
// Check the state in the next tick to allow nextTick/microtasks to run
29+
setImmediate(common.mustCall(() => {
30+
assert.strictEqual(sideA.destroyed, true);
31+
assert.strictEqual(sideB.destroyed, true);
32+
}));

0 commit comments

Comments
 (0)