diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.ts index 27364938d5e1..a9baf923b641 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.ts @@ -412,6 +412,17 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Pr return; } + if (this.isJobStale(item)) { + // Job belongs to an epoch that has been (or is being) cleaned up. Don't persist a + // late error report — the cleanupPass will drop in-memory state for it, and writing + // to the deleted-epoch database racing with that teardown surfaces as 'Store is closed'. + this.logger.warn(`Discarding error report for stale proving job id=${id} epochNumber=${item.epochNumber}`, { + provingJobId: id, + }); + this.inProgress.delete(id); + return; + } + if (!info) { this.logger.warn(`Proving job id=${id} type=${ProvingRequestType[item.type]} not in the in-progress set`, { provingJobId: id, @@ -554,6 +565,14 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Pr return; } + if (this.isJobStale(item)) { + this.logger.warn(`Discarding result for stale proving job id=${id} epochNumber=${item.epochNumber}`, { + provingJobId: id, + }); + this.inProgress.delete(id); + return; + } + if (!info) { this.logger.warn(`Proving job id=${id} type=${ProvingRequestType[item.type]} not in the in-progress set`, { provingJobId: id, diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker_database/persisted.ts b/yarn-project/prover-client/src/proving_broker/proving_broker_database/persisted.ts index 021a1e64c84c..32cd291c7e16 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker_database/persisted.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker_database/persisted.ts @@ -87,6 +87,8 @@ export class KVBrokerDatabase implements ProvingBrokerDatabase { public readonly tracer: Tracer; + private deletedEpochs = new Set(); + private constructor( private epochs: Map, private config: ProverBrokerConfig, @@ -113,11 +115,26 @@ export class KVBrokerDatabase implements ProvingBrokerDatabase { // exposed for testing public async commitWrites(items: Array, epochNumber: number) { + if (this.deletedEpochs.has(epochNumber)) { + // Epoch was already pruned; the broker no longer cares about these writes. + return; + } + const jobsToAdd = items.filter((item): item is ProvingJob => 'id' in item); const resultsToAdd = items.filter((item): item is [ProvingJobId, ProvingJobSettledResult] => Array.isArray(item)); const db = await this.getEpochDatabase(EpochNumber(epochNumber)); - await db.batchWrite(jobsToAdd, resultsToAdd); + try { + await db.batchWrite(jobsToAdd, resultsToAdd); + } catch (err) { + // The store can be closed concurrently by deleteAllProvingJobsOlderThanEpoch while a + // batch is mid-flight. Treat this as a benign no-op — the epoch is being torn down. + if (err instanceof Error && err.message === 'Store is closed') { + this.logger.verbose(`Dropping batch for closed epoch ${epochNumber} store`); + return; + } + throw err; + } } private async estimateSize() { @@ -181,14 +198,19 @@ export class KVBrokerDatabase implements ProvingBrokerDatabase { })) async deleteAllProvingJobsOlderThanEpoch(epochNumber: EpochNumber): Promise { const oldEpochs = Array.from(this.epochs.keys()).filter(e => e < Number(epochNumber)); + // Mark before tearing down: this prevents commitWrites from reopening a deleted epoch's + // directory if a stale batch arrives mid-delete. + for (const old of oldEpochs) { + this.deletedEpochs.add(old); + } for (const old of oldEpochs) { const db = this.epochs.get(old); if (!db) { continue; } this.logger.verbose(`Deleting broker database for epoch ${old}`); - await db.delete(); this.epochs.delete(old); + await db.delete(); } }