Skip to content

Commit d3faabe

Browse files
committed
fix: request body reads, response piping
1 parent 535facd commit d3faabe

2 files changed

Lines changed: 100 additions & 79 deletions

File tree

src/interceptors/http/index.ts

Lines changed: 59 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import net from 'node:net'
22
import { Readable, Writable } from 'node:stream'
33
import { invariant } from 'outvariant'
4-
import { Interceptor, INTERNAL_REQUEST_ID_HEADER_NAME } from '../../Interceptor'
4+
import { Interceptor } from '../../Interceptor'
55
import { type HttpRequestEventMap } from '../../glossary'
66
import { SocketInterceptor } from '../net'
77
import { FetchResponse } from '../../utils/fetchUtils'
@@ -81,32 +81,44 @@ export class HttpRequestInterceptor extends Interceptor<HttpRequestEventMap> {
8181
controller,
8282
emitter: this.emitter,
8383
onResponse: async (response) => {
84+
if (this.emitter.listenerCount('response') > 0) {
85+
const responseClone = response.clone()
86+
process.nextTick(() => {
87+
emitAsync(this.emitter, 'response', {
88+
requestId,
89+
request,
90+
response: responseClone,
91+
isMockedResponse: true,
92+
})
93+
})
94+
}
95+
8496
await respondWith({
8597
socket,
8698
connectionOptions: options,
8799
request,
88100
response,
89101
})
90-
await emitAsync(this.emitter, 'response', {
91-
requestId,
92-
request,
93-
response,
94-
isMockedResponse: true,
95-
})
96102
},
97103
async onRequestError(response) {
104+
if (this.emitter.listenerCount('response') > 0) {
105+
const responseClone = response.clone()
106+
process.nextTick(() => {
107+
emitAsync(this.emitter, 'response', {
108+
requestId,
109+
request,
110+
response: responseClone,
111+
isMockedResponse: true,
112+
})
113+
})
114+
}
115+
98116
await respondWith({
99117
socket,
100118
connectionOptions: options,
101119
request,
102120
response,
103121
})
104-
await emitAsync(this.emitter, 'response', {
105-
requestId,
106-
request,
107-
response,
108-
isMockedResponse: true,
109-
})
110122
},
111123
onError(error) {
112124
if (error instanceof Error) {
@@ -116,22 +128,6 @@ export class HttpRequestInterceptor extends Interceptor<HttpRequestEventMap> {
116128
})
117129

118130
if (!isRequestHandled) {
119-
// If the user didn't register any response listeners, no need to pay the
120-
// price of routing the entire response message through the parser.
121-
if (this.emitter.listenerCount('response') > 0) {
122-
createHttpResponseParserStream({
123-
socket,
124-
onResponse: async (response) => {
125-
await emitAsync(this.emitter, 'response', {
126-
requestId,
127-
request,
128-
response,
129-
isMockedResponse: false,
130-
})
131-
},
132-
})
133-
}
134-
135131
const passthroughSocket = socket.passthrough()
136132

137133
/**
@@ -147,14 +143,39 @@ export class HttpRequestInterceptor extends Interceptor<HttpRequestEventMap> {
147143
passthroughSocket.parser = socket.parser
148144
// @ts-expect-error Node.js internals.
149145
passthroughSocket.parser.socket = passthroughSocket
146+
147+
// If the user didn't register any response listeners, no need to pay the
148+
// price of routing the entire response message through the parser.
149+
if (this.emitter.listenerCount('response') > 0) {
150+
const responseStream = createHttpResponseParserStream({
151+
onResponse: async (response) => {
152+
await emitAsync(this.emitter, 'response', {
153+
requestId,
154+
request,
155+
response,
156+
isMockedResponse: false,
157+
})
158+
},
159+
})
160+
161+
passthroughSocket
162+
.on('data', (chunk) => responseStream.write(chunk))
163+
.on('close', () => responseStream.end())
164+
}
150165
}
151166
},
152167
})
153168

154169
// Write the message header to the parser manually because it's already been written
155170
// on the socket so it won't get piped.
156171
requestParser.write(toBuffer(chunk, encoding))
157-
socket.pipe(requestParser)
172+
173+
socket
174+
.on('write', (chunk, encoding, callback) => {
175+
requestParser.write(chunk, encoding, callback)
176+
})
177+
.on('finish', () => requestParser.end())
178+
.on('error', () => requestParser.end())
158179
})
159180
})
160181
})
@@ -211,13 +232,7 @@ function createHttpRequestParserStream(options: {
211232
* @note Provide the `read()` method so a `Readable` could be
212233
* used as the actual request body (the stream calls "read()").
213234
*/
214-
read() {
215-
// If the user attempts to read the request body,
216-
// flush the write buffer to trigger the callbacks.
217-
// This way, if the request stream ends in the write callback,
218-
// it will indeed end correctly.
219-
// flushWriteBuffer()
220-
},
235+
read() {},
221236
})
222237

223238
const request = new Request(url, {
@@ -254,25 +269,18 @@ function createHttpRequestParserStream(options: {
254269
},
255270
})
256271

257-
const parserStream = new Writable({
272+
return new Writable({
258273
write(chunk, encoding, callback) {
259274
parser.execute(toBuffer(chunk, encoding))
260275
callback()
261276
},
262277
})
263-
264-
parserStream
265-
.once('finish', () => parser.free())
266-
.once('close', () => parser.free())
267-
268-
return parserStream
269278
}
270279

271280
function createHttpResponseParserStream(options: {
272-
socket: MockSocket
273281
onResponse: (response: Response) => void
274282
}) {
275-
const { socket, onResponse } = options
283+
const { onResponse } = options
276284
const responseRawHeadersBuffer: Array<string> = []
277285
let responseBodyStream: Readable | undefined
278286

@@ -323,11 +331,12 @@ function createHttpResponseParserStream(options: {
323331
},
324332
})
325333

326-
socket
327-
.on('push', (chunk, encoding) => {
334+
return new Writable({
335+
write(chunk, encoding, callback) {
328336
parser.execute(toBuffer(chunk, encoding))
329-
})
330-
.once('close', () => parser.free())
337+
callback()
338+
},
339+
})
331340
}
332341

333342
/**

test/modules/http/compliance/events.test.ts

Lines changed: 41 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ const httpServer = new HttpServer((app) => {
99
app.get('/', (req, res) => {
1010
res.send('original-response')
1111
})
12+
app.post('/', (req, res) => {
13+
res.send()
14+
})
1215
})
1316

1417
const interceptor = new HttpRequestInterceptor()
@@ -18,6 +21,10 @@ beforeAll(async () => {
1821
await httpServer.listen()
1922
})
2023

24+
afterEach(() => {
25+
interceptor.removeAllListeners()
26+
})
27+
2128
afterAll(async () => {
2229
interceptor.dispose()
2330
await httpServer.close()
@@ -48,10 +55,10 @@ it('emits the "request" event for an outgoing request without body', async () =>
4855
expect(request.body).toBe(null)
4956
})
5057

51-
it('emits the "request" event for an outgoing request with a body', async () => {
58+
it('emits the "request" event for a bypassed request with a body', async () => {
5259
const requestListener =
5360
vi.fn<(...args: HttpRequestEventMap['request']) => void>()
54-
interceptor.once('request', requestListener)
61+
interceptor.on('request', requestListener)
5562

5663
const request = http.request(httpServer.http.url('/'), {
5764
method: 'POST',
@@ -62,21 +69,22 @@ it('emits the "request" event for an outgoing request with a body', async () =>
6269
})
6370
request.write('post-payload')
6471
request.end()
72+
6573
await waitForClientRequest(request)
6674

6775
expect(requestListener).toHaveBeenCalledTimes(1)
6876

69-
const { request: requestFromListener } = requestListener.mock.calls[0][0]
70-
expect(requestFromListener).toBeInstanceOf(Request)
71-
expect(requestFromListener.method).toBe('POST')
72-
expect(requestFromListener.url).toBe(httpServer.http.url('/'))
77+
const { request: interceptedRequest } = requestListener.mock.calls[0][0]
78+
expect(interceptedRequest).toBeInstanceOf(Request)
79+
expect(interceptedRequest.method).toBe('POST')
80+
expect(interceptedRequest.url).toBe(httpServer.http.url('/'))
7381
expect(
74-
Object.fromEntries(requestFromListener.headers.entries())
82+
Object.fromEntries(interceptedRequest.headers.entries())
7583
).toMatchObject({
7684
'content-type': 'text/plain',
7785
'x-custom-header': 'yes',
7886
})
79-
expect(await requestFromListener.text()).toBe('post-payload')
87+
await expect(interceptedRequest.text()).resolves.toBe('post-payload')
8088
})
8189

8290
it('emits the "response" event for a mocked response', async () => {
@@ -99,28 +107,28 @@ it('emits the "response" event for a mocked response', async () => {
99107
const {
100108
response,
101109
requestId,
102-
request: requestFromListener,
110+
request: interceptedRequest,
103111
isMockedResponse,
104112
} = responseListener.mock.calls[0][0]
105113
expect(response).toBeInstanceOf(Response)
106114
expect(response.status).toBe(200)
107-
expect(await response.text()).toBe('hello world')
115+
await expect(response.text()).resolves.toBe('hello world')
108116
expect(isMockedResponse).toBe(true)
109117

110118
expect(requestId).toMatch(REQUEST_ID_REGEXP)
111-
expect(requestFromListener).toBeInstanceOf(Request)
112-
expect(requestFromListener.method).toBe('GET')
113-
expect(requestFromListener.url).toBe('http://localhost/')
119+
expect(interceptedRequest).toBeInstanceOf(Request)
120+
expect(interceptedRequest.method).toBe('GET')
121+
expect(interceptedRequest.url).toBe('http://localhost/')
114122
expect(
115-
Object.fromEntries(requestFromListener.headers.entries())
123+
Object.fromEntries(interceptedRequest.headers.entries())
116124
).toMatchObject({
117125
'x-custom-header': 'yes',
118126
})
119-
expect(requestFromListener.body).toBe(null)
127+
expect(interceptedRequest.body).toBe(null)
120128

121129
// Must respond with the mocked response.
122130
expect(res.statusCode).toBe(200)
123-
expect(await text()).toBe('hello world')
131+
await expect(text()).resolves.toBe('hello world')
124132
})
125133

126134
it('emits the "response" event for a bypassed response', async () => {
@@ -135,31 +143,35 @@ it('emits the "response" event for a bypassed response', async () => {
135143
})
136144
const { res, text } = await waitForClientRequest(request)
137145

138-
// Must emit the "response" interceptor event.
139-
expect(responseListener).toHaveBeenCalledTimes(1)
146+
// Must respond with the mocked response.
147+
expect.soft(res.statusCode).toBe(200)
148+
await expect.soft(text()).resolves.toBe('original-response')
149+
150+
expect(
151+
responseListener,
152+
'Must emit the "response" event'
153+
).toHaveBeenCalledTimes(1)
154+
140155
const {
141156
response,
142157
requestId,
143-
request: requestFromListener,
158+
request: interceptedRequest,
144159
isMockedResponse,
145160
} = responseListener.mock.calls[0][0]
161+
146162
expect(response).toBeInstanceOf(Response)
147163
expect(response.status).toBe(200)
148-
expect(await response.text()).toBe('original-response')
164+
await expect(response.text()).resolves.toBe('original-response')
149165
expect(isMockedResponse).toBe(false)
150166

151167
expect(requestId).toMatch(REQUEST_ID_REGEXP)
152-
expect(requestFromListener).toBeInstanceOf(Request)
153-
expect(requestFromListener.method).toBe('GET')
154-
expect(requestFromListener.url).toBe(httpServer.http.url('/'))
168+
expect(interceptedRequest).toBeInstanceOf(Request)
169+
expect(interceptedRequest.method).toBe('GET')
170+
expect(interceptedRequest.url).toBe(httpServer.http.url('/'))
155171
expect(
156-
Object.fromEntries(requestFromListener.headers.entries())
172+
Object.fromEntries(interceptedRequest.headers.entries())
157173
).toMatchObject({
158174
'x-custom-header': 'yes',
159175
})
160-
expect(requestFromListener.body).toBe(null)
161-
162-
// Must respond with the mocked response.
163-
expect(res.statusCode).toBe(200)
164-
expect(await text()).toBe('original-response')
176+
expect(interceptedRequest.body).toBe(null)
165177
})

0 commit comments

Comments
 (0)