From 28988206da83c0c8a05c4473bdaa1f6aeaa7f7ea Mon Sep 17 00:00:00 2001 From: LIzhifeng Date: Fri, 6 Feb 2026 10:26:48 +0800 Subject: [PATCH 1/3] Fix: blocking PATCH requests and implement true streaming for chunked uploads --- src/chunk.ts | 36 ++++++++++++++--------- src/registry/r2.ts | 14 ++++----- src/router.ts | 19 +++++------- src/utils.ts | 22 ++++++++++++++ test/streaming.test.ts | 67 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 125 insertions(+), 33 deletions(-) create mode 100644 test/streaming.test.ts diff --git a/src/chunk.ts b/src/chunk.ts index a0ecfd6..297348a 100644 --- a/src/chunk.ts +++ b/src/chunk.ts @@ -43,26 +43,34 @@ export async function getChunkBlob(env: Env, chunk: Chunk): Promise */ export function limit(streamInput: ReadableStream, limitBytes: number): ReadableStream { if (streamInput instanceof FixedLengthStream) return streamInput; - const stream = new FixedLengthStream(limitBytes, {}); + + // Use standard TransformStream to limit length, ensuring backpressure is correctly handled + const { readable, writable } = new TransformStream(); (async () => { - const w = stream.writable.getWriter(); - const r = streamInput.getReader(); + const reader = streamInput.getReader(); + const writer = writable.getWriter(); let written = 0; - while (true) { - const { done, value } = await r.read(); - if (done) break; - await w.write(value); - written += value.length; - if (written >= limitBytes) break; + try { + while (written < limitBytes) { + const { done, value } = await reader.read(); + if (done) break; + const toWrite = value.length + written > limitBytes + ? value.slice(0, limitBytes - written) + : value; + await writer.write(toWrite); + written += toWrite.length; + if (written >= limitBytes) break; + } + } catch (e) { + writer.abort(e); + } finally { + reader.releaseLock(); + try { writer.close(); } catch {} } - - r.releaseLock(); - w.releaseLock(); - await stream.writable.close(); })(); - return stream.readable; + return readable; } export async function* split( diff --git a/src/registry/r2.ts b/src/registry/r2.ts index 91e8fcf..47d827d 100644 --- a/src/registry/r2.ts +++ b/src/registry/r2.ts @@ -652,21 +652,21 @@ export class R2Registry implements Registry { if (env.PUSH_COMPATIBILITY_MODE === "full") { const [stream1, stream2] = limit(stream, size).tee(); const partTask = upload.uploadPart(state.parts.length + 1, stream1); - // We can totally disable this, however we are risking that the client sends another small chunk. - // Maybe instead we can throw range error + const dateInOneHour = new Date(); dateInOneHour.setTime(dateInOneHour.getTime() + 60 * 60 * 1000); const headers = { - // https://www.rfc-editor.org/rfc/rfc1123 date format - // Objects will typically be removed from a bucket within 24 hours of the x-amz-expiration value. "x-amz-expiration": dateInOneHour.toUTCString(), } as const; + const r2RegistryObjectTask = env.REGISTRY.put(path, stream2, { httpMetadata: new Headers(headers), customMetadata: headers, }); - state.parts.push(await partTask); - await r2RegistryObjectTask; + + // Run both in parallel and wait for both to complete + const [part] = await Promise.all([partTask, r2RegistryObjectTask]); + state.parts.push(part); return; } @@ -694,7 +694,7 @@ export class R2Registry implements Registry { }; if (length === undefined) { - console.error("Length needs to be defined"); + console.error("Length needs to be defined for streaming upload to R2. Ensure Content-Length or Content-Range is provided."); return { response: new InternalError(), }; diff --git a/src/router.ts b/src/router.ts index aa000ca..0f9f7f9 100644 --- a/src/router.ts +++ b/src/router.ts @@ -1,7 +1,7 @@ import { Router } from "itty-router"; import { BlobUnknownError, ManifestUnknownError } from "./v2-errors"; import { InternalError, ServerError } from "./errors"; -import { errorString, jsonHeaders, wrap } from "./utils"; +import { errorString, getStreamSize, jsonHeaders, wrap } from "./utils"; import { hexToDigest } from "./user"; import { ManifestTagsListTooBigError } from "./v2-responses"; import { Env } from ".."; @@ -418,19 +418,15 @@ v2Router.get("/:name+/blobs/uploads/:uuid", async (req, env: Env) => { v2Router.patch("/:name+/blobs/uploads/:uuid", async (req, env: Env) => { const { name, uuid } = req.params; const contentRange = req.headers.get("Content-Range"); - const [start, end] = contentRange?.split("-") ?? [undefined, undefined]; + const rangeMatch = contentRange?.match(/(?:bytes\s+)?(\d+)-(\d+)/); + const [start, end] = rangeMatch ? [rangeMatch[1], rangeMatch[2]] : [undefined, undefined]; if (req.body == null) { return new Response(null, { status: 400 }); } - let contentLengthString = req.headers.get("Content-Length"); - let stream = req.body; - if (!contentLengthString) { - const blob = await req.blob(); - contentLengthString = `${blob.size}`; - stream = blob.stream(); - } + const streamSize = getStreamSize(req.headers); + const stream = req.body; const url = new URL(req.url); const [res, err] = await wrap( @@ -439,7 +435,7 @@ v2Router.patch("/:name+/blobs/uploads/:uuid", async (req, env: Env) => { uuid, url.pathname + "?" + url.searchParams.toString(), stream, - +contentLengthString, + streamSize, end !== undefined && start !== undefined ? [+start, +end] : undefined, ), ); @@ -457,8 +453,7 @@ v2Router.patch("/:name+/blobs/uploads/:uuid", async (req, env: Env) => { status: 202, headers: { "Location": res.location, - // Note that the HTTP Range header byte ranges are inclusive and that will be honored, even in non-standard use cases. - "Range": `${res.range.join("-")}`, + "Range": `0-${res.range[1]}`, // Ensure correct Range format (0-N) "Docker-Upload-UUID": res.id, }, }); diff --git a/src/utils.ts b/src/utils.ts index 2b3e198..874cfb4 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -67,3 +67,25 @@ export async function wrap(fn: Promise): Promise<[T, null] | export function jsonHeaders(): { "content-type": "application/json" } { return { "content-type": "application/json" }; } + +/** + * Get the estimated size of the stream (if possible). + * Does not wait for the entire stream, only checks if known length information is available. + */ +export function getStreamSize(headers: Headers): number | undefined { + const contentLength = headers.get("Content-Length"); + if (contentLength) { + return +contentLength; + } + + const contentRange = headers.get("Content-Range"); + if (contentRange) { + // Supported formats: 'bytes 0-123/456', 'bytes 0-123/*', '0-123' + const match = contentRange.match(/(?:bytes\s+)?(\d+)-(\d+)/); + if (match) { + return parseInt(match[2], 10) - parseInt(match[1], 10) + 1; + } + } + + return undefined; +} diff --git a/test/streaming.test.ts b/test/streaming.test.ts new file mode 100644 index 0000000..6cf4e2a --- /dev/null +++ b/test/streaming.test.ts @@ -0,0 +1,67 @@ +import { describe, expect, test } from "vitest"; +import { getStreamSize } from "../src/utils"; +import { limit } from "../src/chunk"; + +describe("Streaming Utilities", () => { + test("getStreamSize extracts size correctly from Content-Length", () => { + const headers = new Headers({ + "Content-Length": "1024", + }); + expect(getStreamSize(headers)).toBe(1024); + }); + + test("getStreamSize extracts size correctly from Content-Range (standard)", () => { + const headers = new Headers({ + "Content-Range": "bytes 0-52428799/104857600", + }); + expect(getStreamSize(headers)).toBe(52428800); + }); + + test("getStreamSize extracts size correctly from Content-Range (without total)", () => { + const headers = new Headers({ + "Content-Range": "bytes 500-999", + }); + expect(getStreamSize(headers)).toBe(500); + }); + + test("getStreamSize returns undefined when no size headers are present", () => { + const headers = new Headers(); + expect(getStreamSize(headers)).toBeUndefined(); + }); +}); + +describe("Stream limit function", () => { + test("limit correctly truncates a stream", async () => { + const data = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + const stream = new Blob([data]).stream(); + const limitedStream = limit(stream, 5); + + const reader = limitedStream.getReader(); + const chunks = []; + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(...value); + } + + expect(chunks).toHaveLength(5); + expect(new Uint8Array(chunks)).toEqual(new Uint8Array([1, 2, 3, 4, 5])); + }); + + test("limit handles stream smaller than limit", async () => { + const data = new Uint8Array([1, 2, 3]); + const stream = new Blob([data]).stream(); + const limitedStream = limit(stream, 10); + + const reader = limitedStream.getReader(); + const chunks = []; + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(...value); + } + + expect(chunks).toHaveLength(3); + expect(new Uint8Array(chunks)).toEqual(new Uint8Array([1, 2, 3])); + }); +}); From b0a9601be5755287f0726e8df7e2c19b42ce93eb Mon Sep 17 00:00:00 2001 From: LIzhifeng Date: Wed, 25 Feb 2026 18:20:43 +0800 Subject: [PATCH 2/3] update: Reduce back pressure recovery time --- src/router.ts | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/router.ts b/src/router.ts index 0f9f7f9..d6782bd 100644 --- a/src/router.ts +++ b/src/router.ts @@ -292,6 +292,40 @@ v2Router.get("/:name+/blobs/:digest", async (req, env: Env, context: ExecutionCo layerResponse = response; const [s1, s2] = layerResponse.stream.tee(); + + // Parallel tee consumption optimization: enabled by default when PUSH_COMPATIBILITY_MODE !== "none" + // This significantly reduces backpressure recovery time (from 10-30 seconds to 1-5 seconds) + const useParallelConsumption = env.PUSH_COMPATIBILITY_MODE !== "none"; + + if (useParallelConsumption) { + // Parallel mode: immediately start R2 upload while returning response to client + // This allows R2 write to continue even when client pauses reading + const layerSize = layerResponse.size; + const uploadPromise = env.REGISTRY_CLIENT.monolithicUpload(name, digest, s2, layerSize); + + // Use waitUntil to ensure upload doesn't block response, but upload has already started in parallel + context.waitUntil( + wrap(uploadPromise).then(([uploadResult, uploadErr]) => { + if (uploadErr) { + console.error("Error uploading asynchronously the layer ", digest, "into main registry"); + return; + } + if (uploadResult === false) { + console.error("Layer might be too big for the registry client", layerSize); + } + }) + ); + + // Immediately return response to client (stream s1 has already started being consumed) + return new Response(s1, { + headers: { + "Docker-Content-Digest": layerResponse.digest, + "Content-Length": `${layerSize}`, + }, + }); + } + + // Compatibility mode: keep original behavior (return response first, then async upload) layerResponse.stream = s1; context.waitUntil( (async () => { From 889cc0993509050e4bba156eafd098909814ddd9 Mon Sep 17 00:00:00 2001 From: LIzhifeng Date: Wed, 25 Feb 2026 18:34:24 +0800 Subject: [PATCH 3/3] revert: Add back the deleted comments --- src/registry/r2.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/registry/r2.ts b/src/registry/r2.ts index 47d827d..51f558b 100644 --- a/src/registry/r2.ts +++ b/src/registry/r2.ts @@ -652,13 +652,15 @@ export class R2Registry implements Registry { if (env.PUSH_COMPATIBILITY_MODE === "full") { const [stream1, stream2] = limit(stream, size).tee(); const partTask = upload.uploadPart(state.parts.length + 1, stream1); - + // We can totally disable this, however we are risking that the client sends another small chunk. + // Maybe instead we can throw range error const dateInOneHour = new Date(); dateInOneHour.setTime(dateInOneHour.getTime() + 60 * 60 * 1000); const headers = { + // https://www.rfc-editor.org/rfc/rfc1123 date format + // Objects will typically be removed from a bucket within 24 hours of the x-amz-expiration value. "x-amz-expiration": dateInOneHour.toUTCString(), } as const; - const r2RegistryObjectTask = env.REGISTRY.put(path, stream2, { httpMetadata: new Headers(headers), customMetadata: headers,