Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/compressed-streams-drain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@qwik.dev/router': patch
---

fix(router): Node SSR no longer hangs when using `compression` (or other middleware that wraps `res.write` / `res.end`).
46 changes: 42 additions & 4 deletions packages/qwik-router/src/middleware/node/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,21 @@ function createNodeResponseSink(res: ServerResponse) {

return new Promise<void>((resolve, reject) => {
let settled = false;
let onDrain: (() => void) | undefined;

const cleanupDrain = () => {
if (onDrain) {
res.off('drain', onDrain);
onDrain = undefined;
}
};

const finish = () => {
if (settled) {
return;
}
settled = true;
cleanupDrain();

// Let Node process the completed write before SSR keeps emitting more chunks.
setImmediate(resolve);
Expand All @@ -119,6 +128,7 @@ function createNodeResponseSink(res: ServerResponse) {
}
settled = true;
closed = true;
cleanupDrain();
reject(error);
};

Expand All @@ -130,8 +140,9 @@ function createNodeResponseSink(res: ServerResponse) {
finish();
});

let ok: boolean;
try {
res.write(chunk, (error) => {
ok = res.write(chunk, (error) => {
if (error) {
if (isClientAbortWriteError(error)) {
finishClosed();
Expand All @@ -148,6 +159,28 @@ function createNodeResponseSink(res: ServerResponse) {
return;
}
fail(error);
return;
}

// Some response wrappers (notably `compression`) replace res.write
// with a 2-arg signature and silently drop the per-write callback,
// which would otherwise hang the SSR walker forever. Resolve via
// Node's standard backpressure protocol instead:
// - res.write() returned true → no backpressure, resolve now.
// - res.write() returned false → wait for 'drain'.
// The per-write callback (if it ever fires) still surfaces errors,
// and becomes a no-op for resolution once we've already settled.
if (settled) {
return;
}
if (ok) {
finish();
} else {
onDrain = () => {
onDrain = undefined;
finish();
};
res.once('drain', onDrain);
}
});
};
Expand Down Expand Up @@ -189,10 +222,15 @@ function createNodeResponseSink(res: ServerResponse) {
finish();
});

// Resolve on 'finish' (data fully flushed). We do NOT pass a callback
// to res.end because some response wrappers replace it with a 2-arg
// signature `(chunk, encoding)` (notably `compression@1.x`) and would
// treat the callback as a chunk, then throw inside Buffer.from(...) —
// leaving the underlying stream un-ended and the socket hanging.
res.once('finish', finish);

try {
res.end(() => {
finish();
});
res.end();
} catch (error) {
if (isClientAbortWriteError(error)) {
finish();
Expand Down
102 changes: 84 additions & 18 deletions packages/qwik-router/src/middleware/node/http.unit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,26 +55,57 @@ import { fromNodeHttp, normalizeUrl } from './http';
});

describe('fromNodeHttp()', () => {
test('should resolve writes from the node write callback without waiting for drain', async () => {
test('should resolve writes immediately when res.write returns true (no backpressure)', async () => {
const req = new EventEmitter() as IncomingMessage & EventEmitter;
req.method = 'GET';
req.url = '/';
req.headers = { host: 'localhost' };
(req as any).socket = {};

let writeCallback: ((error?: Error | null) => void) | undefined;
const res = new EventEmitter() as ServerResponse & EventEmitter;
Object.defineProperty(res, 'closed', { value: false, configurable: true });
Object.defineProperty(res, 'destroyed', { value: false, configurable: true });
res.setHeader = vi.fn();
res.write = vi.fn((_chunk: Uint8Array, cb?: (error?: Error | null) => void) => {
writeCallback = cb;
// Returning false simulates backpressure. This adapter does not attach
// drain listeners because ServerResponse.write() provides a per-write callback.
return false;
res.write = vi.fn(() => true) as any;
// Mimic vanilla Node: 'finish' fires once res.end() has flushed.
res.end = vi.fn(() => {
queueMicrotask(() => res.emit('finish'));
return res;
}) as any;
res.end = vi.fn((cb?: () => void) => {
cb?.();

const requestEv = await fromNodeHttp(new URL('http://localhost/'), req, res, 'server');
const writableStream = requestEv.getWritableStream(
200,
new Headers([['Content-Type', 'text/html; charset=utf-8']]),
{ headers: () => [] } as any,
() => {},
undefined as any
);
const writer = writableStream.getWriter();

await writer.write(new Uint8Array([1, 2, 3]));
// No drain listener should be attached when there's no backpressure.
expect(res.listenerCount('drain')).toBe(0);

await writer.close();
expect(res.write).toHaveBeenCalledTimes(1);
expect(res.end).toHaveBeenCalledTimes(1);
});

test('should resolve writes via the drain event when res.write returns false', async () => {
const req = new EventEmitter() as IncomingMessage & EventEmitter;
req.method = 'GET';
req.url = '/';
req.headers = { host: 'localhost' };
(req as any).socket = {};

const res = new EventEmitter() as ServerResponse & EventEmitter;
Object.defineProperty(res, 'closed', { value: false, configurable: true });
Object.defineProperty(res, 'destroyed', { value: false, configurable: true });
res.setHeader = vi.fn();
res.write = vi.fn(() => false) as any; // simulate backpressure
res.end = vi.fn(() => {
queueMicrotask(() => res.emit('finish'));
return res;
}) as any;

Expand All @@ -90,17 +121,52 @@ describe('fromNodeHttp()', () => {

const writePromise = writer.write(new Uint8Array([1, 2, 3]));
await Promise.resolve();
// No drain listener should be attached in this path; Node calls the write
// callback once this chunk has flushed, even when write() returned false.
expect(res.listenerCount('drain')).toBe(0);
expect(writeCallback).toBeDefined();
// A drain listener should be attached because res.write() returned false.
expect(res.listenerCount('drain')).toBe(1);

writeCallback?.(null);
res.emit('drain');
await writePromise;
expect(res.listenerCount('drain')).toBe(0);

await writer.close();
});

expect(res.write).toHaveBeenCalledTimes(1);
expect(res.end).toHaveBeenCalledTimes(1);
test('should not hang when wrapper middleware drops the per-write callback (e.g. compression)', async () => {
// Reproduces the regression from https://github.com/QwikDev/qwik/pull/8557:
// `compression` middleware wraps res.write with a 2-arg signature
// (chunk, encoding) and never invokes the optional callback.
const req = new EventEmitter() as IncomingMessage & EventEmitter;
req.method = 'GET';
req.url = '/';
req.headers = { host: 'localhost' };
(req as any).socket = {};

const res = new EventEmitter() as ServerResponse & EventEmitter;
Object.defineProperty(res, 'closed', { value: false, configurable: true });
Object.defineProperty(res, 'destroyed', { value: false, configurable: true });
res.setHeader = vi.fn();
// Wrapper that mimics `compression`: callback is dropped.
res.write = vi.fn((_chunk: Uint8Array, _encoding?: any) => false) as any;
res.end = vi.fn(() => res) as any; // also drops callback

const requestEv = await fromNodeHttp(new URL('http://localhost/'), req, res, 'server');
const writableStream = requestEv.getWritableStream(
200,
new Headers([['Content-Type', 'text/html; charset=utf-8']]),
{ headers: () => [] } as any,
() => {},
undefined as any
);
const writer = writableStream.getWriter();

const writePromise = writer.write(new Uint8Array([1, 2, 3]));
await Promise.resolve();
res.emit('drain');
await writePromise; // would hang forever before the fix

const closePromise = writer.close();
res.emit('finish'); // close() must resolve even though res.end's cb is dropped
await closePromise;
});

test('should resolve the response when the writable stream is created', async () => {
Expand All @@ -118,8 +184,8 @@ describe('fromNodeHttp()', () => {
cb?.(null);
return true;
}) as any;
res.end = vi.fn((cb?: () => void) => {
cb?.();
res.end = vi.fn(() => {
queueMicrotask(() => res.emit('finish'));
return res;
}) as any;

Expand Down
Loading