diff --git a/.changeset/compressed-streams-drain.md b/.changeset/compressed-streams-drain.md new file mode 100644 index 00000000000..5350cc49d53 --- /dev/null +++ b/.changeset/compressed-streams-drain.md @@ -0,0 +1,5 @@ +--- +'@qwik.dev/router': patch +--- + +fix(router): Node SSR no longer hangs when using `compression` (or other middleware that wraps `res.write` / `res.end`). diff --git a/packages/qwik-router/src/middleware/node/http.ts b/packages/qwik-router/src/middleware/node/http.ts index 6627bfdb2a8..62dbaad7ea8 100644 --- a/packages/qwik-router/src/middleware/node/http.ts +++ b/packages/qwik-router/src/middleware/node/http.ts @@ -97,12 +97,21 @@ function createNodeResponseSink(res: ServerResponse) { return new Promise((resolve, reject) => { let settled = false; + let onDrain: (() => void) | undefined; + + const cleanupDrain = () => { + if (onDrain) { + res.off('drain', onDrain); + onDrain = undefined; + } + }; const finish = () => { if (settled) { return; } settled = true; + cleanupDrain(); // Let Node process the completed write before SSR keeps emitting more chunks. setImmediate(resolve); @@ -119,6 +128,7 @@ function createNodeResponseSink(res: ServerResponse) { } settled = true; closed = true; + cleanupDrain(); reject(error); }; @@ -130,8 +140,9 @@ function createNodeResponseSink(res: ServerResponse) { finish(); }); + let ok: boolean; try { - res.write(chunk, (error) => { + ok = res.write(chunk, (error) => { if (error) { if (isClientAbortWriteError(error)) { finishClosed(); @@ -148,6 +159,28 @@ function createNodeResponseSink(res: ServerResponse) { return; } fail(error); + return; + } + + // Some response wrappers (notably `compression`) replace res.write + // with a 2-arg signature and silently drop the per-write callback, + // which would otherwise hang the SSR walker forever. Resolve via + // Node's standard backpressure protocol instead: + // - res.write() returned true → no backpressure, resolve now. + // - res.write() returned false → wait for 'drain'. + // The per-write callback (if it ever fires) still surfaces errors, + // and becomes a no-op for resolution once we've already settled. + if (settled) { + return; + } + if (ok) { + finish(); + } else { + onDrain = () => { + onDrain = undefined; + finish(); + }; + res.once('drain', onDrain); } }); }; @@ -189,10 +222,15 @@ function createNodeResponseSink(res: ServerResponse) { finish(); }); + // Resolve on 'finish' (data fully flushed). We do NOT pass a callback + // to res.end because some response wrappers replace it with a 2-arg + // signature `(chunk, encoding)` (notably `compression@1.x`) and would + // treat the callback as a chunk, then throw inside Buffer.from(...) — + // leaving the underlying stream un-ended and the socket hanging. + res.once('finish', finish); + try { - res.end(() => { - finish(); - }); + res.end(); } catch (error) { if (isClientAbortWriteError(error)) { finish(); diff --git a/packages/qwik-router/src/middleware/node/http.unit.ts b/packages/qwik-router/src/middleware/node/http.unit.ts index 045c39b11c5..ac2796deb0a 100644 --- a/packages/qwik-router/src/middleware/node/http.unit.ts +++ b/packages/qwik-router/src/middleware/node/http.unit.ts @@ -55,26 +55,57 @@ import { fromNodeHttp, normalizeUrl } from './http'; }); describe('fromNodeHttp()', () => { - test('should resolve writes from the node write callback without waiting for drain', async () => { + test('should resolve writes immediately when res.write returns true (no backpressure)', async () => { const req = new EventEmitter() as IncomingMessage & EventEmitter; req.method = 'GET'; req.url = '/'; req.headers = { host: 'localhost' }; (req as any).socket = {}; - let writeCallback: ((error?: Error | null) => void) | undefined; const res = new EventEmitter() as ServerResponse & EventEmitter; Object.defineProperty(res, 'closed', { value: false, configurable: true }); Object.defineProperty(res, 'destroyed', { value: false, configurable: true }); res.setHeader = vi.fn(); - res.write = vi.fn((_chunk: Uint8Array, cb?: (error?: Error | null) => void) => { - writeCallback = cb; - // Returning false simulates backpressure. This adapter does not attach - // drain listeners because ServerResponse.write() provides a per-write callback. - return false; + res.write = vi.fn(() => true) as any; + // Mimic vanilla Node: 'finish' fires once res.end() has flushed. + res.end = vi.fn(() => { + queueMicrotask(() => res.emit('finish')); + return res; }) as any; - res.end = vi.fn((cb?: () => void) => { - cb?.(); + + const requestEv = await fromNodeHttp(new URL('http://localhost/'), req, res, 'server'); + const writableStream = requestEv.getWritableStream( + 200, + new Headers([['Content-Type', 'text/html; charset=utf-8']]), + { headers: () => [] } as any, + () => {}, + undefined as any + ); + const writer = writableStream.getWriter(); + + await writer.write(new Uint8Array([1, 2, 3])); + // No drain listener should be attached when there's no backpressure. + expect(res.listenerCount('drain')).toBe(0); + + await writer.close(); + expect(res.write).toHaveBeenCalledTimes(1); + expect(res.end).toHaveBeenCalledTimes(1); + }); + + test('should resolve writes via the drain event when res.write returns false', async () => { + const req = new EventEmitter() as IncomingMessage & EventEmitter; + req.method = 'GET'; + req.url = '/'; + req.headers = { host: 'localhost' }; + (req as any).socket = {}; + + const res = new EventEmitter() as ServerResponse & EventEmitter; + Object.defineProperty(res, 'closed', { value: false, configurable: true }); + Object.defineProperty(res, 'destroyed', { value: false, configurable: true }); + res.setHeader = vi.fn(); + res.write = vi.fn(() => false) as any; // simulate backpressure + res.end = vi.fn(() => { + queueMicrotask(() => res.emit('finish')); return res; }) as any; @@ -90,17 +121,52 @@ describe('fromNodeHttp()', () => { const writePromise = writer.write(new Uint8Array([1, 2, 3])); await Promise.resolve(); - // No drain listener should be attached in this path; Node calls the write - // callback once this chunk has flushed, even when write() returned false. - expect(res.listenerCount('drain')).toBe(0); - expect(writeCallback).toBeDefined(); + // A drain listener should be attached because res.write() returned false. + expect(res.listenerCount('drain')).toBe(1); - writeCallback?.(null); + res.emit('drain'); await writePromise; + expect(res.listenerCount('drain')).toBe(0); + await writer.close(); + }); - expect(res.write).toHaveBeenCalledTimes(1); - expect(res.end).toHaveBeenCalledTimes(1); + test('should not hang when wrapper middleware drops the per-write callback (e.g. compression)', async () => { + // Reproduces the regression from https://github.com/QwikDev/qwik/pull/8557: + // `compression` middleware wraps res.write with a 2-arg signature + // (chunk, encoding) and never invokes the optional callback. + const req = new EventEmitter() as IncomingMessage & EventEmitter; + req.method = 'GET'; + req.url = '/'; + req.headers = { host: 'localhost' }; + (req as any).socket = {}; + + const res = new EventEmitter() as ServerResponse & EventEmitter; + Object.defineProperty(res, 'closed', { value: false, configurable: true }); + Object.defineProperty(res, 'destroyed', { value: false, configurable: true }); + res.setHeader = vi.fn(); + // Wrapper that mimics `compression`: callback is dropped. + res.write = vi.fn((_chunk: Uint8Array, _encoding?: any) => false) as any; + res.end = vi.fn(() => res) as any; // also drops callback + + const requestEv = await fromNodeHttp(new URL('http://localhost/'), req, res, 'server'); + const writableStream = requestEv.getWritableStream( + 200, + new Headers([['Content-Type', 'text/html; charset=utf-8']]), + { headers: () => [] } as any, + () => {}, + undefined as any + ); + const writer = writableStream.getWriter(); + + const writePromise = writer.write(new Uint8Array([1, 2, 3])); + await Promise.resolve(); + res.emit('drain'); + await writePromise; // would hang forever before the fix + + const closePromise = writer.close(); + res.emit('finish'); // close() must resolve even though res.end's cb is dropped + await closePromise; }); test('should resolve the response when the writable stream is created', async () => { @@ -118,8 +184,8 @@ describe('fromNodeHttp()', () => { cb?.(null); return true; }) as any; - res.end = vi.fn((cb?: () => void) => { - cb?.(); + res.end = vi.fn(() => { + queueMicrotask(() => res.emit('finish')); return res; }) as any;