Skip to content

Commit 83ba74f

Browse files
committed
feat(wip): net interceptor
1 parent 643c2fd commit 83ba74f

7 files changed

Lines changed: 413 additions & 11 deletions

File tree

_http_common.d.ts

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,23 @@ declare var HTTPParser: {
1818
export interface HTTPParser<ParserType extends number> {
1919
new (): HTTPParser<ParserType>
2020

21-
[HTTPParser.kOnMessageBegin]: () => void
22-
[HTTPParser.kOnHeaders]: HeadersCallback
23-
[HTTPParser.kOnHeadersComplete]: ParserType extends 0
21+
[HTTPParser.kOnMessageBegin]?: () => void
22+
[HTTPParser.kOnHeaders]?: HeadersCallback
23+
[HTTPParser.kOnHeadersComplete]?: ParserType extends 0
2424
? RequestHeadersCompleteCallback
2525
: ResponseHeadersCompleteCallback
26-
[HTTPParser.kOnBody]: (chunk: Buffer) => void
27-
[HTTPParser.kOnMessageComplete]: () => void
28-
[HTTPParser.kOnExecute]: () => void
29-
[HTTPParser.kOnTimeout]: () => void
26+
[HTTPParser.kOnBody]?: (chunk: Buffer) => void
27+
[HTTPParser.kOnMessageComplete]?: () => void
28+
[HTTPParser.kOnExecute]?: () => void
29+
[HTTPParser.kOnTimeout]?: () => void
3030

3131
initialize(type: ParserType, asyncResource: object): void
3232
execute(buffer: Buffer): void
3333
finish(): void
3434
free(): void
3535
}
3636

37-
export type HeadersCallback = (
38-
rawHeaders: Array<string>,
39-
url: string
40-
) => void
37+
export type HeadersCallback = (rawHeaders: Array<string>, url: string) => void
4138

4239
export type RequestHeadersCompleteCallback = (
4340
versionMajor: number,

src/interceptors/net/demo.test.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// @vitest-environment node
2+
import { beforeAll, afterAll, it, expect } from 'vitest'
3+
import { HttpRequestInterceptor } from './http-interceptor'
4+
import { waitForClientRequest } from '../../../test/helpers'
5+
6+
const interceptor = new HttpRequestInterceptor()
7+
8+
beforeAll(() => {
9+
interceptor.apply()
10+
})
11+
12+
afterAll(() => {
13+
interceptor.dispose()
14+
})
15+
16+
it('???', async () => {
17+
interceptor.on('request', ({ request, controller }) => {
18+
console.log('REQUEST LISTENER!')
19+
controller.respondWith(new Response('hello world'))
20+
})
21+
22+
const http = await import('http')
23+
const request = http.get('http://localhost/resource')
24+
25+
await waitForClientRequest(request)
26+
})
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
import { Readable, Writable } from 'node:stream'
2+
import { invariant } from 'outvariant'
3+
import { Interceptor, INTERNAL_REQUEST_ID_HEADER_NAME } from '../../Interceptor'
4+
import { type HttpRequestEventMap } from '../../glossary'
5+
import { FetchResponse } from '../../utils/fetchUtils'
6+
import { SocketInterceptor } from './index'
7+
import { HttpRequestParser } from './parsers'
8+
import { baseUrlFromConnectionOptions } from '../Socket/utils/baseUrlFromConnectionOptions'
9+
import { NetworkConnectionOptions } from './utils/normalize-net-connect-args'
10+
import { createRequestId } from '../../createRequestId'
11+
import { emitAsync } from 'src/utils/emitAsync'
12+
import { RequestController } from '../../RequestController'
13+
import { handleRequest } from '../../utils/handleRequest'
14+
15+
function toBuffer(data: any, encoding?: BufferEncoding): Buffer {
16+
return Buffer.isBuffer(data) ? data : Buffer.from(data, encoding)
17+
}
18+
19+
export class HttpRequestInterceptor extends Interceptor<HttpRequestEventMap> {
20+
static symbol = Symbol('HttpRequestInterceptor')
21+
22+
constructor() {
23+
super(HttpRequestInterceptor.symbol)
24+
}
25+
26+
public setup() {
27+
/** @fixme Use interceptor as a singleton? */
28+
const interceptor = new SocketInterceptor()
29+
interceptor.apply()
30+
31+
interceptor.on('socket', ({ options, socket }) => {
32+
socket.once('write', (chunk, encoding) => {
33+
const firstFrame = chunk.toString()
34+
35+
/**
36+
* @fixme This is obviously rather naive
37+
*/
38+
if (firstFrame.includes('HTTP/1.1')) {
39+
const method = firstFrame.split(' ')[0]
40+
41+
const requestParser = createHttpRequestParserStream({
42+
requestOptions: {
43+
method,
44+
...options,
45+
},
46+
onRequest: async ({ request }) => {
47+
console.log(1)
48+
49+
const requestId = createRequestId()
50+
const controller = new RequestController(request)
51+
52+
const isRequestHandled = await handleRequest({
53+
request,
54+
requestId,
55+
controller,
56+
emitter: this.emitter,
57+
onResponse(response) {
58+
console.log('MOCKED!', response.status)
59+
},
60+
onRequestError(response) {
61+
//
62+
},
63+
onError(error) {},
64+
})
65+
66+
console.log('REQ! handled?', isRequestHandled)
67+
68+
if (!isRequestHandled) {
69+
// return socket.passthrough()
70+
}
71+
},
72+
})
73+
requestParser.write(toBuffer(chunk, encoding))
74+
socket.pipe(requestParser)
75+
}
76+
})
77+
})
78+
}
79+
}
80+
81+
function createHttpRequestParserStream(options: {
82+
requestOptions: NetworkConnectionOptions & {
83+
method: string
84+
}
85+
onRequest: (args: { request: Request }) => void
86+
}) {
87+
const requestRawHeadersBuffer: Array<string> = []
88+
const requestWriteBuffer: Array<Buffer> = []
89+
let requestBodyStream: Readable | undefined
90+
91+
const parser = new HttpRequestParser({
92+
onHeaders(rawHeaders) {
93+
requestRawHeadersBuffer.push(...rawHeaders)
94+
},
95+
onHeadersComplete(
96+
versionMajor,
97+
versionMinor,
98+
rawHeaders,
99+
_,
100+
path,
101+
__,
102+
___,
103+
____,
104+
shouldKeepAlive
105+
) {
106+
const method = options.requestOptions.method?.toUpperCase() || 'GET'
107+
const baseUrl = baseUrlFromConnectionOptions(options.requestOptions)
108+
const url = new URL(path || '', baseUrl)
109+
110+
// const headers = FetchResponse.parseRawHeaders([
111+
// ...requestRawHeadersBuffer,
112+
// ...(rawHeaders || []),
113+
// ])
114+
115+
const canHaveBody = method !== 'GET' && method !== 'HEAD'
116+
117+
// Translate the basic authorization to request headers.
118+
// Constructing a Request instance with a URL containing auth is no-op.
119+
if (url.username || url.password) {
120+
if (!headers.has('authorization')) {
121+
headers.set('authorization', `Basic ${url.username}:${url.password}`)
122+
}
123+
url.username = ''
124+
url.password = ''
125+
}
126+
127+
requestBodyStream = new Readable({
128+
/**
129+
* @note Provide the `read()` method so a `Readable` could be
130+
* used as the actual request body (the stream calls "read()").
131+
*/
132+
read() {
133+
// If the user attempts to read the request body,
134+
// flush the write buffer to trigger the callbacks.
135+
// This way, if the request stream ends in the write callback,
136+
// it will indeed end correctly.
137+
// flushWriteBuffer()
138+
},
139+
})
140+
141+
const request = new Request(url, {
142+
method,
143+
// headers,
144+
credentials: 'same-origin',
145+
// @ts-expect-error Undocumented Fetch property.
146+
duplex: canHaveBody ? 'half' : undefined,
147+
body: canHaveBody ? (Readable.toWeb(requestBodyStream) as any) : null,
148+
})
149+
150+
options.onRequest({
151+
request,
152+
})
153+
},
154+
onBody(chunk) {
155+
invariant(
156+
requestBodyStream,
157+
'Failed to write to a request stream: stream does not exist'
158+
)
159+
160+
requestBodyStream.push(chunk)
161+
},
162+
onMessageComplete() {
163+
requestBodyStream?.push(null)
164+
},
165+
})
166+
167+
const parserStream = new Writable({
168+
write(chunk, encoding, callback) {
169+
const data = toBuffer(chunk, encoding)
170+
requestWriteBuffer.push(data)
171+
parser.execute(data)
172+
callback()
173+
},
174+
})
175+
parserStream.once('finish', () => parser.free())
176+
177+
return parserStream
178+
}

src/interceptors/net/index.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import net from 'node:net'
2+
import { Interceptor } from '../../Interceptor'
3+
import { MockSocket } from './mock-socket'
4+
import {
5+
NetConnectArgs,
6+
type NetworkConnectionOptions,
7+
normalizeNetConnectArgs,
8+
} from './utils/normalize-net-connect-args'
9+
10+
export interface SocketConnectionEventMap {
11+
/**
12+
* Outgoing socket connection.
13+
*/
14+
socket: [
15+
args: {
16+
socket: MockSocket
17+
options: NetworkConnectionOptions
18+
}
19+
]
20+
}
21+
22+
export class SocketInterceptor extends Interceptor<SocketConnectionEventMap> {
23+
static symbol = Symbol('SocketInterceptor')
24+
25+
constructor() {
26+
super(SocketInterceptor.symbol)
27+
}
28+
29+
protected setup(): void {
30+
const {
31+
connect: originalNetConnect,
32+
createConnection: originalCreateConnection,
33+
} = net
34+
35+
net.createConnection = (...args: Array<unknown>) => {
36+
const [options, connectionListener] = normalizeNetConnectArgs(
37+
args as NetConnectArgs
38+
)
39+
const socket = new MockSocket()
40+
41+
this.emitter.emit('socket', {
42+
options,
43+
socket,
44+
})
45+
46+
return socket
47+
}
48+
49+
this.subscriptions.push(() => {
50+
net.connect = originalNetConnect
51+
net.createConnection = originalCreateConnection
52+
})
53+
}
54+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import net from 'node:net'
2+
import {
3+
normalizeSocketWriteArgs,
4+
WriteArgs,
5+
} from '../Socket/utils/normalizeSocketWriteArgs'
6+
7+
export class MockSocket extends net.Socket {
8+
public connecting: boolean
9+
10+
constructor() {
11+
super()
12+
this.connecting = false
13+
this.connect()
14+
15+
this._final = (callback) => callback(null)
16+
}
17+
18+
public connect() {
19+
this.connecting = true
20+
return this
21+
}
22+
23+
public write(...args: Array<unknown>): boolean {
24+
const [chunk, encoding, callback] = normalizeSocketWriteArgs(
25+
args as WriteArgs
26+
)
27+
this.emit('write', chunk, encoding, callback)
28+
return true
29+
}
30+
31+
public push(chunk: any, encoding?: BufferEncoding): boolean {
32+
this.emit('push', chunk, encoding)
33+
return super.push(chunk, encoding)
34+
}
35+
36+
public end(...args: Array<unknown>) {
37+
const [chunk, encoding, callback] = normalizeSocketWriteArgs(
38+
args as WriteArgs
39+
)
40+
this.emit('write', chunk, encoding, callback)
41+
return super.end.apply(this, args as any)
42+
}
43+
}

src/interceptors/net/parsers.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import {
2+
HeadersCallback,
3+
HTTPParser,
4+
RequestHeadersCompleteCallback,
5+
} from '_http_common'
6+
7+
export class HttpRequestParser {
8+
#parser: HTTPParser<typeof HTTPParser.REQUEST>
9+
10+
constructor(options: {
11+
onMessageBegin?: () => void
12+
onHeaders?: HeadersCallback
13+
onHeadersComplete?: RequestHeadersCompleteCallback
14+
onBody?: (chunk: Buffer) => void
15+
onMessageComplete?: () => void
16+
onExecute?: () => void
17+
onTimeout?: () => void
18+
}) {
19+
this.#parser = new HTTPParser()
20+
this.#parser.initialize(HTTPParser.REQUEST, {})
21+
this.#parser[HTTPParser.kOnMessageBegin] = options.onMessageBegin
22+
this.#parser[HTTPParser.kOnHeaders] = options.onHeaders
23+
this.#parser[HTTPParser.kOnHeadersComplete] = options.onHeadersComplete
24+
this.#parser[HTTPParser.kOnBody] = options.onBody
25+
this.#parser[HTTPParser.kOnMessageComplete] = options.onMessageComplete
26+
this.#parser[HTTPParser.kOnExecute] = options.onExecute
27+
this.#parser[HTTPParser.kOnTimeout] = options.onTimeout
28+
}
29+
30+
public execute(buffer: Buffer): void {
31+
this.#parser.execute(buffer)
32+
}
33+
34+
public free(): void {
35+
this.#parser.free()
36+
}
37+
}

0 commit comments

Comments
 (0)