Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions src/chunk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,34 @@ export async function getChunkBlob(env: Env, chunk: Chunk): Promise<Blob | null>
*/
export function limit(streamInput: ReadableStream, limitBytes: number): ReadableStream {
if (streamInput instanceof FixedLengthStream) return streamInput;
const stream = new FixedLengthStream(limitBytes, {});

// Use standard TransformStream to limit length, ensuring backpressure is correctly handled
const { readable, writable } = new TransformStream();

(async () => {
const w = stream.writable.getWriter();
const r = streamInput.getReader();
const reader = streamInput.getReader();
const writer = writable.getWriter();
let written = 0;
while (true) {
const { done, value } = await r.read();
if (done) break;
await w.write(value);
written += value.length;
if (written >= limitBytes) break;
try {
while (written < limitBytes) {
const { done, value } = await reader.read();
if (done) break;
const toWrite = value.length + written > limitBytes
? value.slice(0, limitBytes - written)
: value;
await writer.write(toWrite);
written += toWrite.length;
if (written >= limitBytes) break;
}
} catch (e) {
writer.abort(e);
} finally {
reader.releaseLock();
try { writer.close(); } catch {}
}

r.releaseLock();
w.releaseLock();
await stream.writable.close();
})();

return stream.readable;
return readable;
}

export async function* split(
Expand Down
8 changes: 5 additions & 3 deletions src/registry/r2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -665,8 +665,10 @@ export class R2Registry implements Registry {
httpMetadata: new Headers(headers),
customMetadata: headers,
});
state.parts.push(await partTask);
await r2RegistryObjectTask;

// Run both in parallel and wait for both to complete
const [part] = await Promise.all([partTask, r2RegistryObjectTask]);
state.parts.push(part);
return;
}

Expand Down Expand Up @@ -694,7 +696,7 @@ export class R2Registry implements Registry {
};

if (length === undefined) {
console.error("Length needs to be defined");
console.error("Length needs to be defined for streaming upload to R2. Ensure Content-Length or Content-Range is provided.");
return {
response: new InternalError(),
};
Expand Down
53 changes: 41 additions & 12 deletions src/router.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Router } from "itty-router";
import { BlobUnknownError, ManifestUnknownError } from "./v2-errors";
import { InternalError, ServerError } from "./errors";
import { errorString, jsonHeaders, wrap } from "./utils";
import { errorString, getStreamSize, jsonHeaders, wrap } from "./utils";
import { hexToDigest } from "./user";
import { ManifestTagsListTooBigError } from "./v2-responses";
import { Env } from "..";
Expand Down Expand Up @@ -292,6 +292,40 @@ v2Router.get("/:name+/blobs/:digest", async (req, env: Env, context: ExecutionCo

layerResponse = response;
const [s1, s2] = layerResponse.stream.tee();

// Parallel tee consumption optimization: enabled by default when PUSH_COMPATIBILITY_MODE !== "none"
// This significantly reduces backpressure recovery time (from 10-30 seconds to 1-5 seconds)
const useParallelConsumption = env.PUSH_COMPATIBILITY_MODE !== "none";

if (useParallelConsumption) {
// Parallel mode: immediately start R2 upload while returning response to client
// This allows R2 write to continue even when client pauses reading
const layerSize = layerResponse.size;
const uploadPromise = env.REGISTRY_CLIENT.monolithicUpload(name, digest, s2, layerSize);

// Use waitUntil to ensure upload doesn't block response, but upload has already started in parallel
context.waitUntil(
wrap(uploadPromise).then(([uploadResult, uploadErr]) => {
if (uploadErr) {
console.error("Error uploading asynchronously the layer ", digest, "into main registry");
return;
}
if (uploadResult === false) {
console.error("Layer might be too big for the registry client", layerSize);
}
})
);

// Immediately return response to client (stream s1 has already started being consumed)
return new Response(s1, {
headers: {
"Docker-Content-Digest": layerResponse.digest,
"Content-Length": `${layerSize}`,
},
});
}

// Compatibility mode: keep original behavior (return response first, then async upload)
layerResponse.stream = s1;
context.waitUntil(
(async () => {
Expand Down Expand Up @@ -418,19 +452,15 @@ v2Router.get("/:name+/blobs/uploads/:uuid", async (req, env: Env) => {
v2Router.patch("/:name+/blobs/uploads/:uuid", async (req, env: Env) => {
const { name, uuid } = req.params;
const contentRange = req.headers.get("Content-Range");
const [start, end] = contentRange?.split("-") ?? [undefined, undefined];
const rangeMatch = contentRange?.match(/(?:bytes\s+)?(\d+)-(\d+)/);
const [start, end] = rangeMatch ? [rangeMatch[1], rangeMatch[2]] : [undefined, undefined];

if (req.body == null) {
return new Response(null, { status: 400 });
}

let contentLengthString = req.headers.get("Content-Length");
let stream = req.body;
if (!contentLengthString) {
const blob = await req.blob();
contentLengthString = `${blob.size}`;
stream = blob.stream();
}
const streamSize = getStreamSize(req.headers);
const stream = req.body;

const url = new URL(req.url);
const [res, err] = await wrap<UploadObject | RegistryError, Error>(
Expand All @@ -439,7 +469,7 @@ v2Router.patch("/:name+/blobs/uploads/:uuid", async (req, env: Env) => {
uuid,
url.pathname + "?" + url.searchParams.toString(),
stream,
+contentLengthString,
streamSize,
end !== undefined && start !== undefined ? [+start, +end] : undefined,
),
);
Expand All @@ -457,8 +487,7 @@ v2Router.patch("/:name+/blobs/uploads/:uuid", async (req, env: Env) => {
status: 202,
headers: {
"Location": res.location,
// Note that the HTTP Range header byte ranges are inclusive and that will be honored, even in non-standard use cases.
"Range": `${res.range.join("-")}`,
"Range": `0-${res.range[1]}`, // Ensure correct Range format (0-N)
"Docker-Upload-UUID": res.id,
},
});
Expand Down
22 changes: 22 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,25 @@ export async function wrap<T, E = unknown>(fn: Promise<T>): Promise<[T, null] |
export function jsonHeaders(): { "content-type": "application/json" } {
return { "content-type": "application/json" };
}

/**
* Get the estimated size of the stream (if possible).
* Does not wait for the entire stream, only checks if known length information is available.
*/
export function getStreamSize(headers: Headers): number | undefined {
const contentLength = headers.get("Content-Length");
if (contentLength) {
return +contentLength;
}

const contentRange = headers.get("Content-Range");
if (contentRange) {
// Supported formats: 'bytes 0-123/456', 'bytes 0-123/*', '0-123'
const match = contentRange.match(/(?:bytes\s+)?(\d+)-(\d+)/);
if (match) {
return parseInt(match[2], 10) - parseInt(match[1], 10) + 1;
}
}

return undefined;
}
67 changes: 67 additions & 0 deletions test/streaming.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { describe, expect, test } from "vitest";
import { getStreamSize } from "../src/utils";
import { limit } from "../src/chunk";

describe("Streaming Utilities", () => {
test("getStreamSize extracts size correctly from Content-Length", () => {
const headers = new Headers({
"Content-Length": "1024",
});
expect(getStreamSize(headers)).toBe(1024);
});

test("getStreamSize extracts size correctly from Content-Range (standard)", () => {
const headers = new Headers({
"Content-Range": "bytes 0-52428799/104857600",
});
expect(getStreamSize(headers)).toBe(52428800);
});

test("getStreamSize extracts size correctly from Content-Range (without total)", () => {
const headers = new Headers({
"Content-Range": "bytes 500-999",
});
expect(getStreamSize(headers)).toBe(500);
});

test("getStreamSize returns undefined when no size headers are present", () => {
const headers = new Headers();
expect(getStreamSize(headers)).toBeUndefined();
});
});

describe("Stream limit function", () => {
test("limit correctly truncates a stream", async () => {
const data = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
const stream = new Blob([data]).stream();
const limitedStream = limit(stream, 5);

const reader = limitedStream.getReader();
const chunks = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(...value);
}

expect(chunks).toHaveLength(5);
expect(new Uint8Array(chunks)).toEqual(new Uint8Array([1, 2, 3, 4, 5]));
});

test("limit handles stream smaller than limit", async () => {
const data = new Uint8Array([1, 2, 3]);
const stream = new Blob([data]).stream();
const limitedStream = limit(stream, 10);

const reader = limitedStream.getReader();
const chunks = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(...value);
}

expect(chunks).toHaveLength(3);
expect(new Uint8Array(chunks)).toEqual(new Uint8Array([1, 2, 3]));
});
});