@@ -2,6 +2,7 @@ import { chunk } from '@aztec/foundation/collection';
22import { createLogger } from '@aztec/foundation/log' ;
33import { Semaphore } from '@aztec/foundation/queue' ;
44import { sleep } from '@aztec/foundation/sleep' ;
5+ import { DateProvider } from '@aztec/foundation/timer' ;
56import type { BlockProposal } from '@aztec/stdlib/p2p' ;
67import { type TxArray , TxHash } from '@aztec/stdlib/tx' ;
78
@@ -33,7 +34,8 @@ export class BatchTxRequester {
3334 private readonly timeoutMs : number ,
3435 private readonly reqresp : ReqRespInterface ,
3536 private readonly connectionSampler : ConnectionSampler ,
36- private logger = createLogger ( 'p2p:reqresp_batch' ) ,
37+ private readonly logger = createLogger ( 'p2p:reqresp_batch' ) ,
38+ private readonly dateProvider : DateProvider = new DateProvider ( ) ,
3739 ) {
3840 this . txsMetadata = new MissingTxMetadataCollection (
3941 missingTxs . map ( txHash => [ txHash . toString ( ) , new MissingTxMetadata ( txHash ) ] ) ,
@@ -46,7 +48,7 @@ export class BatchTxRequester {
4648 this . peers . markPeerInFlight ( this . pinnedPeer ) ;
4749 }
4850
49- this . deadline = Date . now ( ) + this . timeoutMs ;
51+ this . deadline = this . dateProvider . now ( ) + this . timeoutMs ;
5052 }
5153
5254 public async run ( ) {
@@ -108,7 +110,7 @@ export class BatchTxRequester {
108110 } ;
109111
110112 const workers = Array . from (
111- { length : Math . min ( DUMB_PEERS_TO_QUERY_IN_PARALLEL , this . peers . getAllPeers ( ) . size ) } ,
113+ { length : Math . min ( SMART_PEERS_TO_QUERY_IN_PARALLEL , this . peers . getAllPeers ( ) . size ) } ,
112114 ( ) => this . workerLoop ( nextPeer , makeRequest , 'smart' ) ,
113115 ) ;
114116
@@ -135,7 +137,7 @@ export class BatchTxRequester {
135137 return undefined ;
136138 }
137139
138- const txs = txChunks ( ) [ idx ] . map ( t => TxHash . fromString ( t ) ) ;
140+ const txs = txsChunks [ idx ] . map ( t => TxHash . fromString ( t ) ) ;
139141 console . log ( `Dumb batch index: ${ idx } , batches count: ${ txsChunks . length } ` ) ;
140142 txs . forEach ( tx => this . txsMetadata . markRequested ( tx ) ) ;
141143 return { blockRequest : BlockTxsRequest . fromBlockProposalAndMissingTxs ( this . blockProposal , txs ) , txs } ;
@@ -222,12 +224,15 @@ export class BatchTxRequester {
222224 console . log ( `Peer ${ peerId . toString ( ) } \n${ err } ` ) ;
223225 await this . handleFailResponseFromPeer ( peerId , ReqRespStatus . UNKNOWN ) ;
224226 } finally {
225- this . peers . unMarkPeerInFlight ( peerId ) ;
227+ // Don't mark pinned peer as not in flight
228+ if ( ! this . pinnedPeer ?. equals ( peerId ) ) {
229+ this . peers . unMarkPeerInFlight ( peerId ) ;
230+ }
226231 }
227232 }
228233
229234 private handleSuccessResponseFromPeer ( peerId : PeerId , response : BlockTxsResponse ) {
230- this . peers . markPeerAsBad ( peerId ) ;
235+ this . peers . unMarkPeerAsBad ( peerId ) ;
231236 this . logger . debug ( `Received txs: ${ response . txs . length } from peer ${ peerId . toString ( ) } ` ) ;
232237 this . handleReceivedTxs ( peerId , response . txs ) ;
233238
@@ -320,6 +325,6 @@ export class BatchTxRequester {
320325
321326 //TODO: abort signal here?
322327 private shouldStop ( ) {
323- return this . txsMetadata . size === 0 || this . fetchedAllTxs ( ) || Date . now ( ) > this . deadline ;
328+ return this . txsMetadata . size === 0 || this . fetchedAllTxs ( ) || this . dateProvider . now ( ) > this . deadline ;
324329 }
325330}
0 commit comments