Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export {
} from './IncomingHttpHeaders.js';
export * from './HttpClientError.js';
export { FetchFactory, fetch } from './fetch.js';
export { FormData as WebFormData } from './FormData.js';

export default {
request,
Expand Down
72 changes: 72 additions & 0 deletions test/fixtures/BufferStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { Transform } from 'node:stream';

const BUF_SIZE = 1024 * 1024;

export class BufferStream extends Transform {
private buf: Buffer;
private offset: number;

constructor(options?: any) {
super(options);
this.realloc();
}

realloc() {
this.buf = Buffer.alloc(BUF_SIZE);
this.offset = 0;
}

_transform(chunk: Buffer, _: any, callback: any) {
const currentLength = this.offset;
const chunkSize = chunk.length;
const newSize = currentLength + chunkSize;
// 缓冲区未满
// - 向缓冲区写入
if (newSize < BUF_SIZE) {
chunk.copy(this.buf, currentLength);
this.offset += chunkSize;
return callback();
}

// 缓冲区正好满
// - 拷贝到缓冲区以后, 将 chunk 返回
// - 刷新缓冲区
if (newSize === BUF_SIZE) {
chunk.copy(this.buf, currentLength);
const writeChunk = this.buf;
this.realloc();
return callback(null, writeChunk);
}

// 超过缓冲区大小
// - 拷贝到缓冲区以后, 将 chunk 返回
// - 刷新缓冲区
// - 将超出的部分拷贝到新的缓冲区中
const copyLength = BUF_SIZE - currentLength;
const remainLength = chunkSize - copyLength;
chunk.copy(this.buf, currentLength, 0, copyLength);
const writeChunk = this.buf;
this.push(writeChunk);
this.realloc();

if (remainLength > BUF_SIZE) {
// 特殊情况: 给了一个超大 chunk
// 直接将这个 chunk 返回,没必要缓冲了
this.push(chunk.slice(copyLength));
} else {
chunk.copy(this.buf, 0, copyLength);
this.offset = remainLength;
}
return callback(null);
}

_flush(callback: any) {
if (this.offset) {
const chunk = Buffer.alloc(this.offset);
this.buf.copy(chunk);
this.push(chunk);
this.offset = 0;
}
callback();
}
}
Comment on lines +5 to +72
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential memory leak due to improper backpressure handling

The BufferStream class does not handle backpressure correctly. When the internal buffer reaches its limit, the stream continues to accept data without pausing the upstream source. This can lead to increased memory usage and potential memory leaks when dealing with large or continuous data streams.

To fix this issue, implement proper backpressure handling by checking the this.push() method's return value. If it returns false, you should pause reading data until the drain event is emitted.

Apply this diff to modify the _transform method:

...
- this.push(writeChunk);
+ const canContinue = this.push(writeChunk);
+ if (!canContinue) {
+   this._inputPaused = true;
+ }
...

Additionally, listen for the 'drain' event to resume processing:

+ constructor(options?: any) {
+   super(options);
+   this.realloc();
+   this._inputPaused = false;
+   this.on('drain', () => {
+     this._inputPaused = false;
+   });
+ }

And modify the _transform method to handle the pause state:

- return callback();
+ if (this._inputPaused) {
+   // Pause the upstream source
+   this.pause();
+ }
+ callback();

Committable suggestion skipped: line range outside the PR's diff.

43 changes: 43 additions & 0 deletions test/formData-with-BufferStream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { strict as assert } from 'node:assert';
import { createReadStream } from 'node:fs';
import { basename } from 'node:path';
import { describe, it, beforeAll, afterAll } from 'vitest';
import { HttpClient, WebFormData } from '../src/index.js';
import { startServer } from './fixtures/server.js';
import { BufferStream } from './fixtures/BufferStream.js';

describe('formData-with-BufferStream.test.ts', () => {
let close: any;
let _url: string;
beforeAll(async () => {
const { closeServer, url } = await startServer();
close = closeServer;
_url = url;
});

afterAll(async () => {
await close();
});

it('should post with BufferStream', async () => {
const fileStream = createReadStream(__filename);
const bufferStream = new BufferStream();
fileStream.pipe(bufferStream);
const formData = new WebFormData();
const fileName = basename(__filename);
formData.append('fileBufferStream', bufferStream, fileName);
formData.append('foo', 'bar');

const httpClient = new HttpClient();
const response = await httpClient.request(`${_url}multipart`, {
method: 'POST',
content: formData,
headers: formData.getHeaders(),
dataType: 'json',
});
assert.equal(response.status, 200);
// console.log(response.data);
assert.equal(response.data.files.fileBufferStream.filename, 'formData-with-BufferStream.test.ts');
assert.deepEqual(response.data.form, { foo: 'bar' });
});
Comment on lines +22 to +42
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure proper handling of piped streams

When piping fileStream into bufferStream, there is no guarantee that all data has been read before making the HTTP request. This could result in incomplete data being sent.

Consider awaiting the end of the stream before sending the request:

-fileStream.pipe(bufferStream);
+await new Promise((resolve, reject) => {
+  fileStream.pipe(bufferStream);
+  bufferStream.on('finish', resolve);
+  bufferStream.on('error', reject);
+});

Alternatively, handle backpressure appropriately or redesign the test to account for asynchronous stream processing.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
it('should post with BufferStream', async () => {
const fileStream = createReadStream(__filename);
const bufferStream = new BufferStream();
fileStream.pipe(bufferStream);
const formData = new WebFormData();
const fileName = basename(__filename);
formData.append('fileBufferStream', bufferStream, fileName);
formData.append('foo', 'bar');
const httpClient = new HttpClient();
const response = await httpClient.request(`${_url}multipart`, {
method: 'POST',
content: formData,
headers: formData.getHeaders(),
dataType: 'json',
});
assert.equal(response.status, 200);
// console.log(response.data);
assert.equal(response.data.files.fileBufferStream.filename, 'formData-with-BufferStream.test.ts');
assert.deepEqual(response.data.form, { foo: 'bar' });
});
it('should post with BufferStream', async () => {
const fileStream = createReadStream(__filename);
const bufferStream = new BufferStream();
await new Promise((resolve, reject) => {
fileStream.pipe(bufferStream);
bufferStream.on('finish', resolve);
bufferStream.on('error', reject);
});
const formData = new WebFormData();
const fileName = basename(__filename);
formData.append('fileBufferStream', bufferStream, fileName);
formData.append('foo', 'bar');
const httpClient = new HttpClient();
const response = await httpClient.request(`${_url}multipart`, {
method: 'POST',
content: formData,
headers: formData.getHeaders(),
dataType: 'json',
});
assert.equal(response.status, 200);
// console.log(response.data);
assert.equal(response.data.files.fileBufferStream.filename, 'formData-with-BufferStream.test.ts');
assert.deepEqual(response.data.form, { foo: 'bar' });
});

});