Skip to content

Commit 3c1376e

Browse files
committed
stream: propagate destruction in duplexPair
Ensure destroying one side of a duplexPair triggers destruction of the other side via process.nextTick(). To avoid a breaking change and unhandled 'error' events (e.g., in HTTP tests), the error object is not propagated; only the destruction signal is sent.
1 parent 02c47ad commit 3c1376e

File tree

2 files changed

+33
-10
lines changed

2 files changed

+33
-10
lines changed

lib/internal/streams/duplexpair.js

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,25 @@ class DuplexSide extends Duplex {
5353

5454

5555
_destroy(err, callback) {
56-
if (err) {
57-
// Error case: tell the other side to also destroy with that error.
58-
this.#otherSide.destroy(err);
59-
} else if (this.#otherSide && !this.#otherSide.destroyed) {
60-
// Graceful close case (destroy() without error):
61-
// send an EOF to the other side's readable end if it hasn't already closed.
62-
this.#otherSide.push(null);
56+
const otherSide = this.#otherSide;
57+
58+
if (otherSide !== null && !otherSide.destroyed) {
59+
// Use nextTick to avoid crashing the current execution stack (like HTTP parser)
60+
process.nextTick(() => {
61+
if (otherSide.destroyed) return;
62+
63+
if (err) {
64+
// Destroy the other side, without passing the 'err' object.
65+
// This closes the other side gracefully so it doesn't hang,
66+
// but prevents the "Unhandled error" crash.
67+
otherSide.destroy();
68+
} else {
69+
// Standard graceful close
70+
otherSide.push(null);
71+
}
72+
});
6373
}
74+
6475
callback(err);
6576
}
6677
}

test/parallel/test-duplex-error.js

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,27 @@ const { duplexPair } = require('stream');
66

77
const [sideA, sideB] = duplexPair();
88

9+
// Side A should receive the error because we called .destroy(err) on it.
910
sideA.on('error', common.mustCall((err) => {
1011
assert.strictEqual(err.message, 'Simulated error');
1112
}));
1213

13-
sideB.on('error', common.mustCall((err) => {
14-
assert.strictEqual(err.message, 'Simulated error');
14+
// Side B should NOT necessarily emit an error (to avoid crashing
15+
// existing code), but it MUST be destroyed.
16+
sideB.on('error', common.mustNotCall('Side B should not emit an error event'));
17+
18+
sideB.on('close', common.mustCall(() => {
19+
assert.strictEqual(sideB.destroyed, true);
1520
}));
1621

1722
sideA.resume();
1823
sideB.resume();
1924

20-
sideB.destroy(new Error('Simulated error'));
25+
// Trigger the destruction
26+
sideA.destroy(new Error('Simulated error'));
27+
28+
// Check the state in the next tick to allow nextTick/microtasks to run
29+
setImmediate(common.mustCall(() => {
30+
assert.strictEqual(sideA.destroyed, true);
31+
assert.strictEqual(sideB.destroyed, true);
32+
}));

0 commit comments

Comments
 (0)