diff --git a/src/internal.d.ts b/src/internal.d.ts index b4bbec80..559b2f32 100644 --- a/src/internal.d.ts +++ b/src/internal.d.ts @@ -23,13 +23,12 @@ interface RendererState { start: number; suspended: Suspended[]; abortSignal?: AbortSignal | undefined; - onWrite: (str: string) => void; + onWrite: (str: string) => void | Promise; onError?: RendererErrorHandler; } interface RenderToChunksOptions { context?: any; - onError?: (error: any) => void; - onWrite: (str: string) => void; + onWrite: (str: string) => void | Promise; abortSignal?: AbortSignal; } diff --git a/src/lib/chunked.js b/src/lib/chunked.js index 48aca958..21221319 100644 --- a/src/lib/chunked.js +++ b/src/lib/chunked.js @@ -10,12 +10,18 @@ import { createInitScript, createSubtree } from './client.js'; */ export async function renderToChunks(vnode, { context, onWrite, abortSignal }) { context = context || {}; + const deferredWritesReady = new Deferred(); + let deferredWriteQueue = Promise.resolve(); /** @type {RendererState} */ const renderer = { start: Date.now(), abortSignal, - onWrite, + onWrite(str) { + return (deferredWriteQueue = deferredWriteQueue + .then(() => deferredWritesReady.promise) + .then(() => onWrite(str))); + }, onError: handleError, suspended: [] }; @@ -36,15 +42,16 @@ export async function renderToChunks(vnode, { context, onWrite, abortSignal }) { const initialWrite = docSuffixIndex !== -1 ? shell.slice(0, docSuffixIndex) : shell; const prefix = hasHtmlTag ? '' : ''; - onWrite(prefix + initialWrite); - onWrite(''); + if (docSuffixIndex !== -1) await onWrite(shell.slice(docSuffixIndex)); } else { - onWrite(shell); + await onWrite(shell); } } @@ -88,27 +95,35 @@ function handleError(error, vnode, renderChild) { const race = new Deferred(); const abortSignal = this.abortSignal; + /** @type {(() => void) | undefined} */ + let onAbort; if (abortSignal) { - // @ts-ignore 2554 - implicit undefined arg - if (abortSignal.aborted) race.resolve(); - else abortSignal.addEventListener('abort', race.resolve); + onAbort = () => race.resolve(); + if (abortSignal.aborted) onAbort(); + else abortSignal.addEventListener('abort', onAbort, { once: true }); } const promise = error.then( () => { if (abortSignal && abortSignal.aborted) return; const child = renderChild(vnode.props.children, vnode); - if (child) this.onWrite(createSubtree(id, child)); + if (child) return this.onWrite(createSubtree(id, child)); }, // TODO: Abort and send hydration code snippet to client // to attempt to recover during hydration this.onError ); + const racedPromise = Promise.race([promise, race.promise]).finally(() => { + if (abortSignal && onAbort) { + abortSignal.removeEventListener('abort', onAbort); + } + }); + this.suspended.push({ id, vnode, - promise: Promise.race([promise, race.promise]) + promise: racedPromise }); const fallback = renderChild(vnode.props.fallback); diff --git a/src/stream-node.js b/src/stream-node.js index e39da314..527d72da 100644 --- a/src/stream-node.js +++ b/src/stream-node.js @@ -25,37 +25,100 @@ export function renderToPipeableStream(vnode, options, context) { const controller = new AbortController(); const stream = new PassThrough(); + let waitingForDrain = null; + let aborted = false; + let shellReadyCalled = false; + let allReadyCalled = false; + let errored = false; + let shellReadyScheduled = false; + stream.on('error', () => {}); - renderToChunks(vnode, { - context, - abortSignal: controller.signal, - onError: (error) => { - if (options.onError) { - options.onError(error); - } - controller.abort(error); - }, - onWrite(s) { - stream.write(encoder.encode(s)); + function callOnShellReady() { + if (shellReadyCalled || errored) return; + shellReadyCalled = true; + options.onShellReady && options.onShellReady(); + } + + function callOnAllReady() { + if (allReadyCalled || errored) return; + allReadyCalled = true; + options.onAllReady && options.onAllReady(); + } + + function callOnError(error) { + if (errored) return; + errored = true; + if (options.onError) { + options.onError(error); + } else { + throw error; } - }) + } + + function scheduleOnShellReady() { + if (shellReadyCalled || shellReadyScheduled || errored) return; + shellReadyScheduled = true; + Promise.resolve().then(() => { + shellReadyScheduled = false; + callOnShellReady(); + }); + } + + /** + * @returns {Promise} + */ + function waitForDrain() { + if (waitingForDrain) return waitingForDrain; + waitingForDrain = new Promise((resolve, reject) => { + const cleanup = () => { + stream.off('drain', onDrain); + stream.off('close', onClose); + stream.off('error', onError); + waitingForDrain = null; + }; + const onDrain = () => { + cleanup(); + resolve(); + }; + const onClose = () => { + cleanup(); + resolve(); + }; + const onError = (error) => { + cleanup(); + reject(error); + }; + + stream.on('drain', onDrain); + stream.on('close', onClose); + stream.on('error', onError); + }); + return waitingForDrain; + } + + Promise.resolve() + .then(() => + renderToChunks(vnode, { + context, + abortSignal: controller.signal, + async onWrite(s) { + scheduleOnShellReady(); + if (stream.destroyed || stream.writableEnded) return; + if (!stream.write(encoder.encode(s))) { + await waitForDrain(); + } + } + }) + ) .then(() => { - options.onAllReady && options.onAllReady(); + callOnAllReady(); stream.end(); }) .catch((error) => { stream.destroy(); - if (options.onError) { - options.onError(error); - } else { - throw error; - } + callOnError(error); }); - Promise.resolve().then(() => { - options.onShellReady && options.onShellReady(); - }); - return { /** * @param {unknown} [reason] @@ -66,13 +129,19 @@ export function renderToPipeableStream(vnode, options, context) { ) ) { // Remix/React-Router will always call abort after a timeout, even on success - if (stream.closed) return; - - controller.abort(); - stream.destroy(); - if (options.onError) { - options.onError(reason); + if ( + aborted || + stream.closed || + stream.destroyed || + stream.writableEnded + ) { + return; } + + aborted = true; + controller.abort(reason); + stream.destroy(reason); + callOnError(reason); }, /** * @param {import("stream").Writable} writable diff --git a/src/stream.js b/src/stream.js index 181614f0..cda84dd6 100644 --- a/src/stream.js +++ b/src/stream.js @@ -12,28 +12,51 @@ export function renderToReadableStream(vnode, context) { /** @type {Deferred} */ const allReady = new Deferred(); const encoder = new TextEncoder('utf-8'); + const abortController = new AbortController(); + let canceled = false; + /** @type {Deferred | undefined} */ + let pullReady; /** @type {RenderStream} */ const stream = new ReadableStream({ start(controller) { renderToChunks(vnode, { context, - onError: (error) => { - allReady.reject(error); - controller.abort(error); - }, - onWrite(s) { + abortSignal: abortController.signal, + async onWrite(s) { + while ( + !canceled && + controller.desiredSize != null && + controller.desiredSize <= 0 + ) { + pullReady = pullReady || new Deferred(); + await pullReady.promise; + pullReady = undefined; + } + if (canceled) return; controller.enqueue(encoder.encode(s)); } }) .then(() => { - controller.close(); + if (!canceled) controller.close(); allReady.resolve(); }) .catch((error) => { + if (canceled) { + allReady.resolve(); + return; + } controller.error(error); allReady.reject(error); }); + }, + pull() { + if (pullReady) pullReady.resolve(); + }, + cancel(reason) { + canceled = true; + if (pullReady) pullReady.resolve(); + abortController.abort(reason); } }); diff --git a/test/compat/stream-node.test.jsx b/test/compat/stream-node.test.jsx index b22cb839..a4edae99 100644 --- a/test/compat/stream-node.test.jsx +++ b/test/compat/stream-node.test.jsx @@ -88,4 +88,42 @@ describe('renderToPipeableStream', () => { expect(error).to.be.undefined; }); + + it('should not call onShellReady if shell rendering throws', async () => { + let shellReady = false; + let caught; + + function Thrower() { + throw new Error('shell failed'); + } + + renderToPipeableStream(, { + onShellReady: () => { + shellReady = true; + }, + onError: (error) => { + caught = error; + } + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(shellReady).toBe(false); + expect(caught).toBeInstanceOf(Error); + expect(caught.message).toBe('shell failed'); + }); + + it('should only call onError once for repeated aborts', async () => { + const errors = []; + const { abort } = renderToPipeableStream(
bar
, { + onError: (error) => errors.push(error) + }); + + const first = new Error('first abort'); + abort(first); + abort(new Error('second abort')); + + expect(errors).toHaveLength(1); + expect(errors[0]).toBe(first); + }); }); diff --git a/test/compat/stream.test.jsx b/test/compat/stream.test.jsx index 202c26aa..f3006859 100644 --- a/test/compat/stream.test.jsx +++ b/test/compat/stream.test.jsx @@ -89,4 +89,31 @@ describe('renderToReadableStream', () => { '' ]); }); + + it('should abort pending suspense work when the readable stream is canceled', async () => { + const { Suspender, suspended } = createSuspender(); + const decoder = new TextDecoder('utf-8'); + + const stream = renderToReadableStream( +
+ + + +
+ ); + + const reader = stream.getReader(); + const first = await reader.read(); + expect(first.done).toBe(false); + expect(decoder.decode(first.value)).toBe( + '
loading...
' + ); + + await reader.cancel('client disconnected'); + suspended.resolve(); + await stream.allReady; + + const next = await reader.read(); + expect(next.done).toBe(true); + }); });