diff --git a/.changeset/fix-r2-cache-upload.md b/.changeset/fix-r2-cache-upload.md new file mode 100644 index 000000000..ff58702fa --- /dev/null +++ b/.changeset/fix-r2-cache-upload.md @@ -0,0 +1,18 @@ +--- +"@opennextjs/cloudflare": minor +--- + +fix: Use remote R2 binding for cache population to avoid API rate limits + +For deployments with prerendered pages, the R2 incremental cache is now populated +using `unstable_startWorker` with a remote R2 binding instead of `wrangler r2 bulk put`. +This bypasses the Cloudflare API rate limit of 1,200 requests per 5 minutes that caused +failures for large applications with thousands of prerendered pages. + +The new approach: +1. Starts a local worker via `unstable_startWorker` with the R2 binding configured programmatically +2. Sends cache entries to the local worker a few at a time for low memory usage +3. The worker writes entries to R2 via the binding (no API rate limits) +4. No deployment, temp config files, or authentication tokens required + +Closes #1088 diff --git a/packages/cloudflare/src/cli/commands/populate-cache.spec.ts b/packages/cloudflare/src/cli/commands/populate-cache.spec.ts index 132c08de2..8177829ad 100644 --- a/packages/cloudflare/src/cli/commands/populate-cache.spec.ts +++ b/packages/cloudflare/src/cli/commands/populate-cache.spec.ts @@ -78,6 +78,20 @@ vi.mock("./utils/helpers.js", () => ({ quoteShellMeta: vi.fn((s) => s), })); +const mockWorkerFetch = vi.fn(); +const mockWorkerDispose = vi.fn(); + +vi.mock("wrangler", () => ({ + unstable_startWorker: vi.fn(() => + Promise.resolve({ + ready: Promise.resolve(), + url: Promise.resolve(new URL("http://localhost:12345")), + fetch: mockWorkerFetch, + dispose: mockWorkerDispose, + }) + ), +})); + describe("populateCache", () => { const setupMockFileSystem = () => { mockFs({ @@ -100,13 +114,19 @@ describe("populateCache", () => { ({ target }) => { afterEach(() => { mockFs.restore(); + vi.clearAllMocks(); }); - test(target, async () => { - const { runWrangler } = await import("./utils/run-wrangler.js"); - + test(`${target} - starts worker and sends cache entries`, async () => { setupMockFileSystem(); - vi.mocked(runWrangler).mockClear(); + + // Mock fetch to return a successful response for each batch + global.fetch = vi.fn().mockResolvedValue( + new Response(JSON.stringify({ written: 1, failed: 0 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }) + ); await populateCache( { @@ -131,49 +151,108 @@ describe("populateCache", () => { {} as any // eslint-disable-line @typescript-eslint/no-explicit-any ); - expect(runWrangler).toHaveBeenCalledWith( - expect.anything(), - expect.arrayContaining(["r2 bulk put", "test-bucket"]), - expect.objectContaining({ target }) + const { unstable_startWorker: startWorker } = await import("wrangler"); + expect(startWorker).toHaveBeenCalledWith( + expect.objectContaining({ + name: "open-next-cache-populate", + compatibilityDate: "2026-01-01", + bindings: expect.objectContaining({ + NEXT_INC_CACHE_R2_BUCKET: expect.objectContaining({ + type: "r2_bucket", + bucket_name: "test-bucket", + remote: target === "remote", + }), + }), + }) + ); + + // Verify fetch was called with the /populate URL + expect(global.fetch).toHaveBeenCalledWith( + "http://localhost:12345/populate", + expect.objectContaining({ method: "POST" }) ); + + // Verify worker was disposed + expect(mockWorkerDispose).toHaveBeenCalled(); }); + } + ); - test(`${target} using jurisdiction`, async () => { - const { runWrangler } = await import("./utils/run-wrangler.js"); + describe("retry on partial failures", () => { + afterEach(() => { + mockFs.restore(); + vi.clearAllMocks(); + }); - setupMockFileSystem(); - vi.mocked(runWrangler).mockClear(); + test("retries failed entries from 207 response", async () => { + setupMockFileSystem(); - await populateCache( - { - outputDir: "/test/output", - } as BuildOptions, - { - default: { - override: { - incrementalCache: "cf-r2-incremental-cache", - }, - }, - } as any, // eslint-disable-line @typescript-eslint/no-explicit-any - { - r2_buckets: [ - { - binding: "NEXT_INC_CACHE_R2_BUCKET", - bucket_name: "test-bucket", - jurisdiction: "eu", - }, - ], - } as any, // eslint-disable-line @typescript-eslint/no-explicit-any - { target, shouldUsePreviewId: false }, - {} as any // eslint-disable-line @typescript-eslint/no-explicit-any + // First call: partial failure (207), second call: success + global.fetch = vi + .fn() + .mockResolvedValueOnce( + new Response( + JSON.stringify({ + written: 0, + failed: 1, + errors: [ + "incremental-cache/buildID/abc123.cache: put: Unspecified error (0)", + ], + }), + { status: 207, headers: { "Content-Type": "application/json" } } + ) + ) + .mockResolvedValue( + new Response(JSON.stringify({ written: 1, failed: 0 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }) ); - expect(runWrangler).toHaveBeenCalledWith( - expect.anything(), - expect.arrayContaining(["r2 bulk put", "test-bucket", "--jurisdiction eu"]), - expect.objectContaining({ target }) + await populateCache( + { outputDir: "/test/output" } as BuildOptions, + { + default: { override: { incrementalCache: "cf-r2-incremental-cache" } }, + } as any, // eslint-disable-line @typescript-eslint/no-explicit-any + { + r2_buckets: [{ binding: "NEXT_INC_CACHE_R2_BUCKET", bucket_name: "test-bucket" }], + } as any, // eslint-disable-line @typescript-eslint/no-explicit-any + { target: "remote", shouldUsePreviewId: false }, + {} as any // eslint-disable-line @typescript-eslint/no-explicit-any + ); + + // Should have been called at least twice (initial + retry) + expect(global.fetch).toHaveBeenCalledTimes(2); + }); + + test("retries on network errors", async () => { + setupMockFileSystem(); + + // First call: network error, second call: success + global.fetch = vi + .fn() + .mockRejectedValueOnce(new Error("fetch failed")) + .mockResolvedValue( + new Response(JSON.stringify({ written: 1, failed: 0 }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }) ); - }); - } - ); + + await populateCache( + { outputDir: "/test/output" } as BuildOptions, + { + default: { override: { incrementalCache: "cf-r2-incremental-cache" } }, + } as any, // eslint-disable-line @typescript-eslint/no-explicit-any + { + r2_buckets: [{ binding: "NEXT_INC_CACHE_R2_BUCKET", bucket_name: "test-bucket" }], + } as any, // eslint-disable-line @typescript-eslint/no-explicit-any + { target: "remote", shouldUsePreviewId: false }, + {} as any // eslint-disable-line @typescript-eslint/no-explicit-any + ); + + // Should have been called at least twice (initial + retry) + expect(global.fetch).toHaveBeenCalledTimes(2); + }); + }); }); diff --git a/packages/cloudflare/src/cli/commands/populate-cache.ts b/packages/cloudflare/src/cli/commands/populate-cache.ts index e80f88e13..dce82c5d7 100644 --- a/packages/cloudflare/src/cli/commands/populate-cache.ts +++ b/packages/cloudflare/src/cli/commands/populate-cache.ts @@ -2,6 +2,7 @@ import fs from "node:fs"; import fsp from "node:fs/promises"; import os from "node:os"; import path from "node:path"; +import { fileURLToPath } from "node:url"; import type { BuildOptions } from "@opennextjs/aws/build/helper.js"; import logger from "@opennextjs/aws/logger.js"; @@ -14,6 +15,7 @@ import type { import type { IncrementalCache, TagCache } from "@opennextjs/aws/types/overrides.js"; import { globSync } from "glob"; import { tqdm } from "ts-tqdm"; +import { unstable_startWorker } from "wrangler"; import type { Unstable_Config as WranglerConfig } from "wrangler"; import type yargs from "yargs"; @@ -196,9 +198,9 @@ type PopulateCacheOptions = { */ wranglerConfigPath?: string; /** - * Chunk sizes to use when populating KV cache. Ignored for R2. + * Chunk sizes to use when populating the cache. * - * @default 25 for KV, 50 for R2 + * @default 25 for KV, 5 for R2 */ cacheChunkSize?: number; /** @@ -207,6 +209,28 @@ type PopulateCacheOptions = { shouldUsePreviewId: boolean; }; +/** + * Resolves the path to the R2 cache populate handler file. + * + * The handler is a standalone worker used with `unstable_startWorker`. + * It's located in the workers directory relative to this file. + */ +function getCachePopulateHandlerPath(): string { + const currentDir = path.dirname(fileURLToPath(import.meta.url)); + return path.join(currentDir, "../workers/r2-cache-populate-handler.js"); +} + +/** + * Populates the R2 incremental cache using a local worker with a remote R2 binding. + * + * This approach: + * 1. Starts a local worker via `unstable_startWorker` with the R2 cache binding. + * 2. Sends cache entries to the local worker a few at a time. + * 3. The worker writes entries to R2 using the binding (no API rate limits). + * + * This bypasses the Cloudflare API rate limit of 1,200 requests per 5 minutes + * that affects `wrangler r2 bulk put`. + */ async function populateR2IncrementalCache( buildOpts: BuildOptions, config: WranglerConfig, @@ -222,58 +246,178 @@ async function populateR2IncrementalCache( throw new Error(`No R2 binding ${JSON.stringify(R2_CACHE_BINDING_NAME)} found!`); } - const bucket = binding.bucket_name; - if (!bucket) { - throw new Error(`R2 binding ${JSON.stringify(R2_CACHE_BINDING_NAME)} should have a 'bucket_name'`); + const prefix = envVars[R2_CACHE_PREFIX_ENV_NAME]; + const assets = getCacheAssets(buildOpts); + + if (assets.length === 0) { + logger.info("No cache assets to populate"); + return; } - const prefix = envVars[R2_CACHE_PREFIX_ENV_NAME]; + const useRemote = populateCacheOptions.target === "remote"; + const handlerPath = getCachePopulateHandlerPath(); + + // Start the worker from a temp directory to prevent unstable_startWorker + // from picking up the project's wrangler.jsonc (which would merge in all + // bindings like DOs, KV, services, AI, etc. and cause hangs/errors). + const tempDir = await fsp.mkdtemp(path.join(os.tmpdir(), "open-next-r2-populate-")); + const originalCwd = process.cwd(); + + try { + process.chdir(tempDir); + + const worker = await unstable_startWorker({ + name: "open-next-cache-populate", + entrypoint: handlerPath, + compatibilityDate: "2026-01-01", + bindings: { + [R2_CACHE_BINDING_NAME]: { + type: "r2_bucket", + bucket_name: binding.bucket_name, + ...(binding.jurisdiction && { jurisdiction: binding.jurisdiction }), + remote: useRemote, + }, + }, + dev: { + server: { port: 0 }, + inspector: false, + watch: false, + liveReload: false, + }, + }); + + try { + await worker.ready; + const url = await worker.url; + const populateUrl = new URL("/populate", url).href; + await sendCacheEntries(populateUrl, assets, prefix, populateCacheOptions.cacheChunkSize); + } finally { + await worker.dispose(); + } + } finally { + process.chdir(originalCwd); + fs.rmSync(tempDir, { recursive: true, force: true }); + } - const assets = getCacheAssets(buildOpts); + logger.info(`Successfully populated cache with ${assets.length} assets`); +} - const objectList = assets.map(({ fullPath, key, buildId, isFetch }) => ({ - key: computeCacheKey(key, { - prefix, - buildId, - cacheType: isFetch ? "fetch" : "cache", - }), - file: fullPath, - })); +/** + * Sends cache entries to the local populate worker a few at a time. + * + * Since the worker is local, small batches avoid excessive memory usage + * without meaningful overhead. + */ +async function sendCacheEntries( + workerUrl: string, + assets: CacheAsset[], + prefix: string | undefined, + cacheChunkSize?: number +): Promise { + const batchSize = Math.max(1, cacheChunkSize ?? 5); + const totalBatches = Math.ceil(assets.length / batchSize); + + logger.info(`Populating ${assets.length} cache entries in batches of ${batchSize}`); + + let totalWritten = 0; + let totalFailed = 0; + + for (const batchIndex of tqdm(Array.from({ length: totalBatches }, (_, i) => i))) { + const batchAssets = assets.slice(batchIndex * batchSize, (batchIndex + 1) * batchSize); + + const entries = batchAssets.map(({ fullPath, key, buildId, isFetch }) => ({ + key: computeCacheKey(key, { + prefix, + buildId, + cacheType: isFetch ? "fetch" : "cache", + }), + value: fs.readFileSync(fullPath, "utf8"), + })); - const tempDir = await fsp.mkdtemp(path.join(os.tmpdir(), "open-next-")); - const listFile = path.join(tempDir, `r2-bulk-list.json`); - fs.writeFileSync(listFile, JSON.stringify(objectList)); + const result = await sendBatchWithRetry(workerUrl, entries); - const concurrency = Math.max(1, populateCacheOptions.cacheChunkSize ?? 50); - const jurisdiction = binding.jurisdiction ? `--jurisdiction ${binding.jurisdiction}` : ""; + totalWritten += result.written; + totalFailed += result.failed; - const result = runWrangler( - buildOpts, - [ - "r2 bulk put", - bucket, - `--filename ${quoteShellMeta(listFile)}`, - `--concurrency ${concurrency}`, - jurisdiction, - ], - { - target: populateCacheOptions.target, - configPath: populateCacheOptions.wranglerConfigPath, - // R2 does not support the environment flag and results in the following error: - // Incorrect type for the 'cacheExpiry' field on 'HttpMetadata': the provided value is not of type 'date'. - environment: undefined, - logging: "error", + if (result.failed > 0 && result.errors) { + logger.warn( + `Batch ${batchIndex + 1} had ${result.failed} failures: ${result.errors.slice(0, 3).join(", ")}` + ); } - ); + } - fs.rmSync(listFile, { force: true }); + if (totalFailed > 0) { + logger.warn( + `Cache population completed with ${totalFailed} failures out of ${totalWritten + totalFailed} entries` + ); + } +} - if (!result.success) { - logger.error(`Wrangler r2 bulk put command failed${result.stderr ? `:\n${result.stderr}` : ""}`); - process.exit(1); +/** + * Sends a batch of cache entries to the local worker with retry logic. + * + * Retries on: + * - HTTP errors (non-200/207 responses) + * - Network failures + * - Partial failures (207 responses) — only the failed entries are retried + */ +async function sendBatchWithRetry( + workerUrl: string, + entries: { key: string; value: string }[], + maxRetries = 3, + retryDelayMs = 1000 +): Promise<{ written: number; failed: number; errors?: string[] }> { + let lastError: Error | undefined; + let totalWritten = 0; + let remainingEntries = entries; + + for (let attempt = 0; attempt < maxRetries; attempt++) { + if (attempt > 0) { + logger.info(`Retrying ${remainingEntries.length} failed entries (attempt ${attempt + 1}/${maxRetries})...`); + await sleep(retryDelayMs * Math.pow(2, attempt - 1)); + } + + try { + const response = await fetch(workerUrl, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ entries: remainingEntries }), + }); + + if (!response.ok && response.status !== 207) { + const text = await response.text().catch(() => ""); + throw new Error(`HTTP ${response.status}: ${text || response.statusText}`); + } + + const result = (await response.json()) as { written: number; failed: number; errors?: string[] }; + totalWritten += result.written; + + if (result.failed === 0) { + return { written: totalWritten, failed: 0 }; + } + + // Extract the keys that failed so we can retry just those + const failedKeys = new Set(result.errors?.map((e) => e.split(":")[0]?.trim()).filter(Boolean)); + if (failedKeys.size > 0) { + remainingEntries = remainingEntries.filter((e) => failedKeys.has(e.key)); + } + + lastError = new Error(`${result.failed} entries failed: ${result.errors?.slice(0, 3).join(", ")}`); + } catch (error) { + lastError = error instanceof Error ? error : new Error(String(error)); + logger.warn(`Batch failed: ${lastError.message}`); + } } - logger.info(`Successfully populated cache with ${assets.length} assets`); + return { + written: totalWritten, + failed: remainingEntries.length, + errors: lastError ? [lastError.message] : undefined, + }; +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); } async function populateKVIncrementalCache( diff --git a/packages/cloudflare/src/cli/workers/r2-cache-populate-handler.spec.ts b/packages/cloudflare/src/cli/workers/r2-cache-populate-handler.spec.ts new file mode 100644 index 000000000..e9f5d83ca --- /dev/null +++ b/packages/cloudflare/src/cli/workers/r2-cache-populate-handler.spec.ts @@ -0,0 +1,179 @@ +import { describe, expect, test, vi, beforeEach } from "vitest"; + +// Mock the R2 bucket +const mockR2Bucket = { + put: vi.fn(), +}; + +// Create a mock env +const createMockEnv = (withR2 = true) => ({ + NEXT_INC_CACHE_R2_BUCKET: withR2 ? mockR2Bucket : undefined, +}); + +import handler, { populateCache } from "./r2-cache-populate-handler.js"; + +describe("r2-cache-populate-handler", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + describe("populateCache", () => { + test("returns 500 when R2 bucket is not configured", async () => { + const request = new Request("https://example.com/populate", { + method: "POST", + body: JSON.stringify({ entries: [] }), + }); + const env = createMockEnv(false) as unknown as { NEXT_INC_CACHE_R2_BUCKET: R2Bucket }; + + const response = await populateCache(request, env); + + expect(response.status).toBe(500); + expect(await response.text()).toBe("R2 bucket not configured"); + }); + + test("returns 400 for invalid JSON body", async () => { + const request = new Request("https://example.com/populate", { + method: "POST", + body: "not json", + }); + const env = createMockEnv() as unknown as { NEXT_INC_CACHE_R2_BUCKET: R2Bucket }; + + const response = await populateCache(request, env); + + expect(response.status).toBe(400); + expect(await response.text()).toBe("Invalid JSON body"); + }); + + test("returns 400 when entries is not an array", async () => { + const request = new Request("https://example.com/populate", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ entries: "not an array" }), + }); + const env = createMockEnv() as unknown as { NEXT_INC_CACHE_R2_BUCKET: R2Bucket }; + + const response = await populateCache(request, env); + + expect(response.status).toBe(400); + expect(await response.text()).toBe("Invalid request: entries must be an array"); + }); + + test("successfully writes entries to R2", async () => { + mockR2Bucket.put.mockResolvedValue(undefined); + + const request = new Request("https://example.com/populate", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + entries: [ + { key: "cache/key1", value: '{"data":"value1"}' }, + { key: "cache/key2", value: '{"data":"value2"}' }, + ], + }), + }); + const env = createMockEnv() as unknown as { NEXT_INC_CACHE_R2_BUCKET: R2Bucket }; + + const response = await populateCache(request, env); + + expect(response.status).toBe(200); + const body = await response.json(); + expect(body).toEqual({ + success: true, + written: 2, + failed: 0, + }); + + expect(mockR2Bucket.put).toHaveBeenCalledTimes(2); + expect(mockR2Bucket.put).toHaveBeenCalledWith("cache/key1", '{"data":"value1"}'); + expect(mockR2Bucket.put).toHaveBeenCalledWith("cache/key2", '{"data":"value2"}'); + }); + + test("handles partial failures", async () => { + mockR2Bucket.put + .mockResolvedValueOnce(undefined) + .mockRejectedValueOnce(new Error("R2 error")); + + const request = new Request("https://example.com/populate", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + entries: [ + { key: "cache/key1", value: '{"data":"value1"}' }, + { key: "cache/key2", value: '{"data":"value2"}' }, + ], + }), + }); + const env = createMockEnv() as unknown as { NEXT_INC_CACHE_R2_BUCKET: R2Bucket }; + + const response = await populateCache(request, env); + + expect(response.status).toBe(207); + const body = await response.json(); + expect(body.success).toBe(false); + expect(body.written).toBe(1); + expect(body.failed).toBe(1); + expect(body.errors).toBeDefined(); + expect(body.errors.length).toBe(1); + }); + + test("handles empty entries array", async () => { + const request = new Request("https://example.com/populate", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ entries: [] }), + }); + const env = createMockEnv() as unknown as { NEXT_INC_CACHE_R2_BUCKET: R2Bucket }; + + const response = await populateCache(request, env); + + expect(response.status).toBe(200); + const body = await response.json(); + expect(body).toEqual({ + success: true, + written: 0, + failed: 0, + }); + + expect(mockR2Bucket.put).not.toHaveBeenCalled(); + }); + }); + + describe("fetch handler routing", () => { + test("routes POST /populate to populateCache", async () => { + mockR2Bucket.put.mockResolvedValue(undefined); + + const request = new Request("https://example.com/populate", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ entries: [{ key: "k", value: "v" }] }), + }); + const env = createMockEnv() as unknown as { NEXT_INC_CACHE_R2_BUCKET: R2Bucket }; + + const response = await handler.fetch(request, env); + + expect(response.status).toBe(200); + expect(mockR2Bucket.put).toHaveBeenCalledWith("k", "v"); + }); + + test("returns 404 for non-POST requests", async () => { + const request = new Request("https://example.com/populate", { method: "GET" }); + const env = createMockEnv() as unknown as { NEXT_INC_CACHE_R2_BUCKET: R2Bucket }; + + const response = await handler.fetch(request, env); + + expect(response.status).toBe(404); + }); + + test("returns 404 for wrong pathname", async () => { + const request = new Request("https://example.com/other", { + method: "POST", + body: JSON.stringify({ entries: [] }), + }); + const env = createMockEnv() as unknown as { NEXT_INC_CACHE_R2_BUCKET: R2Bucket }; + + const response = await handler.fetch(request, env); + + expect(response.status).toBe(404); + }); + }); +}); diff --git a/packages/cloudflare/src/cli/workers/r2-cache-populate-handler.ts b/packages/cloudflare/src/cli/workers/r2-cache-populate-handler.ts new file mode 100644 index 000000000..975f5a803 --- /dev/null +++ b/packages/cloudflare/src/cli/workers/r2-cache-populate-handler.ts @@ -0,0 +1,123 @@ +/** + * Standalone worker for R2 cache population via remote binding. + * + * This worker is started locally with `wrangler dev` during cache population. + * The R2 cache binding is configured with `remote: true`, allowing this local + * worker to write directly to the remote R2 bucket. + * + * This bypasses the Cloudflare API rate limit of 1,200 requests per 5 minutes + * that affects `wrangler r2 bulk put`. + */ + +/** R2 bucket binding name (must match the one in r2-incremental-cache.ts) */ +const R2_CACHE_BINDING_NAME = "NEXT_INC_CACHE_R2_BUCKET"; + +interface CachePopulateEnv { + NEXT_INC_CACHE_R2_BUCKET: R2Bucket; +} + +/** Single cache entry to be written */ +export interface CacheEntry { + /** The R2 object key */ + key: string; + /** The cache value (JSON stringified) */ + value: string; +} + +/** Request body for cache population */ +export interface CachePopulateRequest { + /** Array of cache entries to write */ + entries: CacheEntry[]; +} + +/** Response from cache population */ +export interface CachePopulateResponse { + success: boolean; + written: number; + failed: number; + errors?: string[]; +} + +/** + * Writes cache entries to R2 via the binding. + * + * Accepts a POST request with a JSON body containing entries to write. + * Returns a JSON response with write results. + */ +export async function populateCache(request: Request, env: CachePopulateEnv): Promise { + const r2 = env[R2_CACHE_BINDING_NAME]; + if (!r2) { + return new Response("R2 bucket not configured", { status: 500 }); + } + + let body: CachePopulateRequest; + try { + body = await request.json(); + } catch { + return new Response("Invalid JSON body", { status: 400 }); + } + + if (!Array.isArray(body.entries)) { + return new Response("Invalid request: entries must be an array", { status: 400 }); + } + + let written = 0; + let failed = 0; + const errors: string[] = []; + + const CONCURRENCY = 50; + const entries = body.entries; + + for (let i = 0; i < entries.length; i += CONCURRENCY) { + const batch = entries.slice(i, i + CONCURRENCY); + const results = await Promise.allSettled( + batch.map(async (entry) => { + try { + await r2.put(entry.key, entry.value); + return { success: true as const }; + } catch (e) { + const errorMsg = e instanceof Error ? e.message : String(e); + return { success: false as const, error: errorMsg, key: entry.key }; + } + }) + ); + + for (const result of results) { + if (result.status === "fulfilled") { + if (result.value.success) { + written++; + } else { + failed++; + if (result.value.error) { + errors.push(`${result.value.key}: ${result.value.error}`); + } + } + } else { + failed++; + errors.push(result.reason?.message || "Unknown error"); + } + } + } + + const response: CachePopulateResponse = { + success: failed === 0, + written, + failed, + ...(errors.length > 0 && { errors: errors.slice(0, 10) }), + }; + + return new Response(JSON.stringify(response), { + status: failed === 0 ? 200 : 207, + headers: { "Content-Type": "application/json" }, + }); +} + +export default { + async fetch(request: Request, env: CachePopulateEnv): Promise { + const url = new URL(request.url); + if (request.method === "POST" && url.pathname === "/populate") { + return populateCache(request, env); + } + return new Response("Not found", { status: 404 }); + }, +};