Skip to content

Commit 4d4f851

Browse files
committed
fix: add response stream handling
1 parent b54b39b commit 4d4f851

2 files changed

Lines changed: 146 additions & 50 deletions

File tree

src/interceptors/http/http-parser.ts

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,41 @@
11
import {
2-
HeadersCallback,
32
HTTPParser,
4-
RequestHeadersCompleteCallback,
3+
type HeadersCallback,
4+
type RequestHeadersCompleteCallback,
5+
type ResponseHeadersCompleteCallback,
56
} from '_http_common'
67

7-
export class HttpRequestParser {
8-
#parser: HTTPParser<typeof HTTPParser.REQUEST>
8+
type HttpParserKind = typeof HTTPParser.REQUEST | typeof HTTPParser.RESPONSE
99

10-
constructor(options: {
11-
onMessageBegin?: () => void
12-
onHeaders?: HeadersCallback
13-
onHeadersComplete?: RequestHeadersCompleteCallback
14-
onBody?: (chunk: Buffer) => void
15-
onMessageComplete?: () => void
16-
onExecute?: () => void
17-
onTimeout?: () => void
18-
}) {
10+
export class HttpParser<ParserKind extends HttpParserKind> {
11+
static REQUEST = HTTPParser.REQUEST
12+
static RESPONSE = HTTPParser.RESPONSE
13+
14+
#parser: HTTPParser<HttpParserKind>
15+
16+
constructor(
17+
kind: ParserKind,
18+
hooks: {
19+
onMessageBegin?: () => void
20+
onHeaders?: HeadersCallback
21+
onHeadersComplete?: ParserKind extends typeof HTTPParser.REQUEST
22+
? RequestHeadersCompleteCallback
23+
: ResponseHeadersCompleteCallback
24+
onBody?: (chunk: Buffer) => void
25+
onMessageComplete?: () => void
26+
onExecute?: () => void
27+
onTimeout?: () => void
28+
}
29+
) {
1930
this.#parser = new HTTPParser()
20-
this.#parser.initialize(HTTPParser.REQUEST, {})
21-
this.#parser[HTTPParser.kOnMessageBegin] = options.onMessageBegin
22-
this.#parser[HTTPParser.kOnHeaders] = options.onHeaders
23-
this.#parser[HTTPParser.kOnHeadersComplete] = options.onHeadersComplete
24-
this.#parser[HTTPParser.kOnBody] = options.onBody
25-
this.#parser[HTTPParser.kOnMessageComplete] = options.onMessageComplete
26-
this.#parser[HTTPParser.kOnExecute] = options.onExecute
27-
this.#parser[HTTPParser.kOnTimeout] = options.onTimeout
31+
this.#parser.initialize(kind, {})
32+
this.#parser[HTTPParser.kOnMessageBegin] = hooks.onMessageBegin
33+
this.#parser[HTTPParser.kOnHeaders] = hooks.onHeaders
34+
this.#parser[HTTPParser.kOnHeadersComplete] = hooks.onHeadersComplete
35+
this.#parser[HTTPParser.kOnBody] = hooks.onBody
36+
this.#parser[HTTPParser.kOnMessageComplete] = hooks.onMessageComplete
37+
this.#parser[HTTPParser.kOnExecute] = hooks.onExecute
38+
this.#parser[HTTPParser.kOnTimeout] = hooks.onTimeout
2839
}
2940

3041
public execute(buffer: Buffer): void {

src/interceptors/http/index.ts

Lines changed: 114 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ import { Interceptor, INTERNAL_REQUEST_ID_HEADER_NAME } from '../../Interceptor'
55
import { type HttpRequestEventMap } from '../../glossary'
66
import { SocketInterceptor } from '../net'
77
import { FetchResponse } from '../../utils/fetchUtils'
8-
import { HttpRequestParser } from './http-parser'
8+
import { HttpParser } from './http-parser'
99
import { baseUrlFromConnectionOptions } from '../Socket/utils/baseUrlFromConnectionOptions'
10+
import { MockSocket } from '../net/mock-socket'
1011
import type { NetworkConnectionOptions } from '../net/utils/normalize-net-connect-args'
1112
import { toBuffer } from './utils/to-buffer'
1213
import { createRequestId } from '../../createRequestId'
@@ -18,7 +19,7 @@ import {
1819
restoreHeadersPrototype,
1920
} from '../ClientRequest/utils/recordRawHeaders'
2021
import { isResponseError } from '../../utils/responseUtils'
21-
import { MockSocket } from '../Socket/MockSocket'
22+
import { emitAsync } from '../../utils/emitAsync'
2223

2324
/**
2425
* @fixme Can we use the socket interceptor as a singleton?
@@ -79,13 +80,19 @@ export class HttpRequestInterceptor extends Interceptor<HttpRequestEventMap> {
7980
requestId,
8081
controller,
8182
emitter: this.emitter,
82-
async onResponse(response) {
83+
onResponse: async (response) => {
8384
await respondWith({
8485
socket,
8586
connectionOptions: options,
8687
request,
8788
response,
8889
})
90+
await emitAsync(this.emitter, 'response', {
91+
requestId,
92+
request,
93+
response,
94+
isMockedResponse: true,
95+
})
8996
},
9097
async onRequestError(response) {
9198
await respondWith({
@@ -94,6 +101,12 @@ export class HttpRequestInterceptor extends Interceptor<HttpRequestEventMap> {
94101
request,
95102
response,
96103
})
104+
await emitAsync(this.emitter, 'response', {
105+
requestId,
106+
request,
107+
response,
108+
isMockedResponse: true,
109+
})
97110
},
98111
onError(error) {
99112
if (error instanceof Error) {
@@ -103,35 +116,46 @@ export class HttpRequestInterceptor extends Interceptor<HttpRequestEventMap> {
103116
})
104117

105118
if (!isRequestHandled) {
119+
// If the user didn't register any response listeners, no need to pay the
120+
// price of routing the entire response message through the parser.
121+
if (this.emitter.listenerCount('response') > 0) {
122+
createHttpResponseParserStream({
123+
socket,
124+
onResponse: async (response) => {
125+
await emitAsync(this.emitter, 'response', {
126+
requestId,
127+
request,
128+
response,
129+
isMockedResponse: false,
130+
})
131+
},
132+
})
133+
}
134+
106135
const passthroughSocket = socket.passthrough()
107136

108137
/**
109-
* @note Creating a passthroughsocket does NOT trigger the "socket" event
110-
* from `http.ClientRequest` where the request, parser, and socket get
111-
* associated. Recreate that association on the passthrough socket manually.
138+
* @note Creating a passthrough socket does NOT trigger the "onSocket" callback
139+
* of `ClientRequest` because that callback is invoked manually in the request's constructor.
140+
* Promote the parser-request-parser association manually from the mocked onto the passthrough socket.
141+
* @see https://github.com/nodejs/node/blob/134625d76139b4b3630d5baaf2efccae01ede564/lib/_http_client.js#L422
112142
* @see https://github.com/nodejs/node/blob/134625d76139b4b3630d5baaf2efccae01ede564/lib/_http_client.js#L890
113143
*/
114-
// @ts-expect-error Internal Node.js property.
144+
// @ts-expect-error Node.js internals.
115145
passthroughSocket._httpMessage = socket._httpMessage
116-
// @ts-expect-error Internal Node.js property.c
146+
// @ts-expect-error Node.js internals.
117147
passthroughSocket.parser = socket.parser
118-
// @ts-expect-error Internal Node.js property.
148+
// @ts-expect-error Node.js internals.
119149
passthroughSocket.parser.socket = passthroughSocket
120150
}
121151
},
122152
})
123153

