Skip to content

Commit 3fadeba

Browse files
committed
stream: pipeline should only destroy un-finished streams.
This PR logically reverts nodejs#31940 which has caused lots of unnecessary breakage in the ecosystem. This PR also aligns better with the actual documented behavior: `stream.pipeline()` will call `stream.destroy(err)` on all streams except: * `Readable` streams which have emitted `'end'` or `'close'`. * `Writable` streams which have emitted `'finish'` or `'close'`. The behavior introduced in nodejs#31940 was much more aggressive in terms of destroying streams. This was good for avoiding potential resources leaks however breaks some common assumputions in legacy streams. Furthermore, it makes the code simpler and removes some hacks. Fixes: nodejs#32954 Fixes: nodejs#32955
1 parent 8a3fa32 commit 3fadeba

File tree

2 files changed

+19
-39
lines changed

2 files changed

+19
-39
lines changed

lib/internal/streams/pipeline.js

Lines changed: 18 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -25,43 +25,18 @@ let EE;
2525
let PassThrough;
2626
let createReadableStreamAsyncIterator;
2727

28-
function isIncoming(stream) {
29-
return (
30-
stream.socket &&
31-
typeof stream.complete === 'boolean' &&
32-
ArrayIsArray(stream.rawTrailers) &&
33-
ArrayIsArray(stream.rawHeaders)
34-
);
35-
}
36-
37-
function isOutgoing(stream) {
38-
return (
39-
stream.socket &&
40-
typeof stream.setHeader === 'function'
41-
);
42-
}
43-
44-
function destroyer(stream, reading, writing, final, callback) {
45-
const _destroy = once((err) => {
46-
if (!err && (isIncoming(stream) || isOutgoing(stream))) {
47-
// http/1 request objects have a coupling to their response and should
48-
// not be prematurely destroyed. Assume they will handle their own
49-
// lifecycle.
50-
return callback();
51-
}
52-
53-
if (!err && reading && !writing && stream.writable) {
54-
return callback();
55-
}
28+
function destroyer(stream, reading, writing, callback) {
29+
callback = once(callback);
5630

57-
if (err || !final || !stream.readable) {
58-
destroyImpl.destroyer(stream, err);
59-
}
60-
callback(err);
31+
let finished = false;
32+
stream.on('close', () => {
33+
finished = true;
6134
});
6235

6336
if (eos === undefined) eos = require('internal/streams/end-of-stream');
6437
eos(stream, { readable: reading, writable: writing }, (err) => {
38+
finished = !err;
39+
6540
const rState = stream._readableState;
6641
if (
6742
err &&
@@ -78,14 +53,19 @@ function destroyer(stream, reading, writing, final, callback) {
7853
// eos will only fail with premature close on the reading side for
7954
// duplex streams.
8055
stream
81-
.once('end', _destroy)
82-
.once('error', _destroy);
56+
.once('end', callback)
57+
.once('error', callback);
8358
} else {
84-
_destroy(err);
59+
callback(err);
8560
}
8661
});
8762

88-
return (err) => _destroy(err || new ERR_STREAM_DESTROYED('pipe'));
63+
return once((err) => {
64+
if (!finished) {
65+
destroyImpl.destroyer(stream, err);
66+
}
67+
callback(err || new ERR_STREAM_DESTROYED('pipe'));
68+
});
8969
}
9070

9171
function popCallback(streams) {
@@ -204,7 +184,7 @@ function pipeline(...streams) {
204184

205185
if (isStream(stream)) {
206186
finishCount++;
207-
destroys.push(destroyer(stream, reading, writing, !reading, finish));
187+
destroys.push(destroyer(stream, reading, writing, finish));
208188
}
209189

210190
if (i === 0) {
@@ -262,7 +242,7 @@ function pipeline(...streams) {
262242
ret = pt;
263243

264244
finishCount++;
265-
destroys.push(destroyer(ret, false, true, true, finish));
245+
destroys.push(destroyer(ret, false, true, finish));
266246
}
267247
} else if (isStream(stream)) {
268248
if (isReadable(ret)) {

test/parallel/test-stream-pipeline.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -916,7 +916,7 @@ const { promisify } = require('util');
916916
const src = new PassThrough({ autoDestroy: false });
917917
const dst = new PassThrough({ autoDestroy: false });
918918
pipeline(src, dst, common.mustCall(() => {
919-
assert.strictEqual(src.destroyed, true);
919+
assert.strictEqual(src.destroyed, false);
920920
assert.strictEqual(dst.destroyed, false);
921921
}));
922922
src.end();

0 commit comments

Comments
 (0)