@@ -2,7 +2,7 @@ import { chunk } from '@aztec/foundation/collection';
22import { createLogger } from '@aztec/foundation/log' ;
33import { promiseWithResolvers } from '@aztec/foundation/promise' ;
44import type { BlockProposal } from '@aztec/stdlib/p2p' ;
5- import type { TxHash } from '@aztec/stdlib/tx' ;
5+ import type { TxArray , TxHash } from '@aztec/stdlib/tx' ;
66
77import type { PeerId } from '@libp2p/interface' ;
88import { peerIdFromString } from '@libp2p/peer-id' ;
@@ -28,16 +28,20 @@ class MissingTxMetadata {
2828export class BatchTxRequester {
2929 private readonly peers : PeerId [ ] ;
3030 private readonly smartPeers = new Set < string > ( ) ;
31+ private readonly badPeers = new Set < string > ( ) ;
3132 private readonly peersToTxMap = new Map < string , Array < TxHash > > ( ) ;
3233
3334 private readonly txsMetadata ;
3435
36+ private readonly deadline = Date . now ( ) + this . timeoutMs ;
37+
3538 private startSmartRequester : ( ( v : void ) => void ) | undefined = undefined ;
3639
3740 constructor (
3841 private readonly blockProposal : BlockProposal ,
3942 private readonly missingTxs : TxHash [ ] ,
4043 private readonly pinnedPeer : PeerId | undefined ,
44+ private readonly timeoutMs : number ,
4145 private readonly reqresp : ReqRespInterface ,
4246 private readonly connectionSampler : ConnectionSampler ,
4347 private logger = createLogger ( 'p2p:reqresp_batch' ) ,
@@ -58,18 +62,25 @@ export class BatchTxRequester {
5862 const { promise, resolve } = promiseWithResolvers < void > ( ) ;
5963 this . startSmartRequester = resolve ;
6064
61- Promise . allSettled ( [ this . smartRequester ( promise ) , this . dumbRequester ( ) ] ) ;
65+ //TODO: executeTimeout?
66+ await Promise . allSettled ( [ this . smartRequester ( promise ) , this . dumbRequester ( ) ] ) ;
6267 }
6368
6469 private async smartRequester ( start : Promise < void > ) {
65- const nextPeerIndex = this . makeRoundRobinIndexer ( ( ) => this . smartPeers . size ) ;
70+ const nextPeerIndex = this . makeRoundRobinIndexer ( ( ) => getPeers ( ) . length ) ;
71+ const getPeers = ( ) => Array . from ( this . smartPeers . difference ( this . badPeers ) ) ;
72+
6673 // if we don't have a pinned peer we wait to start smart requester
6774 // otherwise we start immediately with the pinned peer
6875 if ( ! this . pinnedPeer ) {
6976 await start ;
7077 }
7178
72- const nextPeer = ( ) => peerIdFromString ( Array . from ( this . smartPeers ) [ nextPeerIndex ( ) ] ) ;
79+ const nextPeer = ( ) => {
80+ const idx = nextPeerIndex ( ) ;
81+ return idx === undefined ? undefined : peerIdFromString ( getPeers ( ) [ idx ] ) ;
82+ } ;
83+
7384 const makeRequest = ( pid : PeerId ) => {
7485 //TODO: for this peer we have to make batch on the fly based on which txs peer has
7586 const txsPeerHas = this . peersToTxMap . get ( pid . toString ( ) ) ;
@@ -85,41 +96,57 @@ export class BatchTxRequester {
8596 return BlockTxsRequest . fromBlockProposalAndMissingTxs ( this . blockProposal , txsToRequest ) ;
8697 } ;
8798
88- // Kick off workers
89- const workers = Array . from ( { length : Math . min ( PEERS_TO_QUERY_IN_PARALLEL , this . smartPeers . size ) } , ( ) =>
90- this . workerLoop ( nextPeer , makeRequest ) ,
99+ const workers = Array . from ( { length : Math . min ( PEERS_TO_QUERY_IN_PARALLEL , getPeers ( ) . length ) } , ( ) =>
100+ this . workerLoop ( nextPeer , makeRequest , 'smart' ) ,
91101 ) ;
92102 await Promise . allSettled ( workers ) ;
93103 }
94104
95105 private async dumbRequester ( ) {
96- const nextPeerIndex = this . makeRoundRobinIndexer ( ( ) => peers . length ) ;
106+ const peers = new Set ( this . peers . map ( peer => peer . toString ( ) ) ) ;
107+ const nextPeerIndex = this . makeRoundRobinIndexer ( ( ) => getPeers ( ) . length ) ;
97108 const nextBatchIndex = this . makeRoundRobinIndexer ( ( ) => txChunks . length ) ;
109+ const getPeers = ( ) => Array . from ( peers . difference ( this . smartPeers . union ( this . badPeers ) ) ) ;
98110
99- const peers = [ ...this . peers ] ;
100111 const txChunks = chunk < TxHash > ( this . missingTxs , TX_BATCH_SIZE ) ;
101112
102113 //TODO: batches should be adaptive
103114 const makeRequest = ( _pid : PeerId ) => {
104- return BlockTxsRequest . fromBlockProposalAndMissingTxs ( this . blockProposal , txChunks [ nextBatchIndex ( ) ] ) ;
115+ const idx = nextBatchIndex ( ) ;
116+ return idx === undefined
117+ ? undefined
118+ : BlockTxsRequest . fromBlockProposalAndMissingTxs ( this . blockProposal , txChunks [ idx ] ) ;
105119 } ;
106120
107- const nextPeer = ( ) => peers . filter ( pid => ! this . smartPeers . has ( pid . toString ( ) ) ) [ nextPeerIndex ( ) ] ;
121+ const nextPeer = ( ) => {
122+ const idx = nextPeerIndex ( ) ;
123+ return idx === undefined ? undefined : peerIdFromString ( Array . from ( getPeers ( ) ) [ idx ] ) ;
124+ } ;
108125
109- const workers = Array . from ( { length : Math . min ( PEERS_TO_QUERY_IN_PARALLEL , peers . length ) } , ( ) =>
110- this . workerLoop ( nextPeer , makeRequest ) ,
126+ const workers = Array . from ( { length : Math . min ( PEERS_TO_QUERY_IN_PARALLEL , getPeers ( ) . length ) } , ( ) =>
127+ this . workerLoop ( nextPeer , makeRequest , 'dumb' ) ,
111128 ) ;
112129 await Promise . allSettled ( workers ) ;
113130 }
114131
115- private async workerLoop ( pickNextPeer : ( ) => PeerId , request : ( pid : PeerId ) => BlockTxsRequest | undefined ) {
132+ private async workerLoop (
133+ pickNextPeer : ( ) => PeerId | undefined ,
134+ request : ( pid : PeerId ) => BlockTxsRequest | undefined ,
135+ type : 'smart' | 'dumb' ,
136+ ) {
116137 while ( ! this . shouldStop ( ) ) {
117138 const peerId = pickNextPeer ( ) ;
118- //
139+ const weRanOutOfPeersToQuery = peerId === undefined ;
140+ if ( weRanOutOfPeersToQuery ) {
141+ this . logger . debug ( `Worker loop: ${ type } : No more peers to query` ) ;
142+ return ;
143+ }
144+
119145 // TODO: think about this a bit more what should we do in this case?
120146 const nextBatchTxRequest = request ( peerId ) ;
121147 if ( ! nextBatchTxRequest ) {
122- this . logger . warn ( 'No txs matched' ) ;
148+ this . logger . warn ( `Worker loop: ${ type } : Could not create next batch request` ) ;
149+ // We retry with the next peer/batch
123150 continue ;
124151 }
125152
@@ -146,34 +173,16 @@ export class BatchTxRequester {
146173 }
147174 }
148175
149- //TODO: 1 mark missing transactions as fetched
150- //TODO: 2 mark peer having this transactions
151- //TODO: 3 stream responses either via async generator or callbacks
152176 private handleSuccessResponseFromPeer ( peerId : PeerId , response : BlockTxsResponse ) {
153177 this . logger . debug ( `Received txs: ${ response . txs . length } from peer ${ peerId . toString ( ) } ` ) ;
154178 if ( ! this . isBlockResponseValid ( response ) ) {
155179 return ;
156180 }
157181
158- //TODO: yield txs
159- for ( const tx of response . txs ) {
160- const key = tx . txHash . toString ( ) ;
161- let meta = this . txsMetadata . get ( key ) ;
162-
163- if ( ! meta ) {
164- meta = new MissingTxMetadata ( tx . txHash , true ) ;
165- this . txsMetadata . set ( key , meta ) ;
166- } else {
167- meta . fetched = true ; // mutate in place; no need to re-set
168- }
169- }
182+ this . smartPeers . add ( peerId . toString ( ) ) ;
170183
171- const peerIdStr = peerId . toString ( ) ;
172- this . smartPeers . add ( peerIdStr ) ;
173- const txsPeerHas = this . extractHashesPeerHasFromResponse ( response ) ;
174- // NOTE: it's ok to override this and not make it union with previous data
175- // because the newer request contains most up to date info
176- this . peersToTxMap . set ( peerIdStr , txsPeerHas ) ;
184+ this . markTxsPeerHas ( peerId , response ) ;
185+ this . handleReceivedTxs ( peerId , response . txs ) ;
177186
178187 //TODO: maybe wait for at least couple of peers so that we don't spam single one?
179188 if ( this . startSmartRequester !== undefined ) {
@@ -190,8 +199,42 @@ export class BatchTxRequester {
190199 return blockIdsMatch && peerHasSomeTxsFromProposal ;
191200 }
192201
193- //TODO:
194- private handleFailResponseFromPeer ( peerId : PeerId ) { }
202+ private handleReceivedTxs ( peerId : PeerId , txs : TxArray ) {
203+ //TODO: yield txs
204+ for ( const tx of txs ) {
205+ const key = tx . txHash . toString ( ) ;
206+ let txMeta = this . txsMetadata . get ( key ) ;
207+ if ( txMeta ) {
208+ txMeta . fetched = true ;
209+ txMeta . peers . add ( peerId . toString ( ) ) ;
210+ } else {
211+ //TODO: what to do about peer which sent txs we didn't request?
212+ // 1. don't request from it in the scope of this batch request
213+ // 2. ban it immediately?
214+ // 3. track it and ban it?
215+ //
216+ // NOTE: don't break immediately peer still might have txs we need
217+ }
218+ }
219+ }
220+
221+ private markTxsPeerHas ( peerId : PeerId , response : BlockTxsResponse ) {
222+ const peerIdStr = peerId . toString ( ) ;
223+ const txsPeerHas = this . extractHashesPeerHasFromResponse ( response ) ;
224+ // NOTE: it's ok to override this and not make it union with previous data
225+ // because the newer request contains most up to date info
226+ this . peersToTxMap . set ( peerIdStr , txsPeerHas ) ;
227+
228+ this . txsMetadata . values ( ) . forEach ( txMeta => {
229+ txMeta . peers . add ( peerIdStr ) ;
230+ } ) ;
231+ }
232+
233+ //TODO: are we missing something here?
234+ // banning the peers?
235+ private handleFailResponseFromPeer ( peerId : PeerId ) {
236+ this . badPeers . add ( peerId . toString ( ) ) ;
237+ }
195238
196239 private extractHashesPeerHasFromResponse ( response : BlockTxsResponse ) : Array < TxHash > {
197240 const hashes : TxHash [ ] = [ ] ;
@@ -208,8 +251,13 @@ export class BatchTxRequester {
208251 private makeRoundRobinIndexer ( size : ( ) => number , start = 0 ) {
209252 let i = start ;
210253 return ( ) => {
254+ const length = size ( ) ;
255+ if ( length === 0 ) {
256+ return undefined ;
257+ }
258+
211259 const current = i ;
212- i = ( current + 1 ) % size ( ) ;
260+ i = ( current + 1 ) % length ;
213261 return current ;
214262 } ;
215263 }
@@ -223,6 +271,6 @@ export class BatchTxRequester {
223271 //2. deadline
224272 //3. received all
225273 private shouldStop ( ) {
226- return this . fetchedAllTxs ( ) || this . txsMetadata . size === 0 ;
274+ return this . txsMetadata . size === 0 || this . fetchedAllTxs ( ) || Date . now ( ) > this . deadline ;
227275 }
228276}
0 commit comments