Skip to content

Commit cd881bb

Browse files
authored
Merge pull request #8557 from QwikDev/v2-fix-node-streaming
fix: honor node streaming backpressure
2 parents 9834878 + 8732ef8 commit cd881bb

13 files changed

Lines changed: 615 additions & 50 deletions

File tree

.changeset/spicy-onions-jog.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@qwik.dev/router': patch
3+
'@qwik.dev/core': patch
4+
---
5+
6+
fix: Node SSR streaming to honor downstream backpressure

packages/qwik-router/src/middleware/node/http.ts

Lines changed: 85 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,88 @@ export function normalizeUrl(url: string, base: string) {
4444
return normalizeRequestUrl(url, base);
4545
}
4646

47+
function createNodeResponseSink(res: ServerResponse) {
48+
let closed = res.closed || res.destroyed;
49+
const closedPromise = closed
50+
? Promise.resolve()
51+
: new Promise<void>((resolve) => {
52+
res.once('close', () => {
53+
closed = true;
54+
resolve();
55+
});
56+
});
57+
58+
const write = (chunk: Uint8Array) => {
59+
if (closed || res.closed || res.destroyed) {
60+
// If the response has already been closed or destroyed (for example the client has disconnected)
61+
// then writing into it will cause an error. So just stop writing since no one
62+
// is listening.
63+
return;
64+
}
65+
66+
return new Promise<void>((resolve, reject) => {
67+
let settled = false;
68+
69+
const finish = () => {
70+
if (settled) {
71+
return;
72+
}
73+
settled = true;
74+
75+
// Let Node process the completed write before SSR keeps emitting more chunks.
76+
setImmediate(resolve);
77+
};
78+
79+
const fail = (error: unknown) => {
80+
if (settled) {
81+
return;
82+
}
83+
settled = true;
84+
reject(error);
85+
};
86+
87+
closedPromise.then(finish);
88+
89+
try {
90+
res.write(chunk, (error) => {
91+
if (error) {
92+
if (isIgnoredError(error.message)) {
93+
finish();
94+
return;
95+
}
96+
fail(error);
97+
return;
98+
}
99+
finish();
100+
});
101+
} catch (error) {
102+
if (error instanceof Error && isIgnoredError(error.message)) {
103+
finish();
104+
return;
105+
}
106+
fail(error);
107+
}
108+
});
109+
};
110+
111+
const close = () => {
112+
if (closed || res.closed || res.destroyed) {
113+
return;
114+
}
115+
116+
return new Promise<void>((resolve) => {
117+
res.end(() => {
118+
resolve();
119+
});
120+
});
121+
};
122+
123+
return {
124+
write,
125+
close,
126+
};
127+
}
128+
47129
export async function fromNodeHttp(
48130
url: URL,
49131
req: IncomingMessage | Http2ServerRequest,
@@ -100,6 +182,7 @@ export async function fromNodeHttp(
100182
},
101183
getWritableStream: (status, headers, cookies) => {
102184
res.statusCode = status;
185+
const sink = createNodeResponseSink(res);
103186

104187
try {
105188
for (const [key, value] of headers) {
@@ -118,20 +201,10 @@ export async function fromNodeHttp(
118201

119202
return new WritableStream<Uint8Array>({
120203
write(chunk) {
121-
if (res.closed || res.destroyed) {
122-
// If the response has already been closed or destroyed (for example the client has disconnected)
123-
// then writing into it will cause an error. So just stop writing since no one
124-
// is listening.
125-
return;
126-
}
127-
res.write(chunk, (error) => {
128-
if (error && !isIgnoredError(error.message)) {
129-
console.error(error);
130-
}
131-
});
204+
return sink.write(chunk);
132205
},
133206
close() {
134-
res.end();
207+
return sink.close();
135208
},
136209
});
137210
},

packages/qwik-router/src/middleware/node/http.unit.ts

Lines changed: 170 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
1-
import { assert, test } from 'vitest';
2-
import { normalizeUrl } from './http';
1+
import {
2+
getPlatform,
3+
setPlatform,
4+
type JSXOutput,
5+
type StreamWriter,
6+
} from '@qwik.dev/core/internal';
7+
import { renderToStream } from '@qwik.dev/core/server';
8+
import { EventEmitter } from 'node:events';
9+
import { createServer, type IncomingMessage, type ServerResponse } from 'node:http';
10+
import { connect } from 'node:net';
11+
import { performance } from 'node:perf_hooks';
12+
import { assert, describe, expect, test, vi } from 'vitest';
13+
import { fromNodeHttp, normalizeUrl } from './http';
314

415
[
516
{
@@ -42,3 +53,160 @@ import { normalizeUrl } from './http';
4253
assert.equal(normalizeUrl(t.url, t.base).href, t.expect);
4354
});
4455
});
56+
57+
describe('fromNodeHttp()', () => {
58+
test('should resolve writes from the node write callback without waiting for drain', async () => {
59+
const req = new EventEmitter() as IncomingMessage & EventEmitter;
60+
req.method = 'GET';
61+
req.url = '/';
62+
req.headers = { host: 'localhost' };
63+
(req as any).socket = {};
64+
65+
let writeCallback: ((error?: Error | null) => void) | undefined;
66+
const res = new EventEmitter() as ServerResponse & EventEmitter;
67+
Object.defineProperty(res, 'closed', { value: false, configurable: true });
68+
Object.defineProperty(res, 'destroyed', { value: false, configurable: true });
69+
res.setHeader = vi.fn();
70+
res.write = vi.fn((_chunk: Uint8Array, cb?: (error?: Error | null) => void) => {
71+
writeCallback = cb;
72+
// Returning false simulates backpressure. This adapter does not attach
73+
// drain listeners because ServerResponse.write() provides a per-write callback.
74+
return false;
75+
}) as any;
76+
res.end = vi.fn((cb?: () => void) => {
77+
cb?.();
78+
return res;
79+
}) as any;
80+
81+
const requestEv = await fromNodeHttp(new URL('http://localhost/'), req, res, 'server');
82+
const writableStream = requestEv.getWritableStream(
83+
200,
84+
new Headers([['Content-Type', 'text/html; charset=utf-8']]),
85+
{ headers: () => [] } as any,
86+
() => {},
87+
undefined as any
88+
);
89+
const writer = writableStream.getWriter();
90+
91+
const writePromise = writer.write(new Uint8Array([1, 2, 3]));
92+
await Promise.resolve();
93+
// No drain listener should be attached in this path; Node calls the write
94+
// callback once this chunk has flushed, even when write() returned false.
95+
expect(res.listenerCount('drain')).toBe(0);
96+
expect(writeCallback).toBeDefined();
97+
98+
writeCallback?.(null);
99+
await writePromise;
100+
await writer.close();
101+
102+
expect(res.write).toHaveBeenCalledTimes(1);
103+
expect(res.end).toHaveBeenCalledTimes(1);
104+
});
105+
106+
// Verifies the Node adapter starts making response-write progress before a large SSR render fully completes.
107+
test('should make Node response progress before render completes', async () => {
108+
const timings = {
109+
renderStarted: 0,
110+
renderDone: 0,
111+
firstSocketWrite: 0,
112+
};
113+
const jsx: JSXOutput = Array.from({ length: 4_000 }, (_, i) => `chunk-${i}-${'x'.repeat(96)}`);
114+
const cookies = {
115+
headers: () => [],
116+
};
117+
118+
const server = createServer(async (req, res) => {
119+
res.socket?.setNoDelay(true);
120+
const socket = res.socket;
121+
if (socket) {
122+
const originalSocketWrite = socket.write.bind(socket);
123+
socket.write = ((chunk: any, ...args: any[]) => {
124+
if (timings.firstSocketWrite === 0) {
125+
timings.firstSocketWrite = performance.now();
126+
}
127+
return originalSocketWrite(chunk, ...args);
128+
}) as typeof socket.write;
129+
}
130+
131+
const requestEv = await fromNodeHttp(
132+
new URL(req.url || '/', 'http://127.0.0.1'),
133+
req,
134+
res,
135+
'server'
136+
);
137+
const writableStream = requestEv.getWritableStream(
138+
200,
139+
new Headers([['Content-Type', 'text/html; charset=utf-8']]),
140+
cookies as any,
141+
() => {},
142+
undefined as any
143+
);
144+
const writer = writableStream.getWriter();
145+
const encoder = new TextEncoder();
146+
const stream: StreamWriter = {
147+
write(chunk: string) {
148+
return writer.write(encoder.encode(chunk));
149+
},
150+
};
151+
const platform = getPlatform();
152+
153+
try {
154+
timings.renderStarted = performance.now();
155+
await renderToStream(jsx, {
156+
containerTagName: 'div',
157+
qwikLoader: 'never',
158+
stream,
159+
streaming: {
160+
inOrder: {
161+
strategy: 'auto',
162+
maximumInitialChunk: 128,
163+
maximumChunk: 64,
164+
},
165+
},
166+
});
167+
timings.renderDone = performance.now();
168+
} finally {
169+
setPlatform(platform);
170+
await writer.close();
171+
}
172+
});
173+
174+
await new Promise<void>((resolve) => {
175+
server.listen(0, '127.0.0.1', resolve);
176+
});
177+
178+
const address = server.address();
179+
if (!address || typeof address === 'string') {
180+
server.close();
181+
throw new Error('Failed to bind test server');
182+
}
183+
184+
try {
185+
await new Promise<void>((resolve, reject) => {
186+
const socket = connect(address.port, '127.0.0.1', () => {
187+
socket.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n');
188+
});
189+
190+
socket.on('data', () => {
191+
// Intentionally empty: the request must be consumed to let the server drain.
192+
});
193+
socket.on('end', resolve);
194+
socket.on('error', reject);
195+
});
196+
} finally {
197+
await new Promise<void>((resolve, reject) => {
198+
server.close((error) => {
199+
if (error) {
200+
reject(error);
201+
} else {
202+
resolve();
203+
}
204+
});
205+
});
206+
}
207+
208+
expect(timings.firstSocketWrite).toBeGreaterThan(0);
209+
expect(timings.renderDone).toBeGreaterThan(timings.renderStarted);
210+
expect(timings.firstSocketWrite).toBeLessThan(timings.renderDone);
211+
});
212+
});

packages/qwik-router/src/middleware/node/index.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,12 @@ export function createQwikRouter(
6363
const handled = await requestHandler(serverRequestEv, opts);
6464
if (handled) {
6565
const err = await handled.completion;
66-
if (err) {
67-
throw err;
68-
}
6966
if (handled.requestEv.headersSent) {
7067
return;
7168
}
69+
if (err) {
70+
throw err;
71+
}
7272
}
7373
next();
7474
} catch (e) {

0 commit comments

Comments
 (0)