Skip to content

Commit 2898820

Browse files
author
LIzhifeng
committed
Fix: blocking PATCH requests and implement true streaming for chunked uploads
1 parent c631cea commit 2898820

5 files changed

Lines changed: 125 additions & 33 deletions

File tree

src/chunk.ts

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,26 +43,34 @@ export async function getChunkBlob(env: Env, chunk: Chunk): Promise<Blob | null>
4343
*/
4444
export function limit(streamInput: ReadableStream, limitBytes: number): ReadableStream {
4545
if (streamInput instanceof FixedLengthStream) return streamInput;
46-
const stream = new FixedLengthStream(limitBytes, {});
46+
47+
// Use standard TransformStream to limit length, ensuring backpressure is correctly handled
48+
const { readable, writable } = new TransformStream();
4749

4850
(async () => {
49-
const w = stream.writable.getWriter();
50-
const r = streamInput.getReader();
51+
const reader = streamInput.getReader();
52+
const writer = writable.getWriter();
5153
let written = 0;
52-
while (true) {
53-
const { done, value } = await r.read();
54-
if (done) break;
55-
await w.write(value);
56-
written += value.length;
57-
if (written >= limitBytes) break;
54+
try {
55+
while (written < limitBytes) {
56+
const { done, value } = await reader.read();
57+
if (done) break;
58+
const toWrite = value.length + written > limitBytes
59+
? value.slice(0, limitBytes - written)
60+
: value;
61+
await writer.write(toWrite);
62+
written += toWrite.length;
63+
if (written >= limitBytes) break;
64+
}
65+
} catch (e) {
66+
writer.abort(e);
67+
} finally {
68+
reader.releaseLock();
69+
try { writer.close(); } catch {}
5870
}
59-
60-
r.releaseLock();
61-
w.releaseLock();
62-
await stream.writable.close();
6371
})();
6472

65-
return stream.readable;
73+
return readable;
6674
}
6775

6876
export async function* split(

src/registry/r2.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -652,21 +652,21 @@ export class R2Registry implements Registry {
652652
if (env.PUSH_COMPATIBILITY_MODE === "full") {
653653
const [stream1, stream2] = limit(stream, size).tee();
654654
const partTask = upload.uploadPart(state.parts.length + 1, stream1);
655-
// We can totally disable this, however we are risking that the client sends another small chunk.
656-
// Maybe instead we can throw range error
655+
657656
const dateInOneHour = new Date();
658657
dateInOneHour.setTime(dateInOneHour.getTime() + 60 * 60 * 1000);
659658
const headers = {
660-
// https://www.rfc-editor.org/rfc/rfc1123 date format
661-
// Objects will typically be removed from a bucket within 24 hours of the x-amz-expiration value.
662659
"x-amz-expiration": dateInOneHour.toUTCString(),
663660
} as const;
661+
664662
const r2RegistryObjectTask = env.REGISTRY.put(path, stream2, {
665663
httpMetadata: new Headers(headers),
666664
customMetadata: headers,
667665
});
668-
state.parts.push(await partTask);
669-
await r2RegistryObjectTask;
666+
667+
// Run both in parallel and wait for both to complete
668+
const [part] = await Promise.all([partTask, r2RegistryObjectTask]);
669+
state.parts.push(part);
670670
return;
671671
}
672672

@@ -694,7 +694,7 @@ export class R2Registry implements Registry {
694694
};
695695

696696
if (length === undefined) {
697-
console.error("Length needs to be defined");
697+
console.error("Length needs to be defined for streaming upload to R2. Ensure Content-Length or Content-Range is provided.");
698698
return {
699699
response: new InternalError(),
700700
};

src/router.ts

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Router } from "itty-router";
22
import { BlobUnknownError, ManifestUnknownError } from "./v2-errors";
33
import { InternalError, ServerError } from "./errors";
4-
import { errorString, jsonHeaders, wrap } from "./utils";
4+
import { errorString, getStreamSize, jsonHeaders, wrap } from "./utils";
55
import { hexToDigest } from "./user";
66
import { ManifestTagsListTooBigError } from "./v2-responses";
77
import { Env } from "..";
@@ -418,19 +418,15 @@ v2Router.get("/:name+/blobs/uploads/:uuid", async (req, env: Env) => {
418418
v2Router.patch("/:name+/blobs/uploads/:uuid", async (req, env: Env) => {
419419
const { name, uuid } = req.params;
420420
const contentRange = req.headers.get("Content-Range");
421-
const [start, end] = contentRange?.split("-") ?? [undefined, undefined];
421+
const rangeMatch = contentRange?.match(/(?:bytes\s+)?(\d+)-(\d+)/);
422+
const [start, end] = rangeMatch ? [rangeMatch[1], rangeMatch[2]] : [undefined, undefined];
422423

423424
if (req.body == null) {
424425
return new Response(null, { status: 400 });
425426
}
426427

427-
let contentLengthString = req.headers.get("Content-Length");
428-
let stream = req.body;
429-
if (!contentLengthString) {
430-
const blob = await req.blob();
431-
contentLengthString = `${blob.size}`;
432-
stream = blob.stream();
433-
}
428+
const streamSize = getStreamSize(req.headers);
429+
const stream = req.body;
434430

435431
const url = new URL(req.url);
436432
const [res, err] = await wrap<UploadObject | RegistryError, Error>(
@@ -439,7 +435,7 @@ v2Router.patch("/:name+/blobs/uploads/:uuid", async (req, env: Env) => {
439435
uuid,
440436
url.pathname + "?" + url.searchParams.toString(),
441437
stream,
442-
+contentLengthString,
438+
streamSize,
443439
end !== undefined && start !== undefined ? [+start, +end] : undefined,
444440
),
445441
);
@@ -457,8 +453,7 @@ v2Router.patch("/:name+/blobs/uploads/:uuid", async (req, env: Env) => {
457453
status: 202,
458454
headers: {
459455
"Location": res.location,
460-
// Note that the HTTP Range header byte ranges are inclusive and that will be honored, even in non-standard use cases.
461-
"Range": `${res.range.join("-")}`,
456+
"Range": `0-${res.range[1]}`, // Ensure correct Range format (0-N)
462457
"Docker-Upload-UUID": res.id,
463458
},
464459
});

src/utils.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,25 @@ export async function wrap<T, E = unknown>(fn: Promise<T>): Promise<[T, null] |
6767
export function jsonHeaders(): { "content-type": "application/json" } {
6868
return { "content-type": "application/json" };
6969
}
70+
71+
/**
72+
* Get the estimated size of the stream (if possible).
73+
* Does not wait for the entire stream, only checks if known length information is available.
74+
*/
75+
export function getStreamSize(headers: Headers): number | undefined {
76+
const contentLength = headers.get("Content-Length");
77+
if (contentLength) {
78+
return +contentLength;
79+
}
80+
81+
const contentRange = headers.get("Content-Range");
82+
if (contentRange) {
83+
// Supported formats: 'bytes 0-123/456', 'bytes 0-123/*', '0-123'
84+
const match = contentRange.match(/(?:bytes\s+)?(\d+)-(\d+)/);
85+
if (match) {
86+
return parseInt(match[2], 10) - parseInt(match[1], 10) + 1;
87+
}
88+
}
89+
90+
return undefined;
91+
}

test/streaming.test.ts

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import { describe, expect, test } from "vitest";
2+
import { getStreamSize } from "../src/utils";
3+
import { limit } from "../src/chunk";
4+
5+
describe("Streaming Utilities", () => {
6+
test("getStreamSize extracts size correctly from Content-Length", () => {
7+
const headers = new Headers({
8+
"Content-Length": "1024",
9+
});
10+
expect(getStreamSize(headers)).toBe(1024);
11+
});
12+
13+
test("getStreamSize extracts size correctly from Content-Range (standard)", () => {
14+
const headers = new Headers({
15+
"Content-Range": "bytes 0-52428799/104857600",
16+
});
17+
expect(getStreamSize(headers)).toBe(52428800);
18+
});
19+
20+
test("getStreamSize extracts size correctly from Content-Range (without total)", () => {
21+
const headers = new Headers({
22+
"Content-Range": "bytes 500-999",
23+
});
24+
expect(getStreamSize(headers)).toBe(500);
25+
});
26+
27+
test("getStreamSize returns undefined when no size headers are present", () => {
28+
const headers = new Headers();
29+
expect(getStreamSize(headers)).toBeUndefined();
30+
});
31+
});
32+
33+
describe("Stream limit function", () => {
34+
test("limit correctly truncates a stream", async () => {
35+
const data = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
36+
const stream = new Blob([data]).stream();
37+
const limitedStream = limit(stream, 5);
38+
39+
const reader = limitedStream.getReader();
40+
const chunks = [];
41+
while (true) {
42+
const { done, value } = await reader.read();
43+
if (done) break;
44+
chunks.push(...value);
45+
}
46+
47+
expect(chunks).toHaveLength(5);
48+
expect(new Uint8Array(chunks)).toEqual(new Uint8Array([1, 2, 3, 4, 5]));
49+
});
50+
51+
test("limit handles stream smaller than limit", async () => {
52+
const data = new Uint8Array([1, 2, 3]);
53+
const stream = new Blob([data]).stream();
54+
const limitedStream = limit(stream, 10);
55+
56+
const reader = limitedStream.getReader();
57+
const chunks = [];
58+
while (true) {
59+
const { done, value } = await reader.read();
60+
if (done) break;
61+
chunks.push(...value);
62+
}
63+
64+
expect(chunks).toHaveLength(3);
65+
expect(new Uint8Array(chunks)).toEqual(new Uint8Array([1, 2, 3]));
66+
});
67+
});

0 commit comments

Comments
 (0)