124-
// Write the header again because at this point it's already been written.
154+
// Write the message header to the parser manually because it's already been written
155+
// on the socket so it won't get piped.
125156
requestParser.write(toBuffer(chunk, encoding))
126157
socket.pipe(requestParser)
127158
})
128-
129-
socket.on('push', (chunk, encoding) => {
130-
/**
131-
* @todo Route this through a response parser so both mocked
132-
* and passthrough responses emit the "response" event for the user.
133-
*/
134-
})
135159
})
136160
})
137161
}
@@ -146,7 +170,7 @@ function createHttpRequestParserStream(options: {
146170
const requestRawHeadersBuffer: Array<string> = []
147171
let requestBodyStream: Readable | undefined
148172

149-
const parser = new HttpRequestParser({
173+
const parser = new HttpParser(HttpParser.REQUEST, {
150174
onHeaders(rawHeaders) {
151175
requestRawHeadersBuffer.push(...rawHeaders)
152176
},
@@ -210,7 +234,7 @@ function createHttpRequestParserStream(options: {
210234
onBody(chunk) {
211235
invariant(
212236
requestBodyStream,
213-
'Failed to write to a request stream: stream does not exist'
237+
'Failed to write to a request stream: stream does not exist. This is likely an issue with the library. Please report it on GitHub.'
214238
)
215239

216240
requestBodyStream.push(chunk)
@@ -227,13 +251,75 @@ function createHttpRequestParserStream(options: {
227251
},
228252
})
229253

230-
parserStream.once('finish', () => {
231-
parser.free()
232-
})
254+
parserStream
255+
.once('finish', () => parser.free())
256+
.once('error', () => parser.free())
233257

234258
return parserStream
235259
}
236260

261+
function createHttpResponseParserStream(options: {
262+
socket: MockSocket
263+
onResponse: (response: Response) => void
264+
}) {
265+
const { socket, onResponse } = options
266+
const responseRawHeadersBuffer: Array<string> = []
267+
let responseBodyStream: Readable | undefined
268+
269+
const parser = new HttpParser(HttpParser.RESPONSE, {
270+
onHeaders(rawHeaders) {
271+
responseRawHeadersBuffer.push(...rawHeaders)
272+
},
273+
onHeadersComplete(
274+
versionMajor,
275+
versionMinor,
276+
rawHeaders,
277+
method,
278+
url,
279+
status,
280+
statusText
281+
) {
282+
const headers = FetchResponse.parseRawHeaders([
283+
...responseRawHeadersBuffer,
284+
...(rawHeaders || []),
285+
])
286+
287+
const response = new FetchResponse(
288+
FetchResponse.isResponseWithBody(status)
289+
? (Readable.toWeb(
290+
(responseBodyStream = new Readable({ read() {} }))
291+
) as any)
292+
: null,
293+
{
294+
url,
295+
status,
296+
statusText,
297+
headers,
298+
}
299+
)
300+
301+
onResponse(response)
302+
},
303+
onBody(chunk) {
304+
invariant(
305+
responseBodyStream,
306+
'Failed to read from a response stream: stream does not exist. This is likely an issue with the library. Please report it on GitHub.'
307+
)
308+
309+
responseBodyStream.push(chunk)
310+
},
311+
onMessageComplete() {
312+
responseBodyStream?.push(null)
313+
},
314+
})
315+
316+
socket
317+
.on('push', (chunk, encoding) => {
318+
parser.execute(toBuffer(chunk, encoding))
319+
})
320+
.once('close', () => parser.free())
321+
}
322+
237323
/**
238324
* Mocks a successful socket connection.
239325
*/
@@ -314,20 +400,19 @@ async function respondWith(args: {
314400

315401
// Construct a regular server response to delegate body parsing to Node.js.
316402
const serverResponse = new ServerResponse(new IncomingMessage(socket))
403+
const responseSocket = new MockSocket({} as any)
404+
responseSocket.on('write', (chunk, encoding, callback) => {
405+
socket.push(chunk, encoding)
406+
callback?.()
407+
})
317408

318409
serverResponse.assignSocket(
319410
/**
320411
* @note Provide a dummy socket to the server response to translate all its writes
321412
* into pushes to the underlying mocked socket. This is only needed because we
322413
* use `ServerResponse` instead of pushing to mock socket directly (skip parsing).
323414
*/
324-
new MockSocket({
325-
write(chunk, encoding, callback) {
326-
socket.push(chunk, encoding)
327-
callback?.()
328-
},
329-
read() {},
330-
})
415+
responseSocket
331416
)
332417

333418
/**

0 commit comments

Comments
 (0)