diff --git a/lib/dispatcher/client-h1.js b/lib/dispatcher/client-h1.js index dfedeb49c7b..6d89dd09b5e 100644 --- a/lib/dispatcher/client-h1.js +++ b/lib/dispatcher/client-h1.js @@ -377,6 +377,29 @@ class Parser { const { llhttp } = this + // The peer closed the connection. If the body parser was paused by + // backpressure we must finish parsing before signalling EOF, otherwise + // llhttp_finish() would crash (it used to assert !paused) or report a + // half-parsed message. Backpressure is advisory here: onData keeps buffering + // delivered bytes into the response stream, so resume across pauses and + // drain whatever is still buffered on the socket. A Content-Length/chunked + // body reaches on_message_complete during execute(); an EOF-delimited body + // stays paused (its length is unknown) and is completed by llhttp_finish(). + if (this.paused) { + let data + do { + llhttp.llhttp_resume(this.ptr) + this.paused = false + data = this.socket.read() || EMPTY_BUF + this.execute(data) + } while (this.paused && data.length > 0) + + if (this.paused) { + llhttp.llhttp_resume(this.ptr) + this.paused = false + } + } + let ret try { diff --git a/test/client.js b/test/client.js index 2b8b0517d28..dbe13202ea1 100644 --- a/test/client.js +++ b/test/client.js @@ -3,6 +3,7 @@ const { tspl } = require('@matteo.collina/tspl') const { readFileSync, createReadStream } = require('node:fs') const { createServer } = require('node:http') +const { createServer: createNetServer } = require('node:net') const { Readable, PassThrough } = require('node:stream') const { test, after } = require('node:test') const { Client, errors } = require('..') @@ -1322,6 +1323,90 @@ test('multiple destroy callback', async (t) => { await t.completed }) +// Regression tests for https://github.com/nodejs/undici/issues/5360: when a +// response body applies backpressure the llhttp parser is left paused; if the +// peer then FINs, onHttpSocketEnd -> parser.finish() used to assert !paused and +// crash the process. finish() must instead complete the paused parser. The body +// is only consumed *after* the FIN (attaching just 'end'/'error' does not switch +// the stream to flowing mode), so the parser is genuinely paused at FIN. The +// 64 KiB payload matches the default highWaterMark: large enough to pause, small +// enough to arrive in a single socket read. +function pausedAtFin (t, writeResponse, onBody) { + const payload = Buffer.alloc(64 * 1024, 0x61) + + const server = createNetServer((socket) => { + socket.once('data', () => { + writeResponse(socket, payload) + socket.end() // FIN while the body is still unconsumed -> parser paused. + }) + }) + after(() => server.close()) + + server.listen(0, '127.0.0.1', () => { + const client = new Client(`http://127.0.0.1:${server.address().port}`) + after(() => client.close()) + + client.request({ path: '/', method: 'GET' }, (err, data) => { + t.ifError(err) + + const chunks = [] + data.body.on('end', () => onBody(t, payload, Buffer.concat(chunks), null)) + data.body.on('error', (err) => onBody(t, payload, Buffer.concat(chunks), err)) + + // Defer consuming until after FIN has been handled while paused. + setTimeout(() => data.body.on('data', (chunk) => chunks.push(chunk)), 100) + }) + }) +} + +const expectComplete = (t, payload, body, err) => { + t.ifError(err) + t.strictEqual(body.length, payload.length) +} + +const expectError = (t, payload, body, err) => { + // A truncated body must surface as an error, never a clean completion. + t.ok(err instanceof Error, `expected an error, got ${err}`) +} + +test('socket end completes Content-Length response paused by backpressure', async (t) => { + t = tspl(t, { plan: 3 }) + pausedAtFin(t, (socket, payload) => { + socket.write(`HTTP/1.1 200 OK\r\nContent-Length: ${payload.length}\r\nConnection: close\r\n\r\n`) + socket.write(payload) + }, expectComplete) + await t.completed +}) + +test('socket end completes EOF-delimited response paused by backpressure', async (t) => { + t = tspl(t, { plan: 3 }) + pausedAtFin(t, (socket, payload) => { + socket.write('HTTP/1.1 200 OK\r\nConnection: close\r\n\r\n') + socket.write(payload) + }, expectComplete) + await t.completed +}) + +test('socket end errors on truncated Content-Length response paused by backpressure', async (t) => { + t = tspl(t, { plan: 2 }) + pausedAtFin(t, (socket, payload) => { + // Declare twice the body we actually send, then FIN -> truncated. + socket.write(`HTTP/1.1 200 OK\r\nContent-Length: ${payload.length * 2}\r\nConnection: close\r\n\r\n`) + socket.write(payload) + }, expectError) + await t.completed +}) + +test('socket end errors on truncated chunked response paused by backpressure', async (t) => { + t = tspl(t, { plan: 2 }) + pausedAtFin(t, (socket, payload) => { + socket.write('HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nConnection: close\r\n\r\n') + socket.write(`${payload.length.toString(16)}\r\n`) + socket.write(payload) // no trailing CRLF / terminating 0-chunk -> truncated + }, expectError) + await t.completed +}) + test('only one streaming req at a time', async (t) => { t = tspl(t, { plan: 7 })