Skip to content

Commit 9cff872

Browse files
committed
stream: preserve terminated Readable adapter state
1 parent 01c0fe8 commit 9cff872

2 files changed

Lines changed: 96 additions & 28 deletions

File tree

lib/internal/webstreams/adapters.js

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,15 @@ const {
4545
} = require('stream');
4646

4747
const {
48+
isClosed,
4849
isDestroyed,
4950
isReadable,
51+
isReadableErrored,
52+
isReadableFinished,
53+
isReadableNodeStream,
5054
isWritable,
5155
isWritableEnded,
56+
isWritableErrored,
5257
} = require('internal/streams/utils');
5358

5459
const {
@@ -476,34 +481,27 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
476481

477482
const isBYOB = options.type === 'bytes';
478483

479-
if (isDestroyed(streamReadable) || !isReadable(streamReadable)) {
480-
const readable = new ReadableStream();
481-
readable.cancel();
482-
return readable;
484+
if (!isReadable(streamReadable)) {
485+
const error = isWritableErrored(streamReadable) ?? isReadableErrored(streamReadable) ?? (
486+
isClosed(streamReadable) &&
487+
isReadableNodeStream(streamReadable, true) &&
488+
!isReadableFinished(streamReadable, false) ? new ERR_STREAM_PREMATURE_CLOSE() : null
489+
);
490+
return new ReadableStream({
491+
type: isBYOB ? 'bytes' : undefined,
492+
start(controller) {
493+
if (error != null) {
494+
controller.error(handleKnownInternalErrors(error));
495+
} else {
496+
controller.close();
497+
}
498+
},
499+
});
483500
}
484501

485502
const objectMode = streamReadable.readableObjectMode;
486503
const highWaterMark = streamReadable.readableHighWaterMark;
487504

488-
const evaluateStrategyOrFallback = (strategy) => {
489-
// If the stream is BYOB, we only use highWaterMark
490-
if (isBYOB)
491-
return { highWaterMark };
492-
// If there is a strategy available, use it
493-
if (strategy)
494-
return strategy;
495-
496-
if (objectMode) {
497-
// When running in objectMode explicitly but no strategy, we just fall
498-
// back to CountQueuingStrategy
499-
return new CountQueuingStrategy({ highWaterMark });
500-
}
501-
502-
return new ByteLengthQueuingStrategy({ highWaterMark });
503-
};
504-
505-
const strategy = evaluateStrategyOrFallback(options?.strategy);
506-
507505
let controller;
508506
let wasCanceled = false;
509507

@@ -540,6 +538,10 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
540538

541539
streamReadable.on('data', onData);
542540

541+
const strategy =
542+
isBYOB ? { highWaterMark } :
543+
options.strategy ?? new (objectMode ? CountQueuingStrategy : ByteLengthQueuingStrategy)({ highWaterMark });
544+
543545
return new ReadableStream({
544546
type: isBYOB ? 'bytes' : undefined,
545547
start(c) { controller = c; },

test/parallel/test-whatwg-webstreams-adapters-to-readablestream.js

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
const common = require('../common');
55

66
const assert = require('assert');
7+
const { once } = require('events');
78

89
const {
910
newReadableStreamFromStreamReadable,
1011
} = require('internal/webstreams/adapters');
1112

1213
const {
1314
Duplex,
15+
PassThrough,
1416
Readable,
1517
} = require('stream');
1618

@@ -188,11 +190,75 @@ const {
188190
}
189191

190192
{
191-
const readable = new Readable();
192-
readable.destroy();
193-
const readableStream = newReadableStreamFromStreamReadable(readable);
194-
const reader = readableStream.getReader();
195-
reader.closed.then(common.mustCall());
193+
/**
194+
* Runs the same assertion across finalize-before/after and
195+
* default/BYOB adapter creation orders.
196+
* @param {(readable: Readable) => void | Promise<void>} finalize
197+
* Finalizes the source stream before or after adaptation.
198+
* @param {(readAndAssert: () => Promise<void>, reader: ReadableStreamReader) => Promise<void>} postAssert
199+
* Asserts the resulting web stream state for the current case.
200+
*/
201+
function testConfluence(finalize, postAssert) {
202+
const cases = [false, true].flatMap((finalizeFirst) => [false, true].map((isBYOB) => ({ finalizeFirst, isBYOB })));
203+
Promise.all(cases.map(async (case_) => {
204+
try {
205+
const { isBYOB } = case_;
206+
const readable = new PassThrough();
207+
if (case_.finalizeFirst) {
208+
await finalize(readable);
209+
}
210+
/** @type {ReadableStream} */
211+
const readableStream = newReadableStreamFromStreamReadable(readable, { type: isBYOB ? 'bytes' : undefined });
212+
const reader = readableStream.getReader({ mode: isBYOB ? 'byob' : undefined });
213+
if (!case_.finalizeFirst) {
214+
await finalize(readable);
215+
}
216+
217+
const readAndAssert = common.mustCall(() => reader.read(isBYOB ? new Uint8Array(1) : undefined).then((result) => {
218+
assert.deepStrictEqual(result, { value: isBYOB ? new Uint8Array(0) : undefined, done: true });
219+
}));
220+
await postAssert(readAndAssert, reader);
221+
} catch (cause) {
222+
throw new Error(`Case failed: ${JSON.stringify(case_)}`, { cause });
223+
}
224+
})).then(common.mustCall());
225+
}
226+
const error = new Error('boom');
227+
// Ending the readable without an error => closes the readableStream without an error
228+
testConfluence(
229+
async (readable) => {
230+
readable.resume();
231+
readable.end();
232+
await once(readable, 'end');
233+
},
234+
common.mustCall(async (readAndAssert, reader) => {
235+
await readAndAssert();
236+
await reader.closed;
237+
}, 4)
238+
);
239+
// Prematurely destroying the stream.Readable without an error
240+
// => errors the ReadableStream with a premature close error
241+
testConfluence(
242+
(readable) => readable.destroy(),
243+
common.mustCall(async (readAndAssert, reader) => {
244+
const errorPredicate = { code: 'ABORT_ERR' };
245+
await assert.rejects(readAndAssert(), errorPredicate);
246+
await assert.rejects(reader.closed, errorPredicate);
247+
}, 4)
248+
);
249+
// Destroying the readable with an error => errors the readableStream
250+
testConfluence(
251+
common.mustCall((readable) => {
252+
readable.on('error', common.mustCall((reason) => {
253+
assert.strictEqual(reason, error);
254+
}));
255+
readable.destroy(error);
256+
}, 4),
257+
common.mustCall(async (readAndAssert, reader) => {
258+
await assert.rejects(readAndAssert(), error);
259+
await assert.rejects(reader.closed, error);
260+
}, 4)
261+
);
196262
}
197263

198264
{

0 commit comments

Comments
 (0)