Skip to content

Commit e375e22

Browse files
committed
fix: request body reads, response piping
1 parent 535facd commit e375e22

2 files changed

Lines changed: 100 additions & 80 deletions

File tree

src/interceptors/http/index.ts

Lines changed: 59 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
import net from 'node:net'
22
import { Readable, Writable } from 'node:stream'
33
import { invariant } from 'outvariant'
4-
import { Interceptor, INTERNAL_REQUEST_ID_HEADER_NAME } from '../../Interceptor'
4+
import { Interceptor } from '../../Interceptor'
55
import { type HttpRequestEventMap } from '../../glossary'
66
import { SocketInterceptor } from '../net'
77
import { FetchResponse } from '../../utils/fetchUtils'
88
import { HttpParser } from './http-parser'
99
import { baseUrlFromConnectionOptions } from '../Socket/utils/baseUrlFromConnectionOptions'
10-
import { MockSocket } from '../net/mock-socket'
1110
import type { NetworkConnectionOptions } from '../net/utils/normalize-net-connect-args'
1211
import { toBuffer } from './utils/to-buffer'
1312
import { createRequestId } from '../../createRequestId'
@@ -81,32 +80,44 @@ export class HttpRequestInterceptor extends Interceptor<HttpRequestEventMap> {
8180
controller,
8281
emitter: this.emitter,
8382
onResponse: async (response) => {
83+
if (this.emitter.listenerCount('response') > 0) {
84+
const responseClone = response.clone()
85+
process.nextTick(() => {
86+
emitAsync(this.emitter, 'response', {
87+
requestId,
88+
request,
89+
response: responseClone,
90+
isMockedResponse: true,
91+
})
92+
})
93+
}
94+
8495
await respondWith({
8596
socket,
8697
connectionOptions: options,
8798
request,
8899
response,
89100
})
90-
await emitAsync(this.emitter, 'response', {
91-
requestId,
92-
request,
93-
response,
94-
isMockedResponse: true,
95-
})
96101
},
97102
async onRequestError(response) {
103+
if (this.emitter.listenerCount('response') > 0) {
104+
const responseClone = response.clone()
105+
process.nextTick(() => {
106+
emitAsync(this.emitter, 'response', {
107+
requestId,
108+
request,
109+
response: responseClone,
110+
isMockedResponse: true,
111+
})
112+
})
113+
}
114+
98115
await respondWith({
99116
socket,
100117
connectionOptions: options,
101118
request,
102119
response,
103120
})
104-
await emitAsync(this.emitter, 'response', {
105-
requestId,
106-
request,
107-
response,
108-
isMockedResponse: true,
109-
})
110121
},
111122
onError(error) {
112123
if (error instanceof Error) {
@@ -116,22 +127,6 @@ export class HttpRequestInterceptor extends Interceptor<HttpRequestEventMap> {
116127
})
117128

118129
if (!isRequestHandled) {
119-
// If the user didn't register any response listeners, no need to pay the
120-
// price of routing the entire response message through the parser.
121-
if (this.emitter.listenerCount('response') > 0) {
122-
createHttpResponseParserStream({
123-
socket,
124-
onResponse: async (response) => {
125-
await emitAsync(this.emitter, 'response', {
126-
requestId,
127-
request,
128-
response,
129-
isMockedResponse: false,
130-
})
131-
},
132-
})
133-
}
134-
135130
const passthroughSocket = socket.passthrough()
136131

137132
/**
@@ -147,14 +142,39 @@ export class HttpRequestInterceptor extends Interceptor<HttpRequestEventMap> {
147142
passthroughSocket.parser = socket.parser
148143
// @ts-expect-error Node.js internals.
149144
passthroughSocket.parser.socket = passthroughSocket
145+
146+
// If the user didn't register any response listeners, no need to pay the
147+
// price of routing the entire response message through the parser.
148+
if (this.emitter.listenerCount('response') > 0) {
149+
const responseStream = createHttpResponseParserStream({
150+
onResponse: async (response) => {
151+
await emitAsync(this.emitter, 'response', {
152+
requestId,
153+
request,
154+
response,
155+
isMockedResponse: false,
156+
})
157+
},
158+
})
159+
160+
passthroughSocket
161+
.on('data', (chunk) => responseStream.write(chunk))
162+
.on('close', () => responseStream.end())
163+
}
150164
}
151165
},
152166
})
153167

154168
// Write the message header to the parser manually because it's already been written
155169
// on the socket so it won't get piped.
156170
requestParser.write(toBuffer(chunk, encoding))
157-
socket.pipe(requestParser)
171+
172+
socket
173+
.on('write', (chunk, encoding, callback) => {
174+
requestParser.write(chunk, encoding, callback)
175+
})
176+
.on('finish', () => requestParser.end())
177+
.on('error', () => requestParser.end())
158178
})
159179
})
160180
})
@@ -211,13 +231,7 @@ function createHttpRequestParserStream(options: {
211231
* @note Provide the `read()` method so a `Readable` could be
212232
* used as the actual request body (the stream calls "read()").
213233
*/
214-
read() {
215-
// If the user attempts to read the request body,
216-
// flush the write buffer to trigger the callbacks.
217-
// This way, if the request stream ends in the write callback,
218-
// it will indeed end correctly.
219-
// flushWriteBuffer()
220-
},
234+
read() {},
221235
})
222236

223237
const request = new Request(url, {
@@ -254,25 +268,18 @@ function createHttpRequestParserStream(options: {
254268
},
255269
})
256270

257-
const parserStream = new Writable({
271+
return new Writable({
258272
write(chunk, encoding, callback) {
259273
parser.execute(toBuffer(chunk, encoding))
260274
callback()
261275
},
262276
})
263-
264-
parserStream
265-
.once('finish', () => parser.free())
266-
.once('close', () => parser.free())
267-
268-
return parserStream
269277
}
270278

271279
function createHttpResponseParserStream(options: {
272-
socket: MockSocket
273280
onResponse: (response: Response) => void
274281
}) {
275-
const { socket, onResponse } = options
282+
const { onResponse } = options
276283
const responseRawHeadersBuffer: Array<string> = []
277284
let responseBodyStream: Readable | undefined
278285

@@ -323,11 +330,12 @@ function createHttpResponseParserStream(options: {
323330
},
324331
})
325332

326-
socket
327-
.on('push', (chunk, encoding) => {
333+
return new Writable({
334+
write(chunk, encoding, callback) {
328335
parser.execute(toBuffer(chunk, encoding))
329-
})
330-
.once('close', () => parser.free())
336+
callback()
337+
},
338+
})
331339
}
332340

333341
/**

test/modules/http/compliance/events.test.ts

Lines changed: 41 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ const httpServer = new HttpServer((app) => {
99
app.get('/', (req, res) => {
1010
res.send('original-response')
1111
})
12+
app.post('/', (req, res) => {
13+
res.send()
14+
})
1215
})
1316

1417
const interceptor = new HttpRequestInterceptor()
@@ -18,6 +21,10 @@ beforeAll(async () => {
1821
await httpServer.listen()
1922
})
2023

24+
afterEach(() => {
25+
interceptor.removeAllListeners()
26+
})
27+
2128
afterAll(async () => {
2229
interceptor.dispose()
2330
await httpServer.close()
@@ -48,10 +55,10 @@ it('emits the "request" event for an outgoing request without body', async () =>
4855
expect(request.body).toBe(null)
4956
})
5057

51-
it('emits the "request" event for an outgoing request with a body', async () => {
58+
it('emits the "request" event for a bypassed request with a body', async () => {
5259
const requestListener =
5360
vi.fn<(...args: HttpRequestEventMap['request']) => void>()
54-
interceptor.once('request', requestListener)
61+
interceptor.on('request', requestListener)
5562

5663
const request = http.request(httpServer.http.url('/'), {
5764
method: 'POST',
@@ -62,21 +69,22 @@ it('emits the "request" event for an outgoing request with a body', async () =>
6269
})
6370
request.write('post-payload')
6471
request.end()
72+
6573
await waitForClientRequest(request)
6674

6775
expect(requestListener).toHaveBeenCalledTimes(1)
6876

69-
const { request: requestFromListener } = requestListener.mock.calls[0][0]
70-
expect(requestFromListener).toBeInstanceOf(Request)
71-
expect(requestFromListener.method).toBe('POST')
72-
expect(requestFromListener.url).toBe(httpServer.http.url('/'))
77+
const { request: interceptedRequest } = requestListener.mock.calls[0][0]
78+
expect(interceptedRequest).toBeInstanceOf(Request)
79+
expect(interceptedRequest.method).toBe('POST')
80+
expect(interceptedRequest.url).toBe(httpServer.http.url('/'))
7381
expect(
74-
Object.fromEntries(requestFromListener.headers.entries())
82+
Object.fromEntries(interceptedRequest.headers.entries())
7583
).toMatchObject({
7684
'content-type': 'text/plain',
7785
'x-custom-header': 'yes',
7886
})
79-
expect(await requestFromListener.text()).toBe('post-payload')
87+
await expect(interceptedRequest.text()).resolves.toBe('post-payload')
8088
})
8189

8290
it('emits the "response" event for a mocked response', async () => {
@@ -99,28 +107,28 @@ it('emits the "response" event for a mocked response', async () => {
99107
const {
100108
response,
101109
requestId,
102-
request: requestFromListener,
110+
request: interceptedRequest,
103111
isMockedResponse,
104112
} = responseListener.mock.calls[0][0]
105113
expect(response).toBeInstanceOf(Response)
106114
expect(response.status).toBe(200)
107-
expect(await response.text()).toBe('hello world')
115+
await expect(response.text()).resolves.toBe('hello world')
108116
expect(isMockedResponse).toBe(true)
109117

110118
expect(requestId).toMatch(REQUEST_ID_REGEXP)
111-
expect(requestFromListener).toBeInstanceOf(Request)
112-
expect(requestFromListener.method).toBe('GET')
113-
expect(requestFromListener.url).toBe('http://localhost/')
119+
expect(interceptedRequest).toBeInstanceOf(Request)
120+
expect(interceptedRequest.method).toBe('GET')
121+
expect(interceptedRequest.url).toBe('http://localhost/')
114122
expect(
115-
Object.fromEntries(requestFromListener.headers.entries())
123+
Object.fromEntries(interceptedRequest.headers.entries())
116124
).toMatchObject({
117125
'x-custom-header': 'yes',
118126
})
119-
expect(requestFromListener.body).toBe(null)
127+
expect(interceptedRequest.body).toBe(null)
120128

121129
// Must respond with the mocked response.
122130
expect(res.statusCode).toBe(200)
123-
expect(await text()).toBe('hello world')
131+
await expect(text()).resolves.toBe('hello world')
124132
})
125133

126134
it('emits the "response" event for a bypassed response', async () => {
@@ -135,31 +143,35 @@ it('emits the "response" event for a bypassed response', async () => {
135143
})
136144
const { res, text } = await waitForClientRequest(request)
137145

138-
// Must emit the "response" interceptor event.
139-
expect(responseListener).toHaveBeenCalledTimes(1)
146+
// Must respond with the mocked response.
147+
expect.soft(res.statusCode).toBe(200)
148+
await expect.soft(text()).resolves.toBe('original-response')
149+
150+
expect(
151+
responseListener,
152+
'Must emit the "response" event'
153+
).toHaveBeenCalledTimes(1)
154+
140155
const {
141156
response,
142157
requestId,
143-
request: requestFromListener,
158+
request: interceptedRequest,
144159
isMockedResponse,
145160
} = responseListener.mock.calls[0][0]
161+
146162
expect(response).toBeInstanceOf(Response)
147163
expect(response.status).toBe(200)
148-
expect(await response.text()).toBe('original-response')
164+
await expect(response.text()).resolves.toBe('original-response')
149165
expect(isMockedResponse).toBe(false)
150166

151167
expect(requestId).toMatch(REQUEST_ID_REGEXP)
152-
expect(requestFromListener).toBeInstanceOf(Request)
153-
expect(requestFromListener.method).toBe('GET')
154-
expect(requestFromListener.url).toBe(httpServer.http.url('/'))
168+
expect(interceptedRequest).toBeInstanceOf(Request)
169+
expect(interceptedRequest.method).toBe('GET')
170+
expect(interceptedRequest.url).toBe(httpServer.http.url('/'))
155171
expect(
156-
Object.fromEntries(requestFromListener.headers.entries())
172+
Object.fromEntries(interceptedRequest.headers.entries())
157173
).toMatchObject({
158174
'x-custom-header': 'yes',
159175
})
160-
expect(requestFromListener.body).toBe(null)
161-
162-
// Must respond with the mocked response.
163-
expect(res.statusCode).toBe(200)
164-
expect(await text()).toBe('original-response')
176+
expect(interceptedRequest.body).toBe(null)
165177
})

0 commit comments

Comments
 (0)