Skip to content

Commit 41e804d

Browse files
committed
stream: respect backpressure in node and web wrappers
1 parent 3b3dd93 commit 41e804d

4 files changed

Lines changed: 71 additions & 13 deletions

File tree

src/internal.d.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ interface RendererState {
2323
start: number;
2424
suspended: Suspended[];
2525
abortSignal?: AbortSignal | undefined;
26-
onWrite: (str: string) => void;
26+
onWrite: (str: string) => void | Promise<void>;
2727
onError?: RendererErrorHandler;
2828
}
2929

3030
interface RenderToChunksOptions {
3131
context?: any;
32-
onWrite: (str: string) => void;
32+
onWrite: (str: string) => void | Promise<void>;
3333
abortSignal?: AbortSignal;
3434
}

src/lib/chunked.js

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,18 @@ import { createInitScript, createSubtree } from './client.js';
1010
*/
1111
export async function renderToChunks(vnode, { context, onWrite, abortSignal }) {
1212
context = context || {};
13+
const deferredWritesReady = new Deferred();
14+
let deferredWriteQueue = Promise.resolve();
1315

1416
/** @type {RendererState} */
1517
const renderer = {
1618
start: Date.now(),
1719
abortSignal,
18-
onWrite,
20+
onWrite(str) {
21+
return (deferredWriteQueue = deferredWriteQueue
22+
.then(() => deferredWritesReady.promise)
23+
.then(() => onWrite(str)));
24+
},
1925
onError: handleError,
2026
suspended: []
2127
};
@@ -36,15 +42,16 @@ export async function renderToChunks(vnode, { context, onWrite, abortSignal }) {
3642
const initialWrite =
3743
docSuffixIndex !== -1 ? shell.slice(0, docSuffixIndex) : shell;
3844
const prefix = hasHtmlTag ? '<!DOCTYPE html>' : '';
39-
onWrite(prefix + initialWrite);
40-
onWrite('<div hidden>');
41-
onWrite(createInitScript(len));
45+
await onWrite(prefix + initialWrite);
46+
await onWrite('<div hidden>');
47+
await onWrite(createInitScript(len));
48+
deferredWritesReady.resolve();
4249
// We should keep checking all promises
4350
await forkPromises(renderer);
44-
onWrite('</div>');
45-
if (docSuffixIndex !== -1) onWrite(shell.slice(docSuffixIndex));
51+
await onWrite('</div>');
52+
if (docSuffixIndex !== -1) await onWrite(shell.slice(docSuffixIndex));
4653
} else {
47-
onWrite(shell);
54+
await onWrite(shell);
4855
}
4956
}
5057

@@ -98,7 +105,7 @@ function handleError(error, vnode, renderChild) {
98105
() => {
99106
if (abortSignal && abortSignal.aborted) return;
100107
const child = renderChild(vnode.props.children, vnode);
101-
if (child) this.onWrite(createSubtree(id, child));
108+
if (child) return this.onWrite(createSubtree(id, child));
102109
},
103110
// TODO: Abort and send hydration code snippet to client
104111
// to attempt to recover during hydration

src/stream-node.js

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,48 @@ export function renderToPipeableStream(vnode, options, context) {
2525

2626
const controller = new AbortController();
2727
const stream = new PassThrough();
28+
let waitingForDrain = null;
29+
30+
/**
31+
* @returns {Promise<void>}
32+
*/
33+
function waitForDrain() {
34+
if (waitingForDrain) return waitingForDrain;
35+
waitingForDrain = new Promise((resolve, reject) => {
36+
const cleanup = () => {
37+
stream.off('drain', onDrain);
38+
stream.off('close', onClose);
39+
stream.off('error', onError);
40+
waitingForDrain = null;
41+
};
42+
const onDrain = () => {
43+
cleanup();
44+
resolve();
45+
};
46+
const onClose = () => {
47+
cleanup();
48+
resolve();
49+
};
50+
const onError = (error) => {
51+
cleanup();
52+
reject(error);
53+
};
54+
55+
stream.on('drain', onDrain);
56+
stream.on('close', onClose);
57+
stream.on('error', onError);
58+
});
59+
return waitingForDrain;
60+
}
2861

2962
renderToChunks(vnode, {
3063
context,
3164
abortSignal: controller.signal,
32-
onWrite(s) {
33-
stream.write(encoder.encode(s));
65+
async onWrite(s) {
66+
if (stream.destroyed || stream.writableEnded) return;
67+
if (!stream.write(encoder.encode(s))) {
68+
await waitForDrain();
69+
}
3470
}
3571
})
3672
.then(() => {

src/stream.js

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,25 @@ export function renderToReadableStream(vnode, context) {
1414
const encoder = new TextEncoder('utf-8');
1515
const abortController = new AbortController();
1616
let canceled = false;
17+
/** @type {Deferred<void> | undefined} */
18+
let pullReady;
1719

1820
/** @type {RenderStream} */
1921
const stream = new ReadableStream({
2022
start(controller) {
2123
renderToChunks(vnode, {
2224
context,
2325
abortSignal: abortController.signal,
24-
onWrite(s) {
26+
async onWrite(s) {
27+
while (
28+
!canceled &&
29+
controller.desiredSize != null &&
30+
controller.desiredSize <= 0
31+
) {
32+
pullReady = pullReady || new Deferred();
33+
await pullReady.promise;
34+
pullReady = undefined;
35+
}
2536
if (canceled) return;
2637
controller.enqueue(encoder.encode(s));
2738
}
@@ -39,8 +50,12 @@ export function renderToReadableStream(vnode, context) {
3950
allReady.reject(error);
4051
});
4152
},
53+
pull() {
54+
if (pullReady) pullReady.resolve();
55+
},
4256
cancel(reason) {
4357
canceled = true;
58+
if (pullReady) pullReady.resolve();
4459
abortController.abort(reason);
4560
}
4661
});

0 commit comments

Comments
 (0)