Skip to content

Commit 25ae47b

Browse files
committed
stream: fix nested compose error propagation
1 parent 449a93a commit 25ae47b

File tree

3 files changed

+50
-9
lines changed

3 files changed

+50
-9
lines changed

lib/internal/streams/duplexify.js

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,17 @@ function fromAsyncGen(fn) {
239239
_resolve({ done: true, cb });
240240
},
241241
destroy(err, cb) {
242-
ac.abort();
242+
ac.abort(err);
243+
244+
// If the source async iterator is waiting for the next write/final
245+
// signal, unblock it so the readable side can observe the abort and
246+
// finish destroying.
247+
if (resolve !== null) {
248+
const _resolve = resolve;
249+
resolve = null;
250+
_resolve({ done: true, cb() {} });
251+
}
252+
243253
cb(err);
244254
},
245255
};

lib/internal/streams/from.js

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@ const {
88
const { Buffer } = require('buffer');
99

1010
const {
11-
ERR_INVALID_ARG_TYPE,
12-
ERR_STREAM_NULL_VALUES,
13-
} = require('internal/errors').codes;
11+
aggregateTwoErrors,
12+
codes: {
13+
ERR_INVALID_ARG_TYPE,
14+
ERR_STREAM_NULL_VALUES,
15+
},
16+
} = require('internal/errors');
1417

1518
function from(Readable, iterable, opts) {
1619
let iterator;
@@ -43,6 +46,7 @@ function from(Readable, iterable, opts) {
4346
// TODO(ronag): What options should be allowed?
4447
...opts,
4548
});
49+
const originalDestroy = readable._destroy;
4650

4751
// Flag to protect against _read
4852
// being called before last iteration completion.
@@ -64,11 +68,18 @@ function from(Readable, iterable, opts) {
6468
};
6569

6670
readable._destroy = function(error, cb) {
67-
PromisePrototypeThen(
68-
close(error),
69-
() => process.nextTick(cb, error), // nextTick is here in case cb throws
70-
(e) => process.nextTick(cb, e || error),
71-
);
71+
originalDestroy.call(this, error, (destroyError) => {
72+
const combinedError = destroyError || error;
73+
PromisePrototypeThen(
74+
close(combinedError),
75+
// nextTick is here in case cb throws
76+
() => process.nextTick(cb, combinedError),
77+
(closeError) => process.nextTick(
78+
cb,
79+
aggregateTwoErrors(combinedError, closeError),
80+
),
81+
);
82+
});
7283
};
7384

7485
async function close(error) {

test/parallel/test-stream-readable-compose.js

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,26 @@ const assert = require('assert');
116116
).then(common.mustCall());
117117
}
118118

119+
{
120+
// Errors from nested `.compose()` calls should propagate instead of hanging.
121+
const stream = Readable.from(['hello'])
122+
.compose(async function *(source) {
123+
for await (const chunk of source) {
124+
throw new Error(`boom: ${chunk}`);
125+
}
126+
})
127+
.compose(async function *(source) {
128+
for await (const chunk of source) {
129+
yield chunk;
130+
}
131+
});
132+
133+
assert.rejects(
134+
stream.toArray(),
135+
/boom: hello/,
136+
).then(common.mustCall());
137+
}
138+
119139
{
120140
// AbortSignal
121141
const ac = new AbortController();

0 commit comments

Comments
 (0)