Skip to content

Commit 01c0fe8

Browse files
committed
stream: end Readable.toWeb on half-open duplex
1 parent 8199f9c commit 01c0fe8

2 files changed

Lines changed: 40 additions & 15 deletions

File tree

lib/internal/webstreams/adapters.js

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,9 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
518518

519519
streamReadable.pause();
520520

521-
const cleanup = eos(streamReadable, (error) => {
521+
// When adapting a Duplex as a ReadableStream, readable completion should not
522+
// wait for a half-open writable side to finish as well.
523+
const cleanup = eos(streamReadable, { writable: false }, (error) => {
522524
error = handleKnownInternalErrors(error);
523525

524526
cleanup();
@@ -532,24 +534,15 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
532534
return;
533535
}
534536
controller.close();
537+
if (isBYOB)
538+
controller.byobRequest?.respond(0);
535539
});
536540

537541
streamReadable.on('data', onData);
538542

539543
return new ReadableStream({
540544
type: isBYOB ? 'bytes' : undefined,
541-
start(c) {
542-
controller = c;
543-
if (isBYOB) {
544-
streamReadable.once('end', () => {
545-
// close the controller
546-
controller.close();
547-
// And unlock the last BYOB read request
548-
controller.byobRequest?.respond(0);
549-
wasCanceled = true;
550-
});
551-
}
552-
},
545+
start(c) { controller = c; },
553546

554547
pull() { streamReadable.resume(); },
555548

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
'use strict';
2-
require('../common');
3-
const { Readable } = require('stream');
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const { Duplex, Readable } = require('stream');
5+
const { setTimeout: delay } = require('timers/promises');
46

57
{
68
const r = Readable.from([]);
@@ -10,3 +12,33 @@ const { Readable } = require('stream');
1012
const reader = Readable.toWeb(r).getReader();
1113
reader.read();
1214
}
15+
16+
{
17+
const duplex = new Duplex({
18+
read() {
19+
this.push(Buffer.from('x'));
20+
this.push(null);
21+
},
22+
write(_chunk, _encoding, callback) {
23+
callback();
24+
},
25+
});
26+
27+
const reader = Readable.toWeb(duplex).getReader();
28+
29+
(async () => {
30+
const result = await reader.read();
31+
assert.deepStrictEqual(result, {
32+
value: new Uint8Array(Buffer.from('x')),
33+
done: false,
34+
});
35+
36+
const closeResult = await Promise.race([
37+
reader.read(),
38+
delay(common.platformTimeout(100)).then(() => 'timeout'),
39+
]);
40+
41+
assert.notStrictEqual(closeResult, 'timeout');
42+
assert.deepStrictEqual(closeResult, { value: undefined, done: true });
43+
})().then(common.mustCall());
44+
}

0 commit comments

Comments
 (0)