Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,24 @@ export class BeaconChain implements IBeaconChain {
return null;
}

async getSerializedExecutionPayloadEnvelope(blockSlot: Slot, blockRootHex: string): Promise<Uint8Array | null> {
const payloadInput = this.seenPayloadEnvelopeInputCache.get(blockRootHex);
if (payloadInput?.hasPayloadEnvelope()) {
const envelope = payloadInput.getPayloadEnvelope();
const serialized = this.serializedCache.get(envelope);
if (serialized) {
return serialized;
}
return ssz.gloas.SignedExecutionPayloadEnvelope.serialize(envelope);
}

return (
(await this.db.executionPayloadEnvelope.getBinary(fromHex(blockRootHex))) ??
(await this.db.executionPayloadEnvelopeArchive.getBinary(blockSlot)) ??
null
);
}

async getDataColumnSidecars(blockSlot: Slot, blockRootHex: string): Promise<DataColumnSidecars> {
const blockInput = this.seenBlockInputCache.get(blockRootHex);
if (blockInput) {
Expand Down
13 changes: 13 additions & 0 deletions packages/beacon-node/src/chain/emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ export enum ChainEvent {
* cut-off window passes for waiting on gossip
*/
incompleteBlockInput = "incompleteBlockInput",
/**
* Trigger BlockInputSync to fetch a missing execution payload envelope for a known beacon block root
*/
unknownPayloadEnvelope = "unknownPayloadEnvelope",
/**
* Trigger BlockInputSync when a gossip block's parent block is known but parent payload is missing.
* Tracks the child block in pendingBlocks and triggers parent payload fetch.
*/
unknownParentPayload = "unknownParentPayload",
}

export type HeadEventData = routes.events.EventData[routes.events.EventType.head];
Expand All @@ -78,6 +87,8 @@ export type ChainEventData = {
[ChainEvent.unknownParent]: {blockInput: IBlockInput; peer: PeerIdStr; source: BlockInputSource};
[ChainEvent.unknownBlockRoot]: {rootHex: RootHex; peer?: PeerIdStr; source: BlockInputSource};
[ChainEvent.incompleteBlockInput]: {blockInput: IBlockInput; peer: PeerIdStr; source: BlockInputSource};
[ChainEvent.unknownPayloadEnvelope]: {blockRootHex: RootHex; peer?: PeerIdStr; source: BlockInputSource};
[ChainEvent.unknownParentPayload]: {blockInput: IBlockInput; peer: PeerIdStr; source: BlockInputSource};
};

export type IChainEvents = ApiEvents & {
Expand All @@ -99,6 +110,8 @@ export type IChainEvents = ApiEvents & {
[ChainEvent.unknownParent]: (data: ChainEventData[ChainEvent.unknownParent]) => void;
[ChainEvent.unknownBlockRoot]: (data: ChainEventData[ChainEvent.unknownBlockRoot]) => void;
[ChainEvent.incompleteBlockInput]: (data: ChainEventData[ChainEvent.incompleteBlockInput]) => void;
[ChainEvent.unknownPayloadEnvelope]: (data: ChainEventData[ChainEvent.unknownPayloadEnvelope]) => void;
[ChainEvent.unknownParentPayload]: (data: ChainEventData[ChainEvent.unknownParentPayload]) => void;
};

/**
Expand Down
5 changes: 4 additions & 1 deletion packages/beacon-node/src/chain/errors/blockError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ export enum BlockErrorCode {
TOO_MANY_KZG_COMMITMENTS = "BLOCK_ERROR_TOO_MANY_KZG_COMMITMENTS",
/** Bid parent block root does not match block parent root */
BID_PARENT_ROOT_MISMATCH = "BLOCK_ERROR_BID_PARENT_ROOT_MISMATCH",
/** Parent block is known but the matching FULL payload variant is missing */
PARENT_PAYLOAD_UNKNOWN = "BLOCK_ERROR_PARENT_PAYLOAD_UNKNOWN",
}

type ExecutionErrorStatus = Exclude<
Expand Down Expand Up @@ -114,7 +116,8 @@ export type BlockErrorType =
| {code: BlockErrorCode.EXECUTION_ENGINE_ERROR; execStatus: ExecutionErrorStatus; errorMessage: string}
| {code: BlockErrorCode.DATA_UNAVAILABLE}
| {code: BlockErrorCode.TOO_MANY_KZG_COMMITMENTS; blobKzgCommitmentsLen: number; commitmentLimit: number}
| {code: BlockErrorCode.BID_PARENT_ROOT_MISMATCH; bidParentRoot: RootHex; blockParentRoot: RootHex};
| {code: BlockErrorCode.BID_PARENT_ROOT_MISMATCH; bidParentRoot: RootHex; blockParentRoot: RootHex}
| {code: BlockErrorCode.PARENT_PAYLOAD_UNKNOWN; parentRoot: RootHex; parentBlockHash: RootHex};

export class BlockGossipError extends GossipActionError<BlockErrorType> {}

Expand Down
1 change: 1 addition & 0 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ export interface IBeaconChain {
blockRootHex: string,
indices: number[]
): Promise<(Uint8Array | undefined)[]>;
getSerializedExecutionPayloadEnvelope(blockSlot: Slot, blockRootHex: string): Promise<Uint8Array | null>;

produceCommonBlockBody(blockAttributes: BlockAttributes): Promise<CommonBlockBody>;
produceBlock(blockAttributes: BlockAttributes & {commonBlockBodyPromise: Promise<CommonBlockBody>}): Promise<{
Expand Down
9 changes: 9 additions & 0 deletions packages/beacon-node/src/chain/validation/block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ export async function validateGossipBlock(
)
: chain.forkChoice.getBlockHexDefaultStatus(parentRoot);
if (parentBlock === null) {
// For Gloas blocks: if the parent block root is known but the matching FULL variant
// (by parentBlockHash) is missing, this is a payload availability issue, not a missing block.
if (isGloasBeaconBlock(block) && chain.forkChoice.hasBlockHexUnsafe(parentRoot)) {
throw new BlockGossipError(GossipAction.IGNORE, {
code: BlockErrorCode.PARENT_PAYLOAD_UNKNOWN,
parentRoot,
parentBlockHash: toRootHex(block.body.signedExecutionPayloadBid.message.parentBlockHash),
});
}
// If fork choice does *not* consider the parent to be a descendant of the finalized block,
// then there are two more cases:
//
Expand Down
40 changes: 40 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,23 @@ export function createLodestarMetrics(
labelNames: ["code", "client"],
}),
},
pendingPayloads: register.gauge({
name: "lodestar_sync_block_input_pending_payloads_size",
help: "Current size of pending payloads cache in BlockInputSync",
}),
payloadFetchSuccess: register.gauge({
name: "lodestar_sync_block_input_payload_fetch_success_total",
help: "Total successful payload envelope fetches",
}),
Comment on lines +687 to +690
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The metric lodestar_sync_block_input_payload_fetch_success_total is defined as a gauge, but it's used as a counter since its value only ever increases. For metrics that represent a cumulative count, it's more idiomatic and correct to use a counter. This provides clearer intent and allows monitoring systems like Prometheus to apply counter-specific functions (e.g., rate()).

This feedback also applies to other new metrics in this file that are used as counters but defined as gauges, such as payloadFetchError, payloadRequests, and the metrics within awaitingEnvelopeGossipMessages.

Suggested change
payloadFetchSuccess: register.gauge({
name: "lodestar_sync_block_input_payload_fetch_success_total",
help: "Total successful payload envelope fetches",
}),
payloadFetchSuccess: register.counter({
name: "lodestar_sync_block_input_payload_fetch_success_total",
help: "Total successful payload envelope fetches",
}),

payloadFetchError: register.gauge({
name: "lodestar_sync_block_input_payload_fetch_error_total",
help: "Total failed payload envelope fetches",
}),
payloadRequests: register.gauge<{source: string}>({
name: "lodestar_sync_block_input_payload_requests_total",
help: "Total unknown payload events by source",
labelNames: ["source"],
}),
peerBalancer: {
peersMetaCount: register.gauge({
name: "lodestar_sync_unknown_block_peer_balancer_peers_meta_count",
Expand Down Expand Up @@ -1746,6 +1763,29 @@ export function createLodestarMetrics(
}),
},

awaitingEnvelopeGossipMessages: {
resolve: register.gauge<{topic: GossipType}>({
name: "lodestar_awaiting_envelope_gossip_messages_resolve_total",
help: "Total number of gossip messages reprocessed after payload envelope import",
labelNames: ["topic"],
}),
waitSecBeforeResolve: register.gauge<{topic: GossipType}>({
name: "lodestar_awaiting_envelope_gossip_messages_wait_time_resolve_seconds",
help: "Time to wait for unknown payload envelope in seconds",
labelNames: ["topic"],
}),
reject: register.gauge<{reason: ReprocessRejectReason; topic: GossipType}>({
name: "lodestar_awaiting_envelope_gossip_messages_reject_total",
help: "Total number of gossip messages rejected while waiting for payload envelope",
labelNames: ["reason", "topic"],
}),
waitSecBeforeReject: register.gauge<{reason: ReprocessRejectReason; topic: GossipType}>({
name: "lodestar_awaiting_envelope_gossip_messages_wait_time_reject_seconds",
help: "Time to wait for unknown payload envelope before being rejected",
labelNames: ["reason", "topic"],
}),
},

lightclientServer: {
onSyncAggregate: register.gauge<{event: string}>({
name: "lodestar_lightclient_server_on_sync_aggregate_event_total",
Expand Down
15 changes: 14 additions & 1 deletion packages/beacon-node/src/network/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ import {
import {BlockInputSource} from "../chain/blocks/blockInput/types.js";
import {CustodyConfig} from "../util/dataColumns.js";
import {PeerIdStr} from "../util/peerId.js";
import {BeaconBlocksByRootRequest, BlobSidecarsByRootRequest, DataColumnSidecarsByRootRequest} from "../util/types.js";
import {
BeaconBlocksByRootRequest,
BlobSidecarsByRootRequest,
DataColumnSidecarsByRootRequest,
ExecutionPayloadEnvelopesByRootRequest,
} from "../util/types.js";
import {INetworkCorePublic} from "./core/types.js";
import {INetworkEventBus} from "./events.js";
import {GossipType} from "./gossip/interface.js";
Expand Down Expand Up @@ -82,6 +87,14 @@ export interface INetwork extends INetworkCorePublic {
peerId: PeerIdStr,
request: DataColumnSidecarsByRootRequest
): Promise<fulu.DataColumnSidecar[]>;
sendExecutionPayloadEnvelopesByRange(
peerId: PeerIdStr,
request: gloas.ExecutionPayloadEnvelopesByRangeRequest
): Promise<gloas.SignedExecutionPayloadEnvelope[]>;
sendExecutionPayloadEnvelopesByRoot(
peerId: PeerIdStr,
request: ExecutionPayloadEnvelopesByRootRequest
): Promise<gloas.SignedExecutionPayloadEnvelope[]>;

// Gossip
publishBeaconBlock(signedBlock: SignedBeaconBlock): Promise<number>;
Expand Down
30 changes: 29 additions & 1 deletion packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ import {IClock} from "../util/clock.js";
import {CustodyConfig} from "../util/dataColumns.js";
import {PeerIdStr, peerIdToString} from "../util/peerId.js";
import {promiseAllMaybeAsync} from "../util/promises.js";
import {BeaconBlocksByRootRequest, BlobSidecarsByRootRequest, DataColumnSidecarsByRootRequest} from "../util/types.js";
import {
BeaconBlocksByRootRequest,
BlobSidecarsByRootRequest,
DataColumnSidecarsByRootRequest,
ExecutionPayloadEnvelopesByRootRequest,
} from "../util/types.js";
import {INetworkCore, NetworkCore, WorkerNetworkCore} from "./core/index.js";
import {INetworkEventBus, NetworkEvent, NetworkEventBus, NetworkEventData} from "./events.js";
import {getActiveForkBoundaries} from "./forks.js";
Expand Down Expand Up @@ -636,6 +641,29 @@ export class Network implements INetwork {
);
}

async sendExecutionPayloadEnvelopesByRange(
peerId: PeerIdStr,
request: gloas.ExecutionPayloadEnvelopesByRangeRequest
): Promise<gloas.SignedExecutionPayloadEnvelope[]> {
return collectMaxResponseTyped(
this.sendReqRespRequest(peerId, ReqRespMethod.ExecutionPayloadEnvelopesByRange, [Version.V1], request),
request.count,
responseSszTypeByMethod[ReqRespMethod.ExecutionPayloadEnvelopesByRange]
);
}

async sendExecutionPayloadEnvelopesByRoot(
peerId: PeerIdStr,
request: ExecutionPayloadEnvelopesByRootRequest
): Promise<gloas.SignedExecutionPayloadEnvelope[]> {
return collectMaxResponseTyped(
this.sendReqRespRequest(peerId, ReqRespMethod.ExecutionPayloadEnvelopesByRoot, [Version.V1], request),
request.length,
responseSszTypeByMethod[ReqRespMethod.ExecutionPayloadEnvelopesByRoot],
this.chain.serializedCache
);
}

private sendReqRespRequest<Req>(
peerId: PeerIdStr,
method: ReqRespMethod,
Expand Down
12 changes: 11 additions & 1 deletion packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,16 @@ function getSequentialHandlers(modules: ValidatorFnsModules, options: GossipHand
throw e;
}

if (e.type.code === BlockErrorCode.PARENT_PAYLOAD_UNKNOWN && blockInput) {
logger.debug("Gossip block has parent payload unknown", {slot, root: blockShortHex, code: e.type.code});
chain.emitter.emit(ChainEvent.unknownParentPayload, {
blockInput,
peer: peerIdStr,
source: BlockInputSource.gossip,
});
throw e;
}

if (e.action === GossipAction.REJECT) {
chain.persistInvalidSszValue(forkTypes.SignedBeaconBlock, signedBlock, `gossip_reject_slot_${slot}`);
}
Expand Down Expand Up @@ -842,7 +852,7 @@ function getSequentialHandlers(modules: ValidatorFnsModules, options: GossipHand
}: GossipHandlerParamGeneric<GossipType.execution_payload>) => {
const {serializedData} = gossipData;
const executionPayloadEnvelope = sszDeserialize(topic, serializedData);
// TODO GLOAS: handle BLOCK_ROOT_UNKNOWN error to trigger sync
// BLOCK_ROOT_UNKNWON should not happen here. It would be caught early on and be queued in awaitingMessagesByBlockRoot
await validateGossipExecutionPayloadEnvelope(chain, executionPayloadEnvelope);

const slot = executionPayloadEnvelope.message.slot;
Expand Down
80 changes: 80 additions & 0 deletions packages/beacon-node/src/network/processor/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {routes} from "@lodestar/api";
import {PayloadStatus} from "@lodestar/fork-choice";
import {ForkSeq} from "@lodestar/params";
import {computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {RootHex, Slot, SlotRootHex} from "@lodestar/types";
Expand Down Expand Up @@ -159,7 +160,10 @@ export class NetworkProcessor {
// we may not receive the block for messages like Attestation and SignedAggregateAndProof messages, in that case PendingGossipsubMessage needs
// to be stored in this Map and reprocessed once the block comes
private readonly awaitingMessagesByBlockRoot: MapDef<RootHex, Set<PendingGossipsubMessage>>;
// Messages waiting for a payload envelope to be imported for a known beacon block root
private readonly awaitingMessagesByEnvelopeBlockRoot: MapDef<RootHex, Set<PendingGossipsubMessage>>;
private unknownBlocksBySlot = new MapDef<Slot, Set<RootHex>>(() => new Set());
private unknownEnvelopesBySlot = new MapDef<Slot, Set<RootHex>>(() => new Set());

constructor(
modules: NetworkProcessorModules,
Expand All @@ -181,9 +185,11 @@ export class NetworkProcessor {

events.on(NetworkEvent.pendingGossipsubMessage, this.onPendingGossipsubMessage.bind(this));
this.chain.emitter.on(routes.events.EventType.block, this.onBlockProcessed.bind(this));
this.chain.emitter.on(routes.events.EventType.executionPayloadAvailable, this.onPayloadProcessed.bind(this));
this.chain.clock.on(ClockEvent.slot, this.onClockSlot.bind(this));

this.awaitingMessagesByBlockRoot = new MapDef<RootHex, Set<PendingGossipsubMessage>>(() => new Set());
this.awaitingMessagesByEnvelopeBlockRoot = new MapDef<RootHex, Set<PendingGossipsubMessage>>(() => new Set());

// TODO: Implement queues and priorization for ReqResp incoming requests
// Listens to NetworkEvent.reqRespIncomingRequest event
Expand Down Expand Up @@ -212,6 +218,7 @@ export class NetworkProcessor {
async stop(): Promise<void> {
this.events.off(NetworkEvent.pendingGossipsubMessage, this.onPendingGossipsubMessage);
this.chain.emitter.off(routes.events.EventType.block, this.onBlockProcessed);
this.chain.emitter.off(routes.events.EventType.executionPayloadAvailable, this.onPayloadProcessed);
this.chain.emitter.off(ClockEvent.slot, this.onClockSlot);
}

Expand Down Expand Up @@ -248,6 +255,22 @@ export class NetworkProcessor {
this.chain.emitter.emit(ChainEvent.unknownBlockRoot, {rootHex: root, peer, source});
}

/**
* Search for a missing execution payload envelope for a known beacon block root.
* Only emits if the block is known but FULL variant is missing.
*/
searchUnknownEnvelope({slot, root: blockRoot}: SlotRootHex, source: BlockInputSource, peer?: PeerIdStr): void {
if (this.awaitingMessagesByEnvelopeBlockRoot.has(blockRoot)) return;
if (this.unknownEnvelopesBySlot.getOrDefault(slot).has(blockRoot)) return;
// Only search if block is known — if block is unknown, searchUnknownBlock handles it
if (!this.chain.forkChoice.hasBlockHexUnsafe(blockRoot)) return;
// If FULL variant already exists, no need to search
if (this.chain.forkChoice.getBlockHex(blockRoot, PayloadStatus.FULL)) return;

this.unknownEnvelopesBySlot.getOrDefault(slot).add(blockRoot);
this.chain.emitter.emit(ChainEvent.unknownPayloadEnvelope, {blockRootHex: blockRoot, peer, source});
}

private onPendingGossipsubMessage(message: PendingGossipsubMessage): void {
const topicType = message.topic.type;
const extractBlockSlotRootFn = this.extractBlockSlotRootFns[topicType];
Expand Down Expand Up @@ -301,6 +324,14 @@ export class NetworkProcessor {
return;
}

// Block is known — check if message requires FULL payload state
// TODO GLOAS (PR #9025): Add evidence routing for specific gossip types:
// - attestation index === 1: needs FULL payload
// - PTC payloadPresent === true: needs FULL payload
// - data_column_sidecar in Gloas: needs FULL payload
// For now, these messages proceed to validation where they will fail with
// appropriate errors if FULL variant is missing

this.pushPendingGossipsubMessageToQueue(message);
}

Expand Down Expand Up @@ -345,6 +376,32 @@ export class NetworkProcessor {
this.awaitingMessagesByBlockRoot.delete(rootHex);
}

private async onPayloadProcessed({blockRoot}: {slot: Slot; blockRoot: string}): Promise<void> {
const waitingMessages = this.awaitingMessagesByEnvelopeBlockRoot.get(blockRoot);
if (!waitingMessages || waitingMessages.size === 0) {
return;
}

const nowSec = Date.now() / 1000;
let count = 0;
for (const message of waitingMessages) {
const topicType = message.topic.type;
this.metrics?.awaitingEnvelopeGossipMessages.waitSecBeforeResolve.set(
{topic: topicType},
nowSec - message.seenTimestampSec
);
this.metrics?.awaitingEnvelopeGossipMessages.resolve.inc({topic: topicType});
this.pushPendingGossipsubMessageToQueue(message);
count++;
if (count === MAX_AWAITING_GOSSIP_OBJECTS_PER_TICK) {
count = 0;
await sleep(AWAITING_GOSSIP_OBJECTS_YIELD_EVERY_MS);
}
}

this.awaitingMessagesByEnvelopeBlockRoot.delete(blockRoot);
}

private onClockSlot(clockSlot: Slot): void {
const nowSec = Date.now() / 1000;
const minSlot = clockSlot - MAX_UNKNOWN_ROOTS_SLOT_CACHE_SIZE;
Expand All @@ -371,6 +428,29 @@ export class NetworkProcessor {
}
this.unknownBlocksBySlot.delete(slot);
}

// Prune envelope waiting maps in parallel
for (const [slot, roots] of this.unknownEnvelopesBySlot) {
if (slot > minSlot) continue;
for (const rootHex of roots) {
const gossipMessages = this.awaitingMessagesByEnvelopeBlockRoot.get(rootHex);
if (gossipMessages !== undefined) {
for (const message of gossipMessages) {
const topicType = message.topic.type;
this.metrics?.awaitingEnvelopeGossipMessages.reject.inc({
topic: topicType,
reason: ReprocessRejectReason.expired,
});
this.metrics?.awaitingEnvelopeGossipMessages.waitSecBeforeReject.set(
{topic: topicType, reason: ReprocessRejectReason.expired},
nowSec - message.seenTimestampSec
);
}
this.awaitingMessagesByEnvelopeBlockRoot.delete(rootHex);
}
}
this.unknownEnvelopesBySlot.delete(slot);
}
}

private executeWork(): void {
Expand Down
Loading
Loading