From 8f6698f69708269faa4535e67ea21356e96e6817 Mon Sep 17 00:00:00 2001 From: Richard Scarrott Date: Sun, 12 Jan 2020 10:53:15 +0000 Subject: [PATCH 1/3] [#691] Propagate destroy from consumers to sources and the wrapped node Readable stream --- .vscode/settings.json | 3 +++ lib/index.js | 26 ++++++++++++++++++++++---- 2 files changed, 25 insertions(+), 4 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..7c2feb7 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "editor.formatOnSave": false +} diff --git a/lib/index.js b/lib/index.js index 20277f2..7097863 100755 --- a/lib/index.js +++ b/lib/index.js @@ -483,6 +483,7 @@ function pipeReadable(xs, onFinish, stream) { xs.pipe(stream); // TODO: Replace with onDestroy in v3. + // ...although it appears `_destructors` are called `onEnd`, not just `onDestroy`? stream._destructors.push(unbind); function streamEndCb(error) { @@ -495,12 +496,12 @@ function pipeReadable(xs, onFinish, stream) { } if (error == null || endOnError) { - unbind(); + unbind(error); stream.end(); } } - function unbind() { + function unbind(error) { if (unbound) { return; } @@ -514,6 +515,16 @@ function pipeReadable(xs, onFinish, stream) { if (xs.unpipe) { xs.unpipe(stream); } + + // Destroy the wrapped `Readable` stream if it's not yet ended. + // i.e. this was ended externally, perhaps by a consumer. + // TODO: Use something other than `readableEnded` as it was introduced recently (Node v12.9.0) + if (!xs.readableEnded) { + // NOTE: Not sure whether `error` is necessary here as it'd only ever come from + // the unbind call in `streamEndCb` which originates from the `xs` in the + // first place, so it'd prob be able to handle itself? + xs.destroy(error); + } } } @@ -1327,6 +1338,8 @@ Stream.prototype.pipe = function (dest, options) { */ Stream.prototype.destroy = function () { + var source = this.source; + if (this.ended) { return; } @@ -1336,6 +1349,10 @@ Stream.prototype.destroy = function () { } this._onEnd(); + + if (!this._is_observer && source) { + source.destroy(); + } }; /** @@ -1618,7 +1635,7 @@ Stream.prototype.pull = function (f) { * * Only call this function on streams that were constructed with no source * (i.e., with `_()`). - + * @id write * @section Stream Objects * @name Stream.write(x) @@ -1748,6 +1765,7 @@ Stream.prototype.fork = function () { var s = new Stream(); s.id = 'fork:' + s.id; + // TODO: Could this just call into `this._addConsumer`? s.source = this; this._consumers.push(s); this._checkBackPressure(); @@ -2426,7 +2444,7 @@ var objectOnly = _.curry(function(strategy, x) { * {breed: 'labrador', name: 'Rocky', age: 3}, * {breed: 'german-shepherd', name: 'Waffles', age: 9} * ]; - + * _(dogs).pickBy(function (key, value) { * return value > 4; * }).toArray(function (xs) { From 41818c96b435ac22ba0df036fdd896455e82ca09 Mon Sep 17 00:00:00 2001 From: Richard Scarrott Date: Sun, 12 Jan 2020 11:42:37 +0000 Subject: [PATCH 2/3] Fix up --- lib/index.js | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/lib/index.js b/lib/index.js index 7097863..47e26e8 100755 --- a/lib/index.js +++ b/lib/index.js @@ -1351,6 +1351,15 @@ Stream.prototype.destroy = function () { this._onEnd(); if (!this._is_observer && source) { + // TODO: Should we only destroy the source if nothing else is consuming it, i.e. all sibling `_consumers` are already + // destroyed / ended? + // i.e. + // var s = _() + // var s1 = _().fork() + // var s2 = _().fork() + // s1.destroy() // s still alive + // s2.destroy() // s now destroyed + // and ignore `_observers`? source.destroy(); } }; @@ -1765,7 +1774,6 @@ Stream.prototype.fork = function () { var s = new Stream(); s.id = 'fork:' + s.id; - // TODO: Could this just call into `this._addConsumer`? s.source = this; this._consumers.push(s); this._checkBackPressure(); From fb6815684279954b714ede9dfc18fe680deb4488 Mon Sep 17 00:00:00 2001 From: Richard Scarrott Date: Sun, 12 Jan 2020 11:43:13 +0000 Subject: [PATCH 3/3] Fix up --- .vscode/settings.json | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 7c2feb7..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "editor.formatOnSave": false -}