@@ -53,16 +53,22 @@ export class BatchTxRequester {
5353 }
5454
5555 public async run ( ) {
56- if ( this . txsMetadata . getMissingTxHashes ( ) . size === 0 ) {
57- this . logger . debug ( 'No missing txs to request' ) ;
58- return ;
59- }
56+ try {
57+ if ( this . txsMetadata . getMissingTxHashes ( ) . size === 0 ) {
58+ this . logger . debug ( 'No missing txs to request' ) ;
59+ return ;
60+ }
6061
61- //TODO: executeTimeout?
62- await Promise . allSettled ( [ this . smartRequester ( ) , this . dumbRequester ( ) , this . pinnedPeerRequester ( ) ] ) ;
62+ //TODO: executeTimeout?
63+ await Promise . allSettled ( [ this . smartRequester ( ) , this . dumbRequester ( ) , this . pinnedPeerRequester ( ) ] ) ;
6364
64- //TODO: handle this via async iter
65- return this . txsMetadata . getFetchedTxs ( ) ;
65+ //TODO: handle this via async iter
66+ return this . txsMetadata . getFetchedTxs ( ) ;
67+ } finally {
68+ for ( let i = 0 ; i < SMART_PEERS_TO_QUERY_IN_PARALLEL ; i ++ ) {
69+ this . smartRequesterSemaphore . release ( ) ;
70+ }
71+ }
6672 }
6773
6874 //TODO: handle pinned peer properly
@@ -111,7 +117,7 @@ export class BatchTxRequester {
111117
112118 const workers = Array . from (
113119 { length : Math . min ( SMART_PEERS_TO_QUERY_IN_PARALLEL , this . peers . getAllPeers ( ) . size ) } ,
114- ( ) => this . smartWorkerLoop ( nextPeer , makeRequest ) ,
120+ ( _ , index ) => this . smartWorkerLoop ( nextPeer , makeRequest , index + 1 ) ,
115121 ) ;
116122
117123 await Promise . allSettled ( workers ) ;
@@ -124,9 +130,10 @@ export class BatchTxRequester {
124130 const txChunks = ( ) =>
125131 //TODO: wrap around for last batch
126132 chunk < string > (
127- this . txsMetadata
128- . getSortedByRequestedCountThenByInFlightCountAsc ( Array . from ( this . txsMetadata . getMissingTxHashes ( ) ) )
129- . map ( t => t . txHash . toString ( ) ) ,
133+ // this.txsMetadata
134+ // .getSortedByRequestedCountThenByInFlightCountAsc(Array.from(this.txsMetadata.getMissingTxHashes()))
135+ // .map(t => t.txHash.toString()),
136+ Array . from ( this . txsMetadata . getMissingTxHashes ( ) ) ,
130137 TX_BATCH_SIZE ,
131138 ) ;
132139
@@ -137,6 +144,9 @@ export class BatchTxRequester {
137144 return undefined ;
138145 }
139146
147+ if ( chunks [ idx ] === undefined ) {
148+ console . error ( `Dumb requester Chunk at index ${ idx } is undefined, chunk length: ${ chunks . length } ` ) ;
149+ }
140150 const txs = chunks [ idx ] . map ( t => TxHash . fromString ( t ) ) ;
141151 console . log ( `Dumb batch index: ${ idx } , batches count: ${ chunks . length } ` ) ;
142152 txs . forEach ( tx => this . txsMetadata . markRequested ( tx ) ) ;
@@ -151,98 +161,119 @@ export class BatchTxRequester {
151161
152162 const workers = Array . from (
153163 { length : Math . min ( DUMB_PEERS_TO_QUERY_IN_PARALLEL , this . peers . getAllPeers ( ) . size ) } ,
154- ( ) => this . dumbWorkerLoop ( nextPeer , makeRequest ) ,
164+ ( _ , index ) => this . dumbWorkerLoop ( nextPeer , makeRequest , index + 1 ) ,
155165 ) ;
156166 await Promise . allSettled ( workers ) ;
157167 }
158168
159169 private async dumbWorkerLoop (
160170 pickNextPeer : ( ) => PeerId | undefined ,
161171 request : ( pid : PeerId ) => { blockRequest : BlockTxsRequest | undefined ; txs : TxHash [ ] } | undefined ,
172+ workerIndex : number ,
162173 ) {
163- let count = 0 ;
164- while ( ! this . shouldStop ( ) ) {
165- count ++ ;
166- const peerId = pickNextPeer ( ) ;
167- const weRanOutOfPeersToQuery = peerId === undefined ;
168- if ( weRanOutOfPeersToQuery ) {
169- this . logger . debug ( `Worker loop dumb: No more peers to query` ) ;
170- console . log ( `[${ count } ] Worker loop dumb: No more peers to query` ) ;
171- return ;
172- }
174+ try {
175+ console . log ( `Dumb worker ${ workerIndex } started` ) ;
176+ let count = 0 ;
177+ while ( ! this . shouldStop ( ) ) {
178+ count ++ ;
179+ const peerId = pickNextPeer ( ) ;
180+ const weRanOutOfPeersToQuery = peerId === undefined ;
181+ if ( weRanOutOfPeersToQuery ) {
182+ this . logger . debug ( `Worker loop dumb: No more peers to query` ) ;
183+ console . log ( `[${ workerIndex } ][${ count } ] Worker loop dumb: No more peers to query` ) ;
184+ break ;
185+ }
173186
174- const nextBatchTxRequest = request ( peerId ) ;
175- if ( ! nextBatchTxRequest ) {
176- this . logger . warn ( `Worker loop dumb: Could not create next batch request` ) ;
177- // We retry with the next peer/batch
178- console . log ( `[${ count } ] Worker loop dumb: Could not create next batch request for peer ${ peerId . toString ( ) } ` ) ;
179- continue ;
180- }
187+ console . log ( `[${ workerIndex } ] Worker loop dumb: count: ${ count } for peerId: ${ peerId . toString ( ) } ` ) ;
181188
182- //TODO: check this, this should only happen in case something bad happened
183- const { blockRequest, txs } = nextBatchTxRequest ;
184- if ( blockRequest === undefined ) {
185- return ;
186- }
189+ const nextBatchTxRequest = request ( peerId ) ;
190+ if ( ! nextBatchTxRequest ) {
191+ this . logger . warn ( `Worker loop dumb: Could not create next batch request` ) ;
192+ // We retry with the next peer/batch
193+ console . log (
194+ `[${ workerIndex } ][${ count } ] Worker loop dumb: Could not create next batch request for peer ${ peerId . toString ( ) } ` ,
195+ ) ;
196+ continue ;
197+ }
187198
188- console . log (
189- `[${ count } ] Worker type dumb: Requesting txs from peer ${ peerId . toString ( ) } : ${ txs . map ( tx => tx . toString ( ) ) . join ( '\n' ) } ` ,
190- ) ;
199+ //TODO: check this, this should only happen in case something bad happened
200+ const { blockRequest, txs } = nextBatchTxRequest ;
201+ if ( blockRequest === undefined ) {
202+ console . log ( `[${ workerIndex } ] Dumb worker: BLOCK REQ undefined` ) ;
203+ break ;
204+ }
191205
192- await this . requestTxBatch ( peerId , blockRequest ) ;
206+ console . log (
207+ `[${ workerIndex } ][${ count } ] Worker type dumb: Requesting txs from peer ${ peerId . toString ( ) } : ${ txs . map ( tx => tx . toString ( ) ) . join ( '\n' ) } ` ,
208+ ) ;
209+
210+ await this . requestTxBatch ( peerId , blockRequest ) ;
211+ }
212+ } catch ( err : any ) {
213+ console . error ( `Dumb worker ${ workerIndex } encountered an error: ${ err } ` ) ;
214+ } finally {
215+ console . log ( `Dumb worker ${ workerIndex } finished` ) ;
193216 }
194217 }
195218
196219 private async smartWorkerLoop (
197220 pickNextPeer : ( ) => PeerId | undefined ,
198221 request : ( pid : PeerId ) => { blockRequest : BlockTxsRequest | undefined ; txs : TxHash [ ] } | undefined ,
222+ workerIndex : number ,
199223 ) {
224+ console . log ( `Smart worker ${ workerIndex } started` ) ;
200225 let count = 0 ;
201226 await this . smartRequesterSemaphore . acquire ( ) ;
227+ console . log ( `Smart worker ${ workerIndex } acquired semaphore` ) ;
202228
203229 while ( ! this . shouldStop ( ) ) {
204230 count ++ ;
205231 const peerId = pickNextPeer ( ) ;
206232 const weRanOutOfPeersToQuery = peerId === undefined ;
207233 if ( weRanOutOfPeersToQuery ) {
208234 this . logger . debug ( `Worker loop smart: No more no more peers to query` ) ;
209- console . log ( `[${ count } ] Worker loop smart: No more smart peers to query` ) ;
235+ console . log ( `[${ workerIndex } ][ ${ count } ] Worker loop smart: No more smart peers to query` ) ;
210236
211237 //If there are no more dumb peers to query then none of our peers can become smart,
212238 //thus we can simply exit this worker
213239 const noMoreDumbPeersToQuery = this . peers . getDumbPeersToQuery ( ) . length === 0 ;
214240 if ( noMoreDumbPeersToQuery ) {
215- return ;
241+ console . log ( `[${ workerIndex } ][${ count } ] Worker loop smart: No more smart peers to query, EXITING` ) ;
242+ break ;
216243 }
217244
218245 await this . smartRequesterSemaphore . acquire ( ) ;
219246 this . logger . debug ( `Worker loop smart: acquired next smart peer` ) ;
220- console . log ( `[${ count } ] Worker loop smart: acquired next smart peer` ) ;
247+ console . log ( `[${ workerIndex } ][ ${ count } ] Worker loop smart: acquired next smart peer` ) ;
221248 continue ;
222249 }
223250
224251 const nextBatchTxRequest = request ( peerId ) ;
225252 if ( ! nextBatchTxRequest ) {
226253 this . logger . warn ( `Worker loop smart: Could not create next batch request` ) ;
254+ console . log ( `[${ workerIndex } ][${ count } ] Worker loop smart: Could not create next batch request` ) ;
227255 // We retry with the next peer/batch
228256 continue ;
229257 }
230258
231259 //TODO: check this, this should only happen in case something bad happened
232260 const { blockRequest, txs } = nextBatchTxRequest ;
233261 if ( blockRequest === undefined ) {
234- return ;
262+ console . log ( `[${ workerIndex } ] Smart worker: BLOCK REQ undefined` ) ;
263+ break ;
235264 }
236265
237266 console . log (
238- `[${ count } ] Worker type smart : Requesting txs from peer ${ peerId . toString ( ) } : ${ txs . map ( tx => tx . toString ( ) ) . join ( '\n' ) } ` ,
267+ `[${ workerIndex } ][ ${ count } ] Worker type smart : Requesting txs from peer ${ peerId . toString ( ) } : ${ txs . map ( tx => tx . toString ( ) ) . join ( '\n' ) } ` ,
239268 ) ;
240269
241270 await this . requestTxBatch ( peerId , blockRequest ) ;
242271 txs . forEach ( tx => {
243272 this . txsMetadata . markNotInFlightBySmartPeer ( tx ) ;
244273 } ) ;
245274 }
275+
276+ console . log ( `Smart worker ${ workerIndex } finished` ) ;
246277 }
247278
248279 private async requestTxBatch ( peerId : PeerId , request : BlockTxsRequest ) : Promise < BlockTxsResponse | undefined > {
@@ -263,7 +294,7 @@ export class BatchTxRequester {
263294 error : err ,
264295 } ) ;
265296
266- console . log ( `Peer ${ peerId . toString ( ) } \n${ err } ` ) ;
297+ console . error ( `Peer ${ peerId . toString ( ) } \n${ err } ` ) ;
267298 await this . handleFailResponseFromPeer ( peerId , ReqRespStatus . UNKNOWN ) ;
268299 } finally {
269300 // Don't mark pinned peer as not in flight
@@ -314,6 +345,20 @@ export class BatchTxRequester {
314345 txs . forEach ( tx => {
315346 this . txsMetadata . markFetched ( peerId , tx ) ;
316347 } ) ;
348+
349+ const missingTxHashes = this . txsMetadata . getMissingTxHashes ( ) ;
350+ if ( missingTxHashes . size === 0 ) {
351+ // wake sleepers so they can see shouldStop() and exit
352+ for ( let i = 0 ; i < SMART_PEERS_TO_QUERY_IN_PARALLEL ; i ++ ) {
353+ this . smartRequesterSemaphore . release ( ) ;
354+ }
355+ } else {
356+ console . log (
357+ `Missing txs: \n ${ Array . from ( this . txsMetadata . getMissingTxHashes ( ) )
358+ . map ( tx => tx . toString ( ) )
359+ . join ( '\n' ) } `,
360+ ) ;
361+ }
317362 }
318363
319364 private markTxsPeerHas ( peerId : PeerId , response : BlockTxsResponse ) {
@@ -356,7 +401,7 @@ export class BatchTxRequester {
356401 return undefined ;
357402 }
358403
359- const current = i ;
404+ const current = i % length ;
360405 i = ( current + 1 ) % length ;
361406 return current ;
362407 } ;
0 commit comments