Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,29 @@ class Parser {

const { llhttp } = this

// The peer closed the connection. If the body parser was paused by
// backpressure we must finish parsing before signalling EOF, otherwise
// llhttp_finish() would crash (it used to assert !paused) or report a
// half-parsed message. Backpressure is advisory here: onData keeps buffering
// delivered bytes into the response stream, so resume across pauses and
// drain whatever is still buffered on the socket. A Content-Length/chunked
// body reaches on_message_complete during execute(); an EOF-delimited body
// stays paused (its length is unknown) and is completed by llhttp_finish().
if (this.paused) {
let data
do {
llhttp.llhttp_resume(this.ptr)
this.paused = false
data = this.socket.read() || EMPTY_BUF
this.execute(data)
} while (this.paused && data.length > 0)

if (this.paused) {
llhttp.llhttp_resume(this.ptr)
this.paused = false
}
}

let ret

try {
Expand Down
85 changes: 85 additions & 0 deletions test/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const { tspl } = require('@matteo.collina/tspl')
const { readFileSync, createReadStream } = require('node:fs')
const { createServer } = require('node:http')
const { createServer: createNetServer } = require('node:net')
const { Readable, PassThrough } = require('node:stream')
const { test, after } = require('node:test')
const { Client, errors } = require('..')
Expand Down Expand Up @@ -1322,6 +1323,90 @@ test('multiple destroy callback', async (t) => {
await t.completed
})

// Regression tests for https://github.com/nodejs/undici/issues/5360: when a
// response body applies backpressure the llhttp parser is left paused; if the
// peer then FINs, onHttpSocketEnd -> parser.finish() used to assert !paused and
// crash the process. finish() must instead complete the paused parser. The body
// is only consumed *after* the FIN (attaching just 'end'/'error' does not switch
// the stream to flowing mode), so the parser is genuinely paused at FIN. The
// 64 KiB payload matches the default highWaterMark: large enough to pause, small
// enough to arrive in a single socket read.
function pausedAtFin (t, writeResponse, onBody) {
const payload = Buffer.alloc(64 * 1024, 0x61)

const server = createNetServer((socket) => {
socket.once('data', () => {
writeResponse(socket, payload)
socket.end() // FIN while the body is still unconsumed -> parser paused.
})
})
after(() => server.close())

server.listen(0, '127.0.0.1', () => {
const client = new Client(`http://127.0.0.1:${server.address().port}`)
after(() => client.close())

client.request({ path: '/', method: 'GET' }, (err, data) => {
t.ifError(err)

const chunks = []
data.body.on('end', () => onBody(t, payload, Buffer.concat(chunks), null))
data.body.on('error', (err) => onBody(t, payload, Buffer.concat(chunks), err))

// Defer consuming until after FIN has been handled while paused.
setTimeout(() => data.body.on('data', (chunk) => chunks.push(chunk)), 100)
})
})
}

const expectComplete = (t, payload, body, err) => {
t.ifError(err)
t.strictEqual(body.length, payload.length)
}

const expectError = (t, payload, body, err) => {
// A truncated body must surface as an error, never a clean completion.
t.ok(err instanceof Error, `expected an error, got ${err}`)
}

test('socket end completes Content-Length response paused by backpressure', async (t) => {
t = tspl(t, { plan: 3 })
pausedAtFin(t, (socket, payload) => {
socket.write(`HTTP/1.1 200 OK\r\nContent-Length: ${payload.length}\r\nConnection: close\r\n\r\n`)
socket.write(payload)
}, expectComplete)
await t.completed
})

test('socket end completes EOF-delimited response paused by backpressure', async (t) => {
t = tspl(t, { plan: 3 })
pausedAtFin(t, (socket, payload) => {
socket.write('HTTP/1.1 200 OK\r\nConnection: close\r\n\r\n')
socket.write(payload)
}, expectComplete)
await t.completed
})

test('socket end errors on truncated Content-Length response paused by backpressure', async (t) => {
t = tspl(t, { plan: 2 })
pausedAtFin(t, (socket, payload) => {
// Declare twice the body we actually send, then FIN -> truncated.
socket.write(`HTTP/1.1 200 OK\r\nContent-Length: ${payload.length * 2}\r\nConnection: close\r\n\r\n`)
socket.write(payload)
}, expectError)
await t.completed
})

test('socket end errors on truncated chunked response paused by backpressure', async (t) => {
t = tspl(t, { plan: 2 })
pausedAtFin(t, (socket, payload) => {
socket.write('HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nConnection: close\r\n\r\n')
socket.write(`${payload.length.toString(16)}\r\n`)
socket.write(payload) // no trailing CRLF / terminating 0-chunk -> truncated
}, expectError)
await t.completed
})

test('only one streaming req at a time', async (t) => {
t = tspl(t, { plan: 7 })

Expand Down
Loading