Skip to content

Commit 92b3cf2

Browse files
committed
fix: retry 1102 and upstream 5xx errors during R2 cache population
1 parent 274c20e commit 92b3cf2

2 files changed

Lines changed: 64 additions & 26 deletions

File tree

packages/cloudflare/src/cli/commands/populate-cache.spec.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ describe("populateCache", () => {
133133
describe("R2 incremental cache", () => {
134134
afterEach(() => {
135135
vi.resetAllMocks();
136+
vi.useRealTimers();
136137
mockFs.restore();
137138
});
138139

@@ -150,6 +151,7 @@ describe("populateCache", () => {
150151
const mockWorkerDispose = vi.fn();
151152

152153
setupMockFileSystem();
154+
vi.useFakeTimers();
153155
// @ts-expect-error - Mock unstable_startWorker to return a mock worker instance
154156
vi.mocked(unstable_startWorker).mockResolvedValueOnce({
155157
ready: Promise.resolve(),

packages/cloudflare/src/cli/commands/populate-cache.ts

Lines changed: 62 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import fs from "node:fs";
22
import fsp from "node:fs/promises";
33
import os from "node:os";
44
import path from "node:path";
5+
import { setTimeout } from "node:timers/promises";
56
import { fileURLToPath } from "node:url";
67

78
import type { BuildOptions } from "@opennextjs/aws/build/helper.js";
@@ -361,6 +362,14 @@ async function sendEntriesToR2Worker(options: {
361362
let concurrency = 1;
362363

363364
for (const entry of tqdm(entries)) {
365+
const task = sendEntryToR2Worker({
366+
workerUrl,
367+
key: entry.key,
368+
filename: entry.filename,
369+
}).finally(() => pending.delete(task));
370+
371+
pending.add(task);
372+
364373
// If we've reached the concurrency limit, wait for one to finish.
365374
if (pending.size >= concurrency) {
366375
await Promise.race(pending);
@@ -370,18 +379,13 @@ async function sendEntriesToR2Worker(options: {
370379
concurrency++;
371380
}
372381
}
373-
374-
const task = sendEntryToR2Worker({
375-
workerUrl,
376-
key: entry.key,
377-
filename: entry.filename,
378-
}).finally(() => pending.delete(task));
379-
pending.add(task);
380382
}
381383

382384
await Promise.all(pending);
383385
}
384386

387+
class RetryableWorkerError extends Error {}
388+
385389
/**
386390
* Sends a single cache entry to the R2 worker.
387391
*
@@ -400,30 +404,62 @@ async function sendEntryToR2Worker(options: {
400404
filename: string;
401405
}): Promise<void> {
402406
const { workerUrl, key, filename } = options;
407+
const payload = fs.readFileSync(filename);
408+
409+
const CLIENT_RETRY_ATTEMPTS = 4;
410+
const CLIENT_RETRY_BASE_DELAY_MS = 250;
411+
412+
for (let attempt = 0; attempt < CLIENT_RETRY_ATTEMPTS; attempt++) {
413+
try {
414+
const response = await fetch(workerUrl, {
415+
method: "POST",
416+
headers: {
417+
"x-opennext-cache-key": key,
418+
},
419+
body: payload,
420+
});
403421

404-
const response = await fetch(workerUrl, {
405-
method: "POST",
406-
headers: {
407-
"x-opennext-cache-key": key,
408-
},
409-
body: fs.readFileSync(filename),
410-
});
422+
const body = await response.text();
423+
let result: R2Response;
424+
425+
try {
426+
result = JSON.parse(body) as R2Response;
427+
} catch (e) {
428+
if (body.includes("Worker exceeded resource limits")) {
429+
throw new RetryableWorkerError("Worker exceeded resource limits", { cause: e });
430+
}
431+
432+
if (response.status > 500) {
433+
throw new RetryableWorkerError(
434+
`Worker returned a ${response.status} ${response.statusText} response`,
435+
{ cause: e }
436+
);
437+
}
438+
439+
throw new Error(`Unexpected ${response.status} response from R2 worker: ${body}`, {
440+
cause: e,
441+
});
442+
}
411443

412-
const body = await response.text();
413-
let result: R2Response;
444+
if (!result.success) {
445+
throw new Error(`Failed to write "${key}" to R2: ${result.error}`);
446+
}
414447

415-
try {
416-
result = JSON.parse(body) as R2Response;
417-
} catch (e) {
418-
throw new Error(`Unexpected response from R2 worker: ${body}`, {
419-
cause: e,
420-
});
421-
}
448+
return;
449+
} catch (e) {
450+
if (e instanceof RetryableWorkerError && attempt < CLIENT_RETRY_ATTEMPTS - 1) {
451+
logger.error(
452+
`Attempt ${attempt + 1} to write "${key}" failed with a retryable error: ${e.message}. Retrying...`
453+
);
454+
await setTimeout(CLIENT_RETRY_BASE_DELAY_MS * Math.pow(2, attempt));
455+
continue;
456+
}
422457

423-
if (!result.success) {
424-
logger.error(`Failed to write "${key}" to R2: ${result.error}`);
425-
throw new Error(result.error);
458+
throw e;
459+
}
426460
}
461+
462+
throw new Error(`Failed to write "${key}" to R2 after ${CLIENT_RETRY_ATTEMPTS} attempts`);
427463
}
428464

429465
async function populateKVIncrementalCache(

0 commit comments

Comments
 (0)