From e7a5f5f393a2b579589c4c332ce180e194c2b98d Mon Sep 17 00:00:00 2001 From: Jovi De Croock Date: Sun, 22 Feb 2026 07:46:48 +0100 Subject: [PATCH 1/6] stream: remove dead renderToChunks onError option --- src/internal.d.ts | 1 - src/stream-node.js | 6 ------ src/stream.js | 4 ---- 3 files changed, 11 deletions(-) diff --git a/src/internal.d.ts b/src/internal.d.ts index b4bbec80..b9bec3dc 100644 --- a/src/internal.d.ts +++ b/src/internal.d.ts @@ -29,7 +29,6 @@ interface RendererState { interface RenderToChunksOptions { context?: any; - onError?: (error: any) => void; onWrite: (str: string) => void; abortSignal?: AbortSignal; } diff --git a/src/stream-node.js b/src/stream-node.js index e39da314..d8ca2eec 100644 --- a/src/stream-node.js +++ b/src/stream-node.js @@ -29,12 +29,6 @@ export function renderToPipeableStream(vnode, options, context) { renderToChunks(vnode, { context, abortSignal: controller.signal, - onError: (error) => { - if (options.onError) { - options.onError(error); - } - controller.abort(error); - }, onWrite(s) { stream.write(encoder.encode(s)); } diff --git a/src/stream.js b/src/stream.js index 181614f0..6510fcea 100644 --- a/src/stream.js +++ b/src/stream.js @@ -18,10 +18,6 @@ export function renderToReadableStream(vnode, context) { start(controller) { renderToChunks(vnode, { context, - onError: (error) => { - allReady.reject(error); - controller.abort(error); - }, onWrite(s) { controller.enqueue(encoder.encode(s)); } From 3b3dd9361ad6e66603b333ff014c184641168312 Mon Sep 17 00:00:00 2001 From: Jovi De Croock Date: Sun, 22 Feb 2026 07:49:20 +0100 Subject: [PATCH 2/6] stream: abort readable renders when consumers cancel --- src/stream.js | 14 +++++++++++++- test/compat/stream.test.jsx | 27 +++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/src/stream.js b/src/stream.js index 6510fcea..cef8a153 100644 --- a/src/stream.js +++ b/src/stream.js @@ -12,24 +12,36 @@ export function renderToReadableStream(vnode, context) { /** @type {Deferred} */ const allReady = new Deferred(); const encoder = new TextEncoder('utf-8'); + const abortController = new AbortController(); + let canceled = false; /** @type {RenderStream} */ const stream = new ReadableStream({ start(controller) { renderToChunks(vnode, { context, + abortSignal: abortController.signal, onWrite(s) { + if (canceled) return; controller.enqueue(encoder.encode(s)); } }) .then(() => { - controller.close(); + if (!canceled) controller.close(); allReady.resolve(); }) .catch((error) => { + if (canceled) { + allReady.resolve(); + return; + } controller.error(error); allReady.reject(error); }); + }, + cancel(reason) { + canceled = true; + abortController.abort(reason); } }); diff --git a/test/compat/stream.test.jsx b/test/compat/stream.test.jsx index 202c26aa..f3006859 100644 --- a/test/compat/stream.test.jsx +++ b/test/compat/stream.test.jsx @@ -89,4 +89,31 @@ describe('renderToReadableStream', () => { '' ]); }); + + it('should abort pending suspense work when the readable stream is canceled', async () => { + const { Suspender, suspended } = createSuspender(); + const decoder = new TextDecoder('utf-8'); + + const stream = renderToReadableStream( +
+ + + +
+ ); + + const reader = stream.getReader(); + const first = await reader.read(); + expect(first.done).toBe(false); + expect(decoder.decode(first.value)).toBe( + '
loading...
' + ); + + await reader.cancel('client disconnected'); + suspended.resolve(); + await stream.allReady; + + const next = await reader.read(); + expect(next.done).toBe(true); + }); }); From 41e804d5121cc666bf401bd9bc4c361bd5d23e30 Mon Sep 17 00:00:00 2001 From: Jovi De Croock Date: Sun, 22 Feb 2026 07:54:33 +0100 Subject: [PATCH 3/6] stream: respect backpressure in node and web wrappers --- src/internal.d.ts | 4 ++-- src/lib/chunked.js | 23 +++++++++++++++-------- src/stream-node.js | 40 ++++++++++++++++++++++++++++++++++++++-- src/stream.js | 17 ++++++++++++++++- 4 files changed, 71 insertions(+), 13 deletions(-) diff --git a/src/internal.d.ts b/src/internal.d.ts index b9bec3dc..559b2f32 100644 --- a/src/internal.d.ts +++ b/src/internal.d.ts @@ -23,12 +23,12 @@ interface RendererState { start: number; suspended: Suspended[]; abortSignal?: AbortSignal | undefined; - onWrite: (str: string) => void; + onWrite: (str: string) => void | Promise; onError?: RendererErrorHandler; } interface RenderToChunksOptions { context?: any; - onWrite: (str: string) => void; + onWrite: (str: string) => void | Promise; abortSignal?: AbortSignal; } diff --git a/src/lib/chunked.js b/src/lib/chunked.js index 48aca958..a3fd4bde 100644 --- a/src/lib/chunked.js +++ b/src/lib/chunked.js @@ -10,12 +10,18 @@ import { createInitScript, createSubtree } from './client.js'; */ export async function renderToChunks(vnode, { context, onWrite, abortSignal }) { context = context || {}; + const deferredWritesReady = new Deferred(); + let deferredWriteQueue = Promise.resolve(); /** @type {RendererState} */ const renderer = { start: Date.now(), abortSignal, - onWrite, + onWrite(str) { + return (deferredWriteQueue = deferredWriteQueue + .then(() => deferredWritesReady.promise) + .then(() => onWrite(str))); + }, onError: handleError, suspended: [] }; @@ -36,15 +42,16 @@ export async function renderToChunks(vnode, { context, onWrite, abortSignal }) { const initialWrite = docSuffixIndex !== -1 ? shell.slice(0, docSuffixIndex) : shell; const prefix = hasHtmlTag ? '' : ''; - onWrite(prefix + initialWrite); - onWrite(''); + if (docSuffixIndex !== -1) await onWrite(shell.slice(docSuffixIndex)); } else { - onWrite(shell); + await onWrite(shell); } } @@ -98,7 +105,7 @@ function handleError(error, vnode, renderChild) { () => { if (abortSignal && abortSignal.aborted) return; const child = renderChild(vnode.props.children, vnode); - if (child) this.onWrite(createSubtree(id, child)); + if (child) return this.onWrite(createSubtree(id, child)); }, // TODO: Abort and send hydration code snippet to client // to attempt to recover during hydration diff --git a/src/stream-node.js b/src/stream-node.js index d8ca2eec..4f1e86e2 100644 --- a/src/stream-node.js +++ b/src/stream-node.js @@ -25,12 +25,48 @@ export function renderToPipeableStream(vnode, options, context) { const controller = new AbortController(); const stream = new PassThrough(); + let waitingForDrain = null; + + /** + * @returns {Promise} + */ + function waitForDrain() { + if (waitingForDrain) return waitingForDrain; + waitingForDrain = new Promise((resolve, reject) => { + const cleanup = () => { + stream.off('drain', onDrain); + stream.off('close', onClose); + stream.off('error', onError); + waitingForDrain = null; + }; + const onDrain = () => { + cleanup(); + resolve(); + }; + const onClose = () => { + cleanup(); + resolve(); + }; + const onError = (error) => { + cleanup(); + reject(error); + }; + + stream.on('drain', onDrain); + stream.on('close', onClose); + stream.on('error', onError); + }); + return waitingForDrain; + } renderToChunks(vnode, { context, abortSignal: controller.signal, - onWrite(s) { - stream.write(encoder.encode(s)); + async onWrite(s) { + if (stream.destroyed || stream.writableEnded) return; + if (!stream.write(encoder.encode(s))) { + await waitForDrain(); + } } }) .then(() => { diff --git a/src/stream.js b/src/stream.js index cef8a153..cda84dd6 100644 --- a/src/stream.js +++ b/src/stream.js @@ -14,6 +14,8 @@ export function renderToReadableStream(vnode, context) { const encoder = new TextEncoder('utf-8'); const abortController = new AbortController(); let canceled = false; + /** @type {Deferred | undefined} */ + let pullReady; /** @type {RenderStream} */ const stream = new ReadableStream({ @@ -21,7 +23,16 @@ export function renderToReadableStream(vnode, context) { renderToChunks(vnode, { context, abortSignal: abortController.signal, - onWrite(s) { + async onWrite(s) { + while ( + !canceled && + controller.desiredSize != null && + controller.desiredSize <= 0 + ) { + pullReady = pullReady || new Deferred(); + await pullReady.promise; + pullReady = undefined; + } if (canceled) return; controller.enqueue(encoder.encode(s)); } @@ -39,8 +50,12 @@ export function renderToReadableStream(vnode, context) { allReady.reject(error); }); }, + pull() { + if (pullReady) pullReady.resolve(); + }, cancel(reason) { canceled = true; + if (pullReady) pullReady.resolve(); abortController.abort(reason); } }); From 351279d0080d55cf77f2397225cd9c4a5c2010bb Mon Sep 17 00:00:00 2001 From: Jovi De Croock Date: Sun, 22 Feb 2026 07:55:53 +0100 Subject: [PATCH 4/6] stream-node: fire shell callback only after successful shell write --- src/stream-node.js | 71 +++++++++++++++++++++++--------- test/compat/stream-node.test.jsx | 24 +++++++++++ 2 files changed, 75 insertions(+), 20 deletions(-) diff --git a/src/stream-node.js b/src/stream-node.js index 4f1e86e2..a4fff175 100644 --- a/src/stream-node.js +++ b/src/stream-node.js @@ -26,6 +26,41 @@ export function renderToPipeableStream(vnode, options, context) { const controller = new AbortController(); const stream = new PassThrough(); let waitingForDrain = null; + let shellReadyCalled = false; + let allReadyCalled = false; + let errored = false; + let shellReadyScheduled = false; + + function callOnShellReady() { + if (shellReadyCalled || errored) return; + shellReadyCalled = true; + options.onShellReady && options.onShellReady(); + } + + function callOnAllReady() { + if (allReadyCalled || errored) return; + allReadyCalled = true; + options.onAllReady && options.onAllReady(); + } + + function callOnError(error) { + if (errored) return; + errored = true; + if (options.onError) { + options.onError(error); + } else { + throw error; + } + } + + function scheduleOnShellReady() { + if (shellReadyCalled || shellReadyScheduled || errored) return; + shellReadyScheduled = true; + Promise.resolve().then(() => { + shellReadyScheduled = false; + callOnShellReady(); + }); + } /** * @returns {Promise} @@ -59,33 +94,29 @@ export function renderToPipeableStream(vnode, options, context) { return waitingForDrain; } - renderToChunks(vnode, { - context, - abortSignal: controller.signal, - async onWrite(s) { - if (stream.destroyed || stream.writableEnded) return; - if (!stream.write(encoder.encode(s))) { - await waitForDrain(); - } - } - }) + Promise.resolve() + .then(() => + renderToChunks(vnode, { + context, + abortSignal: controller.signal, + async onWrite(s) { + scheduleOnShellReady(); + if (stream.destroyed || stream.writableEnded) return; + if (!stream.write(encoder.encode(s))) { + await waitForDrain(); + } + } + }) + ) .then(() => { - options.onAllReady && options.onAllReady(); + callOnAllReady(); stream.end(); }) .catch((error) => { stream.destroy(); - if (options.onError) { - options.onError(error); - } else { - throw error; - } + callOnError(error); }); - Promise.resolve().then(() => { - options.onShellReady && options.onShellReady(); - }); - return { /** * @param {unknown} [reason] diff --git a/test/compat/stream-node.test.jsx b/test/compat/stream-node.test.jsx index b22cb839..b2a11f61 100644 --- a/test/compat/stream-node.test.jsx +++ b/test/compat/stream-node.test.jsx @@ -88,4 +88,28 @@ describe('renderToPipeableStream', () => { expect(error).to.be.undefined; }); + + it('should not call onShellReady if shell rendering throws', async () => { + let shellReady = false; + let caught; + + function Thrower() { + throw new Error('shell failed'); + } + + renderToPipeableStream(, { + onShellReady: () => { + shellReady = true; + }, + onError: (error) => { + caught = error; + } + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(shellReady).toBe(false); + expect(caught).toBeInstanceOf(Error); + expect(caught.message).toBe('shell failed'); + }); }); From 5702db55ad921ba572aae6e3981dc355430a664f Mon Sep 17 00:00:00 2001 From: Jovi De Croock Date: Sun, 22 Feb 2026 08:07:50 +0100 Subject: [PATCH 5/6] chunked: clean up abort listeners for suspended boundaries --- src/lib/chunked.js | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/lib/chunked.js b/src/lib/chunked.js index a3fd4bde..21221319 100644 --- a/src/lib/chunked.js +++ b/src/lib/chunked.js @@ -95,10 +95,12 @@ function handleError(error, vnode, renderChild) { const race = new Deferred(); const abortSignal = this.abortSignal; + /** @type {(() => void) | undefined} */ + let onAbort; if (abortSignal) { - // @ts-ignore 2554 - implicit undefined arg - if (abortSignal.aborted) race.resolve(); - else abortSignal.addEventListener('abort', race.resolve); + onAbort = () => race.resolve(); + if (abortSignal.aborted) onAbort(); + else abortSignal.addEventListener('abort', onAbort, { once: true }); } const promise = error.then( @@ -112,10 +114,16 @@ function handleError(error, vnode, renderChild) { this.onError ); + const racedPromise = Promise.race([promise, race.promise]).finally(() => { + if (abortSignal && onAbort) { + abortSignal.removeEventListener('abort', onAbort); + } + }); + this.suspended.push({ id, vnode, - promise: Promise.race([promise, race.promise]) + promise: racedPromise }); const fallback = renderChild(vnode.props.fallback); From 753b08770e32d9f5be53fc815daa48a0fbc82807 Mon Sep 17 00:00:00 2001 From: Jovi De Croock Date: Sun, 22 Feb 2026 08:08:20 +0100 Subject: [PATCH 6/6] stream-node: dedupe abort errors and preserve abort reasons --- src/stream-node.js | 20 ++++++++++++++------ test/compat/stream-node.test.jsx | 14 ++++++++++++++ 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/stream-node.js b/src/stream-node.js index a4fff175..527d72da 100644 --- a/src/stream-node.js +++ b/src/stream-node.js @@ -26,10 +26,12 @@ export function renderToPipeableStream(vnode, options, context) { const controller = new AbortController(); const stream = new PassThrough(); let waitingForDrain = null; + let aborted = false; let shellReadyCalled = false; let allReadyCalled = false; let errored = false; let shellReadyScheduled = false; + stream.on('error', () => {}); function callOnShellReady() { if (shellReadyCalled || errored) return; @@ -127,13 +129,19 @@ export function renderToPipeableStream(vnode, options, context) { ) ) { // Remix/React-Router will always call abort after a timeout, even on success - if (stream.closed) return; - - controller.abort(); - stream.destroy(); - if (options.onError) { - options.onError(reason); + if ( + aborted || + stream.closed || + stream.destroyed || + stream.writableEnded + ) { + return; } + + aborted = true; + controller.abort(reason); + stream.destroy(reason); + callOnError(reason); }, /** * @param {import("stream").Writable} writable diff --git a/test/compat/stream-node.test.jsx b/test/compat/stream-node.test.jsx index b2a11f61..a4edae99 100644 --- a/test/compat/stream-node.test.jsx +++ b/test/compat/stream-node.test.jsx @@ -112,4 +112,18 @@ describe('renderToPipeableStream', () => { expect(caught).toBeInstanceOf(Error); expect(caught.message).toBe('shell failed'); }); + + it('should only call onError once for repeated aborts', async () => { + const errors = []; + const { abort } = renderToPipeableStream(
bar
, { + onError: (error) => errors.push(error) + }); + + const first = new Error('first abort'); + abort(first); + abort(new Error('second abort')); + + expect(errors).toHaveLength(1); + expect(errors[0]).toBe(first); + }); });