diff --git a/packages/beacon-node/src/api/impl/beacon/blocks/index.ts b/packages/beacon-node/src/api/impl/beacon/blocks/index.ts index 120e99771fb3..ce4957c34d24 100644 --- a/packages/beacon-node/src/api/impl/beacon/blocks/index.ts +++ b/packages/beacon-node/src/api/impl/beacon/blocks/index.ts @@ -311,7 +311,10 @@ export function getBeaconBlockApi({ chain .processBlock(blockForImport, opts) .catch((e) => { - if (e instanceof BlockError && e.type.code === BlockErrorCode.PARENT_UNKNOWN) { + if ( + e instanceof BlockError && + (e.type.code === BlockErrorCode.PARENT_UNKNOWN || e.type.code === BlockErrorCode.PARENT_PAYLOAD_UNKNOWN) + ) { chain.emitter.emit(ChainEvent.blockUnknownParent, { blockInput: blockForImport, peer: IDENTITY_PEER_ID, diff --git a/packages/beacon-node/src/chain/blocks/verifyBlocksSanityChecks.ts b/packages/beacon-node/src/chain/blocks/verifyBlocksSanityChecks.ts index 02d686818d2a..b5efd090cbbd 100644 --- a/packages/beacon-node/src/chain/blocks/verifyBlocksSanityChecks.ts +++ b/packages/beacon-node/src/chain/blocks/verifyBlocksSanityChecks.ts @@ -90,15 +90,24 @@ export function verifyBlocksSanityChecks( } else { // When importing a block segment, only the first NON-IGNORED block must be known to the fork-choice. const parentRoot = toRootHex(block.message.parentRoot); - parentBlock = isGloasBeaconBlock(block.message) - ? chain.forkChoice.getBlockHexAndBlockHash( - parentRoot, - toRootHex(block.message.body.signedExecutionPayloadBid.message.parentBlockHash) - ) - : chain.forkChoice.getBlockHexDefaultStatus(parentRoot); - if (!parentBlock) { + const parentBlockDefaultStatus = chain.forkChoice.getBlockHexDefaultStatus(parentRoot); + if (!parentBlockDefaultStatus) { throw new BlockError(block, {code: BlockErrorCode.PARENT_UNKNOWN, parentRoot}); } + + parentBlock = parentBlockDefaultStatus; + if (isGloasBeaconBlock(block.message)) { + const parentBlockHash = toRootHex(block.message.body.signedExecutionPayloadBid.message.parentBlockHash); + const parentBlockWithPayload = chain.forkChoice.getBlockHexAndBlockHash(parentRoot, parentBlockHash); + if (!parentBlockWithPayload) { + throw new BlockError(block, { + code: BlockErrorCode.PARENT_PAYLOAD_UNKNOWN, + parentRoot, + parentBlockHash, + }); + } + parentBlock = parentBlockWithPayload; + } // Parent is known to the fork-choice parentBlockSlot = parentBlock.slot; } diff --git a/packages/beacon-node/src/metrics/metrics/lodestar.ts b/packages/beacon-node/src/metrics/metrics/lodestar.ts index 358054bdd8e9..44df4161692d 100644 --- a/packages/beacon-node/src/metrics/metrics/lodestar.ts +++ b/packages/beacon-node/src/metrics/metrics/lodestar.ts @@ -613,6 +613,10 @@ export function createLodestarMetrics( name: "lodestar_sync_unknown_block_pending_blocks_size", help: "Current size of UnknownBlockSync pending blocks cache", }), + pendingPayloads: register.gauge({ + name: "lodestar_sync_unknown_block_pending_payloads_size", + help: "Current size of UnknownBlockSync pending payloads cache", + }), knownBadBlocks: register.gauge({ name: "lodestar_sync_unknown_block_known_bad_blocks_size", help: "Current size of UnknownBlockSync known bad blocks cache", diff --git a/packages/beacon-node/src/network/processor/gossipHandlers.ts b/packages/beacon-node/src/network/processor/gossipHandlers.ts index b9607e77f9d5..3099e56edb46 100644 --- a/packages/beacon-node/src/network/processor/gossipHandlers.ts +++ b/packages/beacon-node/src/network/processor/gossipHandlers.ts @@ -198,7 +198,10 @@ function getSequentialHandlers(modules: ValidatorFnsModules, options: GossipHand } catch (e) { if (e instanceof BlockGossipError) { logger.debug("Gossip block has error", {slot, root: blockShortHex, code: e.type.code}); - if (e.type.code === BlockErrorCode.PARENT_UNKNOWN && blockInput) { + if ( + (e.type.code === BlockErrorCode.PARENT_UNKNOWN || e.type.code === BlockErrorCode.PARENT_PAYLOAD_UNKNOWN) && + blockInput + ) { chain.emitter.emit(ChainEvent.blockUnknownParent, { blockInput, peer: peerIdStr, diff --git a/packages/beacon-node/src/sync/types.ts b/packages/beacon-node/src/sync/types.ts index ca36095c5ca8..b79a8e40e91a 100644 --- a/packages/beacon-node/src/sync/types.ts +++ b/packages/beacon-node/src/sync/types.ts @@ -1,5 +1,8 @@ import {RootHex, Slot} from "@lodestar/types"; +import {SignedExecutionPayloadEnvelope} from "@lodestar/types/gloas"; +import {toRootHex} from "@lodestar/utils"; import {IBlockInput} from "../chain/blocks/blockInput/index.js"; +import {PayloadEnvelopeInput} from "../chain/blocks/payloadEnvelopeInput/payloadEnvelopeInput.js"; export enum PendingBlockType { /** @@ -26,6 +29,14 @@ export enum PendingBlockInputStatus { processing = "processing", } +export enum PendingPayloadInputStatus { + pending = "pending", + fetching = "fetching", + waitingForBlock = "waiting_for_block", + downloaded = "downloaded", + processing = "processing", +} + export type PendingBlockInput = { status: PendingBlockInputStatus; blockInput: IBlockInput; @@ -44,10 +55,47 @@ export type PendingRootHex = { export type BlockInputSyncCacheItem = PendingBlockInput | PendingRootHex; +export type PendingPayloadInput = { + status: + | PendingPayloadInputStatus.pending + | PendingPayloadInputStatus.fetching + | PendingPayloadInputStatus.downloaded + | PendingPayloadInputStatus.processing; + payloadInput: PayloadEnvelopeInput; + timeAddedSec: number; + timeSyncedSec?: number; + peerIdStrings: Set; +}; + +export type PendingPayloadRootHex = { + status: PendingPayloadInputStatus.pending | PendingPayloadInputStatus.fetching; + rootHex: RootHex; + timeAddedSec: number; + timeSyncedSec?: number; + peerIdStrings: Set; +}; + +export type PendingPayloadEnvelope = { + status: PendingPayloadInputStatus.waitingForBlock; + envelope: SignedExecutionPayloadEnvelope; + timeAddedSec: number; + peerIdStrings: Set; +}; + +export type PayloadSyncCacheItem = PendingPayloadInput | PendingPayloadRootHex | PendingPayloadEnvelope; + export function isPendingBlockInput(pending: BlockInputSyncCacheItem): pending is PendingBlockInput { return "blockInput" in pending; } +export function isPendingPayloadInput(pending: PayloadSyncCacheItem): pending is PendingPayloadInput { + return "payloadInput" in pending; +} + +export function isPendingPayloadEnvelope(pending: PayloadSyncCacheItem): pending is PendingPayloadEnvelope { + return "envelope" in pending; +} + export function getBlockInputSyncCacheItemRootHex(block: BlockInputSyncCacheItem): RootHex { return isPendingBlockInput(block) ? block.blockInput.blockRootHex : block.rootHex; } @@ -55,3 +103,27 @@ export function getBlockInputSyncCacheItemRootHex(block: BlockInputSyncCacheItem export function getBlockInputSyncCacheItemSlot(block: BlockInputSyncCacheItem): Slot | string { return isPendingBlockInput(block) ? block.blockInput.slot : "unknown"; } + +export function getPayloadSyncCacheItemRootHex(payload: PayloadSyncCacheItem): RootHex { + if (isPendingPayloadInput(payload)) { + return payload.payloadInput.blockRootHex; + } + + if (isPendingPayloadEnvelope(payload)) { + return toRootHex(payload.envelope.message.beaconBlockRoot); + } + + return payload.rootHex; +} + +export function getPayloadSyncCacheItemSlot(payload: PayloadSyncCacheItem): Slot | string { + if (isPendingPayloadInput(payload)) { + return payload.payloadInput.slot; + } + + if (isPendingPayloadEnvelope(payload)) { + return payload.envelope.message.payload.slotNumber; + } + + return "unknown"; +} diff --git a/packages/beacon-node/src/sync/unknownBlock.ts b/packages/beacon-node/src/sync/unknownBlock.ts index 4875911f28b6..215240ee7fe5 100644 --- a/packages/beacon-node/src/sync/unknownBlock.ts +++ b/packages/beacon-node/src/sync/unknownBlock.ts @@ -1,13 +1,18 @@ +import {routes} from "@lodestar/api"; import {ChainForkConfig} from "@lodestar/config"; import {ForkSeq} from "@lodestar/params"; import {RequestError, RequestErrorCode} from "@lodestar/reqresp"; import {computeTimeAtSlot} from "@lodestar/state-transition"; -import {RootHex} from "@lodestar/types"; -import {Logger, prettyPrintIndices, pruneSetToMax, sleep} from "@lodestar/utils"; +import {RootHex, gloas} from "@lodestar/types"; +import {Logger, fromHex, prettyPrintIndices, pruneSetToMax, sleep, toRootHex} from "@lodestar/utils"; import {isBlockInputBlobs, isBlockInputColumns} from "../chain/blocks/blockInput/blockInput.js"; import {BlockInputSource, IBlockInput} from "../chain/blocks/blockInput/types.js"; +import {PayloadError, PayloadErrorCode} from "../chain/blocks/importExecutionPayload.js"; +import {PayloadEnvelopeInput, PayloadEnvelopeInputSource} from "../chain/blocks/payloadEnvelopeInput/index.js"; import {BlockError, BlockErrorCode} from "../chain/errors/index.js"; import {ChainEvent, ChainEventData, IBeaconChain} from "../chain/index.js"; +import {validateGloasBlockDataColumnSidecars} from "../chain/validation/dataColumnSidecar.js"; +import {validateGossipExecutionPayloadEnvelope} from "../chain/validation/executionPayloadEnvelope.js"; import {Metrics} from "../metrics/index.js"; import {INetwork, NetworkEvent, NetworkEventData, prettyPrintPeerIdStr} from "../network/index.js"; import {PeerSyncMeta} from "../network/peers/peersData.js"; @@ -19,20 +24,37 @@ import {MAX_CONCURRENT_REQUESTS} from "./constants.js"; import {SyncOptions} from "./options.js"; import { BlockInputSyncCacheItem, + PayloadSyncCacheItem, PendingBlockInput, PendingBlockInputStatus, PendingBlockType, + PendingPayloadEnvelope, + PendingPayloadInput, + PendingPayloadInputStatus, + PendingPayloadRootHex, getBlockInputSyncCacheItemRootHex, getBlockInputSyncCacheItemSlot, + getPayloadSyncCacheItemRootHex, + getPayloadSyncCacheItemSlot, isPendingBlockInput, + isPendingPayloadEnvelope, + isPendingPayloadInput, } from "./types.js"; import {DownloadByRootError, downloadByRoot} from "./utils/downloadByRoot.js"; -import {getAllDescendantBlocks, getDescendantBlocks, getUnknownAndAncestorBlocks} from "./utils/pendingBlocksTree.js"; +import {getAllDescendantBlocks, getUnknownAndAncestorBlocks} from "./utils/pendingBlocksTree.js"; const MAX_ATTEMPTS_PER_BLOCK = 5; const MAX_KNOWN_BAD_BLOCKS = 500; const MAX_PENDING_BLOCKS = 100; +type AdvancePendingBlockResult = + | "ready" + | "queued_block" + | "queued_parent_block" + | "queued_parent_payload" + | "blocked" + | "removed"; + enum FetchResult { SuccessResolved = "success_resolved", SuccessMissingParent = "success_missing_parent", @@ -78,6 +100,8 @@ export class BlockInputSync { * block RootHex -> PendingBlock. To avoid finding same root at the same time */ private readonly pendingBlocks = new Map(); + // Payload sync is keyed by beacon block root as well, so block and payload queues can unblock each other. + private readonly pendingPayloads = new Map(); private readonly knownBadBlocks = new Set(); private readonly maxPendingBlocks; private subscribedToNetworkEvents = false; @@ -98,6 +122,9 @@ export class BlockInputSync { metrics.blockInputSync.pendingBlocks.addCollect(() => metrics.blockInputSync.pendingBlocks.set(this.pendingBlocks.size) ); + metrics.blockInputSync.pendingPayloads.addCollect(() => + metrics.blockInputSync.pendingPayloads.set(this.pendingPayloads.size) + ); metrics.blockInputSync.knownBadBlocks.addCollect(() => metrics.blockInputSync.knownBadBlocks.set(this.knownBadBlocks.size) ); @@ -114,8 +141,13 @@ export class BlockInputSync { if (!this.subscribedToNetworkEvents) { this.logger.verbose("BlockInputSync enabled."); this.chain.emitter.on(ChainEvent.unknownBlockRoot, this.onUnknownBlockRoot); + this.chain.emitter.on(ChainEvent.unknownEnvelopeBlockRoot, this.onUnknownEnvelopeBlockRoot); this.chain.emitter.on(ChainEvent.incompleteBlockInput, this.onIncompleteBlockInput); + this.chain.emitter.on(ChainEvent.incompletePayloadEnvelope, this.onIncompletePayloadEnvelope); this.chain.emitter.on(ChainEvent.blockUnknownParent, this.onUnknownParent); + this.chain.emitter.on(ChainEvent.envelopeUnknownBlock, this.onEnvelopeUnknownBlock); + this.chain.emitter.on(routes.events.EventType.block, this.onBlockImported); + this.chain.emitter.on(routes.events.EventType.executionPayload, this.onPayloadImported); this.network.events.on(NetworkEvent.peerConnected, this.onPeerConnected); this.network.events.on(NetworkEvent.peerDisconnected, this.onPeerDisconnected); this.subscribedToNetworkEvents = true; @@ -125,8 +157,13 @@ export class BlockInputSync { unsubscribeFromNetwork(): void { this.logger.verbose("BlockInputSync disabled."); this.chain.emitter.off(ChainEvent.unknownBlockRoot, this.onUnknownBlockRoot); + this.chain.emitter.off(ChainEvent.unknownEnvelopeBlockRoot, this.onUnknownEnvelopeBlockRoot); this.chain.emitter.off(ChainEvent.incompleteBlockInput, this.onIncompleteBlockInput); + this.chain.emitter.off(ChainEvent.incompletePayloadEnvelope, this.onIncompletePayloadEnvelope); this.chain.emitter.off(ChainEvent.blockUnknownParent, this.onUnknownParent); + this.chain.emitter.off(ChainEvent.envelopeUnknownBlock, this.onEnvelopeUnknownBlock); + this.chain.emitter.off(routes.events.EventType.block, this.onBlockImported); + this.chain.emitter.off(routes.events.EventType.executionPayload, this.onPayloadImported); this.network.events.off(NetworkEvent.peerConnected, this.onPeerConnected); this.network.events.off(NetworkEvent.peerDisconnected, this.onPeerDisconnected); this.subscribedToNetworkEvents = false; @@ -168,12 +205,55 @@ export class BlockInputSync { } }; + private onUnknownEnvelopeBlockRoot = (data: ChainEventData[ChainEvent.unknownEnvelopeBlockRoot]): void => { + try { + this.addByPayloadRootHex(data.rootHex, data.peer); + this.triggerUnknownBlockSearch(); + this.metrics?.blockInputSync.requests.inc({type: PendingBlockType.UNKNOWN_DATA}); + this.metrics?.blockInputSync.source.inc({source: data.source}); + } catch (e) { + this.logger.debug("Error handling unknownEnvelopeBlockRoot event", {}, e as Error); + } + }; + + private onIncompletePayloadEnvelope = (data: ChainEventData[ChainEvent.incompletePayloadEnvelope]): void => { + try { + this.addByPayloadInput(data.payloadInput, data.peer); + this.triggerUnknownBlockSearch(); + this.metrics?.blockInputSync.requests.inc({type: PendingBlockType.UNKNOWN_DATA}); + this.metrics?.blockInputSync.source.inc({source: data.source}); + } catch (e) { + this.logger.debug("Error handling incompletePayloadEnvelope event", {}, e as Error); + } + }; + /** * Process an unknownBlockParent event and register the block in `pendingBlocks` Map. */ private onUnknownParent = (data: ChainEventData[ChainEvent.blockUnknownParent]): void => { try { - this.addByRootHex(data.blockInput.parentRootHex, data.peer); + const missingDependency = this.getMissingBlockDependency(data.blockInput); + if (missingDependency.kind === "invalidParentPayload") { + this.addByBlockInput(data.blockInput, data.peer); + + const pendingBlock = this.pendingBlocks.get(data.blockInput.blockRootHex); + if (pendingBlock && isPendingBlockInput(pendingBlock)) { + this.logger.debug("Ignoring block with conflicting parent payload hash", { + slot: pendingBlock.blockInput.slot, + root: pendingBlock.blockInput.blockRootHex, + parentRoot: missingDependency.parentRootHex, + parentBlockHash: missingDependency.parentBlockHashHex, + }); + this.removeAndDownScoreAllDescendants(pendingBlock); + } + return; + } + + if (missingDependency.kind === "parentPayload") { + this.addByPayloadRootHex(missingDependency.rootHex, data.peer); + } else if (missingDependency.kind === "parentBlock") { + this.addByRootHex(missingDependency.rootHex, data.peer); + } this.addByBlockInput(data.blockInput, data.peer); this.triggerUnknownBlockSearch(); this.metrics?.blockInputSync.requests.inc({type: PendingBlockType.UNKNOWN_PARENT}); @@ -183,8 +263,35 @@ export class BlockInputSync { } }; - private addByRootHex = (rootHex: RootHex, peerIdStr?: PeerIdStr): void => { + private onEnvelopeUnknownBlock = (data: ChainEventData[ChainEvent.envelopeUnknownBlock]): void => { + try { + const blockRootHex = toRootHex(data.envelope.message.beaconBlockRoot); + this.addByRootHex(blockRootHex, data.peer); + this.addByPayloadEnvelope(data.envelope, data.peer); + this.triggerUnknownBlockSearch(); + this.metrics?.blockInputSync.requests.inc({type: PendingBlockType.UNKNOWN_DATA}); + this.metrics?.blockInputSync.source.inc({source: data.source}); + } catch (e) { + this.logger.debug("Error handling envelopeUnknownBlock event", {}, e as Error); + } + }; + + private onBlockImported = (): void => { + if (this.pendingPayloads.size > 0) { + this.triggerUnknownBlockSearch(); + } + }; + + private onPayloadImported = ({ + blockRoot, + }: routes.events.EventData[routes.events.EventType.executionPayload]): void => { + this.pendingPayloads.delete(blockRoot); + this.triggerUnknownBlockSearch(); + }; + + private addByRootHex = (rootHex: RootHex, peerIdStr?: PeerIdStr): boolean => { let pendingBlock = this.pendingBlocks.get(rootHex); + let added = false; if (!pendingBlock) { pendingBlock = { status: PendingBlockInputStatus.pending, @@ -193,6 +300,7 @@ export class BlockInputSync { timeAddedSec: Date.now() / 1000, }; this.pendingBlocks.set(rootHex, pendingBlock); + added = true; this.logger.verbose("Added new rootHex to BlockInputSync.pendingBlocks", { root: pendingBlock.rootHex, @@ -210,6 +318,7 @@ export class BlockInputSync { if (prunedItemCount > 0) { this.logger.verbose(`Pruned ${prunedItemCount} items from BlockInputSync.pendingBlocks`); } + return added; }; private addByBlockInput = (blockInput: IBlockInput, peerIdStr?: string): void => { @@ -242,6 +351,93 @@ export class BlockInputSync { } }; + private addByPayloadRootHex = (rootHex: RootHex, peerIdStr?: PeerIdStr): boolean => { + let pendingPayload = this.pendingPayloads.get(rootHex); + let added = false; + if (!pendingPayload) { + pendingPayload = { + status: PendingPayloadInputStatus.pending, + rootHex, + peerIdStrings: new Set(), + timeAddedSec: Date.now() / 1000, + }; + this.pendingPayloads.set(rootHex, pendingPayload); + added = true; + + this.logger.verbose("Added new payload rootHex to BlockInputSync.pendingPayloads", { + root: rootHex, + peerIdStr: peerIdStr ?? "unknown peer", + }); + } + + if (peerIdStr) { + pendingPayload.peerIdStrings.add(peerIdStr); + } + + const prunedItemCount = pruneSetToMax(this.pendingPayloads, this.maxPendingBlocks); + if (prunedItemCount > 0) { + this.logger.verbose(`Pruned ${prunedItemCount} items from BlockInputSync.pendingPayloads`); + } + return added; + }; + + private addByPayloadEnvelope = (envelope: gloas.SignedExecutionPayloadEnvelope, peerIdStr?: PeerIdStr): void => { + const rootHex = toRootHex(envelope.message.beaconBlockRoot); + const existingPendingPayload = this.pendingPayloads.get(rootHex); + let pendingPayload = this.pendingPayloads.get(rootHex); + if (!pendingPayload || !isPendingPayloadEnvelope(pendingPayload)) { + pendingPayload = { + status: PendingPayloadInputStatus.waitingForBlock, + envelope, + peerIdStrings: new Set(existingPendingPayload?.peerIdStrings ?? []), + timeAddedSec: existingPendingPayload?.timeAddedSec ?? Date.now() / 1000, + }; + this.pendingPayloads.set(rootHex, pendingPayload); + + this.logger.verbose("Added payload envelope to BlockInputSync.pendingPayloads", { + slot: envelope.message.payload.slotNumber, + root: rootHex, + }); + } else { + this.logger.debug("Overwriting pending payload envelope for root already waiting for block", { + slot: envelope.message.payload.slotNumber, + root: rootHex, + }); + pendingPayload.envelope = envelope; + } + + if (peerIdStr) { + pendingPayload.peerIdStrings.add(peerIdStr); + } + + const prunedItemCount = pruneSetToMax(this.pendingPayloads, this.maxPendingBlocks); + if (prunedItemCount > 0) { + this.logger.verbose(`Pruned ${prunedItemCount} items from BlockInputSync.pendingPayloads`); + } + }; + + private addByPayloadInput = ( + payloadInput: PayloadEnvelopeInput, + peerIdStr?: PeerIdStr, + envelope?: gloas.SignedExecutionPayloadEnvelope + ): void => { + const pendingPayload = this.toPendingPayloadInput( + payloadInput, + this.pendingPayloads.get(payloadInput.blockRootHex), + envelope + ); + + if (peerIdStr) { + pendingPayload.peerIdStrings.add(peerIdStr); + } + + this.pendingPayloads.set(payloadInput.blockRootHex, pendingPayload); + const prunedItemCount = pruneSetToMax(this.pendingPayloads, this.maxPendingBlocks); + if (prunedItemCount > 0) { + this.logger.verbose(`Pruned ${prunedItemCount} items from BlockInputSync.pendingPayloads`); + } + }; + private onPeerConnected = (data: NetworkEventData[NetworkEvent.peerConnected]): void => { try { const peerId = data.peer; @@ -258,52 +454,207 @@ export class BlockInputSync { this.peerBalancer.onPeerDisconnected(peerId); }; + /** + * Post-gloas, a locally complete block can still be blocked on its parent's execution payload lineage. + * Distinguish which dependency is missing so the scheduler can enqueue the right follow-up work. + */ + private getMissingBlockDependency( + blockInput: IBlockInput + ): + | {kind: "ready"} + | {kind: "block" | "parentBlock" | "parentPayload"; rootHex: RootHex} + | {kind: "invalidParentPayload"; parentRootHex: RootHex; parentBlockHashHex: RootHex} { + const parentRootHex = blockInput.parentRootHex; + if (!this.chain.forkChoice.hasBlockHex(parentRootHex)) { + return {kind: "parentBlock", rootHex: parentRootHex}; + } + + if (!blockInput.hasBlock()) { + return {kind: "block", rootHex: blockInput.blockRootHex}; + } + + if (this.config.getForkSeq(blockInput.slot) < ForkSeq.gloas) { + return {kind: "ready"}; + } + + const block = blockInput.getBlock() as gloas.SignedBeaconBlock; + const parentBlockHashHex = toRootHex(block.message.body.signedExecutionPayloadBid.message.parentBlockHash); + if (this.chain.forkChoice.getBlockHexAndBlockHash(parentRootHex, parentBlockHashHex) !== null) { + return {kind: "ready"}; + } + + if (this.chain.forkChoice.hasPayloadHexUnsafe(parentRootHex)) { + return {kind: "invalidParentPayload", parentRootHex, parentBlockHashHex}; + } + + const parentPayloadInput = this.chain.seenPayloadEnvelopeInputCache.get(parentRootHex); + if (parentPayloadInput) { + if (parentPayloadInput.getBlockHashHex() === parentBlockHashHex) { + return {kind: "parentPayload", rootHex: parentRootHex}; + } + + return {kind: "invalidParentPayload", parentRootHex, parentBlockHashHex}; + } + + return {kind: "parentPayload", rootHex: parentRootHex}; + } + + private advancePendingBlock(pendingBlock: PendingBlockInput): AdvancePendingBlockResult { + const missingDependency = this.getMissingBlockDependency(pendingBlock.blockInput); + + switch (missingDependency.kind) { + case "ready": + return "ready"; + + case "block": + pendingBlock.status = PendingBlockInputStatus.pending; + return "queued_block"; + + case "parentBlock": { + let added = this.addByRootHex(missingDependency.rootHex); + for (const peerIdStr of pendingBlock.peerIdStrings) { + added = this.addByRootHex(missingDependency.rootHex, peerIdStr) || added; + } + return added ? "queued_parent_block" : "blocked"; + } + + case "parentPayload": { + let added = this.addByPayloadRootHex(missingDependency.rootHex); + for (const peerIdStr of pendingBlock.peerIdStrings) { + added = this.addByPayloadRootHex(missingDependency.rootHex, peerIdStr) || added; + } + return added ? "queued_parent_payload" : "blocked"; + } + + case "invalidParentPayload": + this.logger.debug("Removing block with conflicting parent payload hash", { + slot: pendingBlock.blockInput.slot, + root: pendingBlock.blockInput.blockRootHex, + parentRoot: missingDependency.parentRootHex, + parentBlockHash: missingDependency.parentBlockHashHex, + }); + this.removeAndDownScoreAllDescendants(pendingBlock); + return "removed"; + } + } + + private toPendingPayloadInput( + payloadInput: PayloadEnvelopeInput, + previous?: PayloadSyncCacheItem, + envelope?: gloas.SignedExecutionPayloadEnvelope + ): PendingPayloadInput { + // Normalize every payload queueing path into the same cache shape while preserving first-seen + // timing and peer provenance from any earlier by-root or envelope-only entry. + const queuedEnvelope = envelope ?? (previous && isPendingPayloadEnvelope(previous) ? previous.envelope : undefined); + + if (queuedEnvelope && !payloadInput.hasPayloadEnvelope()) { + payloadInput.addPayloadEnvelope({ + envelope: queuedEnvelope, + source: PayloadEnvelopeInputSource.byRoot, + seenTimestampSec: Date.now() / 1000, + }); + } + + return { + status: payloadInput.isComplete() ? PendingPayloadInputStatus.downloaded : PendingPayloadInputStatus.pending, + payloadInput, + timeAddedSec: previous?.timeAddedSec ?? Date.now() / 1000, + timeSyncedSec: payloadInput.isComplete() ? Date.now() / 1000 : undefined, + peerIdStrings: new Set(previous?.peerIdStrings ?? []), + }; + } + /** * Gather tip parent blocks with unknown parent and do a search for all of them */ private triggerUnknownBlockSearch = (): void => { // Cheap early stop to prevent calling the network.getConnectedPeers() - if (this.pendingBlocks.size === 0) { + if (this.pendingBlocks.size === 0 && this.pendingPayloads.size === 0) { return; } - // If the node loses all peers with pending unknown blocks, the sync will stall + // If the node loses all peers with pending unknown blocks or payloads, the sync will stall const connectedPeers = this.network.getConnectedPeers(); - if (connectedPeers.length === 0) { - this.logger.debug("No connected peers, skipping unknown block search."); - return; - } + const hasConnectedPeers = connectedPeers.length > 0; const {unknowns, ancestors} = getUnknownAndAncestorBlocks(this.pendingBlocks); - // it's rare when there is no unknown block - // see https://github.com/ChainSafe/lodestar/issues/5649#issuecomment-1594213550 - if (unknowns.length === 0) { - let processedBlocks = 0; - - for (const block of ancestors) { - // when this happens, it's likely the block and parent block are processed by head sync - if (this.chain.forkChoice.hasBlockHex(block.blockInput.parentRootHex)) { + let processedBlocks = 0; + let shouldRerunBlockSearch = false; + + for (const block of ancestors) { + const advanceResult = this.advancePendingBlock(block); + switch (advanceResult) { + case "ready": processedBlocks++; - this.processBlock(block).catch((e) => { + this.processReadyBlock(block).catch((e) => { this.logger.debug("Unexpected error - process old downloaded block", {}, e); }); - } + break; + + case "queued_block": + case "queued_parent_block": + shouldRerunBlockSearch = true; + break; + + case "queued_parent_payload": + case "blocked": + case "removed": + break; } + } + if (unknowns.length > 0) { + if (!hasConnectedPeers) { + this.logger.debug("No connected peers, skipping unknown block download."); + } else { + // Most of the time there is exactly 1 unknown block + for (const block of unknowns) { + this.downloadBlock(block).catch((e) => { + this.logger.debug("Unexpected error - downloadBlock", {root: getBlockInputSyncCacheItemRootHex(block)}, e); + }); + } + } + } else if (ancestors.length > 0) { + // It's rare when there is no unknown block + // see https://github.com/ChainSafe/lodestar/issues/5649#issuecomment-1594213550 this.logger.verbose("No unknown block, process ancestor downloaded blocks", { pendingBlocks: this.pendingBlocks.size, ancestorBlocks: ancestors.length, processedBlocks, }); - return; } - // most of the time there is exactly 1 unknown block - for (const block of unknowns) { - this.downloadBlock(block).catch((e) => { - this.logger.debug("Unexpected error - downloadBlock", {root: getBlockInputSyncCacheItemRootHex(block)}, e); + // Blocks can unblock payloads and payloads can unblock blocks, so every scheduler pass services both queues. + for (const payload of Array.from(this.pendingPayloads.values())) { + if (isPendingPayloadInput(payload) && payload.status === PendingPayloadInputStatus.downloaded) { + this.processPayload(payload).catch((e) => { + this.logger.debug("Unexpected error - process downloaded payload", {}, e); + }); + continue; + } + + if (isPendingPayloadEnvelope(payload)) { + this.reconcilePayloadEnvelope(payload).catch((e) => { + this.logger.debug("Unexpected error - reconcile pending payload envelope", {}, e); + }); + continue; + } + + if (!hasConnectedPeers) { + this.logger.debug("No connected peers, skipping unknown payload download.", { + root: getPayloadSyncCacheItemRootHex(payload), + }); + continue; + } + + this.downloadPayload(payload).catch((e) => { + this.logger.debug("Unexpected error - downloadPayload", {root: getPayloadSyncCacheItemRootHex(payload)}, e); }); } + + if (shouldRerunBlockSearch) { + this.triggerUnknownBlockSearch(); + } }; private async downloadBlock(block: BlockInputSyncCacheItem): Promise { @@ -342,10 +693,26 @@ export class BlockInputSync { this.logger.verbose("Downloaded unknown block", logCtx2); if (parentInForkChoice) { - // Bingo! Process block. Add to pending blocks anyway for recycle the cache that prevents duplicate processing - this.processBlock(pending).catch((e) => { - this.logger.debug("Unexpected error - process newly downloaded block", logCtx2, e); - }); + // If the direct parent is already in fork choice, let the block state machine decide if + // the next step is block import, parent payload download, or branch removal. + const advanceResult = this.advancePendingBlock(pending); + switch (advanceResult) { + case "ready": + this.processReadyBlock(pending).catch((e) => { + this.logger.debug("Unexpected error - process newly downloaded block", logCtx2, e); + }); + break; + + case "queued_block": + case "queued_parent_block": + case "queued_parent_payload": + this.triggerUnknownBlockSearch(); + break; + + case "blocked": + case "removed": + break; + } } else if (blockSlot <= finalizedSlot) { // the common ancestor of the downloading chain and canonical chain should be at least the finalized slot and // we should found it through forkchoice. If not, we should penalize all peers sending us this block chain @@ -368,26 +735,11 @@ export class BlockInputSync { } /** - * Send block to the processor awaiting completition. If processed successfully, send all children to the processor. - * On error, remove and downscore all descendants. - * This function could run recursively for all descendant blocks + * Import a block that has already passed the local dependency checks in BlockInputSync. + * On error, remove and downscore descendants as appropriate for the failure type. */ - private async processBlock(pendingBlock: PendingBlockInput): Promise { - // pending block status is `downloaded` right after `downloadBlock` - // but could be `pending` if added by `onUnknownBlockParent` event and this function is called recursively + private async processReadyBlock(pendingBlock: PendingBlockInput): Promise { if (pendingBlock.status !== PendingBlockInputStatus.downloaded) { - if (pendingBlock.status === PendingBlockInputStatus.pending) { - const connectedPeers = this.network.getConnectedPeers(); - if (connectedPeers.length === 0) { - this.logger.debug("No connected peers, skipping download block", { - slot: pendingBlock.blockInput.slot, - blockRoot: pendingBlock.blockInput.blockRootHex, - }); - return; - } - // if the download is a success we'll call `processBlock()` for this block - await this.downloadBlock(pendingBlock); - } return; } @@ -432,15 +784,9 @@ export class BlockInputSync { if (!res.err) { // no need to update status to "processed", delete anyway this.pendingBlocks.delete(pendingBlock.blockInput.blockRootHex); - - // Send child blocks to the processor - for (const descendantBlock of getDescendantBlocks(pendingBlock.blockInput.blockRootHex, this.pendingBlocks)) { - if (isPendingBlockInput(descendantBlock)) { - this.processBlock(descendantBlock).catch((e) => { - this.logger.debug("Unexpected error - process descendant block", {}, e); - }); - } - } + // Re-enter the scheduler so descendants blocked on either parent blocks or parent payloads + // are advanced through the same dependency checks as every other pending item. + this.triggerUnknownBlockSearch(); } else { const errorData = {slot: pendingBlock.blockInput.slot, root: pendingBlock.blockInput.blockRootHex}; if (res.err instanceof BlockError) { @@ -456,6 +802,19 @@ export class BlockInputSync { pendingBlock.status = PendingBlockInputStatus.downloaded; break; + case BlockErrorCode.PARENT_PAYLOAD_UNKNOWN: + this.logger.error( + "processReadyBlock() hit unexpected parent payload dependency after readiness checks", + { + ...errorData, + parentRoot: pendingBlock.blockInput.parentRootHex, + parentBlockHash: res.err.type.parentBlockHash, + }, + res.err + ); + pendingBlock.status = PendingBlockInputStatus.downloaded; + break; + case BlockErrorCode.EXECUTION_ENGINE_ERROR: // Removing the block(s) without penalizing the peers, hoping for EL to // recover on a latter download + verify attempt @@ -477,6 +836,375 @@ export class BlockInputSync { } } + /** + * Reconcile an envelope-first payload entry once the block import path has seeded its + * PayloadEnvelopeInput. This may queue block download, validate the speculative envelope, or + * downgrade back to by-root fetching when the cached envelope does not match the imported block. + */ + private async reconcilePayloadEnvelope(pendingPayload: PendingPayloadEnvelope): Promise { + const rootHex = getPayloadSyncCacheItemRootHex(pendingPayload); + if (this.chain.forkChoice.hasPayloadHexUnsafe(rootHex)) { + this.pendingPayloads.delete(rootHex); + return; + } + + const payloadInput = this.chain.seenPayloadEnvelopeInputCache.get(rootHex); + if (!payloadInput) { + if (!this.chain.forkChoice.hasBlockHex(rootHex)) { + // Column commitments live on the block body, so an envelope-only entry has to pull the block first. + if (!this.pendingBlocks.has(rootHex)) { + this.addByRootHex(rootHex); + } + + const pendingBlock = this.pendingBlocks.get(rootHex); + if (pendingBlock && this.network.getConnectedPeers().length > 0) { + await this.downloadBlock(pendingBlock); + } + } else { + this.logger.debug("Missing PayloadEnvelopeInput for known block while reconciling payload envelope", { + root: rootHex, + }); + } + return; + } + + if (!payloadInput.hasPayloadEnvelope()) { + const validationResult = await wrapError( + validateGossipExecutionPayloadEnvelope(this.chain, pendingPayload.envelope) + ); + if (validationResult.err) { + this.logger.debug( + "Pending payload envelope failed validation after block import, refetching by root", + {slot: pendingPayload.envelope.message.payload.slotNumber, root: rootHex}, + validationResult.err + ); + + const pendingPayloadByRoot: PendingPayloadRootHex = { + status: PendingPayloadInputStatus.pending, + rootHex, + timeAddedSec: pendingPayload.timeAddedSec, + peerIdStrings: new Set(pendingPayload.peerIdStrings), + }; + this.pendingPayloads.set(rootHex, pendingPayloadByRoot); + + if (this.network.getConnectedPeers().length > 0) { + await this.downloadPayload(pendingPayloadByRoot); + } + return; + } + } + + const upgradedPayload = this.toPendingPayloadInput(payloadInput, pendingPayload, pendingPayload.envelope); + this.pendingPayloads.set(rootHex, upgradedPayload); + + if (upgradedPayload.status === PendingPayloadInputStatus.downloaded) { + await this.processPayload(upgradedPayload); + return; + } + + await this.downloadPayload(upgradedPayload); + } + + private async downloadPayload(payload: PayloadSyncCacheItem): Promise { + if (isPendingPayloadEnvelope(payload)) { + await this.reconcilePayloadEnvelope(payload); + return; + } + + const rootHex = getPayloadSyncCacheItemRootHex(payload); + if (this.chain.forkChoice.hasPayloadHexUnsafe(rootHex)) { + this.pendingPayloads.delete(rootHex); + return; + } + + if (payload.status !== PendingPayloadInputStatus.pending) { + return; + } + + const logCtx = { + slot: getPayloadSyncCacheItemSlot(payload), + root: rootHex, + pendingPayloads: this.pendingPayloads.size, + }; + + this.logger.verbose("BlockInputSync.downloadPayload()", logCtx); + + payload.status = PendingPayloadInputStatus.fetching; + + const res = await wrapError(this.fetchPayloadInput(payload)); + if (!res.err) { + const pendingPayload = res.result; + this.pendingPayloads.set(getPayloadSyncCacheItemRootHex(pendingPayload), pendingPayload); + + if (isPendingPayloadEnvelope(pendingPayload)) { + await this.reconcilePayloadEnvelope(pendingPayload); + } else if (pendingPayload.status === PendingPayloadInputStatus.downloaded) { + await this.processPayload(pendingPayload); + } + return; + } + + this.logger.debug("Ignoring unknown payload root after failed download", logCtx, res.err); + if (!isPendingPayloadEnvelope(payload)) { + payload.status = PendingPayloadInputStatus.pending; + } + } + + private async processPayload(pendingPayload: PendingPayloadInput): Promise { + const rootHex = pendingPayload.payloadInput.blockRootHex; + const logCtx = {slot: pendingPayload.payloadInput.slot, root: rootHex}; + + if (pendingPayload.status !== PendingPayloadInputStatus.downloaded) { + this.logger.debug("Skipping payload processing before payload input is downloaded", { + ...logCtx, + status: pendingPayload.status, + }); + return; + } + + if (this.chain.forkChoice.hasPayloadHexUnsafe(rootHex)) { + this.logger.debug("Payload already imported while processing unknown payload", logCtx); + this.pendingPayloads.delete(rootHex); + return; + } + + if (!this.chain.forkChoice.hasBlockHex(rootHex)) { + this.logger.debug("Payload input is ready before its block is in fork choice", logCtx); + const added = this.addByRootHex(rootHex); + pendingPayload.status = PendingPayloadInputStatus.downloaded; + if (added) { + this.triggerUnknownBlockSearch(); + } + return; + } + + pendingPayload.status = PendingPayloadInputStatus.processing; + + const res = await wrapError(this.chain.processExecutionPayload(pendingPayload.payloadInput)); + if (!res.err) { + this.logger.debug("Processed payload from unknown sync", logCtx); + this.pendingPayloads.delete(rootHex); + this.triggerUnknownBlockSearch(); + return; + } + + if (res.err instanceof PayloadError) { + switch (res.err.type.code) { + case PayloadErrorCode.BLOCK_NOT_IN_FORK_CHOICE: + // Payload sync discovered the block dependency before the block queue did. Re-enqueue the + // block and keep the payload ready so the scheduler can retry once the block reaches fork choice. + if (this.addByRootHex(rootHex)) { + this.triggerUnknownBlockSearch(); + } + // Keep the payload out of any synchronous requeue pass; a later scheduler pass will retry it. + pendingPayload.status = PendingPayloadInputStatus.downloaded; + break; + + case PayloadErrorCode.EXECUTION_ENGINE_ERROR: + this.logger.debug("Execution engine error while processing payload from unknown sync", logCtx, res.err); + pendingPayload.status = PendingPayloadInputStatus.downloaded; + break; + + case PayloadErrorCode.EXECUTION_ENGINE_INVALID: + case PayloadErrorCode.ENVELOPE_VERIFICATION_ERROR: + case PayloadErrorCode.INVALID_SIGNATURE: + // TODO GLOAS: Decide how invalid payload inputs should eventually leave memory without + // reintroducing envelope replacement / recreation flows. + this.logger.debug("Error processing payload from unknown sync", logCtx, res.err); + this.removePendingPayloadAndDescendants(rootHex); + break; + + default: + this.logger.debug("Error processing payload from unknown sync", logCtx, res.err); + this.pendingPayloads.delete(rootHex); + } + return; + } + + this.logger.debug("Unknown error processing payload from unknown sync", logCtx, res.err); + pendingPayload.status = PendingPayloadInputStatus.downloaded; + } + + /** + * Download payload material keyed by beacon block root. Unlike block download, payload sync may + * already have a locally cached envelope or partial columns, so each attempt starts from local state + * and only asks peers for the remaining pieces. + */ + private async fetchPayloadInput( + cacheItem: PendingPayloadInput | PendingPayloadRootHex + ): Promise { + const rootHex = getPayloadSyncCacheItemRootHex(cacheItem); + const blockRoot = fromHex(rootHex); + const excludedPeers = new Set(); + + let slot = getPayloadSyncCacheItemSlot(cacheItem); + let payloadInput = isPendingPayloadInput(cacheItem) + ? cacheItem.payloadInput + : this.chain.seenPayloadEnvelopeInputCache.get(rootHex); + let envelope = payloadInput?.hasPayloadEnvelope() ? payloadInput.getPayloadEnvelope() : undefined; + + let i = 0; + while (i++ < this.getMaxDownloadAttempts()) { + const pendingColumns = payloadInput?.hasAllData() + ? new Set() + : new Set(payloadInput?.getMissingSampledColumnMeta().missing ?? []); + const peerMeta = this.peerBalancer.bestPeerForPendingColumns(pendingColumns, excludedPeers); + if (peerMeta === null) { + throw Error( + `Error fetching payload by root slot=${slot} root=${rootHex} after ${i}: cannot find peer with needed columns=${prettyPrintIndices(Array.from(pendingColumns))}` + ); + } + + const {peerId, client: peerClient} = peerMeta; + cacheItem.peerIdStrings.add(peerId); + + try { + if (!envelope) { + envelope = await this.fetchExecutionPayloadEnvelope(peerId, blockRoot, rootHex); + slot = envelope.message.payload.slotNumber; + } + + payloadInput ??= this.chain.seenPayloadEnvelopeInputCache.get(rootHex); + if (!payloadInput) { + if (this.chain.forkChoice.hasBlockHex(rootHex)) { + throw new Error(`Missing PayloadEnvelopeInput for known block ${rootHex}`); + } + // Keep the validated envelope around, but wait for the block body before turning it into a full payload input. + return { + status: PendingPayloadInputStatus.waitingForBlock, + envelope, + timeAddedSec: cacheItem.timeAddedSec, + peerIdStrings: cacheItem.peerIdStrings, + }; + } + + if (!payloadInput.hasPayloadEnvelope()) { + await validateGossipExecutionPayloadEnvelope(this.chain, envelope); + } + + let pendingPayload = this.toPendingPayloadInput(payloadInput, cacheItem, envelope); + if (!pendingPayload.payloadInput.hasAllData()) { + const missing = pendingPayload.payloadInput.getMissingSampledColumnMeta().missing; + if (missing.length > 0) { + const columnSidecars = await this.fetchPayloadColumns(peerMeta, pendingPayload.payloadInput, missing); + const seenTimestampSec = Date.now() / 1000; + for (const columnSidecar of columnSidecars) { + if (pendingPayload.payloadInput.hasColumn(columnSidecar.index)) { + continue; + } + + pendingPayload.payloadInput.addColumn({ + columnSidecar, + source: PayloadEnvelopeInputSource.byRoot, + seenTimestampSec, + peerIdStr: peerId, + }); + } + pendingPayload = this.toPendingPayloadInput(pendingPayload.payloadInput, pendingPayload); + } + } + + this.logger.verbose("BlockInputSync.fetchPayloadInput: successful download", { + slot, + rootHex, + peerId, + peerClient, + hasPayload: pendingPayload.payloadInput.hasPayloadEnvelope(), + hasAllData: pendingPayload.payloadInput.hasAllData(), + }); + + if (pendingPayload.status === PendingPayloadInputStatus.downloaded) { + return pendingPayload; + } + + cacheItem = pendingPayload; + payloadInput = pendingPayload.payloadInput; + } catch (e) { + this.logger.debug( + "Error downloading payload in BlockInputSync.fetchPayloadInput", + {slot, rootHex, attempt: i, peer: peerId, peerClient}, + e as Error + ); + + if (e instanceof RequestError) { + switch (e.type.code) { + case RequestErrorCode.REQUEST_RATE_LIMITED: + case RequestErrorCode.REQUEST_TIMEOUT: + break; + default: + excludedPeers.add(peerId); + break; + } + } else { + excludedPeers.add(peerId); + } + } finally { + this.peerBalancer.onRequestCompleted(peerId); + } + } + + throw Error(`Error fetching payload with slot=${slot} root=${rootHex} after ${i - 1} attempts.`); + } + + private async fetchExecutionPayloadEnvelope( + peerIdStr: PeerIdStr, + blockRoot: Uint8Array, + rootHex: RootHex + ): Promise { + const response = await this.network.sendExecutionPayloadEnvelopesByRoot(peerIdStr, [blockRoot]); + const envelope = response.at(0); + if (!envelope) { + throw new Error(`Missing execution payload envelope for root=${rootHex}`); + } + + const receivedRootHex = toRootHex(envelope.message.beaconBlockRoot); + if (receivedRootHex !== rootHex) { + throw new Error(`Execution payload envelope root mismatch requested=${rootHex} received=${receivedRootHex}`); + } + + return envelope; + } + + private async fetchPayloadColumns( + peerMeta: PeerSyncMeta, + payloadInput: PayloadEnvelopeInput, + missing: number[] + ): Promise { + const {peerId: peerIdStr} = peerMeta; + const peerColumns = new Set(peerMeta.custodyColumns ?? []); + const requestedColumns = missing.filter((columnIndex) => peerColumns.has(columnIndex)); + if (requestedColumns.length === 0) { + return []; + } + + const columnSidecars = (await this.network.sendDataColumnSidecarsByRoot(peerIdStr, [ + {blockRoot: fromHex(payloadInput.blockRootHex), columns: requestedColumns}, + ])) as gloas.DataColumnSidecar[]; + + if (columnSidecars.length === 0) { + throw new Error(`No data column sidecars returned for payload root=${payloadInput.blockRootHex}`); + } + + const requestedColumnsSet = new Set(requestedColumns); + const extraColumns = columnSidecars.filter((columnSidecar) => !requestedColumnsSet.has(columnSidecar.index)); + if (extraColumns.length > 0) { + throw new Error( + `Received unexpected payload data columns indices=${prettyPrintIndices(extraColumns.map((column) => column.index))}` + ); + } + + // PayloadEnvelopeInput already carries the block slot, root, and commitments, so reuse the + // block-based Gloas validator rather than maintaining a second payload-specific variant. + await validateGloasBlockDataColumnSidecars( + payloadInput.slot, + fromHex(payloadInput.blockRootHex), + payloadInput.getBlobKzgCommitments(), + columnSidecars, + this.chain.metrics?.peerDas + ); + return columnSidecars; + } + /** * From a set of shuffled peers: * - fetch the block @@ -660,6 +1388,28 @@ export class BlockInputSync { pruneSetToMax(this.knownBadBlocks, MAX_KNOWN_BAD_BLOCKS); } + // Once a parent payload is invalid, every descendant waiting on that payload lineage becomes unrecoverable too. + private removePendingPayloadAndDescendants(rootHex: RootHex): void { + // Keep PayloadEnvelopeInput resident in the seen cache. importBlock() owns that object and + // later validation/finalization logic decides when it can leave memory. + this.pendingPayloads.delete(rootHex); + + const badPendingBlocks = getAllDescendantBlocks(rootHex, this.pendingBlocks); + this.metrics?.blockInputSync.removedBlocks.inc(badPendingBlocks.length); + + for (const block of badPendingBlocks) { + const descendantRootHex = getBlockInputSyncCacheItemRootHex(block); + this.pendingBlocks.delete(descendantRootHex); + this.pendingPayloads.delete(descendantRootHex); + this.chain.seenBlockInputCache.prune(descendantRootHex); + this.logger.debug("Removing pending descendant after invalid parent payload", { + slot: getBlockInputSyncCacheItemSlot(block), + blockRoot: descendantRootHex, + parentPayloadRoot: rootHex, + }); + } + } + private removeAllDescendants(block: BlockInputSyncCacheItem): BlockInputSyncCacheItem[] { const rootHex = getBlockInputSyncCacheItemRootHex(block); const slot = getBlockInputSyncCacheItemSlot(block); @@ -671,7 +1421,10 @@ export class BlockInputSync { for (const block of badPendingBlocks) { const rootHex = getBlockInputSyncCacheItemRootHex(block); this.pendingBlocks.delete(rootHex); + this.pendingPayloads.delete(rootHex); this.chain.seenBlockInputCache.prune(rootHex); + // Keep PayloadEnvelopeInput resident in the seen cache for consistency with the + // importBlock()-owned lifecycle. this.logger.debug("Removing bad/unknown/incomplete BlockInputSyncCacheItem", { slot, blockRoot: rootHex, diff --git a/packages/beacon-node/src/sync/utils/downloadByRoot.ts b/packages/beacon-node/src/sync/utils/downloadByRoot.ts index 84c6388d79c7..c6a4def5bb1d 100644 --- a/packages/beacon-node/src/sync/utils/downloadByRoot.ts +++ b/packages/beacon-node/src/sync/utils/downloadByRoot.ts @@ -1,6 +1,13 @@ import {routes} from "@lodestar/api"; import {ChainForkConfig} from "@lodestar/config"; -import {ForkPostDeneb, ForkPostFulu, ForkPreFulu, isForkPostDeneb, isForkPostFulu} from "@lodestar/params"; +import { + ForkPostDeneb, + ForkPostFulu, + ForkPreFulu, + isForkPostDeneb, + isForkPostFulu, + isForkPostGloas, +} from "@lodestar/params"; import {BlobIndex, ColumnIndex, SignedBeaconBlock, Slot, deneb, fulu} from "@lodestar/types"; import {LodestarError, byteArrayEquals, fromHex, prettyPrintIndices, toHex, toRootHex} from "@lodestar/utils"; import {isBlockInputBlobs, isBlockInputColumns} from "../../chain/blocks/blockInput/blockInput.js"; @@ -263,7 +270,10 @@ export async function fetchByRoot({ blockRoot, }); const forkName = config.getForkName(block.message.slot); - if (isForkPostFulu(forkName)) { + if (isForkPostGloas(forkName)) { + // Post-gloas block sync only needs the block body. Payload columns stay on the + // payload/envelope path and are queued independently in the network processor. + } else if (isForkPostFulu(forkName)) { columnSidecarResult = await fetchAndValidateColumns({ config, chain, diff --git a/packages/beacon-node/src/sync/utils/pendingBlocksTree.ts b/packages/beacon-node/src/sync/utils/pendingBlocksTree.ts index 04c4d1346a3c..bc0bfa804e31 100644 --- a/packages/beacon-node/src/sync/utils/pendingBlocksTree.ts +++ b/packages/beacon-node/src/sync/utils/pendingBlocksTree.ts @@ -40,21 +40,6 @@ function addToDescendantBlocks( return descendantBlocks; } -export function getDescendantBlocks( - blockRootHex: RootHex, - blocks: Map -): BlockInputSyncCacheItem[] { - const descendantBlocks: BlockInputSyncCacheItem[] = []; - - for (const block of blocks.values()) { - if ((isPendingBlockInput(block) ? block.blockInput.parentRootHex : undefined) === blockRootHex) { - descendantBlocks.push(block); - } - } - - return descendantBlocks; -} - export type UnknownAndAncestorBlocks = { unknowns: BlockInputSyncCacheItem[]; ancestors: PendingBlockInput[]; diff --git a/packages/beacon-node/test/e2e/sync/unknownBlockSync.test.ts b/packages/beacon-node/test/e2e/sync/unknownBlockSync.test.ts index af6d99e47623..eaa2869285cb 100644 --- a/packages/beacon-node/test/e2e/sync/unknownBlockSync.test.ts +++ b/packages/beacon-node/test/e2e/sync/unknownBlockSync.test.ts @@ -1,13 +1,11 @@ import {afterEach, describe, it, vi} from "vitest"; -import {fromHexString} from "@chainsafe/ssz"; import {routes} from "@lodestar/api"; import {ChainConfig} from "@lodestar/config"; import {TimestampFormatCode} from "@lodestar/logger"; import {LogLevel, TestLoggerOpts, testLogger} from "@lodestar/logger/test-utils"; import {SLOTS_PER_EPOCH} from "@lodestar/params"; -import {fulu} from "@lodestar/types"; -import {retry} from "@lodestar/utils"; -import {BlockInputColumns} from "../../../src/chain/blocks/blockInput/blockInput.js"; +import {gloas} from "@lodestar/types"; +import {BlockInputNoData} from "../../../src/chain/blocks/blockInput/blockInput.js"; import {BlockInputSource} from "../../../src/chain/blocks/blockInput/types.js"; import {ChainEvent} from "../../../src/chain/emitter.js"; import {BlockError, BlockErrorCode} from "../../../src/chain/errors/index.js"; @@ -17,12 +15,13 @@ import {connect, onPeerConnect} from "../../utils/network.js"; import {getDevBeaconNode} from "../../utils/node/beacon.js"; import {getAndInitDevValidators} from "../../utils/node/validator.js"; -describe("sync / unknown block sync for fulu", () => { - vi.setConfig({testTimeout: 60_000}); +describe("sync / unknown block sync thru gloas", () => { + vi.setConfig({testTimeout: 90_000}); const validatorCount = 8; const ELECTRA_FORK_EPOCH = 0; - const FULU_FORK_EPOCH = 1; + const FULU_FORK_EPOCH = 0; + const GLOAS_FORK_EPOCH = 1; const SLOT_DURATION_MS = 2000; const testParams: Partial = { SLOT_DURATION_MS, @@ -32,9 +31,10 @@ describe("sync / unknown block sync for fulu", () => { DENEB_FORK_EPOCH: ELECTRA_FORK_EPOCH, ELECTRA_FORK_EPOCH: ELECTRA_FORK_EPOCH, FULU_FORK_EPOCH: FULU_FORK_EPOCH, + GLOAS_FORK_EPOCH: GLOAS_FORK_EPOCH, BLOB_SCHEDULE: [ { - EPOCH: 1, + EPOCH: GLOAS_FORK_EPOCH, MAX_BLOBS_PER_BLOCK: 3, }, ], @@ -61,6 +61,14 @@ describe("sync / unknown block sync for fulu", () => { id: "should do an incompleteBlockInput sync from another BN", event: ChainEvent.incompleteBlockInput, }, + { + id: "should do an unknownEnvelopeBlockRoot sync from another BN", + event: ChainEvent.unknownEnvelopeBlockRoot, + }, + { + id: "should do an incompletePayloadEnvelope sync from another BN", + event: ChainEvent.incompletePayloadEnvelope, + }, ]; for (const {id, event} of testCases) { @@ -80,6 +88,8 @@ describe("sync / unknown block sync for fulu", () => { const loggerNodeA = testLogger("UnknownSync-Node-A", testLoggerOpts); const loggerNodeB = testLogger("UnknownSync-Node-B", testLoggerOpts); + const gloasStartSlot = GLOAS_FORK_EPOCH * SLOTS_PER_EPOCH; + const targetSlot = gloasStartSlot + 1; const bn = await getDevBeaconNode({ params: testParams, @@ -94,6 +104,10 @@ describe("sync / unknown block sync for fulu", () => { eth1BlockHash: Uint8Array.from(INTEROP_BLOCK_HASH), }); + const waitForTargetPayloadOnNodeA = waitForEvent< + routes.events.EventData[routes.events.EventType.executionPayload] + >(bn.chain.emitter, routes.events.EventType.executionPayload, 240000, ({slot}) => slot === targetSlot); + const {validators} = await getAndInitDevValidators({ node: bn, logPrefix: "UnknownSync", @@ -109,14 +123,8 @@ describe("sync / unknown block sync for fulu", () => { // stop bn after validators afterEachCallbacks.push(() => bn.close().catch(() => {})); - // wait until the 2nd slot of fulu - await waitForEvent( - bn.chain.emitter, - routes.events.EventType.head, - 240000, - ({slot}) => slot === FULU_FORK_EPOCH * SLOTS_PER_EPOCH + 1 - ); - loggerNodeA.info("Node A emitted head event", {slot: bn.chain.forkChoice.getHead().slot}); + const payloadEvent = await waitForTargetPayloadOnNodeA; + loggerNodeA.info("Node A selected gloas payload target", {slot: payloadEvent.slot, root: payloadEvent.blockRoot}); const bn2 = await getDevBeaconNode({ params: testParams, @@ -133,48 +141,59 @@ describe("sync / unknown block sync for fulu", () => { afterEachCallbacks.push(() => bn2.close().catch(() => {})); - const headSummary = bn.chain.forkChoice.getHead(); - // Retry getting head block from db in case of slow persistence - const head = await retry( - async () => { - const block = await bn.db.block.get(fromHexString(headSummary.blockRoot)); - if (!block) throw Error("First beacon node has no head block"); - return block; - }, - {retries: 5, retryDelay: 500} - ); + const headSlot = payloadEvent.slot; + const headRootHex = payloadEvent.blockRoot; + const headResult = await bn.chain.getBlockByRoot(headRootHex); + if (headResult === null) { + throw Error("Node A is missing the selected gloas head block"); + } + const head = headResult.block as gloas.SignedBeaconBlock; + if (head.message.body.signedExecutionPayloadBid.message.blobKzgCommitments.length === 0) { + throw Error(`Expected gloas test target at slot=${targetSlot} to have blob commitments`); + } + const waitForSynced = waitForEvent( bn2.chain.emitter, routes.events.EventType.head, 100000, - ({block}) => block === headSummary.blockRoot + ({block}) => block === headRootHex ); + const maybeWaitForPayloadImported = + event === ChainEvent.unknownEnvelopeBlockRoot || event === ChainEvent.incompletePayloadEnvelope + ? Promise.resolve() + : waitForEvent( + bn2.chain.emitter, + routes.events.EventType.executionPayload, + 100000, + ({blockRoot}) => blockRoot === headRootHex + ); const connected = Promise.all([onPeerConnect(bn2.network), onPeerConnect(bn.network)]); await connect(bn2.network, bn.network); await connected; loggerNodeA.info("Node A connected to Node B"); - const headInput = BlockInputColumns.createFromBlock({ - block: head as fulu.SignedBeaconBlock, - blockRootHex: headSummary.blockRoot, + const sourcePeerId = bn.network.peerId.toString(); + const headInput = BlockInputNoData.createFromBlock({ + block: head, + blockRootHex: headRootHex, source: BlockInputSource.gossip, seenTimestampSec: Math.floor(Date.now() / 1000), - forkName: bn.chain.config.getForkName(head.message.slot), + forkName: bn.chain.config.getForkName(headSlot), daOutOfRange: false, - sampledColumns: bn2.network.custodyConfig.sampledColumns, - custodyColumns: bn2.network.custodyConfig.custodyColumns, }); - switch (event) { case ChainEvent.blockUnknownParent: await bn2.chain.processBlock(headInput).catch((e) => { loggerNodeB.info("Error processing block", {slot: headInput.slot, code: e.type.code}); - if (e instanceof BlockError && e.type.code === BlockErrorCode.PARENT_UNKNOWN) { + if ( + e instanceof BlockError && + (e.type.code === BlockErrorCode.PARENT_UNKNOWN || e.type.code === BlockErrorCode.PARENT_PAYLOAD_UNKNOWN) + ) { // Expected bn2.chain.emitter.emit(ChainEvent.blockUnknownParent, { blockInput: headInput, - peer: bn2.network.peerId.toString(), + peer: sourcePeerId, source: BlockInputSource.gossip, }); } else { @@ -184,24 +203,67 @@ describe("sync / unknown block sync for fulu", () => { break; case ChainEvent.unknownBlockRoot: bn2.chain.emitter.emit(ChainEvent.unknownBlockRoot, { - rootHex: headSummary.blockRoot, - peer: bn2.network.peerId.toString(), + rootHex: headRootHex, + peer: sourcePeerId, source: BlockInputSource.gossip, }); break; case ChainEvent.incompleteBlockInput: bn2.chain.emitter.emit(ChainEvent.incompleteBlockInput, { blockInput: headInput, - peer: bn2.network.peerId.toString(), + peer: sourcePeerId, + source: BlockInputSource.gossip, + }); + break; + case ChainEvent.unknownEnvelopeBlockRoot: + bn2.chain.emitter.emit(ChainEvent.unknownEnvelopeBlockRoot, { + rootHex: headRootHex, + peer: sourcePeerId, + source: BlockInputSource.gossip, + }); + break; + case ChainEvent.incompletePayloadEnvelope: { + // get the chain started with an unknownBlockRoot + bn2.chain.emitter.emit(ChainEvent.unknownBlockRoot, { + rootHex: headRootHex, + peer: sourcePeerId, source: BlockInputSource.gossip, }); break; + } default: throw Error("Unknown event type"); } - // Wait for NODE-A head to be processed in NODE-B without range sync + // Wait for the block root to be processed in node B. Payload-aware entrypoints should also import + // the separated payload envelope for the same root. await waitForSynced; + + switch (event) { + case ChainEvent.incompletePayloadEnvelope: { + // After it syncs, send an incomplete payload envelope + // and assert the payload gets imported + const payloadInput = bn2.chain.seenPayloadEnvelopeInputCache.add({ + blockRootHex: headRootHex, + forkName: bn2.config.getForkName(headSlot), + block: head, + sampledColumns: bn2.chain.custodyConfig.sampledColumns, + custodyColumns: bn2.chain.custodyConfig.custodyColumns, + timeCreatedSec: Math.floor(Date.now() / 1000), + }); + bn2.chain.emitter.emit(ChainEvent.incompletePayloadEnvelope, { + payloadInput, + peer: sourcePeerId, + source: BlockInputSource.gossip, + }); + break; + } + default: + break; + } + + // only await payload import for events that imply importing it + await maybeWaitForPayloadImported; }); } }); diff --git a/packages/beacon-node/test/unit/chain/blocks/verifyBlocksSanityChecks.test.ts b/packages/beacon-node/test/unit/chain/blocks/verifyBlocksSanityChecks.test.ts index 7f4c15f62152..cf11c8c359b6 100644 --- a/packages/beacon-node/test/unit/chain/blocks/verifyBlocksSanityChecks.test.ts +++ b/packages/beacon-node/test/unit/chain/blocks/verifyBlocksSanityChecks.test.ts @@ -1,4 +1,5 @@ import {beforeEach, describe, expect, it} from "vitest"; +import {createChainForkConfig} from "@lodestar/config"; import {config} from "@lodestar/config/default"; import {IForkChoice, PayloadStatus, ProtoBlock} from "@lodestar/fork-choice"; import {computeStartSlotAtEpoch} from "@lodestar/state-transition"; @@ -41,6 +42,24 @@ describe("chain / blocks / verifyBlocksSanityChecks", () => { expectThrowsLodestarError(() => verifyBlocksSanityChecks(modules, [block], {}), BlockErrorCode.PARENT_UNKNOWN); }); + it("PARENT_PAYLOAD_UNKNOWN", () => { + const gloasConfig = createChainForkConfig({ + ...config, + FULU_FORK_EPOCH: 0, + GLOAS_FORK_EPOCH: 0, + }); + const gloasBlock = ssz.gloas.SignedBeaconBlock.defaultValue(); + gloasBlock.message.slot = currentSlot; + + forkChoice.getBlockHexDefaultStatus.mockReturnValue({slot: 0} as ProtoBlock); + forkChoice.getBlockHexAndBlockHash.mockReturnValue(null); + + expectThrowsLodestarError( + () => verifyBlocksSanityChecks({...modules, config: gloasConfig}, [gloasBlock as SignedBeaconBlock], {}), + BlockErrorCode.PARENT_PAYLOAD_UNKNOWN + ); + }); + it("GENESIS_BLOCK", () => { block.message.slot = 0; expectThrowsLodestarError(() => verifyBlocksSanityChecks(modules, [block], {}), BlockErrorCode.GENESIS_BLOCK); diff --git a/packages/beacon-node/test/unit/chain/seenCache/seenPayloadEnvelopeInput.test.ts b/packages/beacon-node/test/unit/chain/seenCache/seenPayloadEnvelopeInput.test.ts new file mode 100644 index 000000000000..a0c30e4fe2be --- /dev/null +++ b/packages/beacon-node/test/unit/chain/seenCache/seenPayloadEnvelopeInput.test.ts @@ -0,0 +1,59 @@ +import {beforeEach, describe, expect, it} from "vitest"; +import {testLogger} from "@lodestar/logger/test-utils"; +import {ForkName} from "@lodestar/params"; +import {ChainEventEmitter} from "../../../../src/chain/emitter.js"; +import {SeenPayloadEnvelopeInput} from "../../../../src/chain/seenCache/seenPayloadEnvelopeInput.js"; +import {SerializedCache} from "../../../../src/util/serializedCache.js"; +import {generateBlock} from "../../../utils/blocksAndData.js"; + +describe("SeenPayloadEnvelopeInput", () => { + let cache: SeenPayloadEnvelopeInput; + let abortController: AbortController; + let chainEvents: ChainEventEmitter; + let serializedCache: SerializedCache; + + beforeEach(() => { + chainEvents = new ChainEventEmitter(); + abortController = new AbortController(); + serializedCache = new SerializedCache(); + + cache = new SeenPayloadEnvelopeInput({ + chainEvents, + signal: abortController.signal, + serializedCache, + metrics: null, + logger: testLogger(), + }); + }); + + function addPayloadInput(slot: number): string { + const {block, rootHex} = generateBlock({forkName: ForkName.gloas, slot}); + cache.add({ + blockRootHex: rootHex, + block, + forkName: ForkName.gloas, + sampledColumns: [], + custodyColumns: [], + timeCreatedSec: Date.now() / 1000, + }); + return rootHex; + } + + it("pruneBelow removes payload inputs below the cutoff slot", () => { + const oldRootHex = addPayloadInput(1); + const newRootHex = addPayloadInput(2); + + cache.pruneBelow(2); + + expect(cache.get(oldRootHex)).toBeUndefined(); + expect(cache.get(newRootHex)).toBeDefined(); + }); + + it("pruneBelow keeps payload inputs at or above the cutoff slot", () => { + const rootHex = addPayloadInput(1); + + cache.pruneBelow(1); + + expect(cache.get(rootHex)).toBeDefined(); + }); +}); diff --git a/packages/beacon-node/test/unit/sync/unknownBlock.test.ts b/packages/beacon-node/test/unit/sync/unknownBlock.test.ts index a12d46e42444..45308776dbd0 100644 --- a/packages/beacon-node/test/unit/sync/unknownBlock.test.ts +++ b/packages/beacon-node/test/unit/sync/unknownBlock.test.ts @@ -1,21 +1,29 @@ import EventEmitter from "node:events"; import {afterEach, beforeEach, describe, expect, it, vi} from "vitest"; -import {toHexString} from "@chainsafe/ssz"; -import {createChainForkConfig} from "@lodestar/config"; +import {routes} from "@lodestar/api"; +import {createBeaconConfig} from "@lodestar/config"; import {config as minimalConfig} from "@lodestar/config/default"; import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice"; import {testLogger} from "@lodestar/logger/test-utils"; -import {ssz} from "@lodestar/types"; -import {notNullish, sleep} from "@lodestar/utils"; -import {BlockInputPreData} from "../../../src/chain/blocks/blockInput/blockInput.js"; -import {BlockInputSource} from "../../../src/chain/blocks/blockInput/types.js"; +import {ForkName} from "@lodestar/params"; +import {SignedBeaconBlock, gloas, ssz} from "@lodestar/types"; +import {notNullish, sleep, toRootHex} from "@lodestar/utils"; +import {BlockInputNoData} from "../../../src/chain/blocks/blockInput/blockInput.js"; +import {BlockInputSource, DAType, IBlockInput} from "../../../src/chain/blocks/blockInput/types.js"; +import {PayloadError, PayloadErrorCode} from "../../../src/chain/blocks/importExecutionPayload.js"; +import {PayloadEnvelopeInput} from "../../../src/chain/blocks/payloadEnvelopeInput/payloadEnvelopeInput.js"; +import {PayloadEnvelopeInputSource} from "../../../src/chain/blocks/payloadEnvelopeInput/types.js"; import {BlockError, BlockErrorCode} from "../../../src/chain/errors/blockError.js"; import {ChainEvent, ChainEventEmitter, IBeaconChain} from "../../../src/chain/index.js"; import {SeenBlockProposers} from "../../../src/chain/seenCache/seenBlockProposers.js"; import {SeenBlockInput} from "../../../src/chain/seenCache/seenGossipBlockInput.js"; +import {validateGloasBlockDataColumnSidecars} from "../../../src/chain/validation/dataColumnSidecar.js"; +import {validateGossipExecutionPayloadEnvelope} from "../../../src/chain/validation/executionPayloadEnvelope.js"; +import {ExecutionPayloadStatus} from "../../../src/execution/index.js"; import {INetwork, NetworkEvent, NetworkEventBus} from "../../../src/network/index.js"; import {PeerSyncMeta} from "../../../src/network/peers/peersData.js"; import {defaultSyncOptions} from "../../../src/sync/options.js"; +import {BlockInputSyncCacheItem, PendingBlockInputStatus} from "../../../src/sync/types.js"; import {BlockInputSync, UnknownBlockPeerBalancer} from "../../../src/sync/unknownBlock.js"; import {CustodyConfig} from "../../../src/util/dataColumns.js"; import {PeerIdStr} from "../../../src/util/peerId.js"; @@ -23,13 +31,211 @@ import {ClockStopped} from "../../mocks/clock.js"; import {MockedBeaconChain, getMockedBeaconChain} from "../../mocks/mockedBeaconChain.js"; import {getRandPeerIdStr, getRandPeerSyncMeta} from "../../utils/peer.js"; +vi.mock("../../../src/chain/validation/executionPayloadEnvelope.js", () => ({ + validateGossipExecutionPayloadEnvelope: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock("../../../src/chain/validation/dataColumnSidecar.js", async (importActual) => { + const mod = await importActual(); + return { + ...mod, + validateGloasBlockDataColumnSidecars: vi.fn().mockResolvedValue(undefined), + }; +}); + +function buildPayloadFixture({ + blobCount, + blockHash, + sampledColumns, + slot, +}: { + blobCount: number; + blockHash?: Uint8Array; + sampledColumns: number[]; + slot: number; +}): { + block: gloas.SignedBeaconBlock; + blockRootHex: string; + blockRoot: Uint8Array; + payloadInput: PayloadEnvelopeInput; + envelope: gloas.SignedExecutionPayloadEnvelope; + columnSidecars: gloas.DataColumnSidecar[]; +} { + const block = ssz.gloas.SignedBeaconBlock.defaultValue(); + block.message.slot = slot; + block.message.body.signedExecutionPayloadBid.message.blobKzgCommitments = Array.from({length: blobCount}, () => + Buffer.alloc(48, 0x11) + ); + if (blockHash) { + block.message.body.signedExecutionPayloadBid.message.blockHash = blockHash; + } + + const blockRoot = ssz.gloas.BeaconBlock.hashTreeRoot(block.message); + const blockRootHex = toRootHex(blockRoot); + const payloadInput = PayloadEnvelopeInput.createFromBlock({ + blockRootHex, + block: block as SignedBeaconBlock, + forkName: ForkName.gloas, + sampledColumns, + custodyColumns: sampledColumns, + timeCreatedSec: Date.now() / 1000, + }); + + const envelope = ssz.gloas.SignedExecutionPayloadEnvelope.defaultValue(); + envelope.message.beaconBlockRoot = blockRoot; + envelope.message.payload.slotNumber = slot; + + const columnSidecars = sampledColumns.map((index) => { + const columnSidecar = ssz.gloas.DataColumnSidecar.defaultValue(); + columnSidecar.beaconBlockRoot = blockRoot; + columnSidecar.slot = slot; + columnSidecar.index = index; + return columnSidecar; + }); + + return {block, blockRootHex, blockRoot, payloadInput, envelope, columnSidecars}; +} + +function createGloasBlockInput({ + block, + blockRootHex, + peerIdStr, + seenTimestampSec, + source, +}: { + block: gloas.SignedBeaconBlock; + blockRootHex: string; + peerIdStr?: PeerIdStr; + seenTimestampSec: number; + source: BlockInputSource; +}): BlockInputNoData { + return BlockInputNoData.createFromBlock({ + block, + blockRootHex, + forkName: ForkName.gloas, + daOutOfRange: false, + seenTimestampSec, + source, + peerIdStr, + }); +} + +function getGloasBlockRoot(block: gloas.SignedBeaconBlock): Uint8Array { + return ssz.gloas.BeaconBlock.hashTreeRoot(block.message); +} + +function getGloasBlockHashHex(block: gloas.SignedBeaconBlock): string { + return toRootHex(block.message.body.signedExecutionPayloadBid.message.blockHash); +} + +function buildIncompleteGloasBlockInput({ + parentRoot, + parentBlockHash, + slot, +}: { + parentRoot: Uint8Array; + parentBlockHash: Uint8Array; + slot: number; +}): { + block: gloas.SignedBeaconBlock; + blockRootHex: string; + parentBlockHashHex: string; + parentRootHex: string; + blockInput: IBlockInput; +} { + const block = ssz.gloas.SignedBeaconBlock.defaultValue(); + block.message.slot = slot; + block.message.parentRoot = parentRoot; + block.message.body.signedExecutionPayloadBid.message.parentBlockHash = parentBlockHash; + + const blockRootHex = toRootHex(ssz.gloas.BeaconBlock.hashTreeRoot(block.message)); + const parentRootHex = toRootHex(parentRoot); + const parentBlockHashHex = toRootHex(parentBlockHash); + + let currentBlock: SignedBeaconBlock | undefined; + let timeCompleteSec = 0; + let blockSource = { + source: BlockInputSource.byRoot, + seenTimestampSec: 0, + peerIdStr: undefined as string | undefined, + }; + + const blockInput: IBlockInput = { + type: DAType.NoData, + daOutOfRange: false, + timeCreatedSec: 0, + forkName: ForkName.gloas, + slot, + blockRootHex, + parentRootHex, + addBlock(props): void { + currentBlock = props.block; + timeCompleteSec = props.seenTimestampSec; + blockSource = { + source: props.source, + seenTimestampSec: props.seenTimestampSec, + peerIdStr: props.peerIdStr, + }; + }, + hasBlock(): boolean { + return currentBlock !== undefined; + }, + getBlock(): SignedBeaconBlock { + if (!currentBlock) { + throw new Error("Missing block"); + } + return currentBlock; + }, + getBlockSource() { + if (!currentBlock) { + throw new Error("Missing block source"); + } + return blockSource; + }, + hasAllData(): boolean { + return true; + }, + hasBlockAndAllData(): boolean { + return currentBlock !== undefined; + }, + getLogMeta() { + return {slot, blockRoot: blockRootHex, timeCreatedSec: 0}; + }, + getTimeComplete(): number { + if (!currentBlock) { + throw new Error("Missing completion time"); + } + return timeCompleteSec; + }, + getSerializedCacheKeys(): object[] { + return currentBlock ? [currentBlock] : []; + }, + waitForBlock(): Promise> { + return currentBlock ? Promise.resolve(currentBlock) : Promise.reject(new Error("Missing block")); + }, + waitForAllData(): Promise { + return Promise.resolve(null); + }, + waitForBlockAndAllData(): Promise> { + return currentBlock ? Promise.resolve(blockInput) : Promise.reject(new Error("Missing block")); + }, + }; + + return {block, blockRootHex, parentBlockHashHex, parentRootHex, blockInput}; +} + describe("sync by UnknownBlockSync", {timeout: 20_000}, () => { const logger = testLogger(); const slotSec = 0.3; - const config = createChainForkConfig({ - ...minimalConfig, - SLOT_DURATION_MS: slotSec * 1000, - }); + const config = createBeaconConfig( + { + ...minimalConfig, + FULU_FORK_EPOCH: 0, + GLOAS_FORK_EPOCH: 0, + SLOT_DURATION_MS: slotSec * 1000, + }, + Buffer.alloc(32, 0) + ); beforeEach(() => { vi.useFakeTimers({shouldAdvanceTime: true}); @@ -103,22 +309,36 @@ describe("sync by UnknownBlockSync", {timeout: 20_000}, () => { } of testCases) { it(id, async () => { const peer = await getRandPeerIdStr(); - const blockA = ssz.phase0.SignedBeaconBlock.defaultValue(); - const blockB = ssz.phase0.SignedBeaconBlock.defaultValue(); - const blockC = ssz.phase0.SignedBeaconBlock.defaultValue(); + const blockA = ssz.gloas.SignedBeaconBlock.defaultValue(); + const blockB = ssz.gloas.SignedBeaconBlock.defaultValue(); + const blockC = ssz.gloas.SignedBeaconBlock.defaultValue(); blockA.message.slot = 1; blockB.message.slot = 2; blockC.message.slot = 3; const blockRoot0 = Buffer.alloc(32, 0x00); - const blockRootA = ssz.phase0.BeaconBlock.hashTreeRoot(blockA.message); + const blockHash0 = Buffer.alloc(32, 0x00); + const blockHashA = Buffer.alloc(32, 0xa1); + const blockHashB = Buffer.alloc(32, 0xb2); + const blockHashC = Buffer.alloc(32, 0xc3); + blockA.message.parentRoot = blockRoot0; + blockA.message.body.signedExecutionPayloadBid.message.parentBlockRoot = blockRoot0; + blockA.message.body.signedExecutionPayloadBid.message.parentBlockHash = blockHash0; + blockA.message.body.signedExecutionPayloadBid.message.blockHash = blockHashA; + const blockRootA = getGloasBlockRoot(blockA); blockB.message.parentRoot = blockRootA; - const blockRootB = ssz.phase0.BeaconBlock.hashTreeRoot(blockB.message); + blockB.message.body.signedExecutionPayloadBid.message.parentBlockRoot = blockRootA; + blockB.message.body.signedExecutionPayloadBid.message.parentBlockHash = blockHashA; + blockB.message.body.signedExecutionPayloadBid.message.blockHash = blockHashB; + const blockRootB = getGloasBlockRoot(blockB); blockC.message.parentRoot = blockRootB; - const blockRootC = ssz.phase0.BeaconBlock.hashTreeRoot(blockC.message); - const blockRootHex0 = toHexString(blockRoot0); - const blockRootHexA = toHexString(blockRootA); - const blockRootHexB = toHexString(blockRootB); - const blockRootHexC = toHexString(blockRootC); + blockC.message.body.signedExecutionPayloadBid.message.parentBlockRoot = blockRootB; + blockC.message.body.signedExecutionPayloadBid.message.parentBlockHash = blockHashB; + blockC.message.body.signedExecutionPayloadBid.message.blockHash = blockHashC; + const blockRootC = getGloasBlockRoot(blockC); + const blockRootHex0 = toRootHex(blockRoot0); + const blockRootHexA = toRootHex(blockRootA); + const blockRootHexB = toRootHex(blockRootB); + const blockRootHexC = toRootHex(blockRootC); const blocksByRoot = new Map([ [blockRootHexA, blockA], @@ -141,20 +361,27 @@ describe("sync by UnknownBlockSync", {timeout: 20_000}, () => { custodyColumns: [], earliestAvailableSlot: 0, }), - custodyConfig: {sampledColumns: []} as unknown as CustodyConfig, + custodyConfig: {sampledColumns: [], sampleGroups: [[]]} as unknown as CustodyConfig, sendBeaconBlocksByRoot: async (_peerId, roots) => { sendBeaconBlocksByRootResolveFn([_peerId, roots]); const correctBlocks = Array.from(roots) - .map((root) => blocksByRoot.get(toHexString(root))) + .map((root) => blocksByRoot.get(toRootHex(root))) .filter(notNullish); - return wrongBlockRoot ? [ssz.phase0.SignedBeaconBlock.defaultValue()] : correctBlocks; + return wrongBlockRoot ? [ssz.gloas.SignedBeaconBlock.defaultValue()] : correctBlocks; }, }; const forkChoiceKnownRoots = new Set([blockRootHex0]); - const forkChoice: Pick = { - hasBlock: (root) => forkChoiceKnownRoots.has(toHexString(root)), + const forkChoiceKnownPayloadHashes = new Map([[blockRootHex0, toRootHex(blockHash0)]]); + const forkChoice: Pick< + IForkChoice, + "getBlockHexAndBlockHash" | "getFinalizedBlock" | "hasBlock" | "hasBlockHex" | "hasPayloadHexUnsafe" + > = { + hasBlock: (root) => forkChoiceKnownRoots.has(toRootHex(root)), hasBlockHex: (rootHex) => forkChoiceKnownRoots.has(rootHex), + hasPayloadHexUnsafe: (rootHex) => forkChoiceKnownPayloadHashes.has(rootHex), + getBlockHexAndBlockHash: (rootHex, blockHashHex) => + forkChoiceKnownPayloadHashes.get(rootHex) === blockHashHex ? ({slot: 0} as ProtoBlock) : null, getFinalizedBlock: () => ({ slot: finalizedSlot, @@ -179,15 +406,29 @@ describe("sync by UnknownBlockSync", {timeout: 20_000}, () => { genesisTime: 0, processBlock: async (blockInput, opts) => { const block = blockInput.getBlock(); - if (!forkChoice.hasBlock(block.message.parentRoot)) throw Error("Unknown parent"); + const parentRootHex = toRootHex(block.message.parentRoot); + if (!forkChoice.hasBlockHex(parentRootHex)) throw Error("Unknown parent"); + + const parentBlockHash = toRootHex( + (block as gloas.SignedBeaconBlock).message.body.signedExecutionPayloadBid.message.parentBlockHash + ); + if (!forkChoice.getBlockHexAndBlockHash(parentRootHex, parentBlockHash)) { + throw new BlockError(block, { + code: BlockErrorCode.PARENT_PAYLOAD_UNKNOWN, + parentRoot: parentRootHex, + parentBlockHash, + }); + } + const blockSlot = block.message.slot; if (blockSlot <= finalizedSlot && !opts?.ignoreIfFinalized) { // same behavior to BeaconChain to reproduce https://github.com/ChainSafe/lodestar/issues/5650 throw new BlockError(block, {code: BlockErrorCode.WOULD_REVERT_FINALIZED_SLOT, blockSlot, finalizedSlot}); } // Simulate adding the block to the forkchoice - const blockRootHex = toHexString(ssz.phase0.BeaconBlock.hashTreeRoot(block.message)); + const blockRootHex = toRootHex(getGloasBlockRoot(block as gloas.SignedBeaconBlock)); forkChoiceKnownRoots.add(blockRootHex); + forkChoiceKnownPayloadHashes.set(blockRootHex, getGloasBlockHashHex(block as gloas.SignedBeaconBlock)); if (blockRootHex === blockRootHexC) blockCResolver(); if (blockRootHex === blockRootHexA) blockAResolver(); }, @@ -199,21 +440,23 @@ describe("sync by UnknownBlockSync", {timeout: 20_000}, () => { seenTimestampSec, source, }: { - block: any; + block: gloas.SignedBeaconBlock; blockRootHex: string; seenTimestampSec: number; source: BlockInputSource; }) => - BlockInputPreData.createFromBlock({ + createGloasBlockInput({ block, blockRootHex, - forkName: config.getForkName(block.message.slot), - daOutOfRange: false, seenTimestampSec, source, }), prune: () => {}, } as unknown as SeenBlockInput, + seenPayloadEnvelopeInputCache: { + get: vi.fn().mockReturnValue(undefined), + prune: vi.fn(), + } as unknown as IBeaconChain["seenPayloadEnvelopeInputCache"], }; const setTimeoutSpy = vi.spyOn(global, "setTimeout"); @@ -227,18 +470,16 @@ describe("sync by UnknownBlockSync", {timeout: 20_000}, () => { // Register the peer in the peerBalancer via NetworkEvent.peerConnected networkEvents.emit(NetworkEvent.peerConnected, { peer, - status: {} as any, + status: {} as never, custodyColumns: [], clientAgent: "test-client", }); if (event === ChainEvent.blockUnknownParent) { emitter.emit(ChainEvent.blockUnknownParent, { - blockInput: BlockInputPreData.createFromBlock({ + blockInput: createGloasBlockInput({ block: blockC, blockRootHex: blockRootHexC, - forkName: config.getForkName(blockC.message.slot), - daOutOfRange: false, seenTimestampSec: Math.floor(Date.now() / 1000), source: BlockInputSource.gossip, }), @@ -266,11 +507,12 @@ describe("sync by UnknownBlockSync", {timeout: 20_000}, () => { // (peer reporting is currently disabled in removeAndDownScoreAllDescendants) expect(processBlockSpy).not.toHaveBeenCalled(); } else if (maxPendingBlocks !== undefined) { - // With maxPendingBlocks=1 and unknownParent event, addByRootHex adds parent (blockB), - // then addByBlockInput adds blockC, exceeding the limit. Pruning removes the oldest - // entry (blockB), so blockC can't resolve its parent chain and no blocks get processed. + // With maxPendingBlocks=1 and unknownParent event, the scheduler can re-queue one pruned + // parent root at a time, so it partially recovers the chain. It still cannot retain enough + // pending state to import the full descendant chain, so only the earliest ancestor lands in + // fork choice. await sleep(500); - expect(Array.from(forkChoiceKnownRoots.values())).toEqual([blockRootHex0]); + expect(Array.from(forkChoiceKnownRoots.values())).toEqual([blockRootHex0, blockRootHexA]); } else { // Wait for all blocks to be in ForkChoice store await blockCProcessed; @@ -341,15 +583,1059 @@ describe("UnknownBlockSync", () => { if (expected) { expect(events.listenerCount(ChainEvent.unknownBlockRoot)).toBe(1); expect(events.listenerCount(ChainEvent.blockUnknownParent)).toBe(1); + expect(events.listenerCount(ChainEvent.unknownEnvelopeBlockRoot)).toBe(1); + expect(events.listenerCount(ChainEvent.envelopeUnknownBlock)).toBe(1); + expect(events.listenerCount(ChainEvent.incompletePayloadEnvelope)).toBe(1); + expect(events.listenerCount(routes.events.EventType.block)).toBe(1); + expect(events.listenerCount(routes.events.EventType.executionPayload)).toBe(1); expect(service.isSubscribedToNetwork()).toBe(true); } else { expect(events.listenerCount(ChainEvent.unknownBlockRoot)).toBe(0); expect(events.listenerCount(ChainEvent.blockUnknownParent)).toBe(0); + expect(events.listenerCount(ChainEvent.unknownEnvelopeBlockRoot)).toBe(0); + expect(events.listenerCount(ChainEvent.envelopeUnknownBlock)).toBe(0); + expect(events.listenerCount(ChainEvent.incompletePayloadEnvelope)).toBe(0); + expect(events.listenerCount(routes.events.EventType.block)).toBe(0); + expect(events.listenerCount(routes.events.EventType.executionPayload)).toBe(0); expect(service.isSubscribedToNetwork()).toBe(false); } }); } }); + + describe("payload sync flows", () => { + const gloasConfig = createBeaconConfig( + {...minimalConfig, FULU_FORK_EPOCH: 0, GLOAS_FORK_EPOCH: 0}, + Buffer.alloc(32, 0) + ); + type PayloadSyncTestPeer = { + peerId: PeerIdStr; + client?: string; + custodyColumns?: number[]; + earliestAvailableSlot?: number; + }; + + function setupPayloadSyncTest({ + chainOverrides, + custodyConfig = {sampledColumns: [], sampleGroups: [[]]} as unknown as CustodyConfig, + networkOverrides, + peers = [], + }: { + chainOverrides?: Partial; + custodyConfig?: CustodyConfig; + networkOverrides?: Partial; + peers?: PayloadSyncTestPeer[]; + }): { + chain: IBeaconChain; + emitter: ChainEventEmitter; + network: INetwork; + networkEvents: NetworkEventBus; + } { + const emitter = new ChainEventEmitter(); + const networkEvents = new NetworkEventBus(); + const peersById = new Map(peers.map((peer) => [peer.peerId, peer])); + + const chain = { + emitter, + clock: new ClockStopped(0), + config: gloasConfig, + genesisTime: 0, + metrics: null, + serializedCache: {delete: vi.fn()} as unknown as IBeaconChain["serializedCache"], + getBlockByRoot: vi.fn().mockResolvedValue(null), + processExecutionPayload: vi.fn().mockResolvedValue(undefined), + seenPayloadEnvelopeInputCache: { + get: vi.fn().mockReturnValue(undefined), + prune: vi.fn(), + } as unknown as IBeaconChain["seenPayloadEnvelopeInputCache"], + seenBlockInputCache: {prune: vi.fn()} as unknown as SeenBlockInput, + seenBlockProposers: {isKnown: vi.fn().mockReturnValue(false)} as unknown as SeenBlockProposers, + forkChoice: { + hasPayloadHexUnsafe: vi.fn().mockReturnValue(false), + hasBlockHex: vi.fn().mockReturnValue(false), + getFinalizedBlock: vi.fn().mockReturnValue({slot: 0} as ProtoBlock), + } as unknown as IForkChoice, + ...chainOverrides, + } as IBeaconChain; + + const network = { + events: networkEvents, + getConnectedPeers: () => peers.map(({peerId}) => peerId), + getConnectedPeerSyncMeta: (peerId: string) => { + const peer = peersById.get(peerId); + if (!peer) { + throw new Error(`Unknown peer ${peerId}`); + } + + return { + peerId, + client: peer.client ?? "payload-test-client", + custodyColumns: peer.custodyColumns ?? [], + earliestAvailableSlot: peer.earliestAvailableSlot ?? 0, + }; + }, + custodyConfig, + ...networkOverrides, + } as INetwork; + + service = new BlockInputSync(gloasConfig, network, chain, logger, null, defaultSyncOptions); + service.subscribeToNetwork(); + + for (const peer of peers) { + networkEvents.emit(NetworkEvent.peerConnected, { + peer: peer.peerId, + status: {} as never, + custodyColumns: peer.custodyColumns ?? [], + clientAgent: peer.client ?? "payload-test-client", + }); + } + + return {chain, emitter, network, networkEvents}; + } + + beforeEach(() => { + vi.useFakeTimers({shouldAdvanceTime: true}); + vi.mocked(validateGossipExecutionPayloadEnvelope).mockClear(); + vi.mocked(validateGloasBlockDataColumnSidecars).mockClear(); + }); + + it("fetches and processes unknown envelope by root when payload input exists", async () => { + const peer = await getRandPeerIdStr(); + const {blockRoot, blockRootHex, payloadInput, envelope, columnSidecars} = buildPayloadFixture({ + blobCount: 1, + sampledColumns: [0], + slot: 1, + }); + + const sendExecutionPayloadEnvelopesByRoot = vi.fn().mockResolvedValue([envelope]); + const sendDataColumnSidecarsByRoot = vi.fn().mockResolvedValue(columnSidecars); + const processExecutionPayload = vi.fn().mockResolvedValue(undefined); + const {emitter} = setupPayloadSyncTest({ + chainOverrides: { + processExecutionPayload, + seenPayloadEnvelopeInputCache: { + get: vi.fn().mockImplementation((root: string) => (root === blockRootHex ? payloadInput : undefined)), + prune: vi.fn(), + } as unknown as IBeaconChain["seenPayloadEnvelopeInputCache"], + forkChoice: { + hasPayloadHexUnsafe: vi.fn().mockReturnValue(false), + hasBlockHex: vi.fn().mockImplementation((root: string) => root === blockRootHex), + getFinalizedBlock: vi.fn().mockReturnValue({slot: 0} as ProtoBlock), + } as unknown as IForkChoice, + }, + custodyConfig: {sampledColumns: [0], sampleGroups: [[0]]} as unknown as CustodyConfig, + networkOverrides: { + sendExecutionPayloadEnvelopesByRoot, + sendDataColumnSidecarsByRoot, + }, + peers: [{peerId: peer, custodyColumns: [0]}], + }); + + emitter.emit(ChainEvent.unknownEnvelopeBlockRoot, { + rootHex: blockRootHex, + peer, + source: BlockInputSource.gossip, + }); + + await sleep(50); + + expect(sendExecutionPayloadEnvelopesByRoot).toHaveBeenCalledTimes(1); + expect(sendExecutionPayloadEnvelopesByRoot).toHaveBeenCalledWith(peer, [blockRoot]); + expect(sendDataColumnSidecarsByRoot).toHaveBeenCalledTimes(1); + expect(sendDataColumnSidecarsByRoot).toHaveBeenCalledWith(peer, [{blockRoot, columns: [0]}]); + expect(validateGossipExecutionPayloadEnvelope).toHaveBeenCalledOnce(); + expect(validateGloasBlockDataColumnSidecars).toHaveBeenCalledOnce(); + expect(processExecutionPayload).toHaveBeenCalledTimes(1); + expect(processExecutionPayload).toHaveBeenCalledWith(payloadInput); + expect(payloadInput.hasPayloadEnvelope()).toBe(true); + expect(payloadInput.hasAllData()).toBe(true); + }); + + it("continues fetching sampled columns across peers until payload input is complete", async () => { + const peerA = await getRandPeerIdStr(); + const peerB = await getRandPeerIdStr(); + const {blockRoot, blockRootHex, payloadInput, envelope, columnSidecars} = buildPayloadFixture({ + blobCount: 1, + sampledColumns: [0, 1], + slot: 1, + }); + + const sendExecutionPayloadEnvelopesByRoot = vi.fn().mockResolvedValue([envelope]); + const sendDataColumnSidecarsByRoot = vi + .fn() + .mockImplementation(async (peerId: string, requests: {blockRoot: Uint8Array; columns: number[]}[]) => { + const [{blockRoot: requestedRoot, columns}] = requests; + expect(requestedRoot).toEqual(blockRoot); + expect(columns).toHaveLength(1); + + if (peerId === peerA) { + expect(columns).toEqual([0]); + return [columnSidecars[0]]; + } + + expect(peerId).toBe(peerB); + expect(columns).toEqual([1]); + return [columnSidecars[1]]; + }); + + const processExecutionPayload = vi.fn().mockResolvedValue(undefined); + const {emitter} = setupPayloadSyncTest({ + chainOverrides: { + processExecutionPayload, + seenPayloadEnvelopeInputCache: { + get: vi.fn().mockImplementation((root: string) => (root === blockRootHex ? payloadInput : undefined)), + prune: vi.fn(), + } as unknown as IBeaconChain["seenPayloadEnvelopeInputCache"], + forkChoice: { + hasPayloadHexUnsafe: vi.fn().mockReturnValue(false), + hasBlockHex: vi.fn().mockImplementation((root: string) => root === blockRootHex), + getFinalizedBlock: vi.fn().mockReturnValue({slot: 0} as ProtoBlock), + } as unknown as IForkChoice, + }, + custodyConfig: {sampledColumns: [0, 1], sampleGroups: [[0], [1]]} as unknown as CustodyConfig, + networkOverrides: { + sendExecutionPayloadEnvelopesByRoot, + sendDataColumnSidecarsByRoot, + }, + peers: [ + {peerId: peerA, client: "payload-test-client-a", custodyColumns: [0]}, + {peerId: peerB, client: "payload-test-client-b", custodyColumns: [1]}, + ], + }); + + emitter.emit(ChainEvent.unknownEnvelopeBlockRoot, { + rootHex: blockRootHex, + peer: peerA, + source: BlockInputSource.gossip, + }); + + await sleep(50); + + expect(sendExecutionPayloadEnvelopesByRoot).toHaveBeenCalledTimes(1); + expect(sendDataColumnSidecarsByRoot).toHaveBeenCalledTimes(2); + expect(sendDataColumnSidecarsByRoot.mock.calls.map(([peerId]) => peerId)).toEqual( + expect.arrayContaining([peerA, peerB]) + ); + expect(processExecutionPayload).toHaveBeenCalledTimes(1); + expect(processExecutionPayload).toHaveBeenCalledWith(payloadInput); + expect(payloadInput.hasPayloadEnvelope()).toBe(true); + expect(payloadInput.hasAllData()).toBe(true); + }); + + it("downloads the block immediately after fetching an envelope for an unknown root", async () => { + const peer = await getRandPeerIdStr(); + const {block, blockRoot, blockRootHex, payloadInput, envelope} = buildPayloadFixture({ + blobCount: 0, + sampledColumns: [], + slot: 1, + }); + const parentRootHex = toRootHex(block.message.parentRoot); + + let cachedPayloadInput: PayloadEnvelopeInput | undefined; + const knownRoots = new Set([parentRootHex]); + + const sendExecutionPayloadEnvelopesByRoot = vi.fn().mockResolvedValue([envelope]); + const sendBeaconBlocksByRoot = vi.fn().mockResolvedValue([block]); + const processExecutionPayload = vi.fn().mockResolvedValue(undefined); + const processBlock = vi.fn().mockImplementation(async () => { + cachedPayloadInput = payloadInput; + knownRoots.add(blockRootHex); + emitter.emit(routes.events.EventType.block, {slot: 1, block: blockRootHex, executionOptimistic: false}); + }); + const {emitter} = setupPayloadSyncTest({ + chainOverrides: { + processBlock, + processExecutionPayload, + seenPayloadEnvelopeInputCache: { + get: vi.fn().mockImplementation((root: string) => (root === blockRootHex ? cachedPayloadInput : undefined)), + prune: vi.fn(), + } as unknown as IBeaconChain["seenPayloadEnvelopeInputCache"], + seenBlockInputCache: { + getByBlock: ({ + block, + blockRootHex, + seenTimestampSec, + source, + }: { + block: gloas.SignedBeaconBlock; + blockRootHex: string; + seenTimestampSec: number; + source: BlockInputSource; + }) => + createGloasBlockInput({ + block, + blockRootHex, + seenTimestampSec, + source, + }), + prune: vi.fn(), + } as unknown as SeenBlockInput, + forkChoice: { + hasPayloadHexUnsafe: vi.fn().mockReturnValue(false), + hasBlockHex: vi.fn().mockImplementation((root: string) => knownRoots.has(root)), + getBlockHexAndBlockHash: vi + .fn() + .mockImplementation((root: string, hash: string) => + root === parentRootHex && + hash === toRootHex(block.message.body.signedExecutionPayloadBid.message.parentBlockHash) + ? ({slot: 0} as ProtoBlock) + : null + ), + getFinalizedBlock: vi.fn().mockReturnValue({slot: 0} as ProtoBlock), + } as unknown as IForkChoice, + }, + networkOverrides: { + sendExecutionPayloadEnvelopesByRoot, + sendBeaconBlocksByRoot, + }, + peers: [{peerId: peer}], + }); + + emitter.emit(ChainEvent.unknownEnvelopeBlockRoot, { + rootHex: blockRootHex, + peer, + source: BlockInputSource.gossip, + }); + + await sleep(50); + + expect(sendExecutionPayloadEnvelopesByRoot).toHaveBeenCalledTimes(1); + expect(sendExecutionPayloadEnvelopesByRoot).toHaveBeenCalledWith(peer, [blockRoot]); + expect(sendBeaconBlocksByRoot).toHaveBeenCalledTimes(1); + expect(sendBeaconBlocksByRoot).toHaveBeenCalledWith(peer, [blockRoot]); + expect(processBlock).toHaveBeenCalledTimes(1); + expect(validateGossipExecutionPayloadEnvelope).toHaveBeenCalledOnce(); + expect(processExecutionPayload).toHaveBeenCalledWith(payloadInput); + }); + + it("downloads the block and retries payload import when EL reports block not in fork choice", async () => { + const peer = await getRandPeerIdStr(); + const {block, blockRoot, blockRootHex, payloadInput, envelope} = buildPayloadFixture({ + blobCount: 0, + sampledColumns: [], + slot: 1, + }); + const parentRootHex = toRootHex(block.message.parentRoot); + const knownRoots = new Set([parentRootHex, blockRootHex]); + + const sendExecutionPayloadEnvelopesByRoot = vi.fn().mockResolvedValue([envelope]); + const sendBeaconBlocksByRoot = vi.fn().mockResolvedValue([block]); + const processExecutionPayload = vi + .fn() + .mockRejectedValueOnce( + new PayloadError({ + code: PayloadErrorCode.BLOCK_NOT_IN_FORK_CHOICE, + blockRootHex, + }) + ) + .mockResolvedValueOnce(undefined); + + let emitter!: ChainEventEmitter; + const processBlock = vi.fn().mockImplementation(async () => { + knownRoots.add(blockRootHex); + emitter.emit(routes.events.EventType.block, {slot: 1, block: blockRootHex, executionOptimistic: false}); + }); + + ({emitter} = setupPayloadSyncTest({ + chainOverrides: { + processBlock, + processExecutionPayload, + seenPayloadEnvelopeInputCache: { + get: vi.fn().mockImplementation((root: string) => (root === blockRootHex ? payloadInput : undefined)), + prune: vi.fn(), + } as unknown as IBeaconChain["seenPayloadEnvelopeInputCache"], + seenBlockInputCache: { + getByBlock: ({ + block, + blockRootHex, + seenTimestampSec, + source, + }: { + block: gloas.SignedBeaconBlock; + blockRootHex: string; + seenTimestampSec: number; + source: BlockInputSource; + }) => + createGloasBlockInput({ + block, + blockRootHex, + seenTimestampSec, + source, + }), + prune: vi.fn(), + } as unknown as SeenBlockInput, + forkChoice: { + hasPayloadHexUnsafe: vi.fn().mockReturnValue(false), + hasBlockHex: vi.fn().mockImplementation((root: string) => knownRoots.has(root)), + getBlockHexAndBlockHash: vi + .fn() + .mockImplementation((root: string, hash: string) => + root === parentRootHex && + hash === toRootHex(block.message.body.signedExecutionPayloadBid.message.parentBlockHash) + ? ({slot: 0} as ProtoBlock) + : null + ), + getFinalizedBlock: vi.fn().mockReturnValue({slot: 0} as ProtoBlock), + } as unknown as IForkChoice, + }, + networkOverrides: { + sendExecutionPayloadEnvelopesByRoot, + sendBeaconBlocksByRoot, + }, + peers: [{peerId: peer}], + })); + + emitter.emit(ChainEvent.unknownEnvelopeBlockRoot, { + rootHex: blockRootHex, + peer, + source: BlockInputSource.gossip, + }); + + await sleep(80); + + expect(sendExecutionPayloadEnvelopesByRoot).toHaveBeenCalledTimes(1); + expect(sendExecutionPayloadEnvelopesByRoot).toHaveBeenCalledWith(peer, [blockRoot]); + expect(sendBeaconBlocksByRoot).toHaveBeenCalledTimes(1); + expect(sendBeaconBlocksByRoot).toHaveBeenCalledWith(peer, [blockRoot]); + expect(processBlock).toHaveBeenCalledTimes(1); + expect(processExecutionPayload).toHaveBeenCalledTimes(2); + expect(processExecutionPayload).toHaveBeenNthCalledWith(1, payloadInput); + expect(processExecutionPayload).toHaveBeenNthCalledWith(2, payloadInput); + }); + + it("waits for block after envelopeUnknownBlock and processes payload on block import", async () => { + const peer = await getRandPeerIdStr(); + const {blockRootHex, payloadInput, envelope} = buildPayloadFixture({ + blobCount: 0, + sampledColumns: [], + slot: 1, + }); + + let cachedPayloadInput: PayloadEnvelopeInput | undefined; + let blockKnown = false; + const processExecutionPayload = vi.fn().mockResolvedValue(undefined); + const {emitter} = setupPayloadSyncTest({ + chainOverrides: { + processExecutionPayload, + seenPayloadEnvelopeInputCache: { + get: vi.fn().mockImplementation((root: string) => (root === blockRootHex ? cachedPayloadInput : undefined)), + prune: vi.fn(), + } as unknown as IBeaconChain["seenPayloadEnvelopeInputCache"], + forkChoice: { + hasPayloadHexUnsafe: vi.fn().mockReturnValue(false), + hasBlockHex: vi.fn().mockImplementation((root: string) => root === blockRootHex && blockKnown), + getFinalizedBlock: vi.fn().mockReturnValue({slot: 0} as ProtoBlock), + } as unknown as IForkChoice, + }, + }); + + emitter.emit(ChainEvent.envelopeUnknownBlock, { + envelope, + peer, + source: BlockInputSource.gossip, + }); + + await sleep(20); + expect(processExecutionPayload).not.toHaveBeenCalled(); + + cachedPayloadInput = payloadInput; + blockKnown = true; + emitter.emit(routes.events.EventType.block, {slot: 1, block: blockRootHex, executionOptimistic: false}); + + await sleep(50); + + expect(validateGossipExecutionPayloadEnvelope).toHaveBeenCalledOnce(); + expect(processExecutionPayload).toHaveBeenCalledTimes(1); + expect(processExecutionPayload).toHaveBeenCalledWith(payloadInput); + expect(payloadInput.hasPayloadEnvelope()).toBe(true); + }); + + it("reuses a queued envelope when incomplete payload input arrives for the same root", async () => { + const peer = await getRandPeerIdStr(); + const {payloadInput, envelope} = buildPayloadFixture({ + blobCount: 0, + sampledColumns: [], + slot: 1, + }); + + const sendExecutionPayloadEnvelopesByRoot = vi.fn(); + const processExecutionPayload = vi.fn().mockResolvedValue(undefined); + const {emitter} = setupPayloadSyncTest({ + chainOverrides: { + processExecutionPayload, + seenPayloadEnvelopeInputCache: { + get: vi.fn().mockReturnValue(undefined), + prune: vi.fn(), + } as unknown as IBeaconChain["seenPayloadEnvelopeInputCache"], + forkChoice: { + hasPayloadHexUnsafe: vi.fn().mockReturnValue(false), + hasBlockHex: vi.fn().mockImplementation((root: string) => root === payloadInput.blockRootHex), + getFinalizedBlock: vi.fn().mockReturnValue({slot: 0} as ProtoBlock), + } as unknown as IForkChoice, + }, + networkOverrides: {sendExecutionPayloadEnvelopesByRoot}, + }); + + emitter.emit(ChainEvent.envelopeUnknownBlock, { + envelope, + peer, + source: BlockInputSource.gossip, + }); + + await sleep(20); + + expect(processExecutionPayload).not.toHaveBeenCalled(); + + emitter.emit(ChainEvent.incompletePayloadEnvelope, { + payloadInput, + peer, + source: BlockInputSource.gossip, + }); + + await sleep(20); + + expect(sendExecutionPayloadEnvelopesByRoot).not.toHaveBeenCalled(); + expect(processExecutionPayload).toHaveBeenCalledTimes(1); + expect(processExecutionPayload).toHaveBeenCalledWith(payloadInput); + expect(payloadInput.hasPayloadEnvelope()).toBe(true); + expect(payloadInput.getPayloadEnvelope()).toBe(envelope); + }); + + it("refetches by root if a queued envelope fails validation after block import", async () => { + const peer = await getRandPeerIdStr(); + const {blockRoot, blockRootHex, payloadInput, envelope} = buildPayloadFixture({ + blobCount: 0, + sampledColumns: [], + slot: 1, + }); + + const invalidEnvelope = ssz.gloas.SignedExecutionPayloadEnvelope.defaultValue(); + invalidEnvelope.message.beaconBlockRoot = blockRoot; + invalidEnvelope.message.payload.slotNumber = 1; + + vi.mocked(validateGossipExecutionPayloadEnvelope).mockImplementationOnce(async (_chain, signedEnvelope) => { + if (signedEnvelope === invalidEnvelope) { + throw new Error("invalid queued envelope"); + } + }); + + let connected = false; + let cachedPayloadInput: PayloadEnvelopeInput | undefined; + let blockKnown = false; + const processExecutionPayload = vi.fn().mockResolvedValue(undefined); + const sendExecutionPayloadEnvelopesByRoot = vi.fn().mockResolvedValueOnce([envelope]); + const {emitter} = setupPayloadSyncTest({ + chainOverrides: { + processExecutionPayload, + seenPayloadEnvelopeInputCache: { + get: vi.fn().mockImplementation((root: string) => (root === blockRootHex ? cachedPayloadInput : undefined)), + prune: vi.fn(), + } as unknown as IBeaconChain["seenPayloadEnvelopeInputCache"], + forkChoice: { + hasPayloadHexUnsafe: vi.fn().mockReturnValue(false), + hasBlockHex: vi.fn().mockImplementation((root: string) => root === blockRootHex && blockKnown), + getFinalizedBlock: vi.fn().mockReturnValue({slot: 0} as ProtoBlock), + } as unknown as IForkChoice, + }, + networkOverrides: { + getConnectedPeers: () => (connected ? [peer] : []), + sendExecutionPayloadEnvelopesByRoot, + }, + peers: [{peerId: peer}], + }); + + emitter.emit(ChainEvent.envelopeUnknownBlock, { + envelope: invalidEnvelope, + peer, + source: BlockInputSource.gossip, + }); + + await sleep(20); + + expect(sendExecutionPayloadEnvelopesByRoot).not.toHaveBeenCalled(); + expect(processExecutionPayload).not.toHaveBeenCalled(); + + connected = true; + cachedPayloadInput = payloadInput; + blockKnown = true; + emitter.emit(routes.events.EventType.block, {slot: 1, block: blockRootHex, executionOptimistic: false}); + + await sleep(50); + + expect(sendExecutionPayloadEnvelopesByRoot).toHaveBeenCalledTimes(1); + expect(sendExecutionPayloadEnvelopesByRoot).toHaveBeenNthCalledWith(1, peer, [blockRoot]); + expect(validateGossipExecutionPayloadEnvelope).toHaveBeenCalledTimes(2); + expect(processExecutionPayload).toHaveBeenCalledTimes(1); + expect(processExecutionPayload).toHaveBeenCalledWith(payloadInput); + expect(payloadInput.hasPayloadEnvelope()).toBe(true); + }); + + it("retries payload processing on a later scheduler pass after an execution engine error", async () => { + const peer = await getRandPeerIdStr(); + const {blockRootHex, payloadInput, envelope} = buildPayloadFixture({ + blobCount: 0, + sampledColumns: [], + slot: 1, + }); + + const sendExecutionPayloadEnvelopesByRoot = vi.fn().mockResolvedValue([envelope]); + const processExecutionPayload = vi + .fn() + .mockRejectedValueOnce( + new PayloadError({ + code: PayloadErrorCode.EXECUTION_ENGINE_ERROR, + execStatus: ExecutionPayloadStatus.ELERROR, + errorMessage: "execution engine offline", + }) + ) + .mockResolvedValueOnce(undefined); + + const {emitter} = setupPayloadSyncTest({ + chainOverrides: { + processExecutionPayload, + seenPayloadEnvelopeInputCache: { + get: vi.fn().mockImplementation((root: string) => (root === blockRootHex ? payloadInput : undefined)), + prune: vi.fn(), + } as unknown as IBeaconChain["seenPayloadEnvelopeInputCache"], + forkChoice: { + hasPayloadHexUnsafe: vi.fn().mockReturnValue(false), + hasBlockHex: vi.fn().mockImplementation((root: string) => root === blockRootHex), + getFinalizedBlock: vi.fn().mockReturnValue({slot: 0} as ProtoBlock), + } as unknown as IForkChoice, + }, + networkOverrides: {sendExecutionPayloadEnvelopesByRoot}, + peers: [{peerId: peer}], + }); + + emitter.emit(ChainEvent.unknownEnvelopeBlockRoot, { + rootHex: blockRootHex, + peer, + source: BlockInputSource.gossip, + }); + + await sleep(20); + + expect(sendExecutionPayloadEnvelopesByRoot).toHaveBeenCalledTimes(1); + expect(processExecutionPayload).toHaveBeenCalledTimes(1); + + emitter.emit(routes.events.EventType.block, {slot: 1, block: blockRootHex, executionOptimistic: false}); + + await sleep(20); + + expect(sendExecutionPayloadEnvelopesByRoot).toHaveBeenCalledTimes(1); + expect(processExecutionPayload).toHaveBeenCalledTimes(2); + expect(processExecutionPayload).toHaveBeenNthCalledWith(1, payloadInput); + expect(processExecutionPayload).toHaveBeenNthCalledWith(2, payloadInput); + }); + + it("ignores a fetched payload envelope whose block root does not match the requested root", async () => { + const peer = await getRandPeerIdStr(); + const {blockRoot, blockRootHex, payloadInput} = buildPayloadFixture({ + blobCount: 0, + sampledColumns: [], + slot: 1, + }); + const {envelope: mismatchedEnvelope} = buildPayloadFixture({ + blobCount: 0, + sampledColumns: [], + slot: 2, + }); + + const sendExecutionPayloadEnvelopesByRoot = vi.fn().mockResolvedValue([mismatchedEnvelope]); + const processExecutionPayload = vi.fn().mockResolvedValue(undefined); + const {emitter} = setupPayloadSyncTest({ + chainOverrides: { + processExecutionPayload, + seenPayloadEnvelopeInputCache: { + get: vi.fn().mockImplementation((root: string) => (root === blockRootHex ? payloadInput : undefined)), + prune: vi.fn(), + } as unknown as IBeaconChain["seenPayloadEnvelopeInputCache"], + }, + networkOverrides: {sendExecutionPayloadEnvelopesByRoot}, + peers: [{peerId: peer}], + }); + + emitter.emit(ChainEvent.unknownEnvelopeBlockRoot, { + rootHex: blockRootHex, + peer, + source: BlockInputSource.gossip, + }); + + await sleep(50); + + expect(sendExecutionPayloadEnvelopesByRoot).toHaveBeenCalledTimes(1); + expect(sendExecutionPayloadEnvelopesByRoot).toHaveBeenCalledWith(peer, [blockRoot]); + expect(validateGossipExecutionPayloadEnvelope).not.toHaveBeenCalled(); + expect(processExecutionPayload).not.toHaveBeenCalled(); + }); + + it("processes incomplete payload envelope input without network fetch", async () => { + const peer = await getRandPeerIdStr(); + const {payloadInput, envelope} = buildPayloadFixture({blobCount: 0, sampledColumns: [], slot: 1}); + payloadInput.addPayloadEnvelope({ + envelope, + source: PayloadEnvelopeInputSource.gossip, + seenTimestampSec: Date.now() / 1000, + }); + + const processExecutionPayload = vi.fn().mockResolvedValue(undefined); + const {emitter} = setupPayloadSyncTest({ + chainOverrides: { + processExecutionPayload, + seenPayloadEnvelopeInputCache: { + get: vi.fn().mockReturnValue(payloadInput), + prune: vi.fn(), + } as unknown as IBeaconChain["seenPayloadEnvelopeInputCache"], + forkChoice: { + hasPayloadHexUnsafe: vi.fn().mockReturnValue(false), + hasBlockHex: vi.fn().mockImplementation((root: string) => root === payloadInput.blockRootHex), + getFinalizedBlock: vi.fn().mockReturnValue({slot: 0} as ProtoBlock), + } as unknown as IForkChoice, + }, + }); + + emitter.emit(ChainEvent.incompletePayloadEnvelope, { + payloadInput, + peer, + source: BlockInputSource.gossip, + }); + + await sleep(20); + + expect(processExecutionPayload).toHaveBeenCalledTimes(1); + expect(processExecutionPayload).toHaveBeenCalledWith(payloadInput); + }); + + it("downloads parent payload for unknown parent block when parent block is already known", async () => { + const peer = await getRandPeerIdStr(); + const parentPayloadHash = Buffer.alloc(32, 0x33); + const { + blockRoot: parentRoot, + blockRootHex: parentRootHex, + payloadInput, + envelope, + } = buildPayloadFixture({ + blobCount: 0, + blockHash: parentPayloadHash, + sampledColumns: [], + slot: 1, + }); + + const childBlock = ssz.gloas.SignedBeaconBlock.defaultValue(); + childBlock.message.slot = 2; + childBlock.message.parentRoot = parentRoot; + childBlock.message.body.signedExecutionPayloadBid.message.parentBlockRoot = parentRoot; + childBlock.message.body.signedExecutionPayloadBid.message.parentBlockHash = parentPayloadHash; + const childBlockRootHex = toRootHex(ssz.gloas.BeaconBlock.hashTreeRoot(childBlock.message)); + const childBlockInput = createGloasBlockInput({ + block: childBlock, + blockRootHex: childBlockRootHex, + seenTimestampSec: Date.now() / 1000, + source: BlockInputSource.gossip, + }); + + let hasParentPayload = false; + const sendExecutionPayloadEnvelopesByRoot = vi.fn().mockResolvedValue([envelope]); + const sendBeaconBlocksByRoot = vi.fn(); + const processExecutionPayload = vi.fn().mockImplementation(async () => { + hasParentPayload = true; + }); + const processBlock = vi.fn().mockResolvedValue(undefined); + const {emitter} = setupPayloadSyncTest({ + chainOverrides: { + processExecutionPayload, + processBlock, + seenPayloadEnvelopeInputCache: { + get: vi.fn().mockImplementation((root: string) => (root === parentRootHex ? payloadInput : undefined)), + prune: vi.fn(), + } as unknown as IBeaconChain["seenPayloadEnvelopeInputCache"], + forkChoice: { + hasPayloadHexUnsafe: vi + .fn() + .mockImplementation((root: string) => root === parentRootHex && hasParentPayload), + hasBlockHex: vi.fn().mockImplementation((root: string) => root === parentRootHex), + getBlockHexDefaultStatus: vi + .fn() + .mockImplementation((root: string) => (root === parentRootHex ? ({slot: 1} as ProtoBlock) : null)), + getBlockHexAndBlockHash: vi + .fn() + .mockImplementation((root: string, hash: string) => + root === parentRootHex && hash === toRootHex(parentPayloadHash) && hasParentPayload + ? ({slot: 1} as ProtoBlock) + : null + ), + getFinalizedBlock: vi.fn().mockReturnValue({slot: 0} as ProtoBlock), + } as unknown as IForkChoice, + }, + networkOverrides: { + sendExecutionPayloadEnvelopesByRoot, + sendBeaconBlocksByRoot, + }, + peers: [{peerId: peer}], + }); + + emitter.emit(ChainEvent.blockUnknownParent, { + blockInput: childBlockInput, + peer, + source: BlockInputSource.gossip, + }); + + await sleep(50); + + expect(sendExecutionPayloadEnvelopesByRoot).toHaveBeenCalledTimes(1); + expect(sendExecutionPayloadEnvelopesByRoot).toHaveBeenCalledWith(peer, [parentRoot]); + expect(sendBeaconBlocksByRoot).not.toHaveBeenCalled(); + expect(processExecutionPayload).toHaveBeenCalledTimes(1); + expect(processExecutionPayload).toHaveBeenCalledWith(payloadInput); + expect(processBlock).toHaveBeenCalledTimes(1); + expect(processBlock).toHaveBeenCalledWith( + childBlockInput, + expect.objectContaining({ignoreIfKnown: true, ignoreIfFinalized: true, blsVerifyOnMainThread: true}) + ); + }); + + it("drops a child block when its parent payload hash conflicts with the known parent block", async () => { + const peer = await getRandPeerIdStr(); + + const parentBlock = ssz.gloas.SignedBeaconBlock.defaultValue(); + parentBlock.message.slot = 1; + parentBlock.message.body.signedExecutionPayloadBid.message.parentBlockHash = Buffer.alloc(32, 0x11); + parentBlock.message.body.signedExecutionPayloadBid.message.blockHash = Buffer.alloc(32, 0x22); + const parentRoot = ssz.gloas.BeaconBlock.hashTreeRoot(parentBlock.message); + const parentRootHex = toRootHex(parentRoot); + const parentPayloadInput = PayloadEnvelopeInput.createFromBlock({ + blockRootHex: parentRootHex, + block: parentBlock as SignedBeaconBlock, + forkName: ForkName.gloas, + sampledColumns: [], + custodyColumns: [], + timeCreatedSec: Date.now() / 1000, + }); + + const childBlock = ssz.gloas.SignedBeaconBlock.defaultValue(); + childBlock.message.slot = 2; + childBlock.message.parentRoot = parentRoot; + childBlock.message.body.signedExecutionPayloadBid.message.parentBlockRoot = parentRoot; + childBlock.message.body.signedExecutionPayloadBid.message.parentBlockHash = Buffer.alloc(32, 0x33); + const childBlockRootHex = toRootHex(ssz.gloas.BeaconBlock.hashTreeRoot(childBlock.message)); + const childBlockInput = createGloasBlockInput({ + block: childBlock, + blockRootHex: childBlockRootHex, + seenTimestampSec: Date.now() / 1000, + source: BlockInputSource.gossip, + }); + + const sendExecutionPayloadEnvelopesByRoot = vi.fn(); + const processBlock = vi.fn(); + const seenBlockInputPrune = vi.fn(); + const seenPayloadPrune = vi.fn(); + const {emitter} = setupPayloadSyncTest({ + chainOverrides: { + processBlock, + seenPayloadEnvelopeInputCache: { + get: vi + .fn() + .mockImplementation((root: string) => (root === parentRootHex ? parentPayloadInput : undefined)), + prune: seenPayloadPrune, + } as unknown as IBeaconChain["seenPayloadEnvelopeInputCache"], + seenBlockInputCache: {prune: seenBlockInputPrune} as unknown as SeenBlockInput, + forkChoice: { + hasPayloadHexUnsafe: vi.fn().mockReturnValue(false), + hasBlockHex: vi.fn().mockImplementation((root: string) => root === parentRootHex), + getBlockHexAndBlockHash: vi.fn().mockReturnValue(null), + getFinalizedBlock: vi.fn().mockReturnValue({slot: 0} as ProtoBlock), + } as unknown as IForkChoice, + }, + networkOverrides: {sendExecutionPayloadEnvelopesByRoot}, + peers: [{peerId: peer}], + }); + + emitter.emit(ChainEvent.blockUnknownParent, { + blockInput: childBlockInput, + peer, + source: BlockInputSource.gossip, + }); + + await sleep(20); + + expect(sendExecutionPayloadEnvelopesByRoot).not.toHaveBeenCalled(); + expect(processBlock).not.toHaveBeenCalled(); + expect(seenBlockInputPrune).toHaveBeenCalledWith(childBlockRootHex); + expect(seenPayloadPrune).not.toHaveBeenCalled(); + }); + + it("removes pending descendants after invalid parent payload", async () => { + const peer = await getRandPeerIdStr(); + const parentPayloadHash = Buffer.alloc(32, 0x33); + const { + blockRoot: parentRoot, + blockRootHex: parentRootHex, + payloadInput, + envelope, + } = buildPayloadFixture({ + blobCount: 0, + blockHash: parentPayloadHash, + sampledColumns: [], + slot: 1, + }); + + const childBlock = ssz.gloas.SignedBeaconBlock.defaultValue(); + childBlock.message.slot = 2; + childBlock.message.parentRoot = parentRoot; + childBlock.message.body.signedExecutionPayloadBid.message.parentBlockRoot = parentRoot; + childBlock.message.body.signedExecutionPayloadBid.message.parentBlockHash = parentPayloadHash; + const childBlockRootHex = toRootHex(ssz.gloas.BeaconBlock.hashTreeRoot(childBlock.message)); + const childBlockInput = createGloasBlockInput({ + block: childBlock, + blockRootHex: childBlockRootHex, + seenTimestampSec: Date.now() / 1000, + source: BlockInputSource.gossip, + }); + + const sendExecutionPayloadEnvelopesByRoot = vi.fn().mockResolvedValue([envelope]); + const processExecutionPayload = vi + .fn() + .mockRejectedValue(new PayloadError({code: PayloadErrorCode.INVALID_SIGNATURE})); + const processBlock = vi.fn().mockResolvedValue(undefined); + const seenPayloadPrune = vi.fn(); + const {emitter} = setupPayloadSyncTest({ + chainOverrides: { + processExecutionPayload, + processBlock, + seenPayloadEnvelopeInputCache: { + get: vi.fn().mockImplementation((root: string) => (root === parentRootHex ? payloadInput : undefined)), + prune: seenPayloadPrune, + } as unknown as IBeaconChain["seenPayloadEnvelopeInputCache"], + forkChoice: { + hasPayloadHexUnsafe: vi.fn().mockReturnValue(false), + hasBlockHex: vi.fn().mockImplementation((root: string) => root === parentRootHex), + getBlockHexDefaultStatus: vi + .fn() + .mockImplementation((root: string) => (root === parentRootHex ? ({slot: 1} as ProtoBlock) : null)), + getBlockHexAndBlockHash: vi.fn().mockReturnValue(null), + getFinalizedBlock: vi.fn().mockReturnValue({slot: 0} as ProtoBlock), + } as unknown as IForkChoice, + }, + networkOverrides: {sendExecutionPayloadEnvelopesByRoot}, + peers: [{peerId: peer}], + }); + + emitter.emit(ChainEvent.blockUnknownParent, { + blockInput: childBlockInput, + peer, + source: BlockInputSource.gossip, + }); + + await sleep(50); + + expect(sendExecutionPayloadEnvelopesByRoot).toHaveBeenCalledTimes(1); + expect(processExecutionPayload).toHaveBeenCalledTimes(1); + expect(processBlock).not.toHaveBeenCalled(); + expect(seenPayloadPrune).not.toHaveBeenCalled(); + + emitter.emit(routes.events.EventType.executionPayload, { + slot: 99, + builderIndex: 0, + blockHash: toRootHex(Buffer.alloc(32, 0x44)), + blockRoot: toRootHex(Buffer.alloc(32, 0x55)), + executionOptimistic: false, + }); + + await sleep(20); + + expect(sendExecutionPayloadEnvelopesByRoot).toHaveBeenCalledTimes(1); + expect(processExecutionPayload).toHaveBeenCalledTimes(1); + expect(processBlock).not.toHaveBeenCalled(); + }); + }); + + it("re-queues downloaded gloas ancestors that are still missing the block body", async () => { + const gloasConfig = createBeaconConfig( + {...minimalConfig, FULU_FORK_EPOCH: 0, GLOAS_FORK_EPOCH: 0}, + Buffer.alloc(32, 0) + ); + const peer = await getRandPeerIdStr(); + const parentRoot = Buffer.alloc(32, 0x11); + const parentBlockHash = Buffer.alloc(32, 0x22); + const {block, blockInput, blockRootHex, parentBlockHashHex, parentRootHex} = buildIncompleteGloasBlockInput({ + parentRoot, + parentBlockHash, + slot: 1, + }); + + const sendBeaconBlocksByRoot = vi.fn().mockResolvedValue([block]); + + const processBlock = vi.fn().mockResolvedValue(undefined); + const networkEvents = new NetworkEventBus(); + network = { + events: networkEvents, + getConnectedPeers: () => [peer], + getConnectedPeerSyncMeta: () => ({ + peerId: peer, + client: "gloas-test-client", + custodyColumns: [], + earliestAvailableSlot: 0, + }), + custodyConfig: { + sampledColumns: [], + sampleGroups: Array.from({length: gloasConfig.SAMPLES_PER_SLOT}, () => []), + } as unknown as CustodyConfig, + sendBeaconBlocksByRoot, + } as Partial as INetwork; + + const chainForTest: Partial = { + emitter: new ChainEventEmitter(), + config: gloasConfig, + clock: new ClockStopped(0), + genesisTime: 0, + metrics: null, + processBlock, + forkChoice: { + getFinalizedBlock: vi.fn().mockReturnValue({slot: 0} as ProtoBlock), + hasBlockHex: vi.fn().mockImplementation((rootHex: string) => rootHex === parentRootHex), + getBlockHexAndBlockHash: vi + .fn() + .mockImplementation((rootHex: string, blockHashHex: string) => + rootHex === parentRootHex && blockHashHex === parentBlockHashHex ? ({} as ProtoBlock) : null + ), + hasPayloadHexUnsafe: vi.fn().mockReturnValue(false), + } as unknown as IForkChoice, + seenPayloadEnvelopeInputCache: { + get: vi.fn(), + prune: vi.fn(), + } as unknown as IBeaconChain["seenPayloadEnvelopeInputCache"], + seenBlockInputCache: {prune: vi.fn()} as unknown as SeenBlockInput, + seenBlockProposers: { + isKnown: vi.fn().mockReturnValue(false), + } as unknown as SeenBlockProposers, + }; + + service = new BlockInputSync(gloasConfig, network, chainForTest as IBeaconChain, logger, null, defaultSyncOptions); + service.subscribeToNetwork(); + + const pendingBlocks = (service as unknown as {pendingBlocks: Map}).pendingBlocks; + pendingBlocks.set(blockRootHex, { + status: PendingBlockInputStatus.downloaded, + blockInput, + timeAddedSec: 0, + peerIdStrings: new Set([peer]), + }); + + networkEvents.emit(NetworkEvent.peerConnected, { + peer, + status: {} as never, + custodyColumns: [], + clientAgent: "gloas-test-client", + }); + + await sleep(20); + + expect(sendBeaconBlocksByRoot).toHaveBeenCalledOnce(); + expect(processBlock).toHaveBeenCalledOnce(); + expect(pendingBlocks.has(blockRootHex)).toBe(false); + + service.close(); + }); }); describe("UnknownBlockPeerBalancer", async () => { diff --git a/packages/beacon-node/test/unit/sync/utils/downloadByRoot.test.ts b/packages/beacon-node/test/unit/sync/utils/downloadByRoot.test.ts index 281644b7d149..209756e312a9 100644 --- a/packages/beacon-node/test/unit/sync/utils/downloadByRoot.test.ts +++ b/packages/beacon-node/test/unit/sync/utils/downloadByRoot.test.ts @@ -7,12 +7,14 @@ import {BlobSidecarValidationError} from "../../../../src/chain/errors/blobSidec import {DataColumnSidecarValidationError} from "../../../../src/chain/errors/dataColumnSidecarError.js"; import {INetwork} from "../../../../src/network/index.js"; import {PeerSyncMeta} from "../../../../src/network/peers/peersData.js"; +import {PendingBlockInputStatus} from "../../../../src/sync/types.js"; import { DownloadByRootError, fetchAndValidateBlobs, fetchAndValidateBlock, fetchAndValidateColumns, fetchBlobsByRoot, + fetchByRoot, fetchColumnsByRoot, } from "../../../../src/sync/utils/downloadByRoot.js"; import {ROOT_SIZE} from "../../../../src/util/sszBytes.js"; @@ -320,6 +322,41 @@ describe("downloadByRoot.ts", () => { }); }); + describe("fetchByRoot", () => { + afterEach(() => { + vi.resetAllMocks(); + }); + + it("does not fetch columns for bare-root gloas block sync", async () => { + const gloasBlockWithColumns = generateBlockWithColumnSidecars({forkName: ForkName.gloas}); + const sendBeaconBlocksByRoot = vi.fn(() => Promise.resolve([gloasBlockWithColumns.block])); + const sendDataColumnSidecarsByRoot = vi.fn(); + network = { + sendBeaconBlocksByRoot, + sendDataColumnSidecarsByRoot, + } as unknown as INetwork; + + const response = await fetchByRoot({ + config, + chain: null, + network, + peerMeta, + blockRoot: gloasBlockWithColumns.blockRoot, + cacheItem: { + status: PendingBlockInputStatus.pending, + rootHex: gloasBlockWithColumns.rootHex, + timeAddedSec: 0, + peerIdStrings: new Set(), + }, + }); + + expect(sendBeaconBlocksByRoot).toHaveBeenCalledOnce(); + expect(sendDataColumnSidecarsByRoot).not.toHaveBeenCalled(); + expect(response.result.block).toEqual(gloasBlockWithColumns.block); + expect(response.result.columnSidecars).toBeUndefined(); + }); + }); + describe("fetchColumnsByRoot", () => { let fuluBlockWithColumns: BlockWithColumnsTestSet; beforeAll(() => { diff --git a/packages/beacon-node/test/unit/sync/utils/pendingBlocksTree.test.ts b/packages/beacon-node/test/unit/sync/utils/pendingBlocksTree.test.ts index 88749eca1101..05a1ebc6ba46 100644 --- a/packages/beacon-node/test/unit/sync/utils/pendingBlocksTree.test.ts +++ b/packages/beacon-node/test/unit/sync/utils/pendingBlocksTree.test.ts @@ -9,7 +9,6 @@ import { import { UnknownAndAncestorBlocks, getAllDescendantBlocks, - getDescendantBlocks, getUnknownAndAncestorBlocks, } from "../../../../src/sync/utils/pendingBlocksTree.js"; import {MockBlockInput} from "../../../utils/blockInput.js"; @@ -19,14 +18,12 @@ describe("sync / pendingBlocksTree", () => { id: string; blocks: {block: string; parent: string | null}[]; getAllDescendantBlocks: {block: string; res: string[]}[]; - getDescendantBlocks: {block: string; res: string[]}[]; getUnknownOrAncestorBlocks: {unknowns: string[]; ancestors: string[]}; }[] = [ { id: "empty case", blocks: [], getAllDescendantBlocks: [{block: "0A", res: []}], - getDescendantBlocks: [{block: "0A", res: []}], getUnknownOrAncestorBlocks: {unknowns: [], ancestors: []}, }, { @@ -45,12 +42,6 @@ describe("sync / pendingBlocksTree", () => { {block: "3C", res: ["4C"]}, {block: "3B", res: []}, ], - getDescendantBlocks: [ - {block: "0A", res: ["1A"]}, - {block: "1A", res: ["2A", "2B"]}, - {block: "3C", res: ["4C"]}, - {block: "3B", res: []}, - ], getUnknownOrAncestorBlocks: {unknowns: ["0A"], ancestors: ["4C"]}, }, ]; @@ -74,12 +65,6 @@ describe("sync / pendingBlocksTree", () => { }); } - for (const {block, res} of testCase.getDescendantBlocks) { - it(`getDescendantBlocks(${block})`, () => { - expect(toRes(getDescendantBlocks(block, blocks))).toEqual(res); - }); - } - it("getUnknownBlocks", () => { expect(toRes2(getUnknownAndAncestorBlocks(blocks))).toEqual(testCase.getUnknownOrAncestorBlocks); });