Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion yarn-project/archiver/src/archiver-misc.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ describe('Archiver misc', () => {
slashingProposerAddress: EthAddress.random(),
},
archiverStore,
{ pollingIntervalMs: 1000, batchSize: 1000, maxAllowedEthClientDriftSeconds: 300 },
{
pollingIntervalMs: 1000,
batchSize: 1000,
maxAllowedEthClientDriftSeconds: 300,
orphanProposedBlockPruneGraceSeconds: 2,
},
blobClient,
instrumentation,
l1Constants,
Expand Down
1 change: 1 addition & 0 deletions yarn-project/archiver/src/archiver-store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ describe('Archiver Store', () => {
batchSize: 1000,
maxAllowedEthClientDriftSeconds: 300,
ethereumAllowNoDebugHosts: true,
orphanProposedBlockPruneGraceSeconds: 2,
};

const events = new EventEmitter() as ArchiverEmitter;
Expand Down
124 changes: 124 additions & 0 deletions yarn-project/archiver/src/archiver-sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ describe('Archiver Sync', () => {
// Create epoch cache mock (separate from fake)
epochCache = mock<EpochCache>();
epochCache.getCommitteeForEpoch.mockResolvedValue({ committee: [] as EthAddress[] } as EpochCommitteeInfo);
// Default to no pipelining offset; the orphan-prune tests below override this. Keeps the prune
// deadline well ahead of wall-clock time for the other tests so it never fires spuriously.
epochCache.pipeliningOffset.mockReturnValue(0);

// Create instrumentation mock
const tracer = getTelemetryClient().getTracer('');
Expand All @@ -118,6 +121,7 @@ describe('Archiver Sync', () => {
maxAllowedEthClientDriftSeconds: 300,
ethereumAllowNoDebugHosts: true,
skipHistoricalLogsCheck: true,
orphanProposedBlockPruneGraceSeconds: 2,
};

// Create event emitter shared by archiver and synchronizer
Expand Down Expand Up @@ -2143,4 +2147,124 @@ describe('Archiver Sync', () => {
expect(tips.proposedCheckpoint.block.number).toEqual(tips.checkpointed.block.number);
}, 15_000);
});

describe('pruning orphan proposed blocks', () => {
let pruneSpy: jest.Mock;

// Slot the orphan block targets. With slotDuration=24, slot S starts at l1GenesisTime + S*24.
const orphanSlot = SlotNumber(1);
// Grace period configured for these tests (see the `config` object above).
const graceSeconds = 2;

beforeEach(() => {
pruneSpy = jest.fn();
archiver.events.on(L2BlockSourceEvents.L2PruneUncheckpointed, pruneSpy);
// Normal proposer pipelining: a block targeting slot S is built during slot S-1, so its proposed
// checkpoint is expected by the start of slot S.
epochCache.pipeliningOffset.mockReturnValue(1);
});

afterEach(() => {
archiver.events.off(L2BlockSourceEvents.L2PruneUncheckpointed, pruneSpy);
});

// Wall-clock time (seconds) at which the orphan tip becomes prunable: start(orphanSlot) + grace.
const pruneDeadline = () => now + Number(orphanSlot) * l1Constants.slotDuration + graceSeconds;

// Syncs checkpoint 1 (slot 0), then writes uncheckpointed blocks for slot 1 (checkpoint 2) straight
// into the store as a block-only tip with no matching proposed checkpoint. L1 is held at slot 1 so
// the L1-sync prune (which only fires once the build slot has ended on L1) stays out of the way.
const setupOrphanTip = async () => {
const { checkpoint: cp1 } = await fake.addCheckpoint(CheckpointNumber(1), {
l1BlockNumber: 1n,
messagesL1BlockNumber: 1n,
numL1ToL2Messages: 3,
slotNumber: SlotNumber(0),
});
const cp1Archive = cp1.blocks.at(-1)!.archive;
fake.setL1BlockNumber(1n);
await archiver.syncImmediate();
expect(await archiver.getCheckpointNumber()).toEqual(CheckpointNumber(1));

const lastBlockInCp1 = cp1.blocks.at(-1)!.number;
const provisionalBlocks = await fake.makeBlocks(CheckpointNumber(2), {
l1BlockNumber: 2n,
previousArchive: cp1Archive,
slotNumber: orphanSlot,
});
for (const block of provisionalBlocks) {
await archiverStore.blocks.addProposedBlock(block, { force: true });
}

// Hold L1 at slot 1 so the slot has not ended from L1's perspective.
fake.setL1BlockNumber(2n);
return { lastBlockInCp1, lastProvisional: provisionalBlocks.at(-1)!.number, provisionalBlocks };
};

const makeProposedCheckpoint = (lastBlockInCp1: BlockNumber, blockCount: number): ProposedCheckpointInput => ({
checkpointNumber: CheckpointNumber(2),
header: CheckpointHeader.empty({ slotNumber: orphanSlot }),
startBlock: BlockNumber(lastBlockInCp1 + 1),
blockCount,
totalManaUsed: 0n,
feeAssetPriceModifier: 0n,
});

it('does not prune before the grace window elapses', async () => {
const { lastProvisional } = await setupOrphanTip();

dateProvider.setTime((pruneDeadline() - 1) * 1000);
await archiver.syncImmediate();

expect(pruneSpy).not.toHaveBeenCalled();
expect(await archiver.getBlockNumber()).toEqual(lastProvisional);
}, 15_000);

it('prunes the orphan tip once the grace window elapses', async () => {
const { lastBlockInCp1, provisionalBlocks } = await setupOrphanTip();

dateProvider.setTime((pruneDeadline() + 1) * 1000);
await archiver.syncImmediate();

expect(pruneSpy).toHaveBeenCalledWith(
expect.objectContaining({
type: L2BlockSourceEvents.L2PruneUncheckpointed,
slotNumber: orphanSlot,
blocks: expect.arrayContaining(provisionalBlocks.map(b => expect.objectContaining({ number: b.number }))),
}),
);
expect(await archiver.getBlockNumber()).toEqual(lastBlockInCp1);
expect(await archiver.getCheckpointNumber()).toEqual(CheckpointNumber(1));
}, 15_000);

it('does not prune when a matching proposed checkpoint exists', async () => {
const { lastBlockInCp1, lastProvisional, provisionalBlocks } = await setupOrphanTip();

await archiver.addProposedCheckpoint(makeProposedCheckpoint(lastBlockInCp1, provisionalBlocks.length));

dateProvider.setTime((pruneDeadline() + 100) * 1000);
await archiver.syncImmediate();

expect(pruneSpy).not.toHaveBeenCalled();
expect(await archiver.getBlockNumber()).toEqual(lastProvisional);
expect(await archiverStore.blocks.getLastProposedCheckpoint()).toBeDefined();
}, 15_000);

it('processes a queued proposed checkpoint before pruning, sparing the tip', async () => {
const { lastBlockInCp1, lastProvisional, provisionalBlocks } = await setupOrphanTip();

// Past the grace window: without the matching checkpoint the next sync would prune the tip.
dateProvider.setTime((pruneDeadline() + 100) * 1000);

// Queue the proposed checkpoint. The triggered sync drains the inbound queue (storing the
// checkpoint) before running the orphan prune, so the prune sees it and stands down. If the
// order were reversed, this sync would prune the tip before storing the checkpoint.
await archiver.addProposedCheckpoint(makeProposedCheckpoint(lastBlockInCp1, provisionalBlocks.length));
await archiver.syncImmediate();

expect(pruneSpy).not.toHaveBeenCalled();
expect(await archiver.getBlockNumber()).toEqual(lastProvisional);
expect(await archiverStore.blocks.getLastProposedCheckpoint()).toBeDefined();
}, 15_000);
});
});
5 changes: 5 additions & 0 deletions yarn-project/archiver/src/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ export class Archiver extends ArchiverDataSourceBase implements L2BlockSink, Tra
maxAllowedEthClientDriftSeconds: number;
ethereumAllowNoDebugHosts?: boolean;
skipHistoricalLogsCheck?: boolean;
orphanProposedBlockPruneGraceSeconds: number;
},
private readonly blobClient: BlobClientInterface,
instrumentation: ArchiverInstrumentation,
Expand Down Expand Up @@ -336,6 +337,10 @@ export class Archiver extends ArchiverDataSourceBase implements L2BlockSink, Tra
private async sync() {
// Process any queued blocks first, before doing L1 sync
await this.processInboundQueue();
// Prune orphan proposed blocks (block-only tips with no matching proposed checkpoint) on wall-clock
// time. Runs after the queue is drained so freshly-arrived proposed checkpoints are seen first, and
// before L1 sync so it fires even when L1 has not advanced.
await this.synchronizer.pruneOrphanProposedBlocks();
// Now perform L1 sync
await this.syncFromL1();
}
Expand Down
9 changes: 9 additions & 0 deletions yarn-project/archiver/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ export const archiverConfigMappings: ConfigMappingsType<ArchiverConfig> = {
'Set to true to bypass the check when the connected RPC node is known to prune old logs.',
...booleanConfigHelper(false),
},
orphanProposedBlockPruneGraceSeconds: {
env: 'ARCHIVER_ORPHAN_PROPOSED_BLOCK_PRUNE_GRACE_SECONDS',
description:
'Grace period in seconds, measured from the end of a proposed block build slot, after which a ' +
'proposed block with no matching proposed checkpoint is pruned as an orphan. Defaults from the ' +
'sequencer block duration at the node wiring layer when unset.',
...optionalNumberConfigHelper(),
},
...chainConfigMappings,
...l1ReaderConfigMappings,
viemPollingIntervalMS: {
Expand Down Expand Up @@ -107,5 +115,6 @@ export function mapArchiverConfig(config: Partial<ArchiverConfig>) {
maxAllowedEthClientDriftSeconds: config.maxAllowedEthClientDriftSeconds,
ethereumAllowNoDebugHosts: config.ethereumAllowNoDebugHosts,
skipHistoricalLogsCheck: config.archiverSkipHistoricalLogsCheck,
orphanProposedBlockPruneGraceSeconds: config.orphanProposedBlockPruneGraceSeconds,
};
}
2 changes: 2 additions & 0 deletions yarn-project/archiver/src/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { FunctionType, decodeFunctionSignature } from '@aztec/stdlib/abi';
import type { ArchiverEmitter, BlockHash } from '@aztec/stdlib/block';
import { type ContractClassPublicWithCommitment, computePublicBytecodeCommitment } from '@aztec/stdlib/contract';
import type { DataStoreConfig } from '@aztec/stdlib/kv-store';
import { MIN_EXECUTION_TIME } from '@aztec/stdlib/timetable';
import type { BlockHeader } from '@aztec/stdlib/tx';
import { getTelemetryClient } from '@aztec/telemetry-client';

Expand Down Expand Up @@ -129,6 +130,7 @@ export async function createArchiver(
maxAllowedEthClientDriftSeconds: 300,
ethereumAllowNoDebugHosts: false,
skipHistoricalLogsCheck: false,
orphanProposedBlockPruneGraceSeconds: MIN_EXECUTION_TIME,
},
mapArchiverConfig(config),
);
Expand Down
92 changes: 88 additions & 4 deletions yarn-project/archiver/src/modules/l1_synchronizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { getFinalizedL1Block } from '@aztec/ethereum/queries';
import type { ViemPublicClient, ViemPublicDebugClient } from '@aztec/ethereum/types';
import { asyncPool } from '@aztec/foundation/async-pool';
import { maxBigint } from '@aztec/foundation/bigint';
import { BlockNumber, CheckpointNumber, EpochNumber } from '@aztec/foundation/branded-types';
import { BlockNumber, CheckpointNumber, EpochNumber, SlotNumber } from '@aztec/foundation/branded-types';
import { Buffer16, Buffer32 } from '@aztec/foundation/buffer';
import { compactArray, partition, pick } from '@aztec/foundation/collection';
import { Fr } from '@aztec/foundation/curves/bn254';
Expand All @@ -18,7 +18,12 @@ import { DateProvider, Timer, elapsed } from '@aztec/foundation/timer';
import { isDefined, isErrorClass } from '@aztec/foundation/types';
import { type ArchiverEmitter, L2BlockSourceEvents, type ValidateCheckpointResult } from '@aztec/stdlib/block';
import { Checkpoint, type CheckpointData, PublishedCheckpoint } from '@aztec/stdlib/checkpoint';
import { type L1RollupConstants, getEpochAtSlot, getSlotAtNextL1Block } from '@aztec/stdlib/epoch-helpers';
import {
type L1RollupConstants,
getEpochAtSlot,
getSlotAtNextL1Block,
getTimestampForSlot,
} from '@aztec/stdlib/epoch-helpers';
import { computeInHashFromL1ToL2Messages } from '@aztec/stdlib/messaging';
import type { CoordinationSignatureContext } from '@aztec/stdlib/p2p';
import { type Traceable, type Tracer, execInSpan, trackSpan } from '@aztec/telemetry-client';
Expand Down Expand Up @@ -76,6 +81,7 @@ export class ArchiverL1Synchronizer implements Traceable {
skipValidateCheckpointAttestations?: boolean;
skipPromoteProposedCheckpointDuringL1Sync?: boolean;
maxAllowedEthClientDriftSeconds: number;
orphanProposedBlockPruneGraceSeconds: number;
},
private readonly blobClient: BlobClientInterface,
private readonly epochCache: EpochCache,
Expand All @@ -102,6 +108,7 @@ export class ArchiverL1Synchronizer implements Traceable {
skipValidateCheckpointAttestations?: boolean;
skipPromoteProposedCheckpointDuringL1Sync?: boolean;
maxAllowedEthClientDriftSeconds: number;
orphanProposedBlockPruneGraceSeconds: number;
}) {
this.config = newConfig;
}
Expand Down Expand Up @@ -294,12 +301,89 @@ export class ArchiverL1Synchronizer implements Traceable {
{ firstUncheckpointedBlockHeader: firstUncheckpointedBlockData?.header.toInspect(), slotAtNextL1Block },
);

const prunedBlocks = await this.updater.removeUncheckpointedBlocksAfter(lastCheckpointedBlockNumber);
await this.removeUncheckpointedBlocksAndEmit(lastCheckpointedBlockNumber, firstUncheckpointedBlockSlot);
}

/**
* Prunes a block-only local tip that was built atop a checkpoint that was never itself proposed.
*
* Under pipelining, a proposer publishes the blocks for a checkpoint (block-only proposals) before
* assembling and publishing the enclosing proposed checkpoint at the end of the build slot. A node
* that received those blocks but never the proposed checkpoint is left with an orphan tip it must
* not build on. We prune it once enough wall-clock time has elapsed that the proposed checkpoint
* should have arrived. This runs on wall-clock time (not L1 block advancement) so it fires during
* quiet L1 periods, and is the liveness counterpart to the sequencer's checkSync guard.
*
* Only the first uncheckpointed block is inspected: if its checkpoint is backed by a proposed
* checkpoint, the tip is legitimate and left for promotion (or for the L1-sync prune to clear if it
* later goes stale); if not, every uncheckpointed block chains off the orphan and is pruned.
*/
public async pruneOrphanProposedBlocks(): Promise<void> {
const [lastCheckpointedBlockNumber, lastProposedBlockNumber] = await Promise.all([
this.stores.blocks.getCheckpointedL2BlockNumber(),
this.stores.blocks.getLatestL2BlockNumber(),
]);

// If there are no uncheckpointed blocks, we got nothing to do
if (lastProposedBlockNumber === lastCheckpointedBlockNumber) {
return;
}

const firstUncheckpointedBlockNumber = BlockNumber(lastCheckpointedBlockNumber + 1);
const firstUncheckpointedBlockData = await this.stores.blocks.getBlockData({
number: firstUncheckpointedBlockNumber,
});
if (firstUncheckpointedBlockData === undefined) {
return;
}

const blockCheckpointNumber = firstUncheckpointedBlockData.checkpointNumber;
const blockSlot = firstUncheckpointedBlockData.header.getSlot();

// A proposed checkpoint covering this block's checkpoint means the tip is not an orphan.
const proposedCheckpoint = await this.stores.blocks.getProposedCheckpointByNumber(blockCheckpointNumber);
if (proposedCheckpoint !== undefined) {
return;
}

// The proposed checkpoint should have landed by the start of the slot after the block's build slot
// (build slot = blockSlot - pipeliningOffset). Wait a grace period beyond that to tolerate propagation.
const pipeliningOffset = this.epochCache.pipeliningOffset();
const deadlineSlot = SlotNumber(Number(blockSlot) - pipeliningOffset + 1);
const pruneAfter =
getTimestampForSlot(deadlineSlot, this.l1Constants) + BigInt(this.config.orphanProposedBlockPruneGraceSeconds);
const now = BigInt(this.dateProvider.nowInSeconds());
if (now < pruneAfter) {
return;
}

this.log.warn(
`Pruning orphan blocks after block ${lastCheckpointedBlockNumber}: block at slot ${blockSlot} belongs to ` +
`checkpoint ${blockCheckpointNumber} which has no matching proposed checkpoint`,
{
firstUncheckpointedBlockHeader: firstUncheckpointedBlockData.header.toInspect(),
blockCheckpointNumber,
blockSlot,
pipeliningOffset,
deadlineSlot,
pruneAfter,
now,
},
);

await this.removeUncheckpointedBlocksAndEmit(lastCheckpointedBlockNumber, blockSlot);
}

/** Removes uncheckpointed blocks after the checkpointed tip and emits a prune event for any removed. */
private async removeUncheckpointedBlocksAndEmit(
lastCheckpointedBlockNumber: BlockNumber,
slotNumber: SlotNumber,
): Promise<void> {
const prunedBlocks = await this.updater.removeUncheckpointedBlocksAfter(lastCheckpointedBlockNumber);
if (prunedBlocks.length > 0) {
this.events.emit(L2BlockSourceEvents.L2PruneUncheckpointed, {
type: L2BlockSourceEvents.L2PruneUncheckpointed,
slotNumber: firstUncheckpointedBlockSlot,
slotNumber,
blocks: prunedBlocks,
});
}
Expand Down
4 changes: 4 additions & 0 deletions yarn-project/archiver/src/test/noop_l1_archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class NoopL1Synchronizer implements FunctionsOf<ArchiverL1Synchronizer> {
syncFromL1(_initialSyncComplete: boolean): Promise<void> {
return Promise.resolve();
}
pruneOrphanProposedBlocks(): Promise<void> {
return Promise.resolve();
}
}

/**
Expand Down Expand Up @@ -89,6 +92,7 @@ export class NoopL1Archiver extends Archiver {
maxAllowedEthClientDriftSeconds: 300,
ethereumAllowNoDebugHosts: true, // Skip trace validation
skipHistoricalLogsCheck: true, // Skip historical logs validation
orphanProposedBlockPruneGraceSeconds: 2,
},
blobClient,
instrumentation,
Expand Down
6 changes: 6 additions & 0 deletions yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ import type { DebugLogStore, LogFilter, SiloedTag, Tag, TxScopedL2Log } from '@a
import { InMemoryDebugLogStore, NullDebugLogStore } from '@aztec/stdlib/logs';
import { InboxLeaf, type L1ToL2MessageSource, appendL1ToL2MessagesToTree } from '@aztec/stdlib/messaging';
import type { Offense } from '@aztec/stdlib/slashing';
import { MIN_EXECUTION_TIME } from '@aztec/stdlib/timetable';
import type { NullifierLeafPreimage, PublicDataTreeLeafPreimage } from '@aztec/stdlib/trees';
import { MerkleTreeId, NullifierMembershipWitness, PublicDataWitness } from '@aztec/stdlib/trees';
import {
Expand Down Expand Up @@ -577,6 +578,11 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, AztecNodeDeb
// Track started resources so we can clean up on partial failure during node creation.
const started: { stop?(): Promise<void> | void }[] = [];
try {
// Default the orphan-prune grace window from the block build duration when unset, so the archiver
// waits roughly one build slot for a proposed checkpoint to arrive before pruning a block-only tip.
config.orphanProposedBlockPruneGraceSeconds ??=
config.blockDurationMs !== undefined ? Math.ceil(config.blockDurationMs / 1000) : MIN_EXECUTION_TIME;

// Create world-state first so we can retrieve the initial header before constructing the archiver.
const nativeWs = await createWorldState(config, options.genesis);
const initialHeader = nativeWs.getInitialHeader();
Expand Down
1 change: 1 addition & 0 deletions yarn-project/foundation/src/config/env_var.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export type EnvVar =
| 'ARCHIVER_URL'
| 'ARCHIVER_VIEM_POLLING_INTERVAL_MS'
| 'ARCHIVER_BATCH_SIZE'
| 'ARCHIVER_ORPHAN_PROPOSED_BLOCK_PRUNE_GRACE_SECONDS'
| 'AZTEC_ADMIN_PORT'
| 'AZTEC_NODE_DEBUG'
| 'AZTEC_ADMIN_API_KEY_HASH'
Expand Down
Loading
Loading