11import { ChainForkConfig } from "@lodestar/config" ;
2+ import { PayloadStatus } from "@lodestar/fork-choice" ;
23import { ForkSeq } from "@lodestar/params" ;
34import { RequestError , RequestErrorCode } from "@lodestar/reqresp" ;
45import { computeTimeAtSlot } from "@lodestar/state-transition" ;
56import { RootHex } from "@lodestar/types" ;
6- import { Logger , prettyPrintIndices , pruneSetToMax , sleep } from "@lodestar/utils" ;
7+ import { Logger , fromHex , prettyPrintIndices , pruneSetToMax , sleep } from "@lodestar/utils" ;
78import { isBlockInputBlobs , isBlockInputColumns } from "../chain/blocks/blockInput/blockInput.js" ;
89import { BlockInputSource , IBlockInput } from "../chain/blocks/blockInput/types.js" ;
10+ import { PayloadEnvelopeInputSource } from "../chain/blocks/payloadEnvelopeInput/index.js" ;
911import { BlockError , BlockErrorCode } from "../chain/errors/index.js" ;
1012import { ChainEvent , ChainEventData , IBeaconChain } from "../chain/index.js" ;
1113import { Metrics } from "../metrics/index.js" ;
@@ -22,6 +24,7 @@ import {
2224 PendingBlockInput ,
2325 PendingBlockInputStatus ,
2426 PendingBlockType ,
27+ PendingPayloadEnvelope ,
2528 getBlockInputSyncCacheItemRootHex ,
2629 getBlockInputSyncCacheItemSlot ,
2730 isPendingBlockInput ,
@@ -32,6 +35,8 @@ import {getAllDescendantBlocks, getDescendantBlocks, getUnknownAndAncestorBlocks
3235const MAX_ATTEMPTS_PER_BLOCK = 5 ;
3336const MAX_KNOWN_BAD_BLOCKS = 500 ;
3437const MAX_PENDING_BLOCKS = 100 ;
38+ const MAX_PENDING_PAYLOADS = 100 ;
39+ const MAX_ATTEMPTS_PER_PAYLOAD = 5 ;
3540
3641enum FetchResult {
3742 SuccessResolved = "success_resolved" ,
@@ -78,6 +83,7 @@ export class BlockInputSync {
7883 * block RootHex -> PendingBlock. To avoid finding same root at the same time
7984 */
8085 private readonly pendingBlocks = new Map < RootHex , BlockInputSyncCacheItem > ( ) ;
86+ private readonly pendingPayloads = new Map < RootHex , PendingPayloadEnvelope > ( ) ;
8187 private readonly knownBadBlocks = new Set < RootHex > ( ) ;
8288 private readonly maxPendingBlocks ;
8389 private subscribedToNetworkEvents = false ;
@@ -101,6 +107,9 @@ export class BlockInputSync {
101107 metrics . blockInputSync . knownBadBlocks . addCollect ( ( ) =>
102108 metrics . blockInputSync . knownBadBlocks . set ( this . knownBadBlocks . size )
103109 ) ;
110+ metrics . blockInputSync . pendingPayloads ?. addCollect ( ( ) =>
111+ metrics . blockInputSync . pendingPayloads ?. set ( this . pendingPayloads . size )
112+ ) ;
104113 }
105114 }
106115
@@ -116,6 +125,7 @@ export class BlockInputSync {
116125 this . chain . emitter . on ( ChainEvent . unknownBlockRoot , this . onUnknownBlockRoot ) ;
117126 this . chain . emitter . on ( ChainEvent . incompleteBlockInput , this . onIncompleteBlockInput ) ;
118127 this . chain . emitter . on ( ChainEvent . blockUnknownParent , this . onUnknownParent ) ;
128+ this . chain . emitter . on ( ChainEvent . unknownEnvelopeBlockRoot , this . onUnknownPayloadEnvelope ) ;
119129 this . network . events . on ( NetworkEvent . peerConnected , this . onPeerConnected ) ;
120130 this . network . events . on ( NetworkEvent . peerDisconnected , this . onPeerDisconnected ) ;
121131 this . subscribedToNetworkEvents = true ;
@@ -127,6 +137,7 @@ export class BlockInputSync {
127137 this . chain . emitter . off ( ChainEvent . unknownBlockRoot , this . onUnknownBlockRoot ) ;
128138 this . chain . emitter . off ( ChainEvent . incompleteBlockInput , this . onIncompleteBlockInput ) ;
129139 this . chain . emitter . off ( ChainEvent . blockUnknownParent , this . onUnknownParent ) ;
140+ this . chain . emitter . off ( ChainEvent . unknownEnvelopeBlockRoot , this . onUnknownPayloadEnvelope ) ;
130141 this . network . events . off ( NetworkEvent . peerConnected , this . onPeerConnected ) ;
131142 this . network . events . off ( NetworkEvent . peerDisconnected , this . onPeerDisconnected ) ;
132143 this . subscribedToNetworkEvents = false ;
@@ -183,6 +194,21 @@ export class BlockInputSync {
183194 }
184195 } ;
185196
197+ /**
198+ * Process an unknownEnvelopeBlockRoot event - fetch missing payload envelope for a known block.
199+ */
200+ private onUnknownPayloadEnvelope = ( data : ChainEventData [ ChainEvent . unknownEnvelopeBlockRoot ] ) : void => {
201+ try {
202+ const { rootHex : blockRootHex , peer} = data ;
203+ const block = this . chain . forkChoice . getBlockHexDefaultStatus ( blockRootHex ) ;
204+ if ( ! block ) return ;
205+ this . addPendingPayload ( blockRootHex , block . slot , peer ) ;
206+ this . metrics ?. blockInputSync . payloadRequests ?. inc ( { source : data . source } ) ;
207+ } catch ( e ) {
208+ this . logger . debug ( "Error handling unknownPayloadEnvelope event" , { } , e as Error ) ;
209+ }
210+ } ;
211+
186212 private addByRootHex = ( rootHex : RootHex , peerIdStr ?: PeerIdStr ) : void => {
187213 let pendingBlock = this . pendingBlocks . get ( rootHex ) ;
188214 if ( ! pendingBlock ) {
@@ -248,6 +274,7 @@ export class BlockInputSync {
248274 const peerSyncMeta = this . network . getConnectedPeerSyncMeta ( peerId ) ;
249275 this . peerBalancer . onPeerConnected ( data . peer , peerSyncMeta ) ;
250276 this . triggerUnknownBlockSearch ( ) ;
277+ this . triggerPayloadSearch ( ) ;
251278 } catch ( e ) {
252279 this . logger . debug ( "Error handling peerConnected event" , { } , e as Error ) ;
253280 }
@@ -258,6 +285,105 @@ export class BlockInputSync {
258285 this . peerBalancer . onPeerDisconnected ( peerId ) ;
259286 } ;
260287
288+ addPendingPayload ( rootHex : RootHex , slot : number , peer ?: PeerIdStr ) : void {
289+ const payloadInput = this . chain . seenPayloadEnvelopeInputCache . get ( rootHex ) ;
290+ if ( payloadInput ?. hasPayloadEnvelope ( ) ) return ;
291+ if ( this . chain . forkChoice . getBlockHex ( rootHex , PayloadStatus . FULL ) ) return ;
292+ if ( this . pendingPayloads . size >= MAX_PENDING_PAYLOADS ) return ;
293+
294+ let pending = this . pendingPayloads . get ( rootHex ) ;
295+ if ( ! pending ) {
296+ pending = {
297+ status : "pending" ,
298+ blockRootHex : rootHex ,
299+ slot,
300+ attempts : 0 ,
301+ peerIdStrings : new Set ( ) ,
302+ timeAddedSec : Date . now ( ) / 1000 ,
303+ } ;
304+ this . pendingPayloads . set ( rootHex , pending ) ;
305+ }
306+ if ( peer ) pending . peerIdStrings . add ( peer ) ;
307+ this . triggerPayloadSearch ( ) ;
308+ }
309+
310+ private triggerPayloadSearch = ( ) : void => {
311+ if ( this . pendingPayloads . size === 0 ) return ;
312+ if ( this . network . getConnectedPeers ( ) . length === 0 ) return ;
313+
314+ const finalizedSlot = this . chain . forkChoice . getFinalizedBlock ( ) . slot ;
315+
316+ for ( const [ rootHex , pending ] of this . pendingPayloads ) {
317+ if ( pending . slot <= finalizedSlot ) {
318+ this . pendingPayloads . delete ( rootHex ) ;
319+ continue ;
320+ }
321+ if (
322+ this . chain . seenPayloadEnvelopeInputCache . get ( rootHex ) ?. hasPayloadEnvelope ( ) ||
323+ this . chain . forkChoice . getBlockHex ( rootHex , PayloadStatus . FULL )
324+ ) {
325+ this . pendingPayloads . delete ( rootHex ) ;
326+ continue ;
327+ }
328+ if ( ! this . chain . forkChoice . hasBlockHexUnsafe ( rootHex ) ) continue ;
329+ if ( pending . status !== "pending" ) continue ;
330+ if ( pending . attempts >= MAX_ATTEMPTS_PER_PAYLOAD ) {
331+ this . pendingPayloads . delete ( rootHex ) ;
332+ continue ;
333+ }
334+ this . fetchPayloadEnvelope ( pending ) . catch ( ( e ) => {
335+ this . logger . debug ( "Unexpected error - fetchPayloadEnvelope" , { root : pending . blockRootHex } , e ) ;
336+ } ) ;
337+ }
338+ } ;
339+
340+ private async fetchPayloadEnvelope ( pending : PendingPayloadEnvelope ) : Promise < void > {
341+ pending . status = "fetching" ;
342+ pending . attempts ++ ;
343+ try {
344+ const peerMeta = this . peerBalancer . bestPeerForPendingColumns ( new Set ( ) , new Set ( ) ) ;
345+ if ( ! peerMeta ) {
346+ pending . status = "pending" ;
347+ return ;
348+ }
349+ const { peerId : peer } = peerMeta ;
350+
351+ const envelopes = await this . network . sendExecutionPayloadEnvelopesByRoot ( peer , [ fromHex ( pending . blockRootHex ) ] ) ;
352+ if ( envelopes . length === 0 ) {
353+ pending . status = "pending" ;
354+ return ;
355+ }
356+
357+ const envelope = envelopes [ 0 ] ;
358+ const payloadInput = this . chain . seenPayloadEnvelopeInputCache . get ( pending . blockRootHex ) ;
359+ if ( ! payloadInput ) {
360+ this . logger . debug ( "PayloadEnvelopeInput missing for fetched envelope" , { root : pending . blockRootHex } ) ;
361+ pending . status = "pending" ;
362+ return ;
363+ }
364+
365+ payloadInput . addPayloadEnvelope ( {
366+ envelope,
367+ source : PayloadEnvelopeInputSource . byRoot ,
368+ seenTimestampSec : Date . now ( ) / 1000 ,
369+ peerIdStr : peer ,
370+ } ) ;
371+
372+ if ( payloadInput . isComplete ( ) ) {
373+ await this . chain . processExecutionPayload ( payloadInput ) ;
374+ }
375+
376+ this . pendingPayloads . delete ( pending . blockRootHex ) ;
377+ this . metrics ?. blockInputSync . payloadFetchSuccess ?. inc ( ) ;
378+ // Re-trigger block search since pending blocks may now be processable
379+ this . triggerUnknownBlockSearch ( ) ;
380+ } catch ( e ) {
381+ this . logger . debug ( "Error fetching payload envelope" , { root : pending . blockRootHex } , e as Error ) ;
382+ pending . status = "pending" ;
383+ this . metrics ?. blockInputSync . payloadFetchError ?. inc ( ) ;
384+ }
385+ }
386+
261387 /**
262388 * Gather tip parent blocks with unknown parent and do a search for all of them
263389 */
@@ -456,6 +582,12 @@ export class BlockInputSync {
456582 pendingBlock . status = PendingBlockInputStatus . downloaded ;
457583 break ;
458584
585+ case BlockErrorCode . PARENT_PAYLOAD_UNKNOWN :
586+ this . logger . debug ( "Block parent payload unknown" , errorData , res . err ) ;
587+ this . addPendingPayload ( res . err . type . parentRoot , pendingBlock . blockInput . slot - 1 ) ;
588+ pendingBlock . status = PendingBlockInputStatus . downloaded ;
589+ break ;
590+
459591 case BlockErrorCode . EXECUTION_ENGINE_ERROR :
460592 // Removing the block(s) without penalizing the peers, hoping for EL to
461593 // recover on a latter download + verify attempt
@@ -671,6 +803,7 @@ export class BlockInputSync {
671803 for ( const block of badPendingBlocks ) {
672804 const rootHex = getBlockInputSyncCacheItemRootHex ( block ) ;
673805 this . pendingBlocks . delete ( rootHex ) ;
806+ this . pendingPayloads . delete ( rootHex ) ;
674807 this . chain . seenBlockInputCache . prune ( rootHex ) ;
675808 this . logger . debug ( "Removing bad/unknown/incomplete BlockInputSyncCacheItem" , {
676809 slot,
0 commit comments