-
Notifications
You must be signed in to change notification settings - Fork 125
fix: export WebFormData #559
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| import { Transform } from 'node:stream'; | ||
|
|
||
| const BUF_SIZE = 1024 * 1024; | ||
|
|
||
| export class BufferStream extends Transform { | ||
| private buf: Buffer; | ||
| private offset: number; | ||
|
|
||
| constructor(options?: any) { | ||
| super(options); | ||
| this.realloc(); | ||
| } | ||
|
|
||
| realloc() { | ||
| this.buf = Buffer.alloc(BUF_SIZE); | ||
| this.offset = 0; | ||
| } | ||
|
|
||
| _transform(chunk: Buffer, _: any, callback: any) { | ||
| const currentLength = this.offset; | ||
| const chunkSize = chunk.length; | ||
| const newSize = currentLength + chunkSize; | ||
| // 缓冲区未满 | ||
| // - 向缓冲区写入 | ||
| if (newSize < BUF_SIZE) { | ||
| chunk.copy(this.buf, currentLength); | ||
| this.offset += chunkSize; | ||
| return callback(); | ||
| } | ||
|
|
||
| // 缓冲区正好满 | ||
| // - 拷贝到缓冲区以后, 将 chunk 返回 | ||
| // - 刷新缓冲区 | ||
| if (newSize === BUF_SIZE) { | ||
| chunk.copy(this.buf, currentLength); | ||
| const writeChunk = this.buf; | ||
| this.realloc(); | ||
| return callback(null, writeChunk); | ||
| } | ||
|
|
||
| // 超过缓冲区大小 | ||
| // - 拷贝到缓冲区以后, 将 chunk 返回 | ||
| // - 刷新缓冲区 | ||
| // - 将超出的部分拷贝到新的缓冲区中 | ||
| const copyLength = BUF_SIZE - currentLength; | ||
| const remainLength = chunkSize - copyLength; | ||
| chunk.copy(this.buf, currentLength, 0, copyLength); | ||
| const writeChunk = this.buf; | ||
| this.push(writeChunk); | ||
| this.realloc(); | ||
|
|
||
| if (remainLength > BUF_SIZE) { | ||
| // 特殊情况: 给了一个超大 chunk | ||
| // 直接将这个 chunk 返回,没必要缓冲了 | ||
| this.push(chunk.slice(copyLength)); | ||
| } else { | ||
| chunk.copy(this.buf, 0, copyLength); | ||
| this.offset = remainLength; | ||
| } | ||
| return callback(null); | ||
| } | ||
|
|
||
| _flush(callback: any) { | ||
| if (this.offset) { | ||
| const chunk = Buffer.alloc(this.offset); | ||
| this.buf.copy(chunk); | ||
| this.push(chunk); | ||
| this.offset = 0; | ||
| } | ||
| callback(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,43 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { strict as assert } from 'node:assert'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { createReadStream } from 'node:fs'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { basename } from 'node:path'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { describe, it, beforeAll, afterAll } from 'vitest'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { HttpClient, WebFormData } from '../src/index.js'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { startServer } from './fixtures/server.js'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { BufferStream } from './fixtures/BufferStream.js'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| describe('formData-with-BufferStream.test.ts', () => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let close: any; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let _url: string; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| beforeAll(async () => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const { closeServer, url } = await startServer(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| close = closeServer; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _url = url; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| afterAll(async () => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await close(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| it('should post with BufferStream', async () => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const fileStream = createReadStream(__filename); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const bufferStream = new BufferStream(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| fileStream.pipe(bufferStream); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const formData = new WebFormData(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const fileName = basename(__filename); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| formData.append('fileBufferStream', bufferStream, fileName); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| formData.append('foo', 'bar'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const httpClient = new HttpClient(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const response = await httpClient.request(`${_url}multipart`, { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| method: 'POST', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| content: formData, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| headers: formData.getHeaders(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dataType: 'json', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| assert.equal(response.status, 200); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // console.log(response.data); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| assert.equal(response.data.files.fileBufferStream.filename, 'formData-with-BufferStream.test.ts'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| assert.deepEqual(response.data.form, { foo: 'bar' }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+22
to
+42
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure proper handling of piped streams When piping Consider awaiting the end of the stream before sending the request: -fileStream.pipe(bufferStream);
+await new Promise((resolve, reject) => {
+ fileStream.pipe(bufferStream);
+ bufferStream.on('finish', resolve);
+ bufferStream.on('error', reject);
+});Alternatively, handle backpressure appropriately or redesign the test to account for asynchronous stream processing. 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential memory leak due to improper backpressure handling
The
BufferStreamclass does not handle backpressure correctly. When the internal buffer reaches its limit, the stream continues to accept data without pausing the upstream source. This can lead to increased memory usage and potential memory leaks when dealing with large or continuous data streams.To fix this issue, implement proper backpressure handling by checking the
this.push()method's return value. If it returnsfalse, you should pause reading data until the drain event is emitted.Apply this diff to modify the
_transformmethod:Additionally, listen for the
'drain'event to resume processing:And modify the
_transformmethod to handle the pause state: