diff --git a/lib/index.js b/lib/index.js index 20277f2..47e26e8 100755 --- a/lib/index.js +++ b/lib/index.js @@ -483,6 +483,7 @@ function pipeReadable(xs, onFinish, stream) { xs.pipe(stream); // TODO: Replace with onDestroy in v3. + // ...although it appears `_destructors` are called `onEnd`, not just `onDestroy`? stream._destructors.push(unbind); function streamEndCb(error) { @@ -495,12 +496,12 @@ function pipeReadable(xs, onFinish, stream) { } if (error == null || endOnError) { - unbind(); + unbind(error); stream.end(); } } - function unbind() { + function unbind(error) { if (unbound) { return; } @@ -514,6 +515,16 @@ function pipeReadable(xs, onFinish, stream) { if (xs.unpipe) { xs.unpipe(stream); } + + // Destroy the wrapped `Readable` stream if it's not yet ended. + // i.e. this was ended externally, perhaps by a consumer. + // TODO: Use something other than `readableEnded` as it was introduced recently (Node v12.9.0) + if (!xs.readableEnded) { + // NOTE: Not sure whether `error` is necessary here as it'd only ever come from + // the unbind call in `streamEndCb` which originates from the `xs` in the + // first place, so it'd prob be able to handle itself? + xs.destroy(error); + } } } @@ -1327,6 +1338,8 @@ Stream.prototype.pipe = function (dest, options) { */ Stream.prototype.destroy = function () { + var source = this.source; + if (this.ended) { return; } @@ -1336,6 +1349,19 @@ Stream.prototype.destroy = function () { } this._onEnd(); + + if (!this._is_observer && source) { + // TODO: Should we only destroy the source if nothing else is consuming it, i.e. all sibling `_consumers` are already + // destroyed / ended? + // i.e. + // var s = _() + // var s1 = _().fork() + // var s2 = _().fork() + // s1.destroy() // s still alive + // s2.destroy() // s now destroyed + // and ignore `_observers`? + source.destroy(); + } }; /** @@ -1618,7 +1644,7 @@ Stream.prototype.pull = function (f) { * * Only call this function on streams that were constructed with no source * (i.e., with `_()`). - + * @id write * @section Stream Objects * @name Stream.write(x) @@ -2426,7 +2452,7 @@ var objectOnly = _.curry(function(strategy, x) { * {breed: 'labrador', name: 'Rocky', age: 3}, * {breed: 'german-shepherd', name: 'Waffles', age: 9} * ]; - + * _(dogs).pickBy(function (key, value) { * return value > 4; * }).toArray(function (xs) {