Skip to content

Commit b97be01

Browse files
fix(storage): handle early read stream destruction
File#createReadStream emits the response event before attaching the raw response pipeline. A consumer can destroy the returned stream from that event, leaving pipeline() to receive a destroyed destination and throw ERR_STREAM_UNABLE_TO_PIPE synchronously. Check the public stream after the response arrives and discard the raw response body when the consumer has already closed it. Reuse the existing request-agent cleanup so the abandoned response does not retain its underlying socket resources. Add a regression test using the real stream event ordering. The test destroys the returned stream from its response handler and verifies that the delayed raw response and request agent are cleaned up. This adapts the core approach from PR #7604 by Michael Latman, which was closed before merge. Co-authored-by: Michael Latman <mlatman@gmail.com>
1 parent 4436002 commit b97be01

2 files changed

Lines changed: 60 additions & 3 deletions

File tree

handwritten/storage/src/file.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1574,6 +1574,17 @@ class File extends ServiceObject<File, FileMetadata> {
15741574

15751575
const shouldRunValidation = !rangeRequest && (crc32c || md5);
15761576

1577+
const cleanupRequest = () => {
1578+
if (request?.agent) {
1579+
request.agent.destroy();
1580+
}
1581+
};
1582+
1583+
const cleanupRawResponse = (rawResponseStream: Readable) => {
1584+
rawResponseStream.destroy();
1585+
cleanupRequest();
1586+
};
1587+
15771588
if (rangeRequest) {
15781589
if (
15791590
typeof options.validation === 'string' ||
@@ -1590,9 +1601,7 @@ class File extends ServiceObject<File, FileMetadata> {
15901601
if (err) {
15911602
// There is an issue with node-fetch 2.x that if the stream errors the underlying socket connection is not closed.
15921603
// This causes a memory leak, so cleanup the sockets manually here by destroying the agent.
1593-
if (request?.agent) {
1594-
request.agent.destroy();
1595-
}
1604+
cleanupRequest();
15961605
throughStream.destroy(err);
15971606
}
15981607
};
@@ -1622,6 +1631,11 @@ class File extends ServiceObject<File, FileMetadata> {
16221631
}
16231632

16241633
request = (rawResponseStream as r.Response).request;
1634+
if (throughStream.destroyed) {
1635+
cleanupRawResponse(rawResponseStream as Readable);
1636+
return;
1637+
}
1638+
16251639
const headers = (rawResponseStream as ResponseBody).toJSON().headers;
16261640
const isCompressed = headers['content-encoding'] === 'gzip';
16271641
const hashes: {crc32c?: string; md5?: string} = {};

handwritten/storage/test/file.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,6 +1165,49 @@ describe('File', () => {
11651165
file.createReadStream().resume();
11661166
});
11671167

1168+
it('should clean up if the returned stream is destroyed before the response is piped', done => {
1169+
const rawResponseStream = new PassThrough();
1170+
const agentDestroy = sinon.spy();
1171+
Object.assign(rawResponseStream, {
1172+
request: {
1173+
agent: {
1174+
destroy: agentDestroy,
1175+
},
1176+
},
1177+
toJSON() {
1178+
return {headers: {}};
1179+
},
1180+
});
1181+
1182+
handleRespOverride = (
1183+
_err: Error,
1184+
_res: {},
1185+
_body: {},
1186+
callback: Function
1187+
) => {
1188+
callback(null, null, rawResponseStream);
1189+
};
1190+
1191+
file.requestStream = () => {
1192+
const requestStream = new PassThrough();
1193+
setImmediate(() => {
1194+
requestStream.emit('response', rawResponseStream);
1195+
});
1196+
return requestStream;
1197+
};
1198+
1199+
const readStream = file.createReadStream({validation: false});
1200+
readStream.once('response', () => {
1201+
readStream.destroy();
1202+
});
1203+
readStream.once('close', () => {
1204+
assert.strictEqual(rawResponseStream.destroyed, true);
1205+
assert.strictEqual(agentDestroy.calledOnce, true);
1206+
done();
1207+
});
1208+
readStream.resume();
1209+
});
1210+
11681211
describe('errors', () => {
11691212
const ERROR = new Error('Error.');
11701213

0 commit comments

Comments
 (0)