Skip to content

Commit f21ef57

Browse files
committed
fix(deno): Handle reader.closed rejection from releaseLock() in streaming
Replace `reader.closed.finally(() => onDone())` with `reader.closed.then(() => onDone(), () => onDone())` in `monitorStream`. Per the WHATWG Streams spec, `reader.releaseLock()` rejects `reader.closed` when the promise is still pending. `.finally()` propagates that rejection as an unhandled promise rejection, while `.then(f, f)` suppresses it by handling both the fulfilled and rejected cases. Closes: #20177
1 parent 86dc30a commit f21ef57

File tree

2 files changed

+89
-2
lines changed

2 files changed

+89
-2
lines changed

packages/deno/src/utils/streaming.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,7 @@ function monitorStream(
8181
onDone: () => void,
8282
): ReadableStream<Uint8Array<ArrayBufferLike>> {
8383
const reader = stream.getReader();
84-
// oxlint-disable-next-line typescript/no-floating-promises
85-
reader.closed.finally(() => onDone());
84+
reader.closed.then(() => onDone(), () => onDone());
8685
return new ReadableStream({
8786
async start(controller) {
8887
let result: ReadableStreamReadResult<Uint8Array<ArrayBufferLike>>;
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// <reference lib="deno.ns" />
2+
3+
import { assertEquals } from 'https://deno.land/std@0.212.0/assert/mod.ts';
4+
5+
/**
6+
* Minimal reproduction of monitorStream to verify that reader.releaseLock()
7+
* after stream completion does not cause an unhandled promise rejection.
8+
*
9+
* Per the WHATWG Streams spec, releaseLock() rejects reader.closed.
10+
* Using .then(onDone, onDone) handles both the fulfilled and rejected cases
11+
* so the rejection is suppressed.
12+
*/
13+
function monitorStream(
14+
stream: ReadableStream<Uint8Array>,
15+
onDone: () => void,
16+
): ReadableStream<Uint8Array> {
17+
const reader = stream.getReader();
18+
reader.closed.then(() => onDone(), () => onDone());
19+
return new ReadableStream({
20+
async start(controller) {
21+
let result: ReadableStreamReadResult<Uint8Array>;
22+
do {
23+
result = await reader.read();
24+
if (result.value) {
25+
try {
26+
controller.enqueue(result.value);
27+
} catch (er) {
28+
controller.error(er);
29+
reader.releaseLock();
30+
return;
31+
}
32+
}
33+
} while (!result.done);
34+
controller.close();
35+
reader.releaseLock();
36+
},
37+
});
38+
}
39+
40+
Deno.test('monitorStream calls onDone and does not cause unhandled rejection after normal completion', async () => {
41+
let doneCalled = false;
42+
43+
const source = new ReadableStream<Uint8Array>({
44+
start(controller) {
45+
controller.enqueue(new TextEncoder().encode('chunk1'));
46+
controller.enqueue(new TextEncoder().encode('chunk2'));
47+
controller.close();
48+
},
49+
});
50+
51+
const monitored = monitorStream(source, () => {
52+
doneCalled = true;
53+
});
54+
55+
// Listen for unhandled rejections — the bug caused one here.
56+
let unhandledRejection: PromiseRejectionEvent | undefined;
57+
const handler = (e: PromiseRejectionEvent): void => {
58+
e.preventDefault();
59+
unhandledRejection = e;
60+
};
61+
globalThis.addEventListener('unhandledrejection', handler);
62+
63+
try {
64+
const reader = monitored.getReader();
65+
const chunks: string[] = [];
66+
const decoder = new TextDecoder();
67+
68+
let result: ReadableStreamReadResult<Uint8Array>;
69+
do {
70+
result = await reader.read();
71+
if (result.value) {
72+
chunks.push(decoder.decode(result.value));
73+
}
74+
} while (!result.done);
75+
reader.releaseLock();
76+
77+
assertEquals(chunks, ['chunk1', 'chunk2']);
78+
79+
// Give microtasks a chance to settle so any unhandled rejection fires.
80+
await new Promise(resolve => setTimeout(resolve, 50));
81+
82+
assertEquals(doneCalled, true, 'onDone callback should have been called');
83+
assertEquals(unhandledRejection, undefined, 'should not have caused an unhandled promise rejection');
84+
} finally {
85+
globalThis.removeEventListener('unhandledrejection', handler);
86+
}
87+
});
88+

0 commit comments

Comments
 (0)