From fd2000293baee94f09704c74d018825dbdb8b477 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Wed, 13 May 2026 20:26:18 +0000 Subject: [PATCH] fix: clean up old jobs regardless of pending status --- .test_patterns.yml | 8 -------- .../src/proving_broker/proving_broker.test.ts | 19 ++++++++++--------- .../src/proving_broker/proving_broker.ts | 8 +++++--- 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/.test_patterns.yml b/.test_patterns.yml index 10cd59cca8fd..389e57e035cd 100644 --- a/.test_patterns.yml +++ b/.test_patterns.yml @@ -197,14 +197,6 @@ tests: - *phil - *palla - # http://ci.aztec-labs.com/64a972aafaa40dd0 - # ProvingBroker › Retries › does not retry if job is stale — kv-store closes - # before the broker's final reportProvingJobError write lands. - - regex: "prover-client/src/proving_broker/proving_broker.test.ts" - error_regex: "does not retry if job is stale|Store is closed" - owners: - - *alex - # Nightly GKE tests - regex: "spartan/bootstrap.sh" owners: diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts index 34fc09fe7b06..4652413271d4 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts @@ -856,7 +856,7 @@ describe.each([ await assertJobTransition(id, 'in-progress', 'in-queue'); }); - it('cancel stale jobs that time out', async () => { + it('cleans up stale in-progress jobs before deleting their epoch database', async () => { const id = makeRandomProvingJobId(); await broker.enqueueProvingJob({ id, @@ -887,10 +887,9 @@ describe.each([ inputsUri: makeInputsUri(), }); - // advance time again so job times out. Since the job was in-progress, it won't be cleaned up as stale - // but will be rejected when it times out - await sleep(jobTimeoutMs + brokerIntervalMs); - await assertJobStatus(id, 'rejected'); + // the epoch-1 database is old enough to delete, so the broker closes any remaining epoch-1 jobs + await (broker as any).cleanupPass(); + await assertJobStatus(id, 'not-found'); }); it('rejects jobs that time out more than maxRetries times', async () => { @@ -1070,13 +1069,15 @@ describe.each([ inputsUri: makeInputsUri(), }); - await sleep(brokerIntervalMs); + await (broker as any).cleanupPass(); + await assertJobStatus(id, 'not-found'); - // job was in-progress so it won't be cleaned up as stale, but will be rejected on error + // the epoch-1 database has been deleted, so late worker reports are ignored + jest.spyOn(database, 'setProvingJobError'); await broker.reportProvingJobError(id, 'test error', true); + expect(database.setProvingJobError).not.toHaveBeenCalled(); await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ - status: 'rejected', - reason: 'test error', + status: 'not-found', }); }); }); diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.ts index decb4835eff3..27364938d5e1 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.ts @@ -319,6 +319,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Pr } private cleanUpProvingJobState(ids: ProvingJobId[]) { + const idsToClean = new Set(ids); for (const id of ids) { this.jobsCache.delete(id); const deferred = this.promises.get(id); @@ -331,6 +332,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Pr this.retries.delete(id); this.enqueuedAt.delete(id); } + this.completedJobNotifications = this.completedJobNotifications.filter(id => !idsToClean.has(id)); } #getProvingJobStatus(id: ProvingJobId): ProvingJobStatus { @@ -598,21 +600,21 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Pr } private async cleanupPass() { - this.cleanupStaleJobs(); this.reEnqueueExpiredJobs(); const oldestEpochToKeep = this.oldestEpochToKeep(); if (oldestEpochToKeep > 0) { + this.cleanupJobsOlderThanEpoch(EpochNumber(oldestEpochToKeep)); await this.database.deleteAllProvingJobsOlderThanEpoch(EpochNumber(oldestEpochToKeep)); this.logger.trace(`Deleted all epochs older than ${oldestEpochToKeep}`); } } - private cleanupStaleJobs() { + private cleanupJobsOlderThanEpoch(epochNumber: EpochNumber) { const jobIds = Array.from(this.jobsCache.keys()); const jobsToClean: ProvingJobId[] = []; for (const id of jobIds) { const job = this.jobsCache.get(id)!; - if (this.isJobStale(job) && !this.inProgress.has(id) && !this.resultsCache.has(id)) { + if (job.epochNumber < epochNumber) { jobsToClean.push(id); } }