Skip to content

Commit 0192df3

Browse files
committed
fix(storage): handle upload errors before pipeline setup
createWriteStream starts the simple or resumable upload before installing the pipeline that normally owns fileWriteStream errors. If startup fails synchronously, fileWriteStream and the public write stream can be destroyed before pipeline() runs. Node then throws ERR_STREAM_UNABLE_TO_PIPE while attaching invalid streams, and the original upload error can escape uncaught. Install a temporary error listener before upload startup and forward any early failure through the existing pipeline callback. If startup has already torn down any stream, skip pipeline setup and destroy the remaining internal streams. Keep the temporary listener until the upload stream closes so a queued terminal error is still handled, then remove it to avoid retaining the callback. Add coverage for a consumer destroying the returned stream during upload startup. The test verifies that the internal upload stream is destroyed and its temporary error listener is removed. Existing synchronous upload-error and pipeline-failure coverage continues to pass.
1 parent b97be01 commit 0192df3

2 files changed

Lines changed: 45 additions & 0 deletions

File tree

handwritten/storage/src/file.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2192,12 +2192,36 @@ class File extends ServiceObject<File, FileMetadata> {
21922192
});
21932193

21942194
writeStream.once('writing', () => {
2195+
const onPrePipelineError = (error: Error) => {
2196+
pipelineCallback(error);
2197+
};
2198+
fileWriteStream.once('error', onPrePipelineError);
2199+
21952200
if (options.resumable === false) {
21962201
this.startSimpleUpload_(fileWriteStream, options);
21972202
} else {
21982203
this.startResumableUpload_(fileWriteStream, options);
21992204
}
22002205

2206+
if (
2207+
fileWriteStream.destroyed ||
2208+
writeStream.destroyed ||
2209+
emitStream.destroyed
2210+
) {
2211+
// Destroying an upload stream can queue its terminal error before
2212+
// close, so keep the temporary listener until teardown completes.
2213+
fileWriteStream.once('close', () => {
2214+
fileWriteStream.removeListener('error', onPrePipelineError);
2215+
});
2216+
if (!fileWriteStream.destroyed) {
2217+
fileWriteStream.destroy();
2218+
}
2219+
emitStream.destroy();
2220+
return;
2221+
}
2222+
2223+
fileWriteStream.removeListener('error', onPrePipelineError);
2224+
22012225
// remove temporary noop listener as we now create a pipeline that handles the errors
22022226
emitStream.removeListener('error', noop);
22032227

handwritten/storage/test/file.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2149,6 +2149,27 @@ describe('File', () => {
21492149
writable.write('data');
21502150
});
21512151

2152+
it('should clean up if the returned stream is destroyed during upload startup', done => {
2153+
const writable = file.createWriteStream();
2154+
let fileWriteStream: duplexify.Duplexify | undefined;
2155+
2156+
file.startResumableUpload_ = (stream: duplexify.Duplexify) => {
2157+
fileWriteStream = stream;
2158+
writable.destroy();
2159+
};
2160+
2161+
writable.on('close', () => {
2162+
setImmediate(() => {
2163+
assert(fileWriteStream);
2164+
assert.strictEqual(fileWriteStream.destroyed, true);
2165+
assert.strictEqual(fileWriteStream.listenerCount('error'), 0);
2166+
done();
2167+
});
2168+
});
2169+
2170+
writable.write('data');
2171+
});
2172+
21522173
it('should alias contentType to metadata object', done => {
21532174
const contentType = 'text/html';
21542175
const writable = file.createWriteStream({contentType});

0 commit comments

Comments
 (0)