Skip to content

Commit 8994d30

Browse files
authored
Merge pull request #8620 from QwikDev/v2-fix-node-streaming-compression
fix(router): honor SSR backpressure via drain event for compression compatibility
2 parents 8c226d5 + 5d2fe55 commit 8994d30

3 files changed

Lines changed: 131 additions & 22 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@qwik.dev/router': patch
3+
---
4+
5+
fix(router): Node SSR no longer hangs when using `compression` (or other middleware that wraps `res.write` / `res.end`).

packages/qwik-router/src/middleware/node/http.ts

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,21 @@ function createNodeResponseSink(res: ServerResponse) {
9797

9898
return new Promise<void>((resolve, reject) => {
9999
let settled = false;
100+
let onDrain: (() => void) | undefined;
101+
102+
const cleanupDrain = () => {
103+
if (onDrain) {
104+
res.off('drain', onDrain);
105+
onDrain = undefined;
106+
}
107+
};
100108

101109
const finish = () => {
102110
if (settled) {
103111
return;
104112
}
105113
settled = true;
114+
cleanupDrain();
106115

107116
// Let Node process the completed write before SSR keeps emitting more chunks.
108117
setImmediate(resolve);
@@ -119,6 +128,7 @@ function createNodeResponseSink(res: ServerResponse) {
119128
}
120129
settled = true;
121130
closed = true;
131+
cleanupDrain();
122132
reject(error);
123133
};
124134

@@ -130,8 +140,9 @@ function createNodeResponseSink(res: ServerResponse) {
130140
finish();
131141
});
132142

143+
let ok: boolean;
133144
try {
134-
res.write(chunk, (error) => {
145+
ok = res.write(chunk, (error) => {
135146
if (error) {
136147
if (isClientAbortWriteError(error)) {
137148
finishClosed();
@@ -148,6 +159,28 @@ function createNodeResponseSink(res: ServerResponse) {
148159
return;
149160
}
150161
fail(error);
162+
return;
163+
}
164+
165+
// Some response wrappers (notably `compression`) replace res.write
166+
// with a 2-arg signature and silently drop the per-write callback,
167+
// which would otherwise hang the SSR walker forever. Resolve via
168+
// Node's standard backpressure protocol instead:
169+
// - res.write() returned true → no backpressure, resolve now.
170+
// - res.write() returned false → wait for 'drain'.
171+
// The per-write callback (if it ever fires) still surfaces errors,
172+
// and becomes a no-op for resolution once we've already settled.
173+
if (settled) {
174+
return;
175+
}
176+
if (ok) {
177+
finish();
178+
} else {
179+
onDrain = () => {
180+
onDrain = undefined;
181+
finish();
182+
};
183+
res.once('drain', onDrain);
151184
}
152185
});
153186
};
@@ -189,10 +222,15 @@ function createNodeResponseSink(res: ServerResponse) {
189222
finish();
190223
});
191224

225+
// Resolve on 'finish' (data fully flushed). We do NOT pass a callback
226+
// to res.end because some response wrappers replace it with a 2-arg
227+
// signature `(chunk, encoding)` (notably `compression@1.x`) and would
228+
// treat the callback as a chunk, then throw inside Buffer.from(...) —
229+
// leaving the underlying stream un-ended and the socket hanging.
230+
res.once('finish', finish);
231+
192232
try {
193-
res.end(() => {
194-
finish();
195-
});
233+
res.end();
196234
} catch (error) {
197235
if (isClientAbortWriteError(error)) {
198236
finish();

packages/qwik-router/src/middleware/node/http.unit.ts

Lines changed: 84 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -55,26 +55,57 @@ import { fromNodeHttp, normalizeUrl } from './http';
5555
});
5656

5757
describe('fromNodeHttp()', () => {
58-
test('should resolve writes from the node write callback without waiting for drain', async () => {
58+
test('should resolve writes immediately when res.write returns true (no backpressure)', async () => {
5959
const req = new EventEmitter() as IncomingMessage & EventEmitter;
6060
req.method = 'GET';
6161
req.url = '/';
6262
req.headers = { host: 'localhost' };
6363
(req as any).socket = {};
6464

65-
let writeCallback: ((error?: Error | null) => void) | undefined;
6665
const res = new EventEmitter() as ServerResponse & EventEmitter;
6766
Object.defineProperty(res, 'closed', { value: false, configurable: true });
6867
Object.defineProperty(res, 'destroyed', { value: false, configurable: true });
6968
res.setHeader = vi.fn();
70-
res.write = vi.fn((_chunk: Uint8Array, cb?: (error?: Error | null) => void) => {
71-
writeCallback = cb;
72-
// Returning false simulates backpressure. This adapter does not attach
73-
// drain listeners because ServerResponse.write() provides a per-write callback.
74-
return false;
69+
res.write = vi.fn(() => true) as any;
70+
// Mimic vanilla Node: 'finish' fires once res.end() has flushed.
71+
res.end = vi.fn(() => {
72+
queueMicrotask(() => res.emit('finish'));
73+
return res;
7574
}) as any;
76-
res.end = vi.fn((cb?: () => void) => {
77-
cb?.();
75+
76+
const requestEv = await fromNodeHttp(new URL('http://localhost/'), req, res, 'server');
77+
const writableStream = requestEv.getWritableStream(
78+
200,
79+
new Headers([['Content-Type', 'text/html; charset=utf-8']]),
80+
{ headers: () => [] } as any,
81+
() => {},
82+
undefined as any
83+
);
84+
const writer = writableStream.getWriter();
85+
86+
await writer.write(new Uint8Array([1, 2, 3]));
87+
// No drain listener should be attached when there's no backpressure.
88+
expect(res.listenerCount('drain')).toBe(0);
89+
90+
await writer.close();
91+
expect(res.write).toHaveBeenCalledTimes(1);
92+
expect(res.end).toHaveBeenCalledTimes(1);
93+
});
94+
95+
test('should resolve writes via the drain event when res.write returns false', async () => {
96+
const req = new EventEmitter() as IncomingMessage & EventEmitter;
97+
req.method = 'GET';
98+
req.url = '/';
99+
req.headers = { host: 'localhost' };
100+
(req as any).socket = {};
101+
102+
const res = new EventEmitter() as ServerResponse & EventEmitter;
103+
Object.defineProperty(res, 'closed', { value: false, configurable: true });
104+
Object.defineProperty(res, 'destroyed', { value: false, configurable: true });
105+
res.setHeader = vi.fn();
106+
res.write = vi.fn(() => false) as any; // simulate backpressure
107+
res.end = vi.fn(() => {
108+
queueMicrotask(() => res.emit('finish'));
78109
return res;
79110
}) as any;
80111

@@ -90,17 +121,52 @@ describe('fromNodeHttp()', () => {
90121

91122
const writePromise = writer.write(new Uint8Array([1, 2, 3]));
92123
await Promise.resolve();
93-
// No drain listener should be attached in this path; Node calls the write
94-
// callback once this chunk has flushed, even when write() returned false.
95-
expect(res.listenerCount('drain')).toBe(0);
96-
expect(writeCallback).toBeDefined();
124+
// A drain listener should be attached because res.write() returned false.
125+
expect(res.listenerCount('drain')).toBe(1);
97126

98-
writeCallback?.(null);
127+
res.emit('drain');
99128
await writePromise;
129+
expect(res.listenerCount('drain')).toBe(0);
130+
100131
await writer.close();
132+
});
101133

102-
expect(res.write).toHaveBeenCalledTimes(1);
103-
expect(res.end).toHaveBeenCalledTimes(1);
134+
test('should not hang when wrapper middleware drops the per-write callback (e.g. compression)', async () => {
135+
// Reproduces the regression from https://github.com/QwikDev/qwik/pull/8557:
136+
// `compression` middleware wraps res.write with a 2-arg signature
137+
// (chunk, encoding) and never invokes the optional callback.
138+
const req = new EventEmitter() as IncomingMessage & EventEmitter;
139+
req.method = 'GET';
140+
req.url = '/';
141+
req.headers = { host: 'localhost' };
142+
(req as any).socket = {};
143+
144+
const res = new EventEmitter() as ServerResponse & EventEmitter;
145+
Object.defineProperty(res, 'closed', { value: false, configurable: true });
146+
Object.defineProperty(res, 'destroyed', { value: false, configurable: true });
147+
res.setHeader = vi.fn();
148+
// Wrapper that mimics `compression`: callback is dropped.
149+
res.write = vi.fn((_chunk: Uint8Array, _encoding?: any) => false) as any;
150+
res.end = vi.fn(() => res) as any; // also drops callback
151+
152+
const requestEv = await fromNodeHttp(new URL('http://localhost/'), req, res, 'server');
153+
const writableStream = requestEv.getWritableStream(
154+
200,
155+
new Headers([['Content-Type', 'text/html; charset=utf-8']]),
156+
{ headers: () => [] } as any,
157+
() => {},
158+
undefined as any
159+
);
160+
const writer = writableStream.getWriter();
161+
162+
const writePromise = writer.write(new Uint8Array([1, 2, 3]));
163+
await Promise.resolve();
164+
res.emit('drain');
165+
await writePromise; // would hang forever before the fix
166+
167+
const closePromise = writer.close();
168+
res.emit('finish'); // close() must resolve even though res.end's cb is dropped
169+
await closePromise;
104170
});
105171

106172
test('should resolve the response when the writable stream is created', async () => {
@@ -118,8 +184,8 @@ describe('fromNodeHttp()', () => {
118184
cb?.(null);
119185
return true;
120186
}) as any;
121-
res.end = vi.fn((cb?: () => void) => {
122-
cb?.();
187+
res.end = vi.fn(() => {
188+
queueMicrotask(() => res.emit('finish'));
123189
return res;
124190
}) as any;
125191

0 commit comments

Comments
 (0)