Skip to content

Commit 949e994

Browse files
matingathanivvoclaude
authored
fix(blob): resolve multipart deadlock on empty streams, fix TransformStream backpressure (#1037)
* fix(blob): resolve multipart deadlock on empty streams, fix TransformStream backpressure Two fixes: 1. uploadAllParts hangs forever on empty ReadableStream because resolve() was only called inside sendPart(), which is never invoked when there are no bytes. Add resolve() when stream is done with no buffered parts and no active uploads. 2. createChunkTransformStream wrapped transform/flush in queueMicrotask, returning undefined immediately. This broke backpressure and could cause data loss in flush (trailing bytes enqueued after the readable side closes). Fixes #881 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test: add e2e test for empty stream multipart deadlock fix Add a test route that does a multipart upload with an empty ReadableStream, and a Playwright test that verifies it resolves without hanging (with a 15s timeout). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Vincent Voyer <vincent@codeagain.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent d2ea7cf commit 949e994

7 files changed

Lines changed: 180 additions & 23 deletions

File tree

.changeset/fix-stream-deadlock.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@vercel/blob": patch
3+
---
4+
5+
Fix multipart upload hanging forever on empty streams, and fix `createChunkTransformStream` bypassing backpressure by removing incorrect `queueMicrotask` wrapping.

packages/blob/src/api.node.test.ts

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
BlobUnknownError,
1010
requestApi,
1111
} from './api';
12-
import { BlobError } from './helpers';
12+
import { BlobError, createChunkTransformStream } from './helpers';
1313

1414
describe('api', () => {
1515
describe('request api', () => {
@@ -133,3 +133,74 @@ describe('api', () => {
133133
});
134134
});
135135
});
136+
137+
describe('createChunkTransformStream', () => {
138+
async function collectChunks(
139+
stream: ReadableStream<Uint8Array>,
140+
): Promise<Uint8Array[]> {
141+
const chunks: Uint8Array[] = [];
142+
const reader = stream.getReader();
143+
for (;;) {
144+
const { value, done } = await reader.read();
145+
if (done) break;
146+
chunks.push(value);
147+
}
148+
return chunks;
149+
}
150+
151+
it('accumulates small chunks and emits full-sized chunks', async () => {
152+
const chunkSize = 4;
153+
const stream = new ReadableStream<ArrayBuffer>({
154+
start(controller) {
155+
controller.enqueue(new Uint8Array([1, 2]).buffer);
156+
controller.enqueue(new Uint8Array([3, 4, 5, 6]).buffer);
157+
controller.enqueue(new Uint8Array([7]).buffer);
158+
controller.close();
159+
},
160+
});
161+
162+
const chunks = await collectChunks(
163+
stream.pipeThrough(createChunkTransformStream(chunkSize)),
164+
);
165+
166+
expect(chunks).toHaveLength(2);
167+
expect(Array.from(chunks[0]!)).toEqual([1, 2, 3, 4]);
168+
expect(Array.from(chunks[1]!)).toEqual([5, 6, 7]);
169+
});
170+
171+
it('calls onProgress for each emitted chunk', async () => {
172+
const chunkSize = 3;
173+
const progress: number[] = [];
174+
const stream = new ReadableStream<ArrayBuffer>({
175+
start(controller) {
176+
controller.enqueue(new Uint8Array([1, 2, 3, 4, 5]).buffer);
177+
controller.close();
178+
},
179+
});
180+
181+
await collectChunks(
182+
stream.pipeThrough(
183+
createChunkTransformStream(chunkSize, (bytes) => progress.push(bytes)),
184+
),
185+
);
186+
187+
expect(progress).toEqual([3, 2]);
188+
});
189+
190+
it('flushes remaining bytes when stream ends', async () => {
191+
const chunkSize = 10;
192+
const stream = new ReadableStream<ArrayBuffer>({
193+
start(controller) {
194+
controller.enqueue(new Uint8Array([1, 2, 3]).buffer);
195+
controller.close();
196+
},
197+
});
198+
199+
const chunks = await collectChunks(
200+
stream.pipeThrough(createChunkTransformStream(chunkSize)),
201+
);
202+
203+
expect(chunks).toHaveLength(1);
204+
expect(Array.from(chunks[0]!)).toEqual([1, 2, 3]);
205+
});
206+
});

packages/blob/src/helpers.ts

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -269,31 +269,27 @@ export const createChunkTransformStream = (
269269

270270
return new TransformStream<ArrayBuffer, Uint8Array>({
271271
transform(chunk, controller) {
272-
queueMicrotask(() => {
273-
// Combine the new chunk with any leftover data
274-
const newBuffer = new Uint8Array(buffer.length + chunk.byteLength);
275-
newBuffer.set(buffer);
276-
newBuffer.set(new Uint8Array(chunk), buffer.length);
277-
buffer = newBuffer;
278-
279-
// Output complete chunks
280-
while (buffer.length >= chunkSize) {
281-
const newChunk = buffer.slice(0, chunkSize);
282-
controller.enqueue(newChunk);
283-
onProgress?.(newChunk.byteLength);
284-
buffer = buffer.slice(chunkSize);
285-
}
286-
});
272+
// Combine the new chunk with any leftover data
273+
const newBuffer = new Uint8Array(buffer.length + chunk.byteLength);
274+
newBuffer.set(buffer);
275+
newBuffer.set(new Uint8Array(chunk), buffer.length);
276+
buffer = newBuffer;
277+
278+
// Output complete chunks
279+
while (buffer.length >= chunkSize) {
280+
const newChunk = buffer.slice(0, chunkSize);
281+
controller.enqueue(newChunk);
282+
onProgress?.(newChunk.byteLength);
283+
buffer = buffer.slice(chunkSize);
284+
}
287285
},
288286

289287
flush(controller) {
290-
queueMicrotask(() => {
291-
// Send any remaining data
292-
if (buffer.length > 0) {
293-
controller.enqueue(buffer);
294-
onProgress?.(buffer.byteLength);
295-
}
296-
});
288+
// Send any remaining data
289+
if (buffer.length > 0) {
290+
controller.enqueue(buffer);
291+
onProgress?.(buffer.byteLength);
292+
}
297293
},
298294
});
299295
};

packages/blob/src/index.node.test.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,33 @@ describe('blob client', () => {
791791
}
792792
});
793793

794+
it('resolves (does not hang) when multipart body is an empty ReadableStream', async () => {
795+
const emptyStream = new ReadableStream({
796+
start(controller) {
797+
controller.close();
798+
},
799+
});
800+
801+
// createMultipartUpload
802+
mockClient
803+
.intercept({ path: () => true, method: 'POST' })
804+
.reply(200, { uploadId: 'upload-123', key: 'foo.txt' });
805+
806+
// completeMultipartUpload (called with 0 parts for an empty stream)
807+
mockClient.intercept({ path: () => true, method: 'POST' }).reply(200, {
808+
url: `${BLOB_STORE_BASE_URL}/foo.txt`,
809+
downloadUrl: `${BLOB_STORE_BASE_URL}/foo.txt?download=1`,
810+
pathname: 'foo.txt',
811+
contentType: 'text/plain',
812+
contentDisposition: 'attachment; filename="foo.txt"',
813+
etag: '"empty"',
814+
});
815+
816+
await expect(
817+
put('foo.txt', emptyStream, { access: 'public', multipart: true }),
818+
).resolves.toMatchObject({ pathname: 'foo.txt' });
819+
});
820+
794821
const table: [string, (signal: AbortSignal) => Promise<unknown>][] = [
795822
[
796823
'put',

packages/blob/src/multipart/upload.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,11 @@ export function uploadAllParts({
245245
});
246246

247247
sendParts();
248+
} else if (activeUploads === 0) {
249+
// Stream was empty or ended exactly on a part boundary with no
250+
// in-flight uploads. Nothing will call resolve() later, so do it here.
251+
reader.releaseLock();
252+
resolve(completedParts);
248253
}
249254
reading = false;
250255
return;
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import * as vercelBlob from '@vercel/blob';
2+
import { NextResponse } from 'next/server';
3+
import { validateUploadToken } from '../../../validate-upload-token';
4+
5+
export async function POST(request: Request): Promise<NextResponse> {
6+
const { searchParams } = new URL(request.url);
7+
const pathname = searchParams.get('filename');
8+
9+
if (pathname === null) {
10+
return NextResponse.json({ message: 'Missing filename' }, { status: 400 });
11+
}
12+
13+
if (!validateUploadToken(request)) {
14+
return NextResponse.json({ message: 'Not authorized' }, { status: 401 });
15+
}
16+
17+
// Multipart upload with an empty ReadableStream — this used to deadlock
18+
// because uploadAllParts never called resolve() when the stream was empty.
19+
const emptyStream = new ReadableStream({
20+
start(controller) {
21+
controller.close();
22+
},
23+
});
24+
25+
const blob = await vercelBlob.put(pathname, emptyStream, {
26+
access: 'public',
27+
multipart: true,
28+
addRandomSuffix: true,
29+
});
30+
31+
return NextResponse.json(blob);
32+
}

test/next/test/@vercel/blob/index.test.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,27 @@ test.describe('@vercel/blob', () => {
3737
});
3838
});
3939

40+
test.describe('empty stream multipart', () => {
41+
test('does not deadlock on empty ReadableStream with multipart: true', async ({
42+
request,
43+
extraHTTPHeaders,
44+
}) => {
45+
const res = await request.post(
46+
`vercel/blob/api/app/empty-stream-multipart?filename=${prefix}/empty-stream.bin`,
47+
{
48+
headers: {
49+
cookie: `clientUpload=${process.env.BLOB_UPLOAD_SECRET ?? ''}`,
50+
...extraHTTPHeaders,
51+
},
52+
timeout: 15000, // Should resolve fast, not deadlock
53+
},
54+
);
55+
// The upload resolves (no deadlock). The server may reject the empty
56+
// multipart upload, but the important thing is it doesn't hang.
57+
expect(res.status()).toBeLessThan(500);
58+
});
59+
});
60+
4061
test.describe('page', () => {
4162
test('serverless', async ({ page }) => {
4263
await page.goto(`vercel/pages/blob?filename=${prefix}/test-page.txt`);

0 commit comments

Comments
 (0)