Skip to content

Commit 7349fe3

Browse files
committed
Use current time for reply backoff
Base remote reply scrape completion, failure, and 429 backoff timestamps on the worker clock at the point where processing finishes instead of the job start time. This keeps retry windows from becoming immediately claimable after long-running scrapes. #447 (comment) Assisted-by: Codex:gpt-5.5
1 parent 4c9a209 commit 7349fe3

2 files changed

Lines changed: 57 additions & 10 deletions

File tree

src/federation/replies-worker.test.ts

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,52 @@ describe("remote replies scrape worker", () => {
364364
expect(origin?.nextRequestAt.getTime()).toBe(now.getTime() + 120_000);
365365
});
366366

367+
it("bases 429 backoff on the actual failure time", async () => {
368+
expect.assertions(3);
369+
const { jobId, postIri, repliesIri } = await seedPostWithScrapeJob();
370+
const now = new Date("2026-04-25T00:00:00.000Z");
371+
const failureTime = new Date("2026-04-25T00:10:00.000Z");
372+
const error = new Error("rate limited") as Error & {
373+
response: Response;
374+
};
375+
error.response = new Response(null, {
376+
status: 429,
377+
headers: { "Retry-After": "120" },
378+
});
379+
380+
await processDueRemoteReplyScrapeJobs({
381+
clock: () => failureTime,
382+
documentLoader: async (url): Promise<RemoteDocument> => {
383+
if (url === repliesIri) {
384+
return {
385+
contextUrl: null,
386+
document: collection(repliesIri, [
387+
reply({
388+
id: "https://remote.test/@replyer/posts/1",
389+
replyTarget: postIri,
390+
}),
391+
]),
392+
documentUrl: url,
393+
};
394+
}
395+
if (url === "https://remote.test/@replyer") throw error;
396+
throw new Error(`Unexpected fetch: ${url}`);
397+
},
398+
now,
399+
sleep: async () => undefined,
400+
});
401+
402+
const job = await db.query.remoteReplyScrapeJobs.findFirst({
403+
where: eq(remoteReplyScrapeJobs.id, jobId),
404+
});
405+
const origin = await db.query.remoteReplyScrapeOrigins.findFirst();
406+
expect(job?.status).toBe("pending");
407+
expect(job?.nextAttemptAt.getTime()).toBe(failureTime.getTime() + 120_000);
408+
expect(origin?.nextRequestAt.getTime()).toBe(
409+
failureTime.getTime() + 120_000,
410+
);
411+
});
412+
367413
it("records per-request timestamps for throttled origin request fields", async () => {
368414
expect.assertions(3);
369415
const { postIri, repliesIri } = await seedPostWithScrapeJob();
@@ -397,6 +443,6 @@ describe("remote replies scrape worker", () => {
397443
expect(origin?.nextRequestAt.toISOString()).toBe(
398444
"2026-04-25T00:00:12.000Z",
399445
);
400-
expect(origin?.updated.toISOString()).toBe("2026-04-25T00:00:02.000Z");
446+
expect(origin?.updated.toISOString()).toBe("2026-04-25T00:00:03.000Z");
401447
});
402448
});

src/federation/replies-worker.ts

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -196,13 +196,13 @@ async function processRemoteReplyScrapeJob(
196196
job: RemoteReplyScrapeJob,
197197
options: ProcessRemoteReplyScrapeJobsOptions,
198198
): Promise<number> {
199-
const now = options.now ?? new Date();
199+
const clock = options.clock ?? (() => options.now ?? new Date());
200200
const post = await db.query.posts.findFirst({
201201
where: eq(posts.id, job.postId),
202202
});
203203

204204
if (post == null) {
205-
await failJob(job, "Post not found", now);
205+
await failJob(job, "Post not found", clock());
206206
return 0;
207207
}
208208

@@ -213,7 +213,7 @@ async function processRemoteReplyScrapeJob(
213213
documentLoader,
214214
intervalSeconds:
215215
options.intervalSeconds ?? REMOTE_REPLIES_SCRAPE_INTERVAL_SECONDS,
216-
clock: options.clock ?? (() => new Date()),
216+
clock,
217217
sleep: options.sleep ?? sleep,
218218
});
219219
let lastFetchError: unknown;
@@ -275,24 +275,25 @@ async function processRemoteReplyScrapeJob(
275275

276276
await updatePostStats(db, { id: job.postId });
277277
await updateScrapedRepliesCount(job.postId);
278-
await completeJob(job, fetchedItems, now);
278+
await completeJob(job, fetchedItems, clock());
279279
return fetchedItems;
280280
} catch (error) {
281281
if (getErrorStatus(error) === 429) {
282+
const failedAt = clock();
282283
await backOffJob(
283284
job,
284-
retryAfterSeconds(error) ??
285+
retryAfterSeconds(error, failedAt) ??
285286
options.backoffSeconds ??
286287
REMOTE_REPLIES_SCRAPE_BACKOFF_SECONDS,
287288
error,
288-
now,
289+
failedAt,
289290
);
290291
return 0;
291292
}
292293
await failJob(
293294
job,
294295
error instanceof Error ? error.message : String(error),
295-
now,
296+
clock(),
296297
);
297298
return 0;
298299
}
@@ -458,7 +459,7 @@ function getErrorStatus(error: unknown): number | null {
458459
return error.response.status;
459460
}
460461

461-
function retryAfterSeconds(error: unknown): number | null {
462+
function retryAfterSeconds(error: unknown, now = new Date()): number | null {
462463
if (
463464
error == null ||
464465
typeof error !== "object" ||
@@ -476,7 +477,7 @@ function retryAfterSeconds(error: unknown): number | null {
476477

477478
const date = Date.parse(retryAfter);
478479
if (Number.isNaN(date)) return null;
479-
return Math.max(0, Math.ceil((date - Date.now()) / 1000));
480+
return Math.max(0, Math.ceil((date - now.getTime()) / 1000));
480481
}
481482

482483
function sleep(milliseconds: number): Promise<void> {

0 commit comments

Comments
 (0)