From 7342efcf812f701dfadc0de791dbeecc021eceea Mon Sep 17 00:00:00 2001 From: Phil Windle Date: Wed, 6 May 2026 16:47:50 +0000 Subject: [PATCH] refactor(prover-client): split orchestrator into sub-tree + top-tree pair MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces a sub-tree + top-tree orchestrator pair that decomposes the existing single-class proving orchestrator along the natural state-coupling boundary — per-checkpoint block-level work vs. epoch-level top-tree work — while leaving every existing API on the legacy `EpochProver` / `ProvingOrchestrator` / `EpochProvingState` path untouched. The prover-node and e2e tests build unchanged; this PR is purely additive in surface area, with structural refactors on `ProvingOrchestrator` to share scheduling and top-tree drivers with the new `TopTreeOrchestrator`. Split out from #22990 so it can land independently. ## What's new - **`CheckpointSubTreeOrchestrator`** (`checkpoint-sub-tree-orchestrator.ts`): extends `ProvingOrchestrator`, single-checkpoint by construction. Drives chonk-verifier / base / merge / block-root / block-merge for one checkpoint and resolves a `SubTreeResult` instead of escalating to the checkpoint root — the parent's `checkAndEnqueueCheckpointRootRollup` is overridden to short-circuit. The constructor calls `super.startNewEpoch(epoch, 1, empty challenges)` to set up a single-checkpoint mini-epoch; the count and challenges are never read because the override prevents the parent's finalize / root path from running. - **`TopTreeOrchestrator`** + **`TopTreeProvingState`**: self-contained driver from checkpoint-root through epoch-root rollup. Takes per-checkpoint block-proof promises and pipelines its hint chain against them. Cancellation surfaces as `TopTreeCancelledError` so callers can distinguish reorg-driven cancel from a genuine proving failure. - **`EpochProvingContext`** (`epoch-proving-context.ts`): per-epoch shared cache for chonk-verifier proofs. Survives sub-tree cancellation so a tx that gets reorged out and re-appears in a replacement checkpoint reuses the cached proof. - **`ProvingScheduler`** (`proving-scheduler.ts`): abstract base owning the `SerialQueue` deferred-job lifecycle, the `pendingProvingJobs` controller list, and a unified `deferredProving(state, request, callback, isCancelled?)` submit envelope. The minimal `ProvingStateLike` contract is just `verifyState()` + `reject(reason)`. - **`TopTreeProvingScheduler`** (`top-tree-proving-scheduler.ts`): extends `ProvingScheduler` and holds the checkpoint-merge, padding, and root-rollup drivers (plus tree-walking helpers) shared by both orchestrators. Wraps circuit calls via a `wrapCircuitCall` hook (orchestrator overrides for spans; top-tree leaves identity) and resolves via an `onRootRollupComplete` hook to bridge the two states' differing `resolve` signatures. The per-checkpoint root driver stays subclass-specific because input-building flows differ. - **`EpochProverFactory` interface on `ProverClient`**: new factory methods `createEpochProvingContext(epochNumber)`, `createCheckpointSubTreeOrchestrator(...)`, and `createTopTreeOrchestrator()`. A single shared `BrokerCircuitProverFacade` is owned by `ProverClient` and shared across every orchestrator. ## What changes in existing code - `ProvingOrchestrator` extends `TopTreeProvingScheduler`; the inline broker-job submit envelope, queue lifecycle, and the top-tree-section drivers are inherited. `cancel()` delegates the queue-recreate + abort-jobs logic to `resetSchedulerState(this.cancelJobsOnStop)`. Three internal methods (`getOrEnqueueChonkVerifier`, `checkAndEnqueueBaseRollup`, `checkAndEnqueueCheckpointRootRollup`) become `protected` so the sub-tree can override them; `provingState` and `provingPromise` likewise become `protected` so the sub-tree can hook the parent's failure stream onto `subTreeResult`. No public API change on `ProvingOrchestrator`. - `CheckpointProvingState`: gains two read-only accessors used by the sub-tree's checkpoint-root override — `getSubTreeOutputProofs()` and `getLastArchiveSiblingPath()`. No state changes. - `ProverClient` keeps `createEpochProver()` exactly as before (each call spawns its own `BrokerCircuitProverFacade`); the new factory methods share a `getFacade()` set up in `start()` and torn down in `stop()`. `EpochProver`, `EpochProverManager`, `ServerEpochProver`, `EpochProvingState`, the integration tests in `orchestrator_*.test.ts`, `bb_prover_full_rollup.test.ts`, and `stdlib/interfaces/*` are all unchanged from `merge-train/spartan` — the prover-node and e2e tests continue to build against the existing `EpochProver` API. Migrating the prover-node onto the new factories (and the deferred-finalize flow that goes with optimistic proving) is the follow-up PR. ## Test plan - [x] 261 prover-client tests pass (full `yarn workspace @aztec/prover-client test`). - [x] `yarn build` clean against current merge-train/spartan (modulo the pre-existing `@aztec/sqlite3mc-wasm` issue inherited from baseline). --- .../prover-client/src/mocks/test_context.ts | 1 + .../orchestrator/checkpoint-proving-state.ts | 13 + .../checkpoint-sub-tree-orchestrator.test.ts | 149 +++++++++ .../checkpoint-sub-tree-orchestrator.ts | 271 +++++++++++++++ .../epoch-proving-context.test.ts | 84 +++++ .../src/orchestrator/epoch-proving-context.ts | 101 ++++++ .../prover-client/src/orchestrator/index.ts | 8 + .../src/orchestrator/orchestrator.ts | 241 ++------------ .../src/orchestrator/proving-scheduler.ts | 156 +++++++++ .../top-tree-orchestrator.test.ts | 203 +++++++++++ .../src/orchestrator/top-tree-orchestrator.ts | 314 ++++++++++++++++++ .../top-tree-proving-scheduler.ts | 154 +++++++++ .../orchestrator/top-tree-proving-state.ts | 220 ++++++++++++ .../src/prover-client/prover-client.ts | 126 ++++++- .../prover-client/src/proving_broker/index.ts | 1 + 15 files changed, 1830 insertions(+), 212 deletions(-) create mode 100644 yarn-project/prover-client/src/orchestrator/checkpoint-sub-tree-orchestrator.test.ts create mode 100644 yarn-project/prover-client/src/orchestrator/checkpoint-sub-tree-orchestrator.ts create mode 100644 yarn-project/prover-client/src/orchestrator/epoch-proving-context.test.ts create mode 100644 yarn-project/prover-client/src/orchestrator/epoch-proving-context.ts create mode 100644 yarn-project/prover-client/src/orchestrator/proving-scheduler.ts create mode 100644 yarn-project/prover-client/src/orchestrator/top-tree-orchestrator.test.ts create mode 100644 yarn-project/prover-client/src/orchestrator/top-tree-orchestrator.ts create mode 100644 yarn-project/prover-client/src/orchestrator/top-tree-proving-scheduler.ts create mode 100644 yarn-project/prover-client/src/orchestrator/top-tree-proving-state.ts diff --git a/yarn-project/prover-client/src/mocks/test_context.ts b/yarn-project/prover-client/src/mocks/test_context.ts index 750237c6d1b5..c3a38c8cf7e8 100644 --- a/yarn-project/prover-client/src/mocks/test_context.ts +++ b/yarn-project/prover-client/src/mocks/test_context.ts @@ -284,6 +284,7 @@ export class TestContext { return { constants, + checkpoint, header: checkpoint.header, blocks, l1ToL2Messages, diff --git a/yarn-project/prover-client/src/orchestrator/checkpoint-proving-state.ts b/yarn-project/prover-client/src/orchestrator/checkpoint-proving-state.ts index cebf6465e257..d8dcecf95f97 100644 --- a/yarn-project/prover-client/src/orchestrator/checkpoint-proving-state.ts +++ b/yarn-project/prover-client/src/orchestrator/checkpoint-proving-state.ts @@ -346,4 +346,17 @@ export class CheckpointProvingState { ? [this.blockProofs.getNode(rootLocation)?.provingOutput] // If there's only 1 block, its proof will be stored at the root. : this.blockProofs.getChildren(rootLocation).map(c => c?.provingOutput); } + + /** + * Returns the block-level proof outputs that feed into the checkpoint root rollup. + * Used by `CheckpointSubTreeOrchestrator` to surface its sub-tree result. + */ + public getSubTreeOutputProofs() { + return this.#getChildProofsForRoot(); + } + + /** Sibling path of the archive tree captured before any block in this checkpoint landed. */ + public getLastArchiveSiblingPath() { + return this.lastArchiveSiblingPath; + } } diff --git a/yarn-project/prover-client/src/orchestrator/checkpoint-sub-tree-orchestrator.test.ts b/yarn-project/prover-client/src/orchestrator/checkpoint-sub-tree-orchestrator.test.ts new file mode 100644 index 000000000000..667764d22dc4 --- /dev/null +++ b/yarn-project/prover-client/src/orchestrator/checkpoint-sub-tree-orchestrator.test.ts @@ -0,0 +1,149 @@ +import { FinalBlobBatchingChallenges } from '@aztec/blob-lib'; +import { EpochNumber } from '@aztec/foundation/branded-types'; +import { EthAddress } from '@aztec/foundation/eth-address'; +import { createLogger } from '@aztec/foundation/log'; + +import { TestContext } from '../mocks/test_context.js'; +import { CheckpointSubTreeOrchestrator } from './checkpoint-sub-tree-orchestrator.js'; +import { EpochProvingContext } from './epoch-proving-context.js'; + +const logger = createLogger('prover-client:test:checkpoint-sub-tree-orchestrator'); + +describe('prover/orchestrator/checkpoint-sub-tree', () => { + let context: TestContext; + let epochContext: EpochProvingContext; + + beforeEach(async () => { + context = await TestContext.new(logger); + epochContext = new EpochProvingContext(context.prover, EpochNumber(1)); + }); + + afterEach(async () => { + epochContext.stop(); + await context.cleanup(); + }); + + it('resolves the sub-tree result with block-level proofs for a single-block checkpoint', async () => { + const numBlocks = 1; + const numTxsPerBlock = 1; + const { constants, blocks, l1ToL2Messages, previousBlockHeader } = await context.makeCheckpoint(numBlocks, { + numTxsPerBlock, + }); + + const subTree = await CheckpointSubTreeOrchestrator.start( + context.worldState, + context.prover, + EthAddress.ZERO, + epochContext, + false, + 10, + constants, + l1ToL2Messages, + numBlocks, + previousBlockHeader, + ); + try { + const resultPromise = subTree.getSubTreeResult(); + + for (const block of blocks) { + const { blockNumber, timestamp } = block.header.globalVariables; + await subTree.startNewBlock(blockNumber, timestamp, block.txs.length); + if (block.txs.length > 0) { + await subTree.addTxs(block.txs); + } + await subTree.setBlockCompleted(blockNumber, block.header); + } + + const result = await resultPromise; + expect(result.blockProofOutputs).toHaveLength(1); + expect(result.blockProofOutputs[0].proof).toBeDefined(); + expect(result.previousArchiveSiblingPath).toBeDefined(); + } finally { + await subTree.stop(); + } + }); + + it('resolves with two block proofs for a two-block checkpoint', async () => { + const numBlocks = 2; + const numTxsPerBlock = 1; + const { constants, blocks, l1ToL2Messages, previousBlockHeader } = await context.makeCheckpoint(numBlocks, { + numTxsPerBlock, + }); + + const subTree = await CheckpointSubTreeOrchestrator.start( + context.worldState, + context.prover, + EthAddress.ZERO, + epochContext, + false, + 10, + constants, + l1ToL2Messages, + numBlocks, + previousBlockHeader, + ); + try { + const resultPromise = subTree.getSubTreeResult(); + + for (const block of blocks) { + const { blockNumber, timestamp } = block.header.globalVariables; + await subTree.startNewBlock(blockNumber, timestamp, block.txs.length); + if (block.txs.length > 0) { + await subTree.addTxs(block.txs); + } + await subTree.setBlockCompleted(blockNumber, block.header); + } + + const result = await resultPromise; + expect(result.blockProofOutputs).toHaveLength(2); + } finally { + await subTree.stop(); + } + }); + + it('throws when startNewEpoch is called explicitly', async () => { + const { constants, l1ToL2Messages, previousBlockHeader } = await context.makeCheckpoint(1, { numTxsPerBlock: 0 }); + const subTree = await CheckpointSubTreeOrchestrator.start( + context.worldState, + context.prover, + EthAddress.ZERO, + epochContext, + false, + 10, + constants, + l1ToL2Messages, + 1, + previousBlockHeader, + ); + try { + expect(() => subTree.startNewEpoch(EpochNumber(2), 1, FinalBlobBatchingChallenges.empty())).toThrow( + /starts its epoch in the constructor/, + ); + } finally { + await subTree.stop(); + } + }); + + it('throws when startNewCheckpoint is called explicitly', async () => { + const { constants, l1ToL2Messages, previousBlockHeader } = await context.makeCheckpoint(1, { numTxsPerBlock: 0 }); + const subTree = await CheckpointSubTreeOrchestrator.start( + context.worldState, + context.prover, + EthAddress.ZERO, + epochContext, + false, + 10, + constants, + l1ToL2Messages, + 1, + previousBlockHeader, + ); + try { + await expect(subTree.startNewCheckpoint(0, constants, l1ToL2Messages, 1, previousBlockHeader)).rejects.toThrow( + /drives its single checkpoint in `start`/, + ); + } finally { + await subTree.stop(); + } + }); +}); diff --git a/yarn-project/prover-client/src/orchestrator/checkpoint-sub-tree-orchestrator.ts b/yarn-project/prover-client/src/orchestrator/checkpoint-sub-tree-orchestrator.ts new file mode 100644 index 000000000000..fed630aff151 --- /dev/null +++ b/yarn-project/prover-client/src/orchestrator/checkpoint-sub-tree-orchestrator.ts @@ -0,0 +1,271 @@ +import { FinalBlobBatchingChallenges } from '@aztec/blob-lib'; +import type { ARCHIVE_HEIGHT, NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH } from '@aztec/constants'; +import type { EpochNumber } from '@aztec/foundation/branded-types'; +import { Fr } from '@aztec/foundation/curves/bn254'; +import type { LoggerBindings } from '@aztec/foundation/log'; +import { type PromiseWithResolvers, promiseWithResolvers } from '@aztec/foundation/promise'; +import type { Tuple } from '@aztec/foundation/serialize'; +import type { EthAddress } from '@aztec/stdlib/block'; +import type { + ForkMerkleTreeOperations, + PublicInputsAndRecursiveProof, + ReadonlyWorldStateAccess, + ServerCircuitProver, +} from '@aztec/stdlib/interfaces/server'; +import type { + BlockRollupPublicInputs, + CheckpointConstantData, + PublicChonkVerifierPublicInputs, +} from '@aztec/stdlib/rollup'; +import type { BlockHeader, Tx } from '@aztec/stdlib/tx'; +import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client'; + +import { getPublicChonkVerifierPrivateInputsFromTx } from './block-building-helpers.js'; +import type { BlockProvingState } from './block-proving-state.js'; +import type { CheckpointProvingState } from './checkpoint-proving-state.js'; +import type { EpochProvingContext } from './epoch-proving-context.js'; +import { ProvingOrchestrator } from './orchestrator.js'; + +/** + * Result of proving a single checkpoint's block-level sub-tree. + * + * Contains the final block-rollup proof outputs that feed the checkpoint root rollup, + * plus the archive sibling path captured before any block in the checkpoint landed + * (the top-tree needs this to assemble the checkpoint root rollup hints). + */ +export type SubTreeResult = { + blockProofOutputs: PublicInputsAndRecursiveProof< + BlockRollupPublicInputs, + typeof NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH + >[]; + previousArchiveSiblingPath: Tuple; +}; + +/** + * Orchestrates block-level proving for a single checkpoint, stopping at the boundary + * where checkpoint root rollup would otherwise begin. + * + * Reuses every circuit driver in `ProvingOrchestrator` (chonk verifier, base, merge, + * block-root, parity, block-merge) but overrides the gating method that escalates to + * checkpoint root rollup. Instead of escalating, the orchestrator resolves + * `getSubTreeResult()` once every block-level proof in the checkpoint's tree is ready. + * + * Wiring: a single-checkpoint epoch is created in the constructor (epoch number sourced + * from the supplied `EpochProvingContext`). The canonical way to obtain a fully-started + * sub-tree is the `start` static factory, which also drives the single internal + * `startNewCheckpoint(0, ...)` call. The sub-tree never calls `finalizeEpochStructure`; + * the override of `checkAndEnqueueCheckpointRootRollup` resolves `getSubTreeResult` once + * block-level proving completes. + */ +export class CheckpointSubTreeOrchestrator extends ProvingOrchestrator { + private readonly subTreeResult: PromiseWithResolvers; + + constructor( + dbProvider: ReadonlyWorldStateAccess & ForkMerkleTreeOperations, + prover: ServerCircuitProver, + proverId: EthAddress, + /** + * Per-epoch shared chonk-verifier proof cache. Every chonk-verifier proof started + * by this sub-tree lives on the context and survives the sub-tree's cancellation, + * so a tx whose original checkpoint is reorged out and re-appears in a replacement + * checkpoint reuses the cached proof. The context's `epochNumber` is the epoch + * this sub-tree proves into. + */ + private readonly epochContext: EpochProvingContext, + cancelJobsOnStop: boolean = false, + enqueueConcurrency: number, + telemetryClient: TelemetryClient = getTelemetryClient(), + bindings?: LoggerBindings, + ) { + super(dbProvider, prover, proverId, cancelJobsOnStop, enqueueConcurrency, telemetryClient, bindings); + + // Single-checkpoint mini-epoch by construction. The total/challenges supplied to + // `super.startNewEpoch` are never read, because the sub-tree overrides + // `checkAndEnqueueCheckpointRootRollup` to short-circuit before the parent's + // checkpoint-root / finalize machinery would consume them. + super.startNewEpoch(epochContext.epochNumber, 1, FinalBlobBatchingChallenges.empty()); + + this.subTreeResult = promiseWithResolvers(); + // Mark the rejection branch as observed so a `cancel()` or proving failure does not + // surface an unhandled rejection when no consumer awaits getSubTreeResult(). + this.subTreeResult.promise.catch(() => {}); + + // If the parent's proving state ever rejects, surface the failure on the sub-tree promise. + void this.provingPromise!.then(result => { + if (result.status === 'failure') { + this.subTreeResult.reject(new Error(result.reason)); + } + }); + } + + /** + * Constructs and starts a sub-tree for a single checkpoint. The returned sub-tree + * has had its single internal `startNewCheckpoint(0, ...)` driven; callers proceed + * directly to per-block `startNewBlock` / `addTxs` / `setBlockCompleted`. + * + * If the internal `startNewCheckpoint` rejects, the partially-constructed sub-tree + * is stopped before the error propagates, so no broker resources leak. + */ + public static async start( + dbProvider: ReadonlyWorldStateAccess & ForkMerkleTreeOperations, + prover: ServerCircuitProver, + proverId: EthAddress, + epochContext: EpochProvingContext, + cancelJobsOnStop: boolean, + enqueueConcurrency: number, + checkpointConstants: CheckpointConstantData, + l1ToL2Messages: Fr[], + totalNumBlocks: number, + headerOfLastBlockInPreviousCheckpoint: BlockHeader, + telemetryClient: TelemetryClient = getTelemetryClient(), + bindings?: LoggerBindings, + ): Promise { + const subTree = new CheckpointSubTreeOrchestrator( + dbProvider, + prover, + proverId, + epochContext, + cancelJobsOnStop, + enqueueConcurrency, + telemetryClient, + bindings, + ); + try { + await ProvingOrchestrator.prototype.startNewCheckpoint.call( + subTree, + 0, + checkpointConstants, + l1ToL2Messages, + totalNumBlocks, + headerOfLastBlockInPreviousCheckpoint, + ); + return subTree; + } catch (err) { + await subTree.stop().catch(() => {}); + throw err; + } + } + + /** Returns a promise that resolves when block-level proving completes for the checkpoint. */ + public getSubTreeResult(): Promise { + return this.subTreeResult.promise; + } + + /** + * The epoch is started in the constructor. + */ + public override startNewEpoch( + _epochNumber: EpochNumber, + _totalNumCheckpoints: number, + _finalBlobBatchingChallenges: FinalBlobBatchingChallenges, + ): void { + throw new Error('CheckpointSubTreeOrchestrator starts its epoch in the constructor; do not call startNewEpoch.'); + } + + /** + * The single internal checkpoint is started by the `start` factory + */ + public override startNewCheckpoint( + _checkpointIndex: number, + _constants: CheckpointConstantData, + _l1ToL2Messages: Fr[], + _totalNumBlocks: number, + _headerOfLastBlockInPreviousCheckpoint: BlockHeader, + ): Promise { + return Promise.reject( + new Error( + 'CheckpointSubTreeOrchestrator drives its single checkpoint in `start`; do not call startNewCheckpoint.', + ), + ); + } + + /** + * Returns the archive sibling path captured at the internal `startNewCheckpoint`. + * Available synchronously once `start` has resolved, before block-level proving + * completes. The top-tree consumer uses this to assemble checkpoint root rollup hints + * up-front so checkpoint root proofs can pipeline against in-flight sub-tree proving. + */ + public getPreviousArchiveSiblingPath(): Tuple { + const checkpoint = this.provingState!.getCheckpointProvingState(0); + if (!checkpoint) { + throw new Error('Checkpoint not started; call CheckpointSubTreeOrchestrator.start first.'); + } + return checkpoint.getLastArchiveSiblingPath(); + } + + /** + * Override the checkpoint-root boundary: instead of escalating to checkpoint root, + * resolve the sub-tree promise with the block-level proof outputs once they're all ready. + */ + // eslint-disable-next-line require-await + protected override async checkAndEnqueueCheckpointRootRollup(provingState: CheckpointProvingState): Promise { + const proofs = provingState.getSubTreeOutputProofs(); + const nonEmpty = proofs.filter((p): p is NonNullable => !!p); + if (proofs.length !== nonEmpty.length) { + // Block merge tree not fully resolved yet — will be retried as more block proofs land. + return; + } + + this.subTreeResult.resolve({ + blockProofOutputs: nonEmpty, + previousArchiveSiblingPath: provingState.getLastArchiveSiblingPath(), + }); + } + + /** + * Kickstart chonk-verifier circuits via the shared `EpochProvingContext`. The context + * owns the broker job lifecycle, so the proof survives this sub-tree's `cancel()` — + * a tx that ends up in a replacement checkpoint after a reorg can pick the cached + * promise up and skip re-proving. + */ + public override startChonkVerifierCircuits(txs: Tx[]): Promise { + if (!this.provingState?.verifyState()) { + return Promise.reject(new Error('Sub-tree proving state is not active.')); + } + const publicTxs = txs.filter(tx => tx.data.forPublic); + for (const tx of publicTxs) { + const txHash = tx.getTxHash().toString(); + const inputs = getPublicChonkVerifierPrivateInputsFromTx(tx, this.getProverId().toField()); + // Fire and forget — getOrEnqueueChonkVerifier later picks up the cached promise + // when the tx is processed inside its block. + void this.epochContext.enqueue(txHash, inputs); + } + return Promise.resolve(); + } + + /** + * Route the tx's chonk-verifier dependency through the per-epoch context: read the + * cached promise (or enqueue if missing), then `.then(handleResult)` to progress to + * the base rollup once the proof lands. + */ + protected override getOrEnqueueChonkVerifier(provingState: BlockProvingState, txIndex: number) { + if (!provingState.verifyState()) { + return; + } + + const txProvingState = provingState.getTxProvingState(txIndex); + const txHash = txProvingState.processedTx.hash.toString(); + + const handleResult = ( + result: PublicInputsAndRecursiveProof< + PublicChonkVerifierPublicInputs, + typeof NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH + >, + ) => { + if (!provingState.verifyState()) { + return; + } + txProvingState.setPublicChonkVerifierProof(result); + this.checkAndEnqueueBaseRollup(provingState, txIndex); + }; + + let promise = this.epochContext.getCached(txHash); + if (!promise) { + promise = this.epochContext.enqueue(txHash, txProvingState.getPublicChonkVerifierPrivateInputs()); + } + void promise.then(handleResult).catch(() => { + // The context self-cleans on rejection; a future call (replacement sub-tree + // for this tx) will see the miss and re-enqueue. No action needed here. + }); + } +} diff --git a/yarn-project/prover-client/src/orchestrator/epoch-proving-context.test.ts b/yarn-project/prover-client/src/orchestrator/epoch-proving-context.test.ts new file mode 100644 index 000000000000..a424d2ea5968 --- /dev/null +++ b/yarn-project/prover-client/src/orchestrator/epoch-proving-context.test.ts @@ -0,0 +1,84 @@ +import { EpochNumber } from '@aztec/foundation/branded-types'; +import { promiseWithResolvers } from '@aztec/foundation/promise'; +import type { ServerCircuitProver } from '@aztec/stdlib/interfaces/server'; +import { PublicChonkVerifierPrivateInputs } from '@aztec/stdlib/rollup'; + +import { type MockProxy, mock } from 'jest-mock-extended'; + +import { type ChonkVerifierProofResult, EpochProvingContext } from './epoch-proving-context.js'; + +describe('EpochProvingContext', () => { + let prover: MockProxy; + let context: EpochProvingContext; + + // We don't need a real proof object — assertions only check identity via + // `toHaveBeenCalledTimes` and the resolved promise. + const fakeProof = {} as ChonkVerifierProofResult; + const fakeInputs = {} as PublicChonkVerifierPrivateInputs; + + beforeEach(() => { + prover = mock(); + context = new EpochProvingContext(prover, EpochNumber(1)); + }); + + it('caches and dedupes concurrent enqueue calls for the same tx', async () => { + prover.getPublicChonkVerifierProof.mockResolvedValue(fakeProof); + + const a = context.enqueue('tx1', fakeInputs); + const b = context.enqueue('tx1', fakeInputs); + + expect(a).toBe(b); + expect(prover.getPublicChonkVerifierProof).toHaveBeenCalledTimes(1); + + await expect(a).resolves.toBe(fakeProof); + }); + + it('returns the cached promise from getCached after enqueue', () => { + prover.getPublicChonkVerifierProof.mockResolvedValue(fakeProof); + + const promise = context.enqueue('tx1', fakeInputs); + expect(context.getCached('tx1')).toBe(promise); + expect(context.getCached('tx-other')).toBeUndefined(); + }); + + it('self-cleans the cache on rejection so a subsequent enqueue can re-issue the proof', async () => { + // First call rejects; second call should re-enqueue and succeed. + const failResolvers = promiseWithResolvers(); + failResolvers.promise.catch(() => {}); + prover.getPublicChonkVerifierProof.mockReturnValueOnce(failResolvers.promise); + prover.getPublicChonkVerifierProof.mockResolvedValueOnce(fakeProof); + + const first = context.enqueue('tx1', fakeInputs); + failResolvers.reject(new Error('boom')); + await expect(first).rejects.toThrow(/boom/); + + // Cache should now be empty for tx1. + expect(context.getCached('tx1')).toBeUndefined(); + + const second = context.enqueue('tx1', fakeInputs); + expect(prover.getPublicChonkVerifierProof).toHaveBeenCalledTimes(2); + await expect(second).resolves.toBe(fakeProof); + }); + + it('aborts in-flight chonk-verifier jobs on stop', () => { + let capturedSignal: AbortSignal | undefined; + prover.getPublicChonkVerifierProof.mockImplementation((_inputs, signal) => { + capturedSignal = signal; + return new Promise(() => {}); + }); + + const promise = context.enqueue('tx1', fakeInputs); + promise.catch(() => {}); + + expect(capturedSignal?.aborted).toBe(false); + context.stop(); + expect(capturedSignal?.aborted).toBe(true); + }); + + it('rejects new enqueues after stop', async () => { + context.stop(); + const promise = context.enqueue('tx1', fakeInputs); + promise.catch(() => {}); + await expect(promise).rejects.toThrow(/stopped/); + }); +}); diff --git a/yarn-project/prover-client/src/orchestrator/epoch-proving-context.ts b/yarn-project/prover-client/src/orchestrator/epoch-proving-context.ts new file mode 100644 index 000000000000..d2801afbd79d --- /dev/null +++ b/yarn-project/prover-client/src/orchestrator/epoch-proving-context.ts @@ -0,0 +1,101 @@ +import type { NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH } from '@aztec/constants'; +import type { EpochNumber } from '@aztec/foundation/branded-types'; +import { type Logger, type LoggerBindings, createLogger } from '@aztec/foundation/log'; +import type { PublicInputsAndRecursiveProof, ServerCircuitProver } from '@aztec/stdlib/interfaces/server'; +import type { PublicChonkVerifierPrivateInputs, PublicChonkVerifierPublicInputs } from '@aztec/stdlib/rollup'; + +/** + * Result of a chonk-verifier proof, cached per tx hash on `EpochProvingContext`. + */ +export type ChonkVerifierProofResult = PublicInputsAndRecursiveProof< + PublicChonkVerifierPublicInputs, + typeof NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH +>; + +/** + * Per-epoch state shared across every `CheckpointSubTreeOrchestrator` constructed for + * the same epoch. Owns the chonk-verifier proof cache so a tx whose checkpoint is + * reorged out and re-appears in a replacement checkpoint does not have to re-prove + * its chonk circuit. + * + * The context's chonk-verifier broker jobs are deliberately submitted **outside** the + * sub-tree's deferred-proving queue. The sub-tree's `cancel()` therefore does not abort + * them — by design, because their result is tx-scoped, not sub-tree-scoped, and a + * replacement sub-tree should be able to consume the cached proof. + * + * Callers (`EpochProvingJob`, or unit tests) construct one context per epoch and pass + * it into every sub-tree they create. `stop()` aborts every in-flight chonk job. + */ +export class EpochProvingContext { + private readonly cache = new Map>(); + /** Abort controllers for in-flight chonk jobs, keyed by tx hash. */ + private readonly pending = new Map(); + private readonly log: Logger; + private stopped = false; + + constructor( + private readonly prover: ServerCircuitProver, + public readonly epochNumber: EpochNumber, + bindings?: LoggerBindings, + ) { + this.log = createLogger('prover-client:epoch-proving-context', bindings); + } + + /** + * Returns the cached chonk-verifier proof promise for the given tx hash, or + * `undefined` if none has been enqueued yet. Non-mutating. + */ + public getCached(txHash: string): Promise | undefined { + return this.cache.get(txHash); + } + + /** + * Enqueues a chonk-verifier proof for the given tx hash, returning the promise (or + * the already-cached one if already enqueued). The promise resolves when the broker + * delivers the result; on rejection (including `stop()`), the cache entry is removed + * so a subsequent caller can re-enqueue. + */ + public enqueue(txHash: string, inputs: PublicChonkVerifierPrivateInputs): Promise { + if (this.stopped) { + return Promise.reject(new Error('EpochProvingContext is stopped')); + } + + const cached = this.cache.get(txHash); + if (cached) { + return cached; + } + + const controller = new AbortController(); + this.pending.set(txHash, controller); + this.log.debug(`Enqueueing chonk-verifier circuit`, { txHash, epochNumber: this.epochNumber }); + + const promise = this.prover + .getPublicChonkVerifierProof(inputs, controller.signal, this.epochNumber) + .finally(() => this.pending.delete(txHash)); + + // Self-clean on rejection so a future caller can re-enqueue. Mark the rejection + // path as observed to silence unhandled-rejection warnings when no consumer + // awaits the promise (e.g. when the only `.then` chain belonged to a cancelled + // sub-tree's tx-proving state). + promise.catch(err => { + this.cache.delete(txHash); + this.log.debug(`Chonk-verifier proof failed; evicted from cache`, { txHash, error: `${err}` }); + }); + + this.cache.set(txHash, promise); + return promise; + } + + /** + * Aborts every in-flight chonk-verifier broker job and clears the cache. Called by + * the owning `EpochProvingJob` when the job stops. + */ + public stop() { + this.stopped = true; + for (const controller of this.pending.values()) { + controller.abort(); + } + this.pending.clear(); + this.cache.clear(); + } +} diff --git a/yarn-project/prover-client/src/orchestrator/index.ts b/yarn-project/prover-client/src/orchestrator/index.ts index a4b15f3e8916..34ddb02de8d2 100644 --- a/yarn-project/prover-client/src/orchestrator/index.ts +++ b/yarn-project/prover-client/src/orchestrator/index.ts @@ -1 +1,9 @@ export { ProvingOrchestrator } from './orchestrator.js'; +export { CheckpointSubTreeOrchestrator, type SubTreeResult } from './checkpoint-sub-tree-orchestrator.js'; +export { EpochProvingContext, type ChonkVerifierProofResult } from './epoch-proving-context.js'; +export { + TopTreeOrchestrator, + TopTreeCancelledError, + type CheckpointTopTreeData, + type TopTreeResult, +} from './top-tree-orchestrator.js'; diff --git a/yarn-project/prover-client/src/orchestrator/orchestrator.ts b/yarn-project/prover-client/src/orchestrator/orchestrator.ts index a74e7bb452c2..2f4c190f5140 100644 --- a/yarn-project/prover-client/src/orchestrator/orchestrator.ts +++ b/yarn-project/prover-client/src/orchestrator/orchestrator.ts @@ -9,12 +9,9 @@ import { import { BlockNumber, EpochNumber } from '@aztec/foundation/branded-types'; import { padArrayEnd } from '@aztec/foundation/collection'; import { Fr } from '@aztec/foundation/curves/bn254'; -import { AbortError } from '@aztec/foundation/error'; -import { type Logger, type LoggerBindings, createLogger } from '@aztec/foundation/log'; +import type { LoggerBindings } from '@aztec/foundation/log'; import { promiseWithResolvers } from '@aztec/foundation/promise'; -import { SerialQueue } from '@aztec/foundation/queue'; import { assertLength } from '@aztec/foundation/serialize'; -import { sleep } from '@aztec/foundation/sleep'; import { pushTestData } from '@aztec/foundation/testing'; import { elapsed } from '@aztec/foundation/timer'; import type { TreeNodeLocation } from '@aztec/foundation/trees'; @@ -71,6 +68,7 @@ import type { BlockProvingState } from './block-proving-state.js'; import type { CheckpointProvingState } from './checkpoint-proving-state.js'; import { EpochProvingState, type ProvingResult, type TreeSnapshots } from './epoch-proving-state.js'; import { ProvingOrchestratorMetrics } from './orchestrator_metrics.js'; +import { TopTreeProvingScheduler } from './top-tree-proving-scheduler.js'; import { TxProvingState } from './tx-proving-state.js'; /** @@ -87,28 +85,25 @@ import { TxProvingState } from './tx-proving-state.js'; /** * The orchestrator, managing the flow of recursive proving operations required to build the rollup proof tree. */ -export class ProvingOrchestrator implements EpochProver { - private provingState: EpochProvingState | undefined = undefined; - private pendingProvingJobs: AbortController[] = []; +export class ProvingOrchestrator extends TopTreeProvingScheduler implements EpochProver { + protected provingState: EpochProvingState | undefined = undefined; - private provingPromise: Promise | undefined = undefined; + protected provingPromise: Promise | undefined = undefined; private metrics: ProvingOrchestratorMetrics; + // eslint-disable-next-line aztec-custom/no-non-primitive-in-collections private dbs: Map = new Map(); - private logger: Logger; - private deferredJobQueue = new SerialQueue(); constructor( private dbProvider: ReadonlyWorldStateAccess & ForkMerkleTreeOperations, - private prover: ServerCircuitProver, + prover: ServerCircuitProver, private readonly proverId: EthAddress, private readonly cancelJobsOnStop: boolean = false, - private readonly enqueueConcurrency: number, + enqueueConcurrency: number, telemetryClient: TelemetryClient = getTelemetryClient(), bindings?: LoggerBindings, ) { - this.logger = createLogger('prover-client:orchestrator', bindings); + super(prover, enqueueConcurrency, 'prover-client:orchestrator', bindings); this.metrics = new ProvingOrchestratorMetrics(telemetryClient, 'ProvingOrchestrator'); - this.deferredJobQueue.start(this.enqueueConcurrency); } get tracer(): Tracer { @@ -123,11 +118,24 @@ export class ProvingOrchestrator implements EpochProver { return this.dbs.size; } - public async stop(): Promise { - // Grab the old queue before cancel() replaces it, so we can await its draining. - const oldQueue = this.deferredJobQueue; + protected override cancelInternal(): void { this.cancel(); - await oldQueue.cancel(); + } + + protected override wrapCircuitCall( + circuitName: string, + fn: (signal: AbortSignal) => Promise, + ): (signal: AbortSignal) => Promise { + return wrapCallbackInSpan( + this.tracer, + `ProvingOrchestrator.prover.${circuitName}`, + { [Attributes.PROTOCOL_CIRCUIT_NAME]: circuitName as CircuitName }, + fn, + ); + } + + protected override onRootRollupComplete(state: EpochProvingState) { + state.resolve({ status: 'success' }); } public startNewEpoch( @@ -520,16 +528,7 @@ export class ProvingOrchestrator implements EpochProver { * If cancelJobsOnStop is false (default), jobs remain in the broker queue and can be reused on restart/reorg. */ public cancel() { - void this.deferredJobQueue.cancel(); - // Recreate the queue so it can accept jobs for subsequent epochs. - this.deferredJobQueue = new SerialQueue(); - this.deferredJobQueue.start(this.enqueueConcurrency); - - if (this.cancelJobsOnStop) { - for (const controller of this.pendingProvingJobs) { - controller.abort(); - } - } + this.resetSchedulerState(this.cancelJobsOnStop); this.provingState?.cancel(); @@ -576,71 +575,6 @@ export class ProvingOrchestrator implements EpochProver { return epochProofResult; } - /** - * Enqueue a job to be scheduled - * @param provingState - The proving state object being operated on - * @param jobType - The type of job to be queued - * @param job - The actual job, returns a promise notifying of the job's completion - */ - private deferredProving( - provingState: EpochProvingState | CheckpointProvingState | BlockProvingState, - request: (signal: AbortSignal) => Promise, - callback: (result: T) => void | Promise, - ) { - if (!provingState.verifyState()) { - this.logger.debug(`Not enqueuing job, state no longer valid`); - return; - } - - const controller = new AbortController(); - this.pendingProvingJobs.push(controller); - - // We use a 'safeJob'. We don't want promise rejections in the proving pool, we want to capture the error here - // and reject the proving job whilst keeping the event loop free of rejections - const safeJob = async () => { - try { - // there's a delay between enqueueing this job and it actually running - if (controller.signal.aborted) { - return; - } - - const result = await request(controller.signal); - if (!provingState.verifyState()) { - this.logger.debug(`State no longer valid, discarding result`); - return; - } - - // we could have been cancelled whilst waiting for the result - // and the prover ignored the signal. Drop the result in that case - if (controller.signal.aborted) { - return; - } - - await callback(result); - } catch (err) { - if (err instanceof AbortError) { - // operation was cancelled, probably because the block was cancelled - // drop this result - return; - } - - this.logger.error(`Error thrown when proving job`, err); - provingState!.reject(`${err}`); - } finally { - const index = this.pendingProvingJobs.indexOf(controller); - if (index > -1) { - this.pendingProvingJobs.splice(index, 1); - } - } - }; - - void this.deferredJobQueue.put(async () => { - void safeJob(); - // we yield here to the macro task queue such to give Nodejs a chance to run other operatoins in between enqueues - await sleep(0); - }); - } - private async updateL1ToL2MessageTree(l1ToL2Messages: Fr[], db: MerkleTreeWriteOperations) { const l1ToL2MessagesPadded = padArrayEnd( l1ToL2Messages, @@ -763,7 +697,7 @@ export class ProvingOrchestrator implements EpochProver { // Enqueues the public chonk verifier circuit for a given transaction index, or reuses the one already enqueued. // Once completed, will enqueue the the public tx base rollup. - private getOrEnqueueChonkVerifier(provingState: BlockProvingState, txIndex: number) { + protected getOrEnqueueChonkVerifier(provingState: BlockProvingState, txIndex: number) { if (!provingState.verifyState()) { this.logger.debug('Not running chonk verifier circuit, state invalid'); return; @@ -1081,99 +1015,6 @@ export class ProvingOrchestrator implements EpochProver { ); } - private enqueueCheckpointMergeRollup(provingState: EpochProvingState, location: TreeNodeLocation) { - if (!provingState.verifyState()) { - this.logger.debug('Not running checkpoint merge rollup. State no longer valid.'); - return; - } - - if (!provingState.tryStartProvingCheckpointMerge(location)) { - this.logger.debug('Checkpoint merge rollup already started.'); - return; - } - - const inputs = provingState.getCheckpointMergeRollupInputs(location); - - this.deferredProving( - provingState, - wrapCallbackInSpan( - this.tracer, - 'ProvingOrchestrator.prover.getCheckpointMergeRollupProof', - { - [Attributes.PROTOCOL_CIRCUIT_NAME]: 'rollup-checkpoint-merge' satisfies CircuitName, - }, - signal => this.prover.getCheckpointMergeRollupProof(inputs, signal, provingState.epochNumber), - ), - result => { - this.logger.debug('Completed proof for checkpoint merge rollup.'); - provingState.setCheckpointMergeRollupProof(location, result); - this.checkAndEnqueueNextCheckpointMergeRollup(provingState, location); - }, - ); - } - - private enqueueEpochPadding(provingState: EpochProvingState) { - if (!provingState.verifyState()) { - this.logger.debug('Not running epoch padding. State no longer valid.'); - return; - } - - if (!provingState.tryStartProvingPaddingCheckpoint()) { - this.logger.debug('Padding checkpoint already started.'); - return; - } - - this.logger.debug('Padding epoch proof with a padding block root proof.'); - - const inputs = provingState.getPaddingCheckpointInputs(); - - this.deferredProving( - provingState, - wrapCallbackInSpan( - this.tracer, - 'ProvingOrchestrator.prover.getCheckpointPaddingRollupProof', - { - [Attributes.PROTOCOL_CIRCUIT_NAME]: 'rollup-checkpoint-padding' satisfies CircuitName, - }, - signal => this.prover.getCheckpointPaddingRollupProof(inputs, signal, provingState.epochNumber), - ), - result => { - this.logger.debug('Completed proof for padding checkpoint.'); - provingState.setCheckpointPaddingProof(result); - this.checkAndEnqueueRootRollup(provingState); - }, - ); - } - - // Executes the root rollup circuit - private enqueueRootRollup(provingState: EpochProvingState) { - if (!provingState.verifyState()) { - this.logger.debug('Not running root rollup, state no longer valid'); - return; - } - - this.logger.debug(`Preparing root rollup`); - - const inputs = provingState.getRootRollupInputs(); - - this.deferredProving( - provingState, - wrapCallbackInSpan( - this.tracer, - 'ProvingOrchestrator.prover.getRootRollupProof', - { - [Attributes.PROTOCOL_CIRCUIT_NAME]: 'rollup-root' satisfies CircuitName, - }, - signal => this.prover.getRootRollupProof(inputs, signal, provingState.epochNumber), - ), - result => { - this.logger.verbose(`Orchestrator completed root rollup for epoch ${provingState.epochNumber}`); - provingState.setRootRollupProof(result); - provingState.resolve({ status: 'success' }); - }, - ); - } - private checkAndEnqueueNextMergeRollup(provingState: BlockProvingState, currentLocation: TreeNodeLocation) { if (!provingState.isReadyForMergeRollup(currentLocation)) { return; @@ -1212,7 +1053,7 @@ export class ProvingOrchestrator implements EpochProver { } } - private async checkAndEnqueueCheckpointRootRollup(provingState: CheckpointProvingState) { + protected async checkAndEnqueueCheckpointRootRollup(provingState: CheckpointProvingState) { if (!provingState.isReadyForCheckpointRoot()) { return; } @@ -1220,28 +1061,6 @@ export class ProvingOrchestrator implements EpochProver { await this.enqueueCheckpointRootRollup(provingState); } - private checkAndEnqueueNextCheckpointMergeRollup(provingState: EpochProvingState, currentLocation: TreeNodeLocation) { - if (!provingState.isReadyForCheckpointMerge(currentLocation)) { - return; - } - - const parentLocation = provingState.getParentLocation(currentLocation); - if (parentLocation.level === 0) { - this.checkAndEnqueueRootRollup(provingState); - } else { - this.enqueueCheckpointMergeRollup(provingState, parentLocation); - } - } - - private checkAndEnqueueRootRollup(provingState: EpochProvingState) { - if (!provingState.isReadyForRootRollup()) { - this.logger.debug('Not ready for root rollup'); - return; - } - - this.enqueueRootRollup(provingState); - } - /** * Executes the VM circuit for a public function, will enqueue the corresponding kernel if the * previous kernel is ready @@ -1275,7 +1094,7 @@ export class ProvingOrchestrator implements EpochProver { }); } - private checkAndEnqueueBaseRollup(provingState: BlockProvingState, txIndex: number) { + protected checkAndEnqueueBaseRollup(provingState: BlockProvingState, txIndex: number) { const txProvingState = provingState.getTxProvingState(txIndex); if (!txProvingState.ready()) { return; diff --git a/yarn-project/prover-client/src/orchestrator/proving-scheduler.ts b/yarn-project/prover-client/src/orchestrator/proving-scheduler.ts new file mode 100644 index 000000000000..75561d9e911a --- /dev/null +++ b/yarn-project/prover-client/src/orchestrator/proving-scheduler.ts @@ -0,0 +1,156 @@ +import { AbortError } from '@aztec/foundation/error'; +import { type Logger, type LoggerBindings, createLogger } from '@aztec/foundation/log'; +import { SerialQueue } from '@aztec/foundation/queue'; +import { sleep } from '@aztec/foundation/sleep'; + +/** + * Minimal surface a deferred-proving state must expose. Both `EpochProvingState` / + * `CheckpointProvingState` / `BlockProvingState` (used by `ProvingOrchestrator`) and + * `TopTreeProvingState` (used by `TopTreeOrchestrator`) satisfy it. + */ +export interface ProvingStateLike { + /** Returns false once the state has been cancelled or otherwise invalidated. */ + verifyState(): boolean; + /** Surfaces a proving error to the state's owner. */ + reject(reason: string): void; +} + +/** + * Common scheduling infrastructure shared by every orchestrator that drives broker + * proving jobs: + * + * - One `SerialQueue` (`deferredJobQueue`) acting as the enqueue-throttle. + * - A list of `AbortController`s (`pendingProvingJobs`) so a `cancel()` can abort + * in-flight broker jobs when needed. + * - A `deferredProving(state, request, callback, isCancelled?)` method that wraps + * a broker request in the standard "submit, drop result if state invalidated, push + * errors to state.reject" envelope. + * + * Subclasses own their own concrete proving state and define `cancelInternal()` for + * the rest of the cleanup work (closing world-state forks, marking sub-trees + * cancelled, etc.). `stop()` lives on the base class and follows the standard pattern + * of grabbing the old queue, calling `cancelInternal()` (which recreates the queue), + * and awaiting the old queue's drain. + */ +export abstract class ProvingScheduler { + protected pendingProvingJobs: AbortController[] = []; + protected logger: Logger; + private deferredJobQueue: SerialQueue; + + constructor( + private readonly enqueueConcurrency: number, + loggerName = 'prover-client:proving-scheduler', + bindings?: LoggerBindings, + ) { + this.logger = createLogger(loggerName, bindings); + this.deferredJobQueue = new SerialQueue(); + this.deferredJobQueue.start(this.enqueueConcurrency); + } + + /** Number of broker jobs currently in flight. */ + public getNumPendingProvingJobs(): number { + return this.pendingProvingJobs.length; + } + + /** + * Drains the deferred-job queue, recreates it (so the subclass can be reused), and + * optionally aborts every in-flight broker job. Aborting is the right choice on + * reorg-driven cancel (where the in-flight inputs are no longer valid) and the + * wrong choice on shutdown (where leaving jobs in the broker queue lets a restart + * pick them up). + */ + protected resetSchedulerState(abortJobs: boolean): void { + void this.deferredJobQueue.cancel(); + this.deferredJobQueue = new SerialQueue(); + this.deferredJobQueue.start(this.enqueueConcurrency); + if (abortJobs) { + for (const controller of this.pendingProvingJobs) { + controller.abort(); + } + } + } + + /** + * Subclass-defined cancellation. Implementations call `resetSchedulerState(...)` + * and then do their own cleanup (close world-state forks, propagate cancel into + * the proving state, etc.). + */ + protected abstract cancelInternal(): void; + + /** + * Standard stop: grab the old queue, cancel (which recreates the queue), then + * await the old queue's drain so any final job tear-down has unwound before we + * return. + */ + public async stop(): Promise { + const oldQueue = this.deferredJobQueue; + this.cancelInternal(); + await oldQueue.cancel(); + } + + /** + * Submits a broker request. The returned-via-callback result is dropped if the + * state has become invalid (re-org, cancellation) by the time it lands. Errors + * are routed to `state.reject` unless they are abort-driven or the state is + * already invalid (in which case they're a stale echo of the cancel). + * + * @param state - Object exposing `verifyState()` and `reject()`. + * @param request - The broker call. Receives the controller's signal. + * @param callback - Runs on success, after `verifyState()` is checked. + * @param isCancelled - Optional extra cancellation predicate (e.g. a `cancelled` + * flag the subclass maintains independently of the state). Defaults to never. + */ + protected deferredProving( + state: S, + request: (signal: AbortSignal) => Promise, + callback: (result: T) => void | Promise, + isCancelled: () => boolean = () => false, + ): void { + if (!state.verifyState()) { + this.logger.debug(`Not enqueuing job, state no longer valid`); + return; + } + + const controller = new AbortController(); + this.pendingProvingJobs.push(controller); + + // We use a 'safeJob'. We don't want promise rejections in the proving pool — we + // want to capture the error here and reject the proving state while keeping the + // event loop free of rejections. + const safeJob = async () => { + try { + if (controller.signal.aborted) { + return; + } + const result = await request(controller.signal); + if (controller.signal.aborted || !state.verifyState() || isCancelled()) { + this.logger.debug(`State no longer valid, discarding result`); + return; + } + await callback(result); + } catch (err) { + if (err instanceof AbortError || isCancelled()) { + return; + } + if (!state.verifyState()) { + this.logger.debug(`State no longer valid, discarding error from proving job`, err); + return; + } + this.logger.error(`Error thrown when proving job`, err); + state.reject(`${err}`); + } finally { + const idx = this.pendingProvingJobs.indexOf(controller); + if (idx > -1) { + this.pendingProvingJobs.splice(idx, 1); + } + } + }; + + void this.deferredJobQueue.put(async () => { + void safeJob(); + // Yield to the macrotask queue so Node has a chance to interleave other work + // between enqueues. + await sleep(0); + }); + } +} diff --git a/yarn-project/prover-client/src/orchestrator/top-tree-orchestrator.test.ts b/yarn-project/prover-client/src/orchestrator/top-tree-orchestrator.test.ts new file mode 100644 index 000000000000..0f52cd656e24 --- /dev/null +++ b/yarn-project/prover-client/src/orchestrator/top-tree-orchestrator.test.ts @@ -0,0 +1,203 @@ +import { EpochNumber } from '@aztec/foundation/branded-types'; +import { EthAddress } from '@aztec/foundation/eth-address'; +import { createLogger } from '@aztec/foundation/log'; +import { promiseWithResolvers } from '@aztec/foundation/promise'; + +import { TestContext } from '../mocks/test_context.js'; +import { CheckpointSubTreeOrchestrator } from './checkpoint-sub-tree-orchestrator.js'; +import { EpochProvingContext } from './epoch-proving-context.js'; +import { type CheckpointTopTreeData, TopTreeCancelledError, TopTreeOrchestrator } from './top-tree-orchestrator.js'; + +const logger = createLogger('prover-client:test:top-tree-orchestrator'); + +/** + * End-to-end exercises for `TopTreeOrchestrator`. Each test drives one or more + * `CheckpointSubTreeOrchestrator`s to produce block proofs, then feeds them into a + * fresh `TopTreeOrchestrator.prove()` call and verifies the resulting epoch proof + * is well-formed. + */ +describe('prover/orchestrator/top-tree', () => { + let context: TestContext; + + beforeEach(async () => { + context = await TestContext.new(logger); + }); + + afterEach(async () => { + await context.cleanup(); + }); + + /** + * Drives a single checkpoint through `CheckpointSubTreeOrchestrator` and returns + * the assembled `CheckpointTopTreeData` plus the originating checkpoint metadata. + */ + async function driveSubTree(numBlocks: number, numTxsPerBlock: number, numL1ToL2Messages = 0) { + const fixture = await context.makeCheckpoint(numBlocks, { numTxsPerBlock, numL1ToL2Messages }); + + const epochContext = new EpochProvingContext(context.prover, EpochNumber(1)); + const subTree = await CheckpointSubTreeOrchestrator.start( + context.worldState, + context.prover, + EthAddress.ZERO, + epochContext, + false, + 10, + fixture.constants, + fixture.l1ToL2Messages, + numBlocks, + fixture.previousBlockHeader, + ); + const resultPromise = subTree.getSubTreeResult(); + + for (const block of fixture.blocks) { + const { blockNumber, timestamp } = block.header.globalVariables; + await subTree.startNewBlock(blockNumber, timestamp, block.txs.length); + if (block.txs.length > 0) { + await subTree.addTxs(block.txs); + } + await subTree.setBlockCompleted(blockNumber, block.header); + } + + const result = await resultPromise; + await subTree.stop(); + epochContext.stop(); + + const topTreeData: CheckpointTopTreeData = { + blockProofs: Promise.resolve(result.blockProofOutputs), + l2ToL1MsgsPerBlock: fixture.blocks.map(b => b.txs.map(tx => tx.txEffect.l2ToL1Msgs)), + blobFields: fixture.checkpoint.toBlobFields(), + previousBlockHeader: fixture.previousBlockHeader, + previousArchiveSiblingPath: result.previousArchiveSiblingPath, + }; + + return { fixture, topTreeData }; + } + + it('produces an epoch proof for a single-checkpoint, single-block, single-tx epoch', async () => { + const { topTreeData } = await driveSubTree(1, 1); + const challenges = await context.getFinalBlobChallenges(); + + const topTree = new TopTreeOrchestrator(context.prover, EthAddress.ZERO, 10); + try { + const result = await topTree.prove(EpochNumber(1), 1, challenges, [topTreeData]); + expect(result.proof).toBeDefined(); + expect(result.publicInputs).toBeDefined(); + expect(result.batchedBlobInputs).toBeDefined(); + } finally { + await topTree.stop(); + } + }); + + it('produces an epoch proof for a multi-checkpoint epoch', async () => { + const a = await driveSubTree(1, 1); + const b = await driveSubTree(1, 1); + const challenges = await context.getFinalBlobChallenges(); + + const topTree = new TopTreeOrchestrator(context.prover, EthAddress.ZERO, 10); + try { + const result = await topTree.prove(EpochNumber(1), 2, challenges, [a.topTreeData, b.topTreeData]); + expect(result.proof).toBeDefined(); + } finally { + await topTree.stop(); + } + }); + + it('pipelines: starts ckpt0 root rollup before ckpt1 sub-tree resolves', async () => { + // Drive both sub-trees synchronously (still no top tree running). + const a = await driveSubTree(1, 1); + const b = await driveSubTree(1, 1); + const challenges = await context.getFinalBlobChallenges(); + + // Replace ckpt1's blockProofs with a deferred promise that resolves later. + const deferred = promiseWithResolvers ? T : never>(); + const ckpt1 = { ...b.topTreeData, blockProofs: deferred.promise } as CheckpointTopTreeData; + + const topTree = new TopTreeOrchestrator(context.prover, EthAddress.ZERO, 10); + try { + // Top tree proves in the background; it should be able to advance ckpt0's root + // rollup before we resolve ckpt1's promise. + const provePromise = topTree.prove(EpochNumber(1), 2, challenges, [a.topTreeData, ckpt1]); + + // Give the orchestrator a chance to enqueue ckpt0's root rollup. + await new Promise(resolve => setTimeout(resolve, 50)); + + // Now resolve ckpt1 — the orchestrator should pick it up and continue. + deferred.resolve((await b.topTreeData.blockProofs) as any); + + const result = await provePromise; + expect(result.proof).toBeDefined(); + } finally { + await topTree.stop(); + } + }); + + it('rejects with TopTreeCancelledError when cancelled mid-flight', async () => { + const { topTreeData } = await driveSubTree(1, 1); + const challenges = await context.getFinalBlobChallenges(); + + // Block ckpt0's blockProofs forever so prove() can't finish. + const stuck = new Promise ? T : never>(() => {}); + const stuckData = { ...topTreeData, blockProofs: stuck } as CheckpointTopTreeData; + + const topTree = new TopTreeOrchestrator(context.prover, EthAddress.ZERO, 10); + const provePromise = topTree.prove(EpochNumber(1), 1, challenges, [stuckData]); + + // Yield then cancel. + await new Promise(resolve => setTimeout(resolve, 10)); + topTree.cancel({ abortJobs: true }); + + let actual: unknown; + try { + await provePromise; + } catch (err) { + actual = err; + } + expect(actual).toBeInstanceOf(TopTreeCancelledError); + }); + + it('rejects immediately if cancel is called before prove', async () => { + const { topTreeData } = await driveSubTree(1, 1); + const challenges = await context.getFinalBlobChallenges(); + + const topTree = new TopTreeOrchestrator(context.prover, EthAddress.ZERO, 10); + topTree.cancel({ abortJobs: true }); + + let actual: unknown; + try { + await topTree.prove(EpochNumber(1), 1, challenges, [topTreeData]); + } catch (err) { + actual = err; + } + expect(actual).toBeInstanceOf(TopTreeCancelledError); + await topTree.stop(); + }); + + it('rejects when prove is called twice', async () => { + const { topTreeData } = await driveSubTree(1, 1); + const challenges = await context.getFinalBlobChallenges(); + + const topTree = new TopTreeOrchestrator(context.prover, EthAddress.ZERO, 10); + try { + const first = topTree.prove(EpochNumber(1), 1, challenges, [topTreeData]); + // Second call before first settles should throw synchronously inside the function + await expect(topTree.prove(EpochNumber(1), 1, challenges, [topTreeData])).rejects.toThrow(/prove called twice/); + await first; + } finally { + await topTree.stop(); + } + }); + + it('rejects when checkpointData length disagrees with totalNumCheckpoints', async () => { + const { topTreeData } = await driveSubTree(1, 1); + const challenges = await context.getFinalBlobChallenges(); + + const topTree = new TopTreeOrchestrator(context.prover, EthAddress.ZERO, 10); + try { + await expect(topTree.prove(EpochNumber(1), 2, challenges, [topTreeData])).rejects.toThrow( + /does not match totalNumCheckpoints/, + ); + } finally { + await topTree.stop(); + } + }); +}); diff --git a/yarn-project/prover-client/src/orchestrator/top-tree-orchestrator.ts b/yarn-project/prover-client/src/orchestrator/top-tree-orchestrator.ts new file mode 100644 index 000000000000..c0479d15d8cd --- /dev/null +++ b/yarn-project/prover-client/src/orchestrator/top-tree-orchestrator.ts @@ -0,0 +1,314 @@ +import { BatchedBlobAccumulator, type FinalBlobBatchingChallenges } from '@aztec/blob-lib'; +import type { BatchedBlob } from '@aztec/blob-lib/types'; +import { + type ARCHIVE_HEIGHT, + BLOBS_PER_CHECKPOINT, + FIELDS_PER_BLOB, + type NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH, + OUT_HASH_TREE_HEIGHT, +} from '@aztec/constants'; +import type { EpochNumber } from '@aztec/foundation/branded-types'; +import { padArrayEnd } from '@aztec/foundation/collection'; +import { BLS12Point } from '@aztec/foundation/curves/bls12'; +import { Fr } from '@aztec/foundation/curves/bn254'; +import type { LoggerBindings } from '@aztec/foundation/log'; +import { promiseWithResolvers } from '@aztec/foundation/promise'; +import type { Tuple } from '@aztec/foundation/serialize'; +import { MerkleTreeCalculator, shaMerkleHash } from '@aztec/foundation/trees'; +import type { EthAddress } from '@aztec/stdlib/block'; +import type { PublicInputsAndRecursiveProof, ServerCircuitProver } from '@aztec/stdlib/interfaces/server'; +import { computeCheckpointOutHash } from '@aztec/stdlib/messaging'; +import type { Proof } from '@aztec/stdlib/proofs'; +import { + type BlockRollupPublicInputs, + CheckpointRootRollupHints, + CheckpointRootRollupPrivateInputs, + CheckpointRootSingleBlockRollupPrivateInputs, + type RootRollupPublicInputs, +} from '@aztec/stdlib/rollup'; +import { AppendOnlyTreeSnapshot } from '@aztec/stdlib/trees'; +import type { BlockHeader } from '@aztec/stdlib/tx'; +import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client'; + +import { buildBlobHints, toProofData } from './block-building-helpers.js'; +import { TopTreeProvingScheduler } from './top-tree-proving-scheduler.js'; +import { TopTreeProvingState } from './top-tree-proving-state.js'; + +/** Per-checkpoint data fed into the top tree. */ +export type CheckpointTopTreeData = { + /** + * Block-rollup proof outputs from the checkpoint's sub-tree. Passed as a Promise so the + * top tree can start (compute hints, pipeline merges) while sub-trees are still proving. + * The promise resolves to 1 entry for a single-block checkpoint, 2 for multi-block. + */ + blockProofs: Promise< + PublicInputsAndRecursiveProof[] + >; + /** L2-to-L1 messages per block in the checkpoint, used to compute the out hash. */ + l2ToL1MsgsPerBlock: Fr[][][]; + /** Blob fields encoding the checkpoint's tx effects, used to compute the blob accumulator. */ + blobFields: Fr[]; + /** Header of the last block in the previous checkpoint (or the epoch's predecessor for index 0). */ + previousBlockHeader: BlockHeader; + /** Sibling path of the archive tree before any block in this checkpoint landed. */ + previousArchiveSiblingPath: Tuple; +}; + +/** Result of proving the top tree. */ +export type TopTreeResult = { + publicInputs: RootRollupPublicInputs; + proof: Proof; + batchedBlobInputs: BatchedBlob; +}; + +/** + * Sentinel thrown by `cancel` so callers can distinguish reorg-driven cancellation from + * a genuine proving failure (and choose to rebuild + retry instead of failing the epoch). + */ +export class TopTreeCancelledError extends Error { + constructor(reason = 'Top-tree proving cancelled') { + super(reason); + this.name = 'TopTreeCancelledError'; + } +} + +type OutHashHint = { + treeSnapshot: AppendOnlyTreeSnapshot; + siblingPath: Tuple; +}; + +/** + * Drives proving from checkpoint root rollups through the epoch root rollup. Owns no + * world-state forks or tx processing — every input is supplied by the caller. + * + * Pipelined start: `prove()` does not wait for block-level proving. It pre-computes the + * out-hash and blob-accumulator hint chains immediately from archiver-derivable data, + * and each checkpoint's root rollup fires the moment its sub-tree's `blockProofs` + * promise resolves. Later checkpoints can still be block-level proving in parallel. + */ +export class TopTreeOrchestrator extends TopTreeProvingScheduler { + private state: TopTreeProvingState | undefined; + private cancelled = false; + + constructor( + prover: ServerCircuitProver, + private readonly proverId: EthAddress, + enqueueConcurrency: number, + _telemetryClient: TelemetryClient = getTelemetryClient(), + bindings?: LoggerBindings, + ) { + super(prover, enqueueConcurrency, 'prover-client:top-tree-orchestrator', bindings); + } + + public getProverId(): EthAddress { + return this.proverId; + } + + /** + * Proves the top tree from per-checkpoint data and pending block-proofs promises. + * Resolves with the final epoch proof, or rejects with `TopTreeCancelledError` if + * `cancel()` is invoked, or any other error if a circuit fails. + */ + public async prove( + epochNumber: EpochNumber, + totalNumCheckpoints: number, + finalBlobBatchingChallenges: FinalBlobBatchingChallenges, + checkpointData: CheckpointTopTreeData[], + ): Promise { + if (checkpointData.length !== totalNumCheckpoints) { + throw new Error( + `checkpointData length (${checkpointData.length}) does not match totalNumCheckpoints (${totalNumCheckpoints}).`, + ); + } + if (this.state) { + throw new Error('TopTreeOrchestrator.prove called twice; construct a new orchestrator per epoch.'); + } + // If cancel() was already called before prove() ran (e.g. a removeCheckpoint that + // landed while the caller was still preparing inputs), short-circuit the whole + // proving path. Without this, prove() would build its state, the per-checkpoint + // .then handlers would all bail on `this.cancelled`, and the completion promise + // would never resolve — prove() would hang forever. + if (this.cancelled) { + throw new TopTreeCancelledError(); + } + + const { promise: completionPromise, resolve, reject } = promiseWithResolvers(); + // The completion promise is awaited inside the try/catch below. Attach a no-op catch + // here as well so any spurious unhandled-rejection detection during cancellation + // (where reject() can fire synchronously before the await microtask installs a handler) + // is silenced. + completionPromise.catch(() => {}); + const startBlobAccumulator = BatchedBlobAccumulator.newWithChallenges(finalBlobBatchingChallenges); + + this.state = new TopTreeProvingState( + epochNumber, + totalNumCheckpoints, + finalBlobBatchingChallenges, + startBlobAccumulator, + resolve, + reason => reject(this.cancelled ? new TopTreeCancelledError(reason) : new Error(reason)), + ); + + // Compute the full out-hash hint chain and per-checkpoint start blob accumulators + // synchronously from archiver data. No proving required. + const outHashHints = await this.computeOutHashHints(checkpointData); + const checkpointStartBlobs: BatchedBlobAccumulator[] = []; + let runningBlobAccumulator = startBlobAccumulator; + for (const cd of checkpointData) { + checkpointStartBlobs.push(runningBlobAccumulator); + runningBlobAccumulator = await runningBlobAccumulator.accumulateFields(cd.blobFields); + } + this.state.setEndBlobAccumulator(runningBlobAccumulator); + + // For each checkpoint, await its block proofs promise then enqueue the checkpoint root. + // Each await runs independently — checkpoints whose sub-trees finish first start their + // root proofs first, in parallel with later checkpoints' block-level proving. + for (let i = 0; i < checkpointData.length; i++) { + const cd = checkpointData[i]; + const checkpointIndex = i; + void cd.blockProofs.then( + blockProofs => { + if (this.cancelled || !this.state?.verifyState()) { + return; + } + this.enqueueCheckpointRoot( + this.state, + checkpointIndex, + blockProofs, + cd, + outHashHints[i], + checkpointStartBlobs[i], + ); + }, + err => { + if (this.cancelled) { + return; + } + this.state?.reject(`Sub-tree for checkpoint ${i} failed: ${err}`); + }, + ); + } + + try { + await completionPromise; + await this.state.finalizeBatchedBlob(); + return this.state.getEpochProofResult(); + } catch (err: any) { + if (this.cancelled) { + throw new TopTreeCancelledError(); + } + throw err; + } + } + + /** + * Cancels in-flight proving. If `abortJobs` is true, each pending broker job is aborted + * (used on reorg, when the surviving checkpoint set differs and the in-flight jobs' + * inputs are no longer valid). On shutdown the caller passes `false` so jobs remain in + * the broker queue for reuse on restart. + */ + public cancel({ abortJobs }: { abortJobs: boolean }) { + this.cancelled = true; + this.resetSchedulerState(abortJobs); + this.state?.cancel(); + } + + /** Standard shutdown — preserve the broker queue (`abortJobs: false`). */ + protected override cancelInternal(): void { + this.cancel({ abortJobs: false }); + } + + // --- internal: per-checkpoint enqueue path --- + + protected override onRootRollupComplete(state: TopTreeProvingState) { + state.resolve(); + } + + private enqueueCheckpointRoot( + state: TopTreeProvingState, + checkpointIndex: number, + blockProofs: PublicInputsAndRecursiveProof< + BlockRollupPublicInputs, + typeof NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH + >[], + cd: CheckpointTopTreeData, + outHashHint: OutHashHint, + startBlobAccumulator: BatchedBlobAccumulator, + ) { + void this.buildCheckpointRootInputs(blockProofs, cd, outHashHint, startBlobAccumulator).then(inputs => { + this.deferredProving( + state, + signal => { + if (inputs instanceof CheckpointRootSingleBlockRollupPrivateInputs) { + return this.prover.getCheckpointRootSingleBlockRollupProof(inputs, signal, state.epochNumber); + } + return this.prover.getCheckpointRootRollupProof(inputs, signal, state.epochNumber); + }, + result => { + this.logger.debug(`Completed checkpoint root proof for checkpoint ${checkpointIndex}`); + const leafLocation = state.setCheckpointRootRollupProof(checkpointIndex, result); + if (state.totalNumCheckpoints === 1) { + this.enqueueEpochPadding(state); + } else { + this.checkAndEnqueueNextCheckpointMergeRollup(state, leafLocation); + } + }, + ); + }); + } + + private async buildCheckpointRootInputs( + blockProofs: PublicInputsAndRecursiveProof< + BlockRollupPublicInputs, + typeof NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH + >[], + cd: CheckpointTopTreeData, + outHashHint: OutHashHint, + startBlobAccumulator: BatchedBlobAccumulator, + ) { + const { blobCommitments, blobsHash } = await buildBlobHints(cd.blobFields); + + const hints = CheckpointRootRollupHints.from({ + previousBlockHeader: cd.previousBlockHeader, + previousArchiveSiblingPath: cd.previousArchiveSiblingPath, + previousOutHash: outHashHint.treeSnapshot, + newOutHashSiblingPath: outHashHint.siblingPath, + startBlobAccumulator: startBlobAccumulator.toBlobAccumulator(), + finalBlobChallenges: this.state!.finalBlobBatchingChallenges, + blobFields: padArrayEnd(cd.blobFields, Fr.ZERO, FIELDS_PER_BLOB * BLOBS_PER_CHECKPOINT), + blobCommitments: padArrayEnd(blobCommitments, BLS12Point.ZERO, BLOBS_PER_CHECKPOINT), + blobsHash, + }); + + const proofDatas = blockProofs.map(p => toProofData(p)); + return proofDatas.length === 1 + ? new CheckpointRootSingleBlockRollupPrivateInputs(proofDatas[0], hints) + : new CheckpointRootRollupPrivateInputs([proofDatas[0], proofDatas[1]], hints); + } + + private async computeOutHashHints(checkpointData: CheckpointTopTreeData[]): Promise { + const treeCalculator = await MerkleTreeCalculator.create(OUT_HASH_TREE_HEIGHT, undefined, (left, right) => + Promise.resolve(shaMerkleHash(left, right)), + ); + + const computeHint = async (leaves: Fr[]): Promise => { + const tree = await treeCalculator.computeTree(leaves.map(l => l.toBuffer())); + const nextAvailableLeafIndex = leaves.length; + return { + treeSnapshot: new AppendOnlyTreeSnapshot(Fr.fromBuffer(tree.root), nextAvailableLeafIndex), + siblingPath: tree.getSiblingPath(nextAvailableLeafIndex).map(Fr.fromBuffer) as Tuple< + Fr, + typeof OUT_HASH_TREE_HEIGHT + >, + }; + }; + + const hints: OutHashHint[] = []; + const outHashes: Fr[] = []; + for (const cd of checkpointData) { + hints.push(await computeHint(outHashes)); + outHashes.push(computeCheckpointOutHash(cd.l2ToL1MsgsPerBlock)); + } + return hints; + } +} diff --git a/yarn-project/prover-client/src/orchestrator/top-tree-proving-scheduler.ts b/yarn-project/prover-client/src/orchestrator/top-tree-proving-scheduler.ts new file mode 100644 index 000000000000..0d65cd2b1b62 --- /dev/null +++ b/yarn-project/prover-client/src/orchestrator/top-tree-proving-scheduler.ts @@ -0,0 +1,154 @@ +import type { NESTED_RECURSIVE_PROOF_LENGTH, NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH } from '@aztec/constants'; +import type { EpochNumber } from '@aztec/foundation/branded-types'; +import type { LoggerBindings } from '@aztec/foundation/log'; +import type { TreeNodeLocation } from '@aztec/foundation/trees'; +import type { PublicInputsAndRecursiveProof, ServerCircuitProver } from '@aztec/stdlib/interfaces/server'; +import type { + CheckpointMergeRollupPrivateInputs, + CheckpointPaddingRollupPrivateInputs, + CheckpointRollupPublicInputs, + RootRollupPrivateInputs, + RootRollupPublicInputs, +} from '@aztec/stdlib/rollup'; + +import { ProvingScheduler, type ProvingStateLike } from './proving-scheduler.js'; + +type CheckpointRollupProof = PublicInputsAndRecursiveProof< + CheckpointRollupPublicInputs, + typeof NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH +>; + +type RootRollupProof = PublicInputsAndRecursiveProof; + +/** + * State interface required by the top-tree proving drivers (checkpoint-merge → padding → + * root-rollup). Both `EpochProvingState` and `TopTreeProvingState` satisfy it structurally; + * the per-checkpoint state in `EpochProvingState` (block/tx proving, world-state forks) + * is owned outside this surface. + */ +export interface TopTreeStateLike extends ProvingStateLike { + readonly epochNumber: EpochNumber; + readonly totalNumCheckpoints: number; + + tryStartProvingCheckpointMerge(location: TreeNodeLocation): boolean; + setCheckpointMergeRollupProof(location: TreeNodeLocation, provingOutput: CheckpointRollupProof): void; + isReadyForCheckpointMerge(location: TreeNodeLocation): boolean; + getParentLocation(location: TreeNodeLocation): TreeNodeLocation; + getCheckpointMergeRollupInputs(location: TreeNodeLocation): CheckpointMergeRollupPrivateInputs; + + tryStartProvingPaddingCheckpoint(): boolean; + setCheckpointPaddingProof(provingOutput: CheckpointRollupProof): void; + getPaddingCheckpointInputs(): CheckpointPaddingRollupPrivateInputs; + + tryStartProvingRootRollup(): boolean; + setRootRollupProof(provingOutput: RootRollupProof): void; + isReadyForRootRollup(): boolean; + getRootRollupInputs(): RootRollupPrivateInputs; +} + +/** + * Shared scheduling for the top-tree section of epoch proving — checkpoint-merge, + * padding (single-checkpoint case), and root rollup. Both `ProvingOrchestrator` and + * `TopTreeOrchestrator` extend this; their per-checkpoint-root drivers diverge (one + * drains state-derived inputs once block-merge is done, the other builds inputs from + * caller-supplied checkpoint data), but the rest of the tree is identical. + * + * Subclasses provide a `wrapCircuitCall` hook for telemetry (the orchestrator wraps + * each call in a span; the top-tree leaves it as identity), and an + * `onRootRollupComplete` hook to invoke the right shape of `state.resolve()` — + * `EpochProvingState.resolve` takes a `ProvingResult`, `TopTreeProvingState.resolve` + * is no-arg. + */ +export abstract class TopTreeProvingScheduler extends ProvingScheduler { + constructor( + protected readonly prover: ServerCircuitProver, + enqueueConcurrency: number, + loggerName?: string, + bindings?: LoggerBindings, + ) { + super(enqueueConcurrency, loggerName, bindings); + } + + /** + * Wraps a circuit call for telemetry. Default is identity; the orchestrator overrides + * to wrap with `wrapCallbackInSpan`. + */ + protected wrapCircuitCall( + _circuitName: string, + fn: (signal: AbortSignal) => Promise, + ): (signal: AbortSignal) => Promise { + return fn; + } + + /** Called once the root rollup proof has been set; subclasses call `state.resolve(...)` with the right shape. */ + protected abstract onRootRollupComplete(state: TopTreeStateLike): void; + + protected enqueueCheckpointMergeRollup(state: TopTreeStateLike, location: TreeNodeLocation) { + if (!state.verifyState() || !state.tryStartProvingCheckpointMerge(location)) { + return; + } + const inputs = state.getCheckpointMergeRollupInputs(location); + this.deferredProving( + state, + this.wrapCircuitCall('rollup-checkpoint-merge', signal => + this.prover.getCheckpointMergeRollupProof(inputs, signal, state.epochNumber), + ), + result => { + state.setCheckpointMergeRollupProof(location, result); + this.checkAndEnqueueNextCheckpointMergeRollup(state, location); + }, + ); + } + + protected enqueueEpochPadding(state: TopTreeStateLike) { + if (!state.verifyState() || !state.tryStartProvingPaddingCheckpoint()) { + return; + } + const inputs = state.getPaddingCheckpointInputs(); + this.deferredProving( + state, + this.wrapCircuitCall('rollup-checkpoint-padding', signal => + this.prover.getCheckpointPaddingRollupProof(inputs, signal, state.epochNumber), + ), + result => { + state.setCheckpointPaddingProof(result); + this.checkAndEnqueueRootRollup(state); + }, + ); + } + + protected enqueueRootRollup(state: TopTreeStateLike) { + if (!state.verifyState() || !state.tryStartProvingRootRollup()) { + return; + } + const inputs = state.getRootRollupInputs(); + this.deferredProving( + state, + this.wrapCircuitCall('rollup-root', signal => this.prover.getRootRollupProof(inputs, signal, state.epochNumber)), + result => { + this.logger.verbose(`Completed root rollup for epoch ${state.epochNumber}`); + state.setRootRollupProof(result); + this.onRootRollupComplete(state); + }, + ); + } + + protected checkAndEnqueueNextCheckpointMergeRollup(state: TopTreeStateLike, currentLocation: TreeNodeLocation) { + if (!state.isReadyForCheckpointMerge(currentLocation)) { + return; + } + const parentLocation = state.getParentLocation(currentLocation); + if (parentLocation.level === 0) { + this.checkAndEnqueueRootRollup(state); + } else { + this.enqueueCheckpointMergeRollup(state, parentLocation); + } + } + + protected checkAndEnqueueRootRollup(state: TopTreeStateLike) { + if (!state.isReadyForRootRollup()) { + return; + } + this.enqueueRootRollup(state); + } +} diff --git a/yarn-project/prover-client/src/orchestrator/top-tree-proving-state.ts b/yarn-project/prover-client/src/orchestrator/top-tree-proving-state.ts new file mode 100644 index 000000000000..add5b990fb5c --- /dev/null +++ b/yarn-project/prover-client/src/orchestrator/top-tree-proving-state.ts @@ -0,0 +1,220 @@ +import type { BatchedBlob, BatchedBlobAccumulator, FinalBlobBatchingChallenges } from '@aztec/blob-lib'; +import type { NESTED_RECURSIVE_PROOF_LENGTH, NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH } from '@aztec/constants'; +import { EpochNumber } from '@aztec/foundation/branded-types'; +import { type TreeNodeLocation, UnbalancedTreeStore } from '@aztec/foundation/trees'; +import type { PublicInputsAndRecursiveProof } from '@aztec/stdlib/interfaces/server'; +import type { Proof } from '@aztec/stdlib/proofs'; +import { + CheckpointMergeRollupPrivateInputs, + CheckpointPaddingRollupPrivateInputs, + CheckpointRollupPublicInputs, + RootRollupPrivateInputs, + type RootRollupPublicInputs, +} from '@aztec/stdlib/rollup'; + +import { toProofData } from './block-building-helpers.js'; +import type { ProofState } from './block-proving-state.js'; + +enum TOP_TREE_LIFECYCLE { + CREATED, + RESOLVED, + REJECTED, +} + +/** + * Lean top-tree-only state. Owns the merge tree of checkpoint root proofs, the + * single-checkpoint padding proof slot, the final root rollup proof, and the blob + * accumulator endpoints needed to finalise the epoch's batched blob proof. + * + * Constructed with `totalNumCheckpoints` and `finalBlobBatchingChallenges` upfront — + * by the time the top tree starts, all checkpoints are known and the challenges are + * derivable from their blob fields. + */ +export class TopTreeProvingState { + private checkpointProofs: UnbalancedTreeStore< + ProofState + >; + private checkpointPaddingProof: + | ProofState + | undefined; + private rootRollupProof: ProofState | undefined; + private endBlobAccumulator: BatchedBlobAccumulator | undefined; + private finalBatchedBlob: BatchedBlob | undefined; + private lifecycle = TOP_TREE_LIFECYCLE.CREATED; + + constructor( + public readonly epochNumber: EpochNumber, + public readonly totalNumCheckpoints: number, + public readonly finalBlobBatchingChallenges: FinalBlobBatchingChallenges, + public readonly startBlobAccumulator: BatchedBlobAccumulator, + private readonly completionCallback: () => void, + private readonly rejectionCallback: (reason: string) => void, + ) { + if (totalNumCheckpoints < 1) { + throw new Error(`TopTreeProvingState requires at least one checkpoint; got ${totalNumCheckpoints}.`); + } + this.checkpointProofs = new UnbalancedTreeStore(totalNumCheckpoints); + } + + // --- checkpoint root rollup --- + + public setCheckpointRootRollupProof( + checkpointIndex: number, + provingOutput: PublicInputsAndRecursiveProof< + CheckpointRollupPublicInputs, + typeof NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH + >, + ): TreeNodeLocation { + return this.checkpointProofs.setLeaf(checkpointIndex, { provingOutput }); + } + + // --- checkpoint merge rollup --- + + public tryStartProvingCheckpointMerge(location: TreeNodeLocation) { + if (this.checkpointProofs.getNode(location)?.isProving) { + return false; + } + this.checkpointProofs.setNode(location, { isProving: true }); + return true; + } + + public setCheckpointMergeRollupProof( + location: TreeNodeLocation, + provingOutput: PublicInputsAndRecursiveProof< + CheckpointRollupPublicInputs, + typeof NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH + >, + ) { + this.checkpointProofs.setNode(location, { provingOutput }); + } + + public isReadyForCheckpointMerge(location: TreeNodeLocation) { + return !!this.checkpointProofs.getSibling(location)?.provingOutput; + } + + public getParentLocation(location: TreeNodeLocation) { + return this.checkpointProofs.getParentLocation(location); + } + + public getCheckpointMergeRollupInputs(mergeLocation: TreeNodeLocation) { + const [left, right] = this.checkpointProofs.getChildren(mergeLocation).map(c => c?.provingOutput); + if (!left || !right) { + throw new Error('At least one child is not ready for the checkpoint merge rollup.'); + } + return new CheckpointMergeRollupPrivateInputs([toProofData(left), toProofData(right)]); + } + + // --- padding (single-checkpoint case) --- + + public tryStartProvingPaddingCheckpoint() { + if (this.checkpointPaddingProof?.isProving) { + return false; + } + this.checkpointPaddingProof = { isProving: true }; + return true; + } + + public setCheckpointPaddingProof( + provingOutput: PublicInputsAndRecursiveProof< + CheckpointRollupPublicInputs, + typeof NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH + >, + ) { + this.checkpointPaddingProof = { provingOutput }; + } + + public getPaddingCheckpointInputs() { + return new CheckpointPaddingRollupPrivateInputs(); + } + + // --- root rollup --- + + public tryStartProvingRootRollup() { + if (this.rootRollupProof?.isProving) { + return false; + } + this.rootRollupProof = { isProving: true }; + return true; + } + + public setRootRollupProof(provingOutput: PublicInputsAndRecursiveProof) { + this.rootRollupProof = { provingOutput }; + } + + public isReadyForRootRollup() { + const childProofs = this.#getChildProofsForRoot(); + return childProofs.every(p => !!p); + } + + public getRootRollupInputs() { + const [left, right] = this.#getChildProofsForRoot(); + if (!left || !right) { + throw new Error('At least one child is not ready for the root rollup.'); + } + return RootRollupPrivateInputs.from({ + previousRollups: [toProofData(left), toProofData(right)], + }); + } + + // --- blob accumulator finalisation --- + + /** + * Sets the end-of-epoch blob accumulator, computed by the top-tree orchestrator + * from the surviving checkpoints' blob fields. Required before `finalizeBatchedBlob`. + */ + public setEndBlobAccumulator(accumulator: BatchedBlobAccumulator) { + this.endBlobAccumulator = accumulator; + } + + public async finalizeBatchedBlob() { + if (!this.endBlobAccumulator) { + throw new Error('End blob accumulator not set; call setEndBlobAccumulator before finalize.'); + } + this.finalBatchedBlob = await this.endBlobAccumulator.finalize(true /* verifyProof */); + } + + public getEpochProofResult(): { proof: Proof; publicInputs: RootRollupPublicInputs; batchedBlobInputs: BatchedBlob } { + const provingOutput = this.rootRollupProof?.provingOutput; + if (!provingOutput || !this.finalBatchedBlob) { + throw new Error('Top-tree proof not ready; root rollup or batched blob missing.'); + } + return { + proof: provingOutput.proof.binaryProof, + publicInputs: provingOutput.inputs, + batchedBlobInputs: this.finalBatchedBlob, + }; + } + + // --- lifecycle --- + + public verifyState() { + return this.lifecycle === TOP_TREE_LIFECYCLE.CREATED; + } + + public resolve() { + if (!this.verifyState()) { + return; + } + this.lifecycle = TOP_TREE_LIFECYCLE.RESOLVED; + this.completionCallback(); + } + + public reject(reason: string) { + if (!this.verifyState()) { + return; + } + this.lifecycle = TOP_TREE_LIFECYCLE.REJECTED; + this.rejectionCallback(reason); + } + + public cancel() { + this.reject('Top-tree proving cancelled'); + } + + #getChildProofsForRoot() { + const rootLocation = { level: 0, index: 0 }; + return this.totalNumCheckpoints === 1 + ? [this.checkpointProofs.getNode(rootLocation)?.provingOutput, this.checkpointPaddingProof?.provingOutput] + : this.checkpointProofs.getChildren(rootLocation).map(c => c?.provingOutput); + } +} diff --git a/yarn-project/prover-client/src/prover-client/prover-client.ts b/yarn-project/prover-client/src/prover-client/prover-client.ts index 27c55717a2a6..727ed3858363 100644 --- a/yarn-project/prover-client/src/prover-client/prover-client.ts +++ b/yarn-project/prover-client/src/prover-client/prover-client.ts @@ -1,5 +1,7 @@ import { type ACVMConfig, type BBConfig, BBNativeRollupProver, TestCircuitProver } from '@aztec/bb-prover'; +import type { EpochNumber } from '@aztec/foundation/branded-types'; import { times } from '@aztec/foundation/collection'; +import type { Fr } from '@aztec/foundation/curves/bn254'; import type { EthAddress } from '@aztec/foundation/eth-address'; import { type Logger, createLogger } from '@aztec/foundation/log'; import { NativeACVMSimulator } from '@aztec/simulator/server'; @@ -15,19 +17,64 @@ import { type ServerCircuitProver, tryStop, } from '@aztec/stdlib/interfaces/server'; +import type { CheckpointConstantData } from '@aztec/stdlib/rollup'; +import type { BlockHeader } from '@aztec/stdlib/tx'; import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client'; import type { ProverClientConfig } from '../config.js'; +import { CheckpointSubTreeOrchestrator } from '../orchestrator/checkpoint-sub-tree-orchestrator.js'; +import { EpochProvingContext } from '../orchestrator/epoch-proving-context.js'; import { ProvingOrchestrator } from '../orchestrator/orchestrator.js'; +import { TopTreeOrchestrator } from '../orchestrator/top-tree-orchestrator.js'; import { BrokerCircuitProverFacade } from '../proving_broker/broker_prover_facade.js'; import { InlineProofStore, type ProofStore, createProofStore } from '../proving_broker/proof_store/index.js'; import { ProvingAgent } from '../proving_broker/proving_agent.js'; import { ServerEpochProver } from './server-epoch-prover.js'; +/** + * The factory surface that `EpochProvingJob` (in `prover-node`) depends on. Implemented + * by `ProverClient`. Defined here rather than in stdlib because the return types + * (`CheckpointSubTreeOrchestrator`, `TopTreeOrchestrator`) are concrete classes from + * this package. + * + * A single `BrokerCircuitProverFacade` is owned by `ProverClient` and shared across + * every orchestrator (every sub-tree and every top-tree across every concurrent epoch + * job). The broker delivers each completed-job notification exactly once (drained on + * the first `getCompletedJobs` poll), so multiple facades polling the same broker + * race and lose notifications + * + * The facade's job map cleans up entries on resolve/reject, and the prover-node + * keeps `ProverClient` alive for its whole lifetime + */ +export interface EpochProverFactory { + getProverId(): EthAddress; + /** + * Constructs a per-epoch shared context for the caching of e.g. chonk verifier results + */ + createEpochProvingContext(epochNumber: EpochNumber): EpochProvingContext; + /** + * Constructs and starts a `CheckpointSubTreeOrchestrator` for a single checkpoint. + */ + createCheckpointSubTreeOrchestrator( + epochContext: EpochProvingContext, + checkpointConstants: CheckpointConstantData, + l1ToL2Messages: Fr[], + totalNumBlocks: number, + headerOfLastBlockInPreviousCheckpoint: BlockHeader, + ): Promise; + createTopTreeOrchestrator(): TopTreeOrchestrator; +} + /** Manages proving of epochs by orchestrating the proving of individual blocks relying on a pool of prover agents. */ -export class ProverClient implements EpochProverManager { +export class ProverClient implements EpochProverManager, EpochProverFactory { private running = false; private agents: ProvingAgent[] = []; + /** + * The single broker facade shared by every orchestrator created from this client. + * Constructed lazily on `start()` and torn down on `stop()` — see the comment on + * `EpochProverFactory` for why a single shared facade is required. + */ + private facade: BrokerCircuitProverFacade | undefined; private constructor( private config: ProverClientConfig, @@ -40,6 +87,38 @@ export class ProverClient implements EpochProverManager { private log: Logger = createLogger('prover-client:tx-prover'), ) {} + /** + * Lazy-init the shared facade. The broker delivers each completed-job notification + * exactly once (drained on the first `getCompletedJobs` poll), so we cannot start + * a shared facade alongside the per-call facades that `createEpochProver` builds — + * they would race for notifications and one side would silently drop them. Starting + * the shared facade only on first use of one of the new factory methods keeps the + * legacy `createEpochProver` path race-free. + */ + private getFacade(): BrokerCircuitProverFacade { + if (!this.running) { + throw new Error('ProverClient is not running; call start() before constructing orchestrators.'); + } + if (!this.facade) { + this.facade = new BrokerCircuitProverFacade( + this.orchestratorClient, + this.proofStore, + this.failedProofStore, + undefined, + this.log.getBindings(), + ); + this.facade.start(); + } + return this.facade; + } + + /** + * Legacy single-class epoch prover. Each call constructs its own + * `BrokerCircuitProverFacade`; the new factory methods (`createCheckpointSubTreeOrchestrator`, + * `createTopTreeOrchestrator`, `createEpochProvingContext`) share a single facade + * owned by `ProverClient`. Both APIs coexist while the prover-node migrates onto + * the new pair. + */ public createEpochProver(): EpochProver { const bindings = this.log.getBindings(); const facade = new BrokerCircuitProverFacade( @@ -61,6 +140,43 @@ export class ProverClient implements EpochProverManager { return new ServerEpochProver(facade, orchestrator); } + public createEpochProvingContext(epochNumber: EpochNumber): EpochProvingContext { + return new EpochProvingContext(this.getFacade(), epochNumber, this.log.getBindings()); + } + + public createCheckpointSubTreeOrchestrator( + epochContext: EpochProvingContext, + checkpointConstants: CheckpointConstantData, + l1ToL2Messages: Fr[], + totalNumBlocks: number, + headerOfLastBlockInPreviousCheckpoint: BlockHeader, + ): Promise { + return CheckpointSubTreeOrchestrator.start( + this.worldState, + this.getFacade(), + this.config.proverId, + epochContext, + this.config.cancelJobsOnStop, + this.config.enqueueConcurrency, + checkpointConstants, + l1ToL2Messages, + totalNumBlocks, + headerOfLastBlockInPreviousCheckpoint, + this.telemetry, + this.log.getBindings(), + ); + } + + public createTopTreeOrchestrator(): TopTreeOrchestrator { + return new TopTreeOrchestrator( + this.getFacade(), + this.config.proverId, + this.config.enqueueConcurrency, + this.telemetry, + this.log.getBindings(), + ); + } + public getProverId(): EthAddress { return this.config.proverId; } @@ -100,6 +216,14 @@ export class ProverClient implements EpochProverManager { } this.running = false; await this.stopAgents(); + if (this.facade) { + try { + await this.facade.stop(); + } catch (err) { + this.log.error('Error stopping shared broker facade', err); + } + this.facade = undefined; + } await tryStop(this.orchestratorClient); } diff --git a/yarn-project/prover-client/src/proving_broker/index.ts b/yarn-project/prover-client/src/proving_broker/index.ts index 70440f4982a9..6de93f116f94 100644 --- a/yarn-project/prover-client/src/proving_broker/index.ts +++ b/yarn-project/prover-client/src/proving_broker/index.ts @@ -7,3 +7,4 @@ export * from './proving_broker_database/persisted.js'; export * from './proof_store/index.js'; export * from './factory.js'; export * from './config.js'; +export { BrokerCircuitProverFacade } from './broker_prover_facade.js';