Skip to content

Commit d2cfa69

Browse files
committed
Stream passthrough bodies directly without wrapping where possible
This provides a significant body streaming throughput boost. It won't take effect either by default (recordTraffic: true) or in HTTP Toolkit (which currently uses 'request' events) but it will for most cases that disable recording and HTTP Toolkit in future (once we start streaming body chunks instead of buffering via those new event APIs).
1 parent fe0ad90 commit d2cfa69

File tree

5 files changed

+69
-17
lines changed

5 files changed

+69
-17
lines changed

src/rules/requests/request-rule.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,16 @@ export class RequestRule implements RequestRule {
8181
record?: boolean,
8282
debug: boolean,
8383
keyLogStream?: Writable,
84-
emitEventCallback?: (type: string, event: unknown) => void
84+
emitEventCallback?: (type: string, event: unknown) => void,
85+
reqBodyObserved?: boolean
8586
}): Promise<void> {
8687
let stepsPromise = (async () => {
8788
for (let step of this.steps) {
8889
const result = await step.handle(req, res, {
8990
emitEventCallback: options.emitEventCallback,
9091
keyLogStream: options.keyLogStream,
91-
debug: options.debug
92+
debug: options.debug,
93+
reqBodyObserved: options.reqBodyObserved
9294
});
9395

9496
if (!result || result.continue === false) break;

src/rules/requests/request-step-impls.ts

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Buffer } from 'buffer';
2-
import { Writable } from 'stream';
2+
import { Readable, Writable } from 'stream';
33
import * as url from 'url';
44
import type * as dns from 'dns';
55
import * as net from 'net';
@@ -171,6 +171,7 @@ export interface RequestStepOptions {
171171
emitEventCallback?: (type: string, event: unknown) => void;
172172
keyLogStream?: Writable;
173173
debug: boolean;
174+
reqBodyObserved?: boolean; // True if the request body will be buffered for inspection (e.g. recordTraffic)
174175
}
175176

176177
export class FixedResponseStepImpl extends FixedResponseStep {
@@ -488,9 +489,13 @@ export class PassThroughStepImpl extends PassThroughStep {
488489
`);
489490
}
490491

491-
// We have to capture the request stream immediately, to make sure nothing is lost if it
492-
// goes past its max length (truncating the data) before we start sending upstream.
493-
const clientReqBody = clientReq.body.asStream();
492+
// When transforms or callbacks need the body, we capture it immediately via asStream()
493+
// (which buffers data and provides replay). When no transforms are configured, we defer
494+
// to pipe the raw request stream directly, avoiding the intermediate PassThrough.
495+
const needsBufferedBody = !!(this.transformRequest || this.beforeRequest);
496+
const clientReqBody: Readable = needsBufferedBody
497+
? clientReq.body.asStream()
498+
: clientReq as any as Readable;
494499

495500
const isH2Downstream = isHttp2(clientReq);
496501

@@ -1170,6 +1175,12 @@ export class PassThroughStepImpl extends PassThroughStep {
11701175
if (reqBodyOverride.length > 0) serverReq.end(reqBodyOverride);
11711176
else serverReq.end(); // http2-wrapper fails given an empty buffer for methods that aren't allowed a body
11721177
} else {
1178+
if (!needsBufferedBody && options.reqBodyObserved) {
1179+
// Start body buffering now (same tick as pipe) so both the buffer
1180+
// and the pipe receive all data via Node's multi-listener support.
1181+
// This is needed for recordTraffic and async event emission.
1182+
clientReq.body.asBuffer().catch(() => {});
1183+
}
11731184
// asStream includes all content, including the body before this call
11741185
clientReqBody.pipe(serverReq);
11751186
clientReqBody.on('error', () => serverReq.abort());

src/server/mockttp-server.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -753,7 +753,11 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
753753
keyLogStream: this.keyLogStream,
754754
emitEventCallback: (requestEmitter.listenerCount('rule-event') !== 0)
755755
? (type, event) => this.announceRuleEventAsync(requestEmitter, request.id, nextRule!.id, type, event)
756-
: undefined
756+
: undefined,
757+
// When something will inspect the request body (traffic recording,
758+
// request event listeners) it must be buffered in memory. Otherwise
759+
// it can be streamed directly to the upstream target.
760+
reqBodyObserved: this.recordTraffic || requestEmitter.listenerCount('request') > 0
757761
});
758762
} else {
759763
await this.sendUnmatchedRequestError(request, response);

src/util/buffer-utils.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { Buffer } from 'buffer';
2-
import { EventEmitter } from 'events';
32
import * as stream from 'stream';
43

54
import { isNode } from './util';
@@ -19,7 +18,7 @@ export const asBuffer = (input: Buffer | Uint8Array | string) =>
1918
export type BufferInProgress = Promise<Buffer> & {
2019
currentChunks: Buffer[]; // Stores the body chunks as they arrive
2120
failedWith?: Error; // Stores the error that killed the stream, if one did
22-
events: EventEmitter; // Emits events - notably 'truncate' if data is truncated
21+
onTruncate?: (chunks: Buffer[]) => void; // Called if data is truncated (maxSize exceeded)
2322
};
2423

2524
// Takes a buffer and a stream, returns a simple stream that outputs the buffer then the stream. The stream
@@ -66,11 +65,11 @@ export const bufferThenStream = (buffer: BufferInProgress, inputStream: stream.R
6665
}
6766
});
6867

69-
buffer.events.on('truncate', (chunks) => {
68+
buffer.onTruncate = () => {
7069
// If the stream hasn't started yet, start it now, so it grabs the buffer
7170
// data before it gets truncated:
7271
if (!active) outputStream.read(0);
73-
});
72+
};
7473

7574
return outputStream;
7675
};
@@ -106,7 +105,7 @@ export const streamToBuffer = (input: stream.Readable, maxSize = MAX_BUFFER_SIZE
106105
if (currentSize > maxSize) {
107106
// Announce truncation, so that other mechanisms (asStream) can
108107
// capture this data if they're interested in it.
109-
bufferPromise.events.emit('truncate', chunks);
108+
bufferPromise.onTruncate?.(chunks);
110109

111110
// Drop all the data so far & stop reading
112111
bufferPromise.currentChunks = chunks = [];
@@ -139,7 +138,6 @@ export const streamToBuffer = (input: stream.Readable, maxSize = MAX_BUFFER_SIZE
139138
}
140139
);
141140
bufferPromise.currentChunks = chunks;
142-
bufferPromise.events = new EventEmitter();
143141
return bufferPromise;
144142
};
145143

test/performance/performance-tests.perf.ts

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ nodeOnly(() => {
1111
this.timeout(120000);
1212

1313
describe("Static HTTP mocking", () => {
14-
const server = getLocal();
14+
const server = getLocal({ recordTraffic: false });
1515

1616
before(async () => {
1717
await server.start();
@@ -90,6 +90,7 @@ nodeOnly(() => {
9090
describe("HTTPS traffic", () => {
9191

9292
const server = getLocal({
93+
recordTraffic: false,
9394
https: {
9495
keyPath: './test/fixtures/test-ca.key',
9596
certPath: './test/fixtures/test-ca.pem'
@@ -125,8 +126,8 @@ nodeOnly(() => {
125126
});
126127

127128
describe("HTTP proxying", () => {
128-
const proxyServer = getLocal();
129-
const targetServer = getLocal();
129+
const proxyServer = getLocal({ recordTraffic: false });
130+
const targetServer = getLocal({ recordTraffic: false });
130131

131132
before(async () => {
132133
await targetServer.start();
@@ -150,7 +151,7 @@ nodeOnly(() => {
150151

151152
printResults("HTTP proxy passthrough", result);
152153

153-
// Seeing ~2500k req/sec locally for proxy + static server
154+
// Seeing ~2500 req/sec locally for proxy + static server
154155
assertPerformance(result, {
155156
minThroughput: 500,
156157
maxP99Latency: 50,
@@ -196,13 +197,15 @@ nodeOnly(() => {
196197
describe("HTTPS proxying", () => {
197198

198199
const proxyServer = getLocal({
200+
recordTraffic: false,
199201
https: {
200202
keyPath: './test/fixtures/test-ca.key',
201203
certPath: './test/fixtures/test-ca.pem'
202204
}
203205
});
204206

205207
const targetServer = getLocal({
208+
recordTraffic: false,
206209
https: {
207210
keyPath: './test/fixtures/test-ca.key',
208211
certPath: './test/fixtures/test-ca.pem'
@@ -240,5 +243,39 @@ nodeOnly(() => {
240243
});
241244
});
242245

246+
describe("HTTP proxying with traffic recording", () => {
247+
const proxyServer = getLocal({ recordTraffic: true });
248+
const targetServer = getLocal({ recordTraffic: false });
249+
250+
before(async () => {
251+
await targetServer.start();
252+
await proxyServer.start();
253+
});
254+
255+
after(async () => {
256+
await proxyServer.stop();
257+
await targetServer.stop();
258+
});
259+
260+
it("for Mockttp proxy with recordTraffic enabled", async () => {
261+
await targetServer.forGet("/target").thenReply(200, "Target response");
262+
await proxyServer.forGet("/proxy").thenForwardTo(targetServer.url);
263+
264+
const result = await runPerformanceTest({
265+
url: proxyServer.urlFor("/proxy"),
266+
duration: 10,
267+
connections: 10
268+
});
269+
270+
printResults("HTTP proxy passthrough (recordTraffic)", result);
271+
272+
assertPerformance(result, {
273+
minThroughput: 500,
274+
maxP99Latency: 50,
275+
maxErrors: 0
276+
});
277+
});
278+
});
279+
243280
});
244281
});

0 commit comments

Comments
 (0)