Skip to content

Commit 69d7b58

Browse files
authored
fix(sequencer): set own proposed checkpoint locally instead of via p2p loopback (#23659)
## Motivation The proposer relied on looping its own checkpoint proposal back through the p2p receive path to advance its local proposed-checkpoint tip before propagating. Under `broadcastInvalidBlockProposal` the broadcast checkpoint archive is deliberately corrupted, so the loopback handler's archive-based block lookup (`getBlockData({ archive })`) found nothing and retried until the next slot. By the time the proposer returned from broadcast, propagation had slipped past the p2p validator's stale window — producing intermittent failures (e.g. peers rejecting a late slot proposal). ## Approach The proposer's optimistic proposed-checkpoint tip is the proposer's own local state, so it is now set directly in the sequencer's checkpoint proposal job rather than via a p2p loopback. The job adds the proposed checkpoint to the archiver from local checkpoint data (block numbers and counts, never the possibly-corrupted broadcast archive) immediately before gossiping, failing closed if the local insert fails. Because every block is already added to the archiver's FIFO queue (and awaited) during block building, the checkpoint insert needs no retry. The `notifyOwnCheckpointProposal` loopback is removed entirely, so the path is identical whether p2p is enabled or not. ## Changes - **stdlib**: New `ProposedCheckpointSink` interface alongside `L2BlockSink`. - **sequencer-client**: `CheckpointProposalJob` now pushes the proposed checkpoint to the archiver from local data before broadcast, gated on proposer pipelining and skipped when block-push is disabled (`skipPushProposedBlocksToArchiver`, fisherman mode); widened the sequencer/client `l2BlockSource` types to `ProposedCheckpointSink`. - **p2p**: Removed `notifyOwnCheckpointProposal` from the `P2PService` interface, the libp2p and dummy services, and the `P2PClient.broadcastCheckpointProposal` call site (own proposals are still stored in the attestation pool before propagation). - **validator-client**: The all-nodes own-proposal branch now skips validation and returns; removed the now-dead `setProposedCheckpointFromBlocks` and narrowed the archiver `Pick`. - **tests**: Added job tests (push-from-local-data and order-before-gossip, abort-on-push-failure, no-push-when-pipelining-disabled, fisherman) and a proposal_handler own-proposal test; removed the obsolete libp2p loopback test and the e2e slash-test stub; widened affected mock types.
1 parent 1aa8f10 commit 69d7b58

15 files changed

Lines changed: 165 additions & 101 deletions

File tree

yarn-project/end-to-end/src/e2e_p2p/duplicate_attestation_slash.test.ts

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -213,18 +213,6 @@ describe('e2e_p2p_duplicate_attestation_slash', () => {
213213

214214
nodes = [maliciousNode1, maliciousNode2, honestNode1, honestNode2];
215215

216-
// Stub the proposer's own-checkpoint-proposal loopback on the malicious nodes. The default
217-
// path awaits a local handleCheckpointProposal → validateCheckpointProposal that retries
218-
// until the proposed block lands in the archiver — but skipPushProposedBlocksToArchiver
219-
// means it never does, so the await hangs until the retry deadline (~one slot). By the
220-
// time the proposer returns from broadcast, the wallclock is in the target slot and the
221-
// staleness gate refuses the self-attestation, so no duplicate attestations are ever
222-
// broadcast.
223-
for (const node of [maliciousNode1, maliciousNode2]) {
224-
const p2pService: any = (node as any).p2pClient.p2pService;
225-
jest.spyOn(p2pService, 'notifyOwnCheckpointProposal').mockResolvedValue(undefined);
226-
}
227-
228216
// Wait for P2P mesh on all needed topics before starting sequencers
229217
await t.waitForP2PMeshConnectivity(nodes, NUM_VALIDATORS, 30, 0.1, [
230218
TopicType.tx,

yarn-project/p2p/src/client/p2p_client.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -377,8 +377,6 @@ export class P2PClient extends WithTracer implements P2P {
377377
throw new Error(`Attempted to broadcast a duplicate checkpoint proposal for slot ${proposal.slotNumber}`);
378378
}
379379
}
380-
// Gossipsub doesn't deliver own messages, so fire the all-nodes handler locally
381-
await this.p2pService.notifyOwnCheckpointProposal(checkpointCore);
382380
return this.p2pService.propagate(proposal);
383381
}
384382

yarn-project/p2p/src/services/dummy_service.ts

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { EthAddress } from '@aztec/foundation/eth-address';
22
import type { PeerInfo } from '@aztec/stdlib/interfaces/server';
3-
import type { CheckpointProposalCore, Gossipable, PeerErrorSeverity, TopicType } from '@aztec/stdlib/p2p';
3+
import type { Gossipable, PeerErrorSeverity, TopicType } from '@aztec/stdlib/p2p';
44
import { Tx, TxHash } from '@aztec/stdlib/tx';
55

66
import type { PeerId } from '@libp2p/interface';
@@ -93,12 +93,6 @@ export class DummyP2PService implements P2PService {
9393
this.allNodesCheckpointReceivedCallback = callback;
9494
}
9595

96-
// Mirror libp2p's own-proposal loopback so the proposer's pipelined `canProposeAt` override sees its own
97-
// in-flight parent checkpoint when running in p2p-disabled (single-node e2e) mode.
98-
public async notifyOwnCheckpointProposal(checkpoint: CheckpointProposalCore): Promise<void> {
99-
await this.allNodesCheckpointReceivedCallback?.(checkpoint, undefined as unknown as PeerId);
100-
}
101-
10296
/**
10397
* Register a callback for when a duplicate proposal is detected
10498
*/

yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1071,16 +1071,6 @@ describe('LibP2PService', () => {
10711071
expect(reportMessageValidationResultSpy).toHaveBeenCalledWith('msg-1', MOCK_PEER_ID, TopicValidatorResult.Reject);
10721072
});
10731073

1074-
it('notifyOwnCheckpointProposal fires allNodesCheckpointReceivedCallback', async () => {
1075-
const checkpointHeader = makeCheckpointHeader(1, { slotNumber: targetSlot });
1076-
const proposal = await makeCheckpointProposal({ signer, checkpointHeader });
1077-
1078-
await service.notifyOwnCheckpointProposal(proposal.toCore());
1079-
1080-
expect(allNodesCheckpointReceivedCallback).toHaveBeenCalledTimes(1);
1081-
expect(allNodesCheckpointReceivedCallback).toHaveBeenCalledWith(expect.any(Object), expect.anything());
1082-
});
1083-
10841074
// Regression for A-1013: payloads sharing (slot, archive) but differing on feeAssetPriceModifier
10851075
// used to dedup by archive only and silently drop the second one. The pool now dedups by
10861076
// signed-payload hash, so the equivocation surfaces.

yarn-project/p2p/src/services/libp2p/libp2p_service.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -722,10 +722,6 @@ export class LibP2PService extends WithTracer implements P2PService {
722722
this.allNodesCheckpointReceivedCallback = callback;
723723
}
724724

725-
public async notifyOwnCheckpointProposal(checkpoint: CheckpointProposalCore): Promise<void> {
726-
await this.allNodesCheckpointReceivedCallback(checkpoint, this.node.peerId);
727-
}
728-
729725
/**
730726
* Registers a callback to be invoked when a duplicate proposal is detected.
731727
* This callback is triggered on the first duplicate (when count goes from 1 to 2).

yarn-project/p2p/src/services/service.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,6 @@ export interface P2PService {
102102

103103
registerAllNodesCheckpointReceivedCallback(callback: P2PCheckpointReceivedCallback): void;
104104

105-
/** Fires the all-nodes checkpoint callback for our own proposal (gossipsub doesn't deliver own messages). */
106-
notifyOwnCheckpointProposal(checkpoint: CheckpointProposalCore): Promise<void>;
107-
108105
/**
109106
* Registers a callback invoked when a duplicate proposal is detected (equivocation).
110107
* The callback is triggered on the first duplicate (when count goes from 1 to 2).

yarn-project/sequencer-client/src/client/sequencer-client.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import type { DateProvider } from '@aztec/foundation/timer';
1212
import type { KeystoreManager } from '@aztec/node-keystore';
1313
import type { P2P } from '@aztec/p2p';
1414
import type { SlasherClientInterface } from '@aztec/slasher';
15-
import type { L2BlockSink, L2BlockSource } from '@aztec/stdlib/block';
15+
import type { L2BlockSink, L2BlockSource, ProposedCheckpointSink } from '@aztec/stdlib/block';
1616
import type { ValidatorClientFullConfig, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server';
1717
import type { L1ToL2MessageSource } from '@aztec/stdlib/messaging';
1818
import { L1Metrics, type TelemetryClient } from '@aztec/telemetry-client';
@@ -56,7 +56,7 @@ export class SequencerClient {
5656
worldStateSynchronizer: WorldStateSynchronizer;
5757
slasherClient: SlasherClientInterface | undefined;
5858
checkpointsBuilder: FullNodeCheckpointsBuilder;
59-
l2BlockSource: L2BlockSource & L2BlockSink;
59+
l2BlockSource: L2BlockSource & L2BlockSink & ProposedCheckpointSink;
6060
l1ToL2MessageSource: L1ToL2MessageSource;
6161
telemetry: TelemetryClient;
6262
publisherFactory?: SequencerPublisherFactory;

yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.test.ts

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {
2424
L2Block,
2525
type L2BlockSink,
2626
type L2BlockSource,
27+
type ProposedCheckpointSink,
2728
type ValidateCheckpointResult,
2829
} from '@aztec/stdlib/block';
2930
import {
@@ -87,7 +88,7 @@ describe('CheckpointProposalJob', () => {
8788
let checkpointBuilder: MockCheckpointBuilder;
8889
let l1ToL2MessageSource: MockProxy<L1ToL2MessageSource>;
8990
let l2BlockSource: MockProxy<L2BlockSource>;
90-
let blockSink: MockProxy<L2BlockSink>;
91+
let blockSink: MockProxy<L2BlockSink & ProposedCheckpointSink>;
9192
let slasherClient: MockProxy<SlasherClientInterface>;
9293
let dateProvider: TestDateProvider;
9394
let metrics: MockProxy<SequencerMetrics>;
@@ -244,8 +245,9 @@ describe('CheckpointProposalJob', () => {
244245
l2BlockSource = mock<L2BlockSource>();
245246
l2BlockSource.getCheckpointsData.mockResolvedValue([]);
246247

247-
blockSink = mock<L2BlockSink>();
248+
blockSink = mock<L2BlockSink & ProposedCheckpointSink>();
248249
blockSink.addBlock.mockResolvedValue(undefined);
250+
blockSink.addProposedCheckpoint.mockResolvedValue(undefined);
249251

250252
validatorClient = mock<ValidatorClient>();
251253
validatorClient.collectAttestations.mockImplementation(() => Promise.resolve([]));
@@ -1134,6 +1136,40 @@ describe('CheckpointProposalJob', () => {
11341136
expect(mismatchEvents).toHaveLength(0);
11351137
});
11361138

1139+
it('pushes the proposed checkpoint to the archiver from local data before broadcasting', async () => {
1140+
const pipelinedJob = await createPipelinedJobWithBlock(proposedParent);
1141+
mockL2BlockSource({ checkpointedNumber: CheckpointNumber(1), checkpointedHash: parentCheckpointHash });
1142+
1143+
await pipelinedJob.executeAndAwait();
1144+
1145+
// Built from local checkpoint data: startBlock = syncedToBlockNumber + 1, blockCount = blocks built,
1146+
// checkpointNumber from the job — never derived from the (possibly corrupted) broadcast proposal archive.
1147+
expect(blockSink.addProposedCheckpoint).toHaveBeenCalledTimes(1);
1148+
expect(blockSink.addProposedCheckpoint).toHaveBeenCalledWith(
1149+
expect.objectContaining({
1150+
checkpointNumber: CheckpointNumber(2),
1151+
startBlock: BlockNumber(lastBlockNumber + 1),
1152+
blockCount: 1,
1153+
}),
1154+
);
1155+
// The proposed checkpoint must be pushed locally before the proposal is gossiped.
1156+
expect(blockSink.addProposedCheckpoint.mock.invocationCallOrder[0]).toBeLessThan(
1157+
p2p.broadcastCheckpointProposal.mock.invocationCallOrder[0],
1158+
);
1159+
});
1160+
1161+
it('aborts the checkpoint without broadcasting when the proposed checkpoint push fails', async () => {
1162+
blockSink.addProposedCheckpoint.mockRejectedValue(new Error('proposed checkpoint slot expired'));
1163+
const pipelinedJob = await createPipelinedJobWithBlock(proposedParent);
1164+
mockL2BlockSource({ checkpointedNumber: CheckpointNumber(1), checkpointedHash: parentCheckpointHash });
1165+
1166+
const checkpoint = await pipelinedJob.execute();
1167+
1168+
expect(checkpoint).toBeUndefined();
1169+
expect(blockSink.addProposedCheckpoint).toHaveBeenCalledTimes(1);
1170+
expect(p2p.broadcastCheckpointProposal).not.toHaveBeenCalled();
1171+
});
1172+
11371173
it('skips proposal with archiver-sync-timeout when archiver does not sync in time', async () => {
11381174
const pipelinedJob = await createPipelinedJobWithBlock(proposedParent);
11391175
l2BlockSource.getSyncedL2SlotNumber.mockResolvedValue(SlotNumber(0));
@@ -1705,6 +1741,21 @@ describe('CheckpointProposalJob', () => {
17051741
expect(checkpointBuilder.buildBlockCalls).toHaveLength(1);
17061742
// But must NOT push to the archiver — that was the bug causing reorgs on mainnet
17071743
expect(blockSink.addBlock).not.toHaveBeenCalled();
1744+
expect(blockSink.addProposedCheckpoint).not.toHaveBeenCalled();
1745+
});
1746+
1747+
it('does not push the proposed checkpoint when pipelining is disabled', async () => {
1748+
// The proposed-checkpoint tip is a pipelining-only concept, so a non-pipelining proposer
1749+
// must still broadcast but must not advance it.
1750+
epochCache.isProposerPipeliningEnabled.mockReturnValue(false);
1751+
const { txs, block } = await setupTxsAndBlock(p2p, globalVariables, 1, chainId);
1752+
checkpointBuilder.seedBlocks([block], [txs]);
1753+
validatorClient.collectAttestations.mockResolvedValue(getAttestations(block));
1754+
1755+
await job.executeAndAwait();
1756+
1757+
expect(blockSink.addProposedCheckpoint).not.toHaveBeenCalled();
1758+
expect(p2p.broadcastCheckpointProposal).toHaveBeenCalledTimes(1);
17081759
});
17091760

17101761
it('handles empty committee gracefully', async () => {

yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.timing.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import type { TypedEventEmitter } from '@aztec/foundation/types';
1010
import { type P2P, P2PClientState } from '@aztec/p2p';
1111
import type { SlasherClientInterface } from '@aztec/slasher';
1212
import { AztecAddress } from '@aztec/stdlib/aztec-address';
13-
import type { L2Block, L2BlockSink, L2BlockSource } from '@aztec/stdlib/block';
13+
import type { L2Block, L2BlockSink, L2BlockSource, ProposedCheckpointSink } from '@aztec/stdlib/block';
1414
import type { L1RollupConstants } from '@aztec/stdlib/epoch-helpers';
1515
import { GasFees } from '@aztec/stdlib/gas';
1616
import type {
@@ -206,7 +206,7 @@ describe('CheckpointProposalJob Timing Tests', () => {
206206
let worldState: MockProxy<WorldStateSynchronizer>;
207207
let l1ToL2MessageSource: MockProxy<L1ToL2MessageSource>;
208208
let l2BlockSource: MockProxy<L2BlockSource>;
209-
let blockSink: MockProxy<L2BlockSink>;
209+
let blockSink: MockProxy<L2BlockSink & ProposedCheckpointSink>;
210210
let slasherClient: MockProxy<SlasherClientInterface>;
211211
let metrics: MockProxy<SequencerMetrics>;
212212
let checkpointMetrics: MockProxy<CheckpointProposalJobMetricsRecorder>;
@@ -438,7 +438,7 @@ describe('CheckpointProposalJob Timing Tests', () => {
438438
l2BlockSource = mock<L2BlockSource>();
439439
l2BlockSource.getCheckpointsData.mockResolvedValue([]);
440440

441-
blockSink = mock<L2BlockSink>();
441+
blockSink = mock<L2BlockSink & ProposedCheckpointSink>();
442442
blockSink.addBlock.mockResolvedValue(undefined);
443443

444444
validatorClient = mock<ValidatorClient>();

yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.ts

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import {
3131
type L2BlockSink,
3232
type L2BlockSource,
3333
MaliciousCommitteeAttestationsAndSigners,
34+
type ProposedCheckpointSink,
3435
type ValidateCheckpointResult,
3536
} from '@aztec/stdlib/block';
3637
import {
@@ -137,7 +138,7 @@ export class CheckpointProposalJob implements Traceable {
137138
private readonly l1ToL2MessageSource: L1ToL2MessageSource,
138139
private readonly l2BlockSource: L2BlockSource,
139140
private readonly checkpointsBuilder: FullNodeCheckpointsBuilder,
140-
private readonly blockSink: L2BlockSink,
141+
private readonly blockSink: L2BlockSink & ProposedCheckpointSink,
141142
private readonly l1Constants: SequencerRollupConstants,
142143
private readonly signatureContext: CoordinationSignatureContext,
143144
protected config: ResolvedSequencerConfig,
@@ -774,6 +775,13 @@ export class CheckpointProposalJob implements Traceable {
774775
checkpointProposalOptions,
775776
);
776777

778+
// Advance our own optimistic proposed-checkpoint tip locally before gossiping. Gossipsub
779+
// doesn't echo our own messages back, so this is how the proposer makes its own proposed
780+
// checkpoint visible for pipelining the next slot. Built from local checkpoint data — never
781+
// from the broadcast proposal archive, which may be deliberately corrupted under test flags.
782+
// Fail closed: if this throws, the outer catch aborts the slot before gossiping.
783+
await this.syncProposedCheckpointToArchiver(checkpoint, blocksInCheckpoint.length, feeAssetPriceModifier);
784+
777785
const blockProposedAt = this.dateProvider.now();
778786
if (this.config.skipBroadcastCheckpointProposal) {
779787
// Test-only: suppress the CheckpointProposal so peers never see a proposed checkpoint for
@@ -1489,6 +1497,45 @@ export class CheckpointProposalJob implements Traceable {
14891497
await this.blockSink.addBlock(block);
14901498
}
14911499

1500+
/**
1501+
* Adds the proposed checkpoint to the archiver so the proposer's optimistic proposed-checkpoint
1502+
* tip advances locally. Gossip doesn't echo our own messages back, so without this the proposer
1503+
* would never see its own proposed checkpoint and couldn't pipeline the next slot.
1504+
*
1505+
* Only runs under proposer pipelining (the proposed tip is a pipelining-only concept) and is
1506+
* skipped whenever proposed blocks aren't pushed (`skipPushProposedBlocksToArchiver`, fisherman
1507+
* mode): the archiver derives the checkpoint archive from its stored blocks, so without them the
1508+
* push would fail. All blocks were already added (and awaited) during block building, so this
1509+
* needs no retry — they are guaranteed present by the time we get here.
1510+
*/
1511+
private async syncProposedCheckpointToArchiver(
1512+
checkpoint: Checkpoint,
1513+
blockCount: number,
1514+
feeAssetPriceModifier: bigint,
1515+
): Promise<void> {
1516+
if (this.config.skipPushProposedBlocksToArchiver || this.config.fishermanMode) {
1517+
return;
1518+
}
1519+
if (!this.epochCache.isProposerPipeliningEnabled()) {
1520+
return;
1521+
}
1522+
const startBlock = BlockNumber(this.syncedToBlockNumber + 1);
1523+
this.log.debug(`Syncing proposed checkpoint ${this.checkpointNumber} to archiver`, {
1524+
checkpointNumber: this.checkpointNumber,
1525+
slot: this.targetSlot,
1526+
startBlock,
1527+
blockCount,
1528+
});
1529+
await this.blockSink.addProposedCheckpoint({
1530+
header: checkpoint.header,
1531+
checkpointNumber: this.checkpointNumber,
1532+
startBlock,
1533+
blockCount,
1534+
totalManaUsed: checkpoint.header.totalManaUsed.toBigInt(),
1535+
feeAssetPriceModifier,
1536+
});
1537+
}
1538+
14921539
/** Runs fee analysis and logs checkpoint outcome as fisherman */
14931540
private async handleCheckpointEndAsFisherman(checkpoint: Checkpoint | undefined) {
14941541
// Perform L1 fee analysis before clearing requests

0 commit comments

Comments
 (0)