Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions .test_patterns.yml
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,6 @@ tests:
- *phil
- *palla

# http://ci.aztec-labs.com/64a972aafaa40dd0
# ProvingBroker › Retries › does not retry if job is stale — kv-store closes
# before the broker's final reportProvingJobError write lands.
- regex: "prover-client/src/proving_broker/proving_broker.test.ts"
error_regex: "does not retry if job is stale|Store is closed"
owners:
- *alex

# Nightly GKE tests
- regex: "spartan/bootstrap.sh"
owners:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ describe.each([
await assertJobTransition(id, 'in-progress', 'in-queue');
});

it('cancel stale jobs that time out', async () => {
it('cleans up stale in-progress jobs before deleting their epoch database', async () => {
const id = makeRandomProvingJobId();
await broker.enqueueProvingJob({
id,
Expand Down Expand Up @@ -887,10 +887,9 @@ describe.each([
inputsUri: makeInputsUri(),
});

// advance time again so job times out. Since the job was in-progress, it won't be cleaned up as stale
// but will be rejected when it times out
await sleep(jobTimeoutMs + brokerIntervalMs);
await assertJobStatus(id, 'rejected');
// the epoch-1 database is old enough to delete, so the broker closes any remaining epoch-1 jobs
await (broker as any).cleanupPass();
await assertJobStatus(id, 'not-found');
});

it('rejects jobs that time out more than maxRetries times', async () => {
Expand Down Expand Up @@ -1070,13 +1069,15 @@ describe.each([
inputsUri: makeInputsUri(),
});

await sleep(brokerIntervalMs);
await (broker as any).cleanupPass();
await assertJobStatus(id, 'not-found');

// job was in-progress so it won't be cleaned up as stale, but will be rejected on error
// the epoch-1 database has been deleted, so late worker reports are ignored
jest.spyOn(database, 'setProvingJobError');
await broker.reportProvingJobError(id, 'test error', true);
expect(database.setProvingJobError).not.toHaveBeenCalled();
await expect(broker.getProvingJobStatus(id)).resolves.toEqual({
status: 'rejected',
reason: 'test error',
status: 'not-found',
});
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Pr
}

private cleanUpProvingJobState(ids: ProvingJobId[]) {
const idsToClean = new Set(ids);
for (const id of ids) {
this.jobsCache.delete(id);
const deferred = this.promises.get(id);
Expand All @@ -331,6 +332,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Pr
this.retries.delete(id);
this.enqueuedAt.delete(id);
}
this.completedJobNotifications = this.completedJobNotifications.filter(id => !idsToClean.has(id));
}

#getProvingJobStatus(id: ProvingJobId): ProvingJobStatus {
Expand Down Expand Up @@ -598,21 +600,21 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Pr
}

private async cleanupPass() {
this.cleanupStaleJobs();
this.reEnqueueExpiredJobs();
const oldestEpochToKeep = this.oldestEpochToKeep();
if (oldestEpochToKeep > 0) {
this.cleanupJobsOlderThanEpoch(EpochNumber(oldestEpochToKeep));
await this.database.deleteAllProvingJobsOlderThanEpoch(EpochNumber(oldestEpochToKeep));
this.logger.trace(`Deleted all epochs older than ${oldestEpochToKeep}`);
}
}

private cleanupStaleJobs() {
private cleanupJobsOlderThanEpoch(epochNumber: EpochNumber) {
const jobIds = Array.from(this.jobsCache.keys());
const jobsToClean: ProvingJobId[] = [];
for (const id of jobIds) {
const job = this.jobsCache.get(id)!;
if (this.isJobStale(job) && !this.inProgress.has(id) && !this.resultsCache.has(id)) {
if (job.epochNumber < epochNumber) {
jobsToClean.push(id);
}
}
Expand Down
Loading