Skip to content

Commit dccdf31

Browse files
committed
Tests
1 parent f21ef57 commit dccdf31

File tree

2 files changed

+34
-63
lines changed

2 files changed

+34
-63
lines changed

packages/deno/src/utils/streaming.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,10 @@ function monitorStream(
8181
onDone: () => void,
8282
): ReadableStream<Uint8Array<ArrayBufferLike>> {
8383
const reader = stream.getReader();
84-
reader.closed.then(() => onDone(), () => onDone());
84+
reader.closed.then(
85+
() => onDone(),
86+
() => onDone(),
87+
);
8588
return new ReadableStream({
8689
async start(controller) {
8790
let result: ReadableStreamReadResult<Uint8Array<ArrayBufferLike>>;

packages/deno/test/streaming.test.ts

Lines changed: 30 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -2,57 +2,42 @@
22

33
import { assertEquals } from 'https://deno.land/std@0.212.0/assert/mod.ts';
44

5-
/**
6-
* Minimal reproduction of monitorStream to verify that reader.releaseLock()
7-
* after stream completion does not cause an unhandled promise rejection.
8-
*
9-
* Per the WHATWG Streams spec, releaseLock() rejects reader.closed.
10-
* Using .then(onDone, onDone) handles both the fulfilled and rejected cases
11-
* so the rejection is suppressed.
12-
*/
13-
function monitorStream(
14-
stream: ReadableStream<Uint8Array>,
15-
onDone: () => void,
16-
): ReadableStream<Uint8Array> {
17-
const reader = stream.getReader();
18-
reader.closed.then(() => onDone(), () => onDone());
19-
return new ReadableStream({
20-
async start(controller) {
21-
let result: ReadableStreamReadResult<Uint8Array>;
22-
do {
23-
result = await reader.read();
24-
if (result.value) {
25-
try {
26-
controller.enqueue(result.value);
27-
} catch (er) {
28-
controller.error(er);
29-
reader.releaseLock();
30-
return;
31-
}
32-
}
33-
} while (!result.done);
34-
controller.close();
35-
reader.releaseLock();
36-
},
37-
});
38-
}
5+
Deno.test('reader.closed.then(f, f) suppresses rejection when releaseLock is called on an open stream', async () => {
6+
// Reproduces the bug from GitHub issue #20177:
7+
// In monitorStream, reader.releaseLock() is called while the source stream
8+
// is still open (e.g. the error path when controller.enqueue() throws).
9+
// Per WHATWG Streams spec, this rejects reader.closed with a TypeError.
10+
// Using .then(onDone, onDone) handles both cases; .finally() would propagate
11+
// the rejection as unhandled.
3912

40-
Deno.test('monitorStream calls onDone and does not cause unhandled rejection after normal completion', async () => {
41-
let doneCalled = false;
13+
let onDoneCalled = false;
4214

43-
const source = new ReadableStream<Uint8Array>({
15+
const stream = new ReadableStream<Uint8Array>({
4416
start(controller) {
45-
controller.enqueue(new TextEncoder().encode('chunk1'));
46-
controller.enqueue(new TextEncoder().encode('chunk2'));
47-
controller.close();
17+
controller.enqueue(new TextEncoder().encode('data'));
18+
// intentionally not closing — stream stays open
4819
},
4920
});
5021

51-
const monitored = monitorStream(source, () => {
52-
doneCalled = true;
53-
});
22+
const reader = stream.getReader();
23+
24+
// This is the exact pattern from monitorStream (line 84 in streaming.ts).
25+
// With .finally(() => onDone()), this would propagate the rejection.
26+
reader.closed.then(
27+
() => {
28+
onDoneCalled = true;
29+
},
30+
() => {
31+
onDoneCalled = true;
32+
},
33+
);
34+
35+
await reader.read();
36+
37+
// This is what monitorStream does on the error path (line 98) when
38+
// controller.enqueue() throws — releaseLock while the source is still open.
39+
reader.releaseLock();
5440

55-
// Listen for unhandled rejections — the bug caused one here.
5641
let unhandledRejection: PromiseRejectionEvent | undefined;
5742
const handler = (e: PromiseRejectionEvent): void => {
5843
e.preventDefault();
@@ -61,28 +46,11 @@ Deno.test('monitorStream calls onDone and does not cause unhandled rejection aft
6146
globalThis.addEventListener('unhandledrejection', handler);
6247

6348
try {
64-
const reader = monitored.getReader();
65-
const chunks: string[] = [];
66-
const decoder = new TextDecoder();
67-
68-
let result: ReadableStreamReadResult<Uint8Array>;
69-
do {
70-
result = await reader.read();
71-
if (result.value) {
72-
chunks.push(decoder.decode(result.value));
73-
}
74-
} while (!result.done);
75-
reader.releaseLock();
76-
77-
assertEquals(chunks, ['chunk1', 'chunk2']);
78-
79-
// Give microtasks a chance to settle so any unhandled rejection fires.
8049
await new Promise(resolve => setTimeout(resolve, 50));
8150

82-
assertEquals(doneCalled, true, 'onDone callback should have been called');
51+
assertEquals(onDoneCalled, true, 'onDone should have been called via the rejection handler');
8352
assertEquals(unhandledRejection, undefined, 'should not have caused an unhandled promise rejection');
8453
} finally {
8554
globalThis.removeEventListener('unhandledrejection', handler);
8655
}
8756
});
88-

0 commit comments

Comments
 (0)