Skip to content

Commit 437eacb

Browse files
authored
fix: clean up old jobs regardless of pending status (#23260)
.
1 parent 498b961 commit 437eacb

3 files changed

Lines changed: 15 additions & 20 deletions

File tree

.test_patterns.yml

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -197,14 +197,6 @@ tests:
197197
- *phil
198198
- *palla
199199

200-
# http://ci.aztec-labs.com/64a972aafaa40dd0
201-
# ProvingBroker › Retries › does not retry if job is stale — kv-store closes
202-
# before the broker's final reportProvingJobError write lands.
203-
- regex: "prover-client/src/proving_broker/proving_broker.test.ts"
204-
error_regex: "does not retry if job is stale|Store is closed"
205-
owners:
206-
- *alex
207-
208200
# Nightly GKE tests
209201
- regex: "spartan/bootstrap.sh"
210202
owners:

yarn-project/prover-client/src/proving_broker/proving_broker.test.ts

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -856,7 +856,7 @@ describe.each([
856856
await assertJobTransition(id, 'in-progress', 'in-queue');
857857
});
858858

859-
it('cancel stale jobs that time out', async () => {
859+
it('cleans up stale in-progress jobs before deleting their epoch database', async () => {
860860
const id = makeRandomProvingJobId();
861861
await broker.enqueueProvingJob({
862862
id,
@@ -887,10 +887,9 @@ describe.each([
887887
inputsUri: makeInputsUri(),
888888
});
889889

890-
// advance time again so job times out. Since the job was in-progress, it won't be cleaned up as stale
891-
// but will be rejected when it times out
892-
await sleep(jobTimeoutMs + brokerIntervalMs);
893-
await assertJobStatus(id, 'rejected');
890+
// the epoch-1 database is old enough to delete, so the broker closes any remaining epoch-1 jobs
891+
await (broker as any).cleanupPass();
892+
await assertJobStatus(id, 'not-found');
894893
});
895894

896895
it('rejects jobs that time out more than maxRetries times', async () => {
@@ -1070,13 +1069,15 @@ describe.each([
10701069
inputsUri: makeInputsUri(),
10711070
});
10721071

1073-
await sleep(brokerIntervalMs);
1072+
await (broker as any).cleanupPass();
1073+
await assertJobStatus(id, 'not-found');
10741074

1075-
// job was in-progress so it won't be cleaned up as stale, but will be rejected on error
1075+
// the epoch-1 database has been deleted, so late worker reports are ignored
1076+
jest.spyOn(database, 'setProvingJobError');
10761077
await broker.reportProvingJobError(id, 'test error', true);
1078+
expect(database.setProvingJobError).not.toHaveBeenCalled();
10771079
await expect(broker.getProvingJobStatus(id)).resolves.toEqual({
1078-
status: 'rejected',
1079-
reason: 'test error',
1080+
status: 'not-found',
10801081
});
10811082
});
10821083
});

yarn-project/prover-client/src/proving_broker/proving_broker.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Pr
319319
}
320320

321321
private cleanUpProvingJobState(ids: ProvingJobId[]) {
322+
const idsToClean = new Set(ids);
322323
for (const id of ids) {
323324
this.jobsCache.delete(id);
324325
const deferred = this.promises.get(id);
@@ -331,6 +332,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Pr
331332
this.retries.delete(id);
332333
this.enqueuedAt.delete(id);
333334
}
335+
this.completedJobNotifications = this.completedJobNotifications.filter(id => !idsToClean.has(id));
334336
}
335337

336338
#getProvingJobStatus(id: ProvingJobId): ProvingJobStatus {
@@ -598,21 +600,21 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Pr
598600
}
599601

600602
private async cleanupPass() {
601-
this.cleanupStaleJobs();
602603
this.reEnqueueExpiredJobs();
603604
const oldestEpochToKeep = this.oldestEpochToKeep();
604605
if (oldestEpochToKeep > 0) {
606+
this.cleanupJobsOlderThanEpoch(EpochNumber(oldestEpochToKeep));
605607
await this.database.deleteAllProvingJobsOlderThanEpoch(EpochNumber(oldestEpochToKeep));
606608
this.logger.trace(`Deleted all epochs older than ${oldestEpochToKeep}`);
607609
}
608610
}
609611

610-
private cleanupStaleJobs() {
612+
private cleanupJobsOlderThanEpoch(epochNumber: EpochNumber) {
611613
const jobIds = Array.from(this.jobsCache.keys());
612614
const jobsToClean: ProvingJobId[] = [];
613615
for (const id of jobIds) {
614616
const job = this.jobsCache.get(id)!;
615-
if (this.isJobStale(job) && !this.inProgress.has(id) && !this.resultsCache.has(id)) {
617+
if (job.epochNumber < epochNumber) {
616618
jobsToClean.push(id);
617619
}
618620
}

0 commit comments

Comments
 (0)