@@ -10,7 +10,7 @@ import type { PeerId } from '@libp2p/interface';
1010import { peerIdFromString } from '@libp2p/peer-id' ;
1111
1212import type { ConnectionSampler } from '.././connection-sampler/connection_sampler.js' ;
13- import { type ReqRespInterface , ReqRespSubProtocol } from '.././interface.js' ;
13+ import { type ReqRespInterface , ReqRespSubProtocol , type ReqRespSubProtocolValidators } from '.././interface.js' ;
1414import { BlockTxsRequest , BlockTxsResponse } from '.././protocols/index.js' ;
1515import { ReqRespStatus } from '.././status.js' ;
1616import { MissingTxMetadata , MissingTxMetadataCollection , TX_BATCH_SIZE } from './missing_txs.js' ;
@@ -34,6 +34,7 @@ export class BatchTxRequester {
3434 private readonly timeoutMs : number ,
3535 private readonly reqresp : ReqRespInterface ,
3636 private readonly connectionSampler : ConnectionSampler ,
37+ private readonly txValidator : ReqRespSubProtocolValidators [ ReqRespSubProtocol . TX ] ,
3738 private readonly logger = createLogger ( 'p2p:reqresp_batch' ) ,
3839 private readonly dateProvider : DateProvider = new DateProvider ( ) ,
3940 ) {
@@ -90,10 +91,9 @@ export class BatchTxRequester {
9091
9192 private async smartRequester ( ) {
9293 const nextPeerIndex = this . makeRoundRobinIndexer ( ) ;
93- const getPeers = ( ) => this . peers . getSmartPeersToQuery ( ) ;
9494
9595 const nextPeer = ( ) => {
96- const peers = getPeers ( ) ;
96+ const peers = this . peers . getSmartPeersToQuery ( ) ;
9797 const idx = nextPeerIndex ( ( ) => peers . length ) ;
9898 return idx === undefined ? undefined : peerIdFromString ( peers [ idx ] ) ;
9999 } ;
@@ -111,7 +111,7 @@ export class BatchTxRequester {
111111
112112 const workers = Array . from (
113113 { length : Math . min ( SMART_PEERS_TO_QUERY_IN_PARALLEL , this . peers . getAllPeers ( ) . size ) } ,
114- ( ) => this . workerLoop ( nextPeer , makeRequest , 'smart' ) ,
114+ ( ) => this . smartWorkerLoop ( nextPeer , makeRequest ) ,
115115 ) ;
116116
117117 await Promise . allSettled ( workers ) ;
@@ -131,14 +131,14 @@ export class BatchTxRequester {
131131 ) ;
132132
133133 const makeRequest = ( _pid : PeerId ) => {
134- const txsChunks = txChunks ( ) ;
135- const idx = nextBatchIndex ( ( ) => txChunks ( ) . length ) ;
134+ const chunks = txChunks ( ) ;
135+ const idx = nextBatchIndex ( ( ) => chunks . length ) ;
136136 if ( idx === undefined ) {
137137 return undefined ;
138138 }
139139
140- const txs = txsChunks [ idx ] . map ( t => TxHash . fromString ( t ) ) ;
141- console . log ( `Dumb batch index: ${ idx } , batches count: ${ txsChunks . length } ` ) ;
140+ const txs = chunks [ idx ] . map ( t => TxHash . fromString ( t ) ) ;
141+ console . log ( `Dumb batch index: ${ idx } , batches count: ${ chunks . length } ` ) ;
142142 txs . forEach ( tx => this . txsMetadata . markRequested ( tx ) ) ;
143143 return { blockRequest : BlockTxsRequest . fromBlockProposalAndMissingTxs ( this . blockProposal , txs ) , txs } ;
144144 } ;
@@ -151,55 +151,96 @@ export class BatchTxRequester {
151151
152152 const workers = Array . from (
153153 { length : Math . min ( DUMB_PEERS_TO_QUERY_IN_PARALLEL , this . peers . getAllPeers ( ) . size ) } ,
154- ( ) => this . workerLoop ( nextPeer , makeRequest , 'dumb' ) ,
154+ ( ) => this . dumbWorkerLoop ( nextPeer , makeRequest ) ,
155155 ) ;
156156 await Promise . allSettled ( workers ) ;
157157 }
158158
159- //TODO: cleanup the typeing here
160- // splitting this in workerLoopSmart and dumb probably makes sense
161- private async workerLoop (
159+ private async dumbWorkerLoop (
162160 pickNextPeer : ( ) => PeerId | undefined ,
163161 request : ( pid : PeerId ) => { blockRequest : BlockTxsRequest | undefined ; txs : TxHash [ ] } | undefined ,
164- type : 'smart' | 'dumb' ,
165162 ) {
166- if ( type === 'smart' ) {
167- await this . smartRequesterSemaphore . acquire ( ) ;
168- }
169-
170163 let count = 0 ;
171164 while ( ! this . shouldStop ( ) ) {
172165 count ++ ;
173166 const peerId = pickNextPeer ( ) ;
174167 const weRanOutOfPeersToQuery = peerId === undefined ;
175168 if ( weRanOutOfPeersToQuery ) {
176- this . logger . debug ( `Worker loop: ${ type } : No more peers to query` ) ;
177- console . log ( `[${ count } ] Worker loop: ${ type } : No more peers to query` ) ;
169+ this . logger . debug ( `Worker loop dumb : No more peers to query` ) ;
170+ console . log ( `[${ count } ] Worker loop dumb : No more peers to query` ) ;
178171 return ;
179172 }
180173
181174 const nextBatchTxRequest = request ( peerId ) ;
182175 if ( ! nextBatchTxRequest ) {
183- this . logger . warn ( `Worker loop: ${ type } : Could not create next batch request` ) ;
176+ this . logger . warn ( `Worker loop dumb : Could not create next batch request` ) ;
184177 // We retry with the next peer/batch
185178 continue ;
186179 }
187180
181+ //TODO: check this, this should only happen in case something bad happened
188182 const { blockRequest, txs } = nextBatchTxRequest ;
189183 if ( blockRequest === undefined ) {
190184 return ;
191185 }
192186
193187 console . log (
194- `[${ count } ] Worker type: ${ type } : Requesting txs from peer ${ peerId . toString ( ) } : ${ txs . map ( tx => tx . toString ( ) ) . join ( '\n' ) } ` ,
188+ `[${ count } ] Worker type dumb : Requesting txs from peer ${ peerId . toString ( ) } : ${ txs . map ( tx => tx . toString ( ) ) . join ( '\n' ) } ` ,
195189 ) ;
196190
197191 await this . requestTxBatch ( peerId , blockRequest ) ;
198- if ( type === 'smart' ) {
199- txs . forEach ( tx => {
200- this . txsMetadata . markNotInFlightBySmartPeer ( tx ) ;
201- } ) ;
192+ }
193+ }
194+
195+ private async smartWorkerLoop (
196+ pickNextPeer : ( ) => PeerId | undefined ,
197+ request : ( pid : PeerId ) => { blockRequest : BlockTxsRequest | undefined ; txs : TxHash [ ] } | undefined ,
198+ ) {
199+ let count = 0 ;
200+ await this . smartRequesterSemaphore . acquire ( ) ;
201+
202+ while ( ! this . shouldStop ( ) ) {
203+ count ++ ;
204+ const peerId = pickNextPeer ( ) ;
205+ const weRanOutOfPeersToQuery = peerId === undefined ;
206+ if ( weRanOutOfPeersToQuery ) {
207+ this . logger . debug ( `Worker loop smart: No more no more peers to query` ) ;
208+ console . log ( `[${ count } ] Worker loop smart: No more smart peers to query` ) ;
209+
210+ //If there are no more dumb peers to query then none of our peers can become smart,
211+ //thus we can simply exit this worker
212+ const noMoreDumbPeersToQuery = this . peers . getDumbPeersToQuery ( ) . length === 0 ;
213+ if ( noMoreDumbPeersToQuery ) {
214+ return ;
215+ }
216+
217+ await this . smartRequesterSemaphore . acquire ( ) ;
218+ this . logger . debug ( `Worker loop smart: acquired next smart peer` ) ;
219+ console . log ( `[${ count } ] Worker loop smart: acquired next smart peer` ) ;
220+ continue ;
221+ }
222+
223+ const nextBatchTxRequest = request ( peerId ) ;
224+ if ( ! nextBatchTxRequest ) {
225+ this . logger . warn ( `Worker loop smart: Could not create next batch request` ) ;
226+ // We retry with the next peer/batch
227+ continue ;
202228 }
229+
230+ //TODO: check this, this should only happen in case something bad happened
231+ const { blockRequest, txs } = nextBatchTxRequest ;
232+ if ( blockRequest === undefined ) {
233+ return ;
234+ }
235+
236+ console . log (
237+ `[${ count } ] Worker type smart : Requesting txs from peer ${ peerId . toString ( ) } : ${ txs . map ( tx => tx . toString ( ) ) . join ( '\n' ) } ` ,
238+ ) ;
239+
240+ await this . requestTxBatch ( peerId , blockRequest ) ;
241+ txs . forEach ( tx => {
242+ this . txsMetadata . markNotInFlightBySmartPeer ( tx ) ;
243+ } ) ;
203244 }
204245 }
205246
@@ -277,6 +318,7 @@ export class BatchTxRequester {
277318 private markTxsPeerHas ( peerId : PeerId , response : BlockTxsResponse ) {
278319 const txsPeerHas = this . extractHashesPeerHasFromResponse ( response ) ;
279320 console . log ( `${ peerId . toString ( ) } has txs: ${ txsPeerHas . map ( tx => tx . toString ( ) ) . join ( '\n' ) } ` ) ;
321+ //TODO: validate txs
280322 this . txsMetadata . markPeerHas ( peerId , txsPeerHas ) ;
281323 }
282324
0 commit comments