Skip to content

Commit 59e8e1e

Browse files
committed
More bugfixes and cleanups
1 parent 146ce68 commit 59e8e1e

File tree

1 file changed

+91
-46
lines changed

1 file changed

+91
-46
lines changed

yarn-project/p2p/src/services/reqresp/batch-tx-requester/reqresp_batch.ts

Lines changed: 91 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,22 @@ export class BatchTxRequester {
5353
}
5454

5555
public async run() {
56-
if (this.txsMetadata.getMissingTxHashes().size === 0) {
57-
this.logger.debug('No missing txs to request');
58-
return;
59-
}
56+
try {
57+
if (this.txsMetadata.getMissingTxHashes().size === 0) {
58+
this.logger.debug('No missing txs to request');
59+
return;
60+
}
6061

61-
//TODO: executeTimeout?
62-
await Promise.allSettled([this.smartRequester(), this.dumbRequester(), this.pinnedPeerRequester()]);
62+
//TODO: executeTimeout?
63+
await Promise.allSettled([this.smartRequester(), this.dumbRequester(), this.pinnedPeerRequester()]);
6364

64-
//TODO: handle this via async iter
65-
return this.txsMetadata.getFetchedTxs();
65+
//TODO: handle this via async iter
66+
return this.txsMetadata.getFetchedTxs();
67+
} finally {
68+
for (let i = 0; i < SMART_PEERS_TO_QUERY_IN_PARALLEL; i++) {
69+
this.smartRequesterSemaphore.release();
70+
}
71+
}
6672
}
6773

6874
//TODO: handle pinned peer properly
@@ -111,7 +117,7 @@ export class BatchTxRequester {
111117

112118
const workers = Array.from(
113119
{ length: Math.min(SMART_PEERS_TO_QUERY_IN_PARALLEL, this.peers.getAllPeers().size) },
114-
() => this.smartWorkerLoop(nextPeer, makeRequest),
120+
(_, index) => this.smartWorkerLoop(nextPeer, makeRequest, index + 1),
115121
);
116122

117123
await Promise.allSettled(workers);
@@ -124,9 +130,10 @@ export class BatchTxRequester {
124130
const txChunks = () =>
125131
//TODO: wrap around for last batch
126132
chunk<string>(
127-
this.txsMetadata
128-
.getSortedByRequestedCountThenByInFlightCountAsc(Array.from(this.txsMetadata.getMissingTxHashes()))
129-
.map(t => t.txHash.toString()),
133+
// this.txsMetadata
134+
// .getSortedByRequestedCountThenByInFlightCountAsc(Array.from(this.txsMetadata.getMissingTxHashes()))
135+
// .map(t => t.txHash.toString()),
136+
Array.from(this.txsMetadata.getMissingTxHashes()),
130137
TX_BATCH_SIZE,
131138
);
132139

@@ -137,6 +144,9 @@ export class BatchTxRequester {
137144
return undefined;
138145
}
139146

147+
if (chunks[idx] === undefined) {
148+
console.error(`Dumb requester Chunk at index ${idx} is undefined, chunk length: ${chunks.length}`);
149+
}
140150
const txs = chunks[idx].map(t => TxHash.fromString(t));
141151
console.log(`Dumb batch index: ${idx}, batches count: ${chunks.length}`);
142152
txs.forEach(tx => this.txsMetadata.markRequested(tx));
@@ -151,98 +161,119 @@ export class BatchTxRequester {
151161

152162
const workers = Array.from(
153163
{ length: Math.min(DUMB_PEERS_TO_QUERY_IN_PARALLEL, this.peers.getAllPeers().size) },
154-
() => this.dumbWorkerLoop(nextPeer, makeRequest),
164+
(_, index) => this.dumbWorkerLoop(nextPeer, makeRequest, index + 1),
155165
);
156166
await Promise.allSettled(workers);
157167
}
158168

159169
private async dumbWorkerLoop(
160170
pickNextPeer: () => PeerId | undefined,
161171
request: (pid: PeerId) => { blockRequest: BlockTxsRequest | undefined; txs: TxHash[] } | undefined,
172+
workerIndex: number,
162173
) {
163-
let count = 0;
164-
while (!this.shouldStop()) {
165-
count++;
166-
const peerId = pickNextPeer();
167-
const weRanOutOfPeersToQuery = peerId === undefined;
168-
if (weRanOutOfPeersToQuery) {
169-
this.logger.debug(`Worker loop dumb: No more peers to query`);
170-
console.log(`[${count}] Worker loop dumb: No more peers to query`);
171-
return;
172-
}
174+
try {
175+
console.log(`Dumb worker ${workerIndex} started`);
176+
let count = 0;
177+
while (!this.shouldStop()) {
178+
count++;
179+
const peerId = pickNextPeer();
180+
const weRanOutOfPeersToQuery = peerId === undefined;
181+
if (weRanOutOfPeersToQuery) {
182+
this.logger.debug(`Worker loop dumb: No more peers to query`);
183+
console.log(`[${workerIndex}][${count}] Worker loop dumb: No more peers to query`);
184+
break;
185+
}
173186

174-
const nextBatchTxRequest = request(peerId);
175-
if (!nextBatchTxRequest) {
176-
this.logger.warn(`Worker loop dumb: Could not create next batch request`);
177-
// We retry with the next peer/batch
178-
console.log(`[${count}] Worker loop dumb: Could not create next batch request for peer ${peerId.toString()}`);
179-
continue;
180-
}
187+
console.log(`[${workerIndex}] Worker loop dumb: count: ${count} for peerId: ${peerId.toString()}`);
181188

182-
//TODO: check this, this should only happen in case something bad happened
183-
const { blockRequest, txs } = nextBatchTxRequest;
184-
if (blockRequest === undefined) {
185-
return;
186-
}
189+
const nextBatchTxRequest = request(peerId);
190+
if (!nextBatchTxRequest) {
191+
this.logger.warn(`Worker loop dumb: Could not create next batch request`);
192+
// We retry with the next peer/batch
193+
console.log(
194+
`[${workerIndex}][${count}] Worker loop dumb: Could not create next batch request for peer ${peerId.toString()}`,
195+
);
196+
continue;
197+
}
187198

188-
console.log(
189-
`[${count}] Worker type dumb: Requesting txs from peer ${peerId.toString()}: ${txs.map(tx => tx.toString()).join('\n')}`,
190-
);
199+
//TODO: check this, this should only happen in case something bad happened
200+
const { blockRequest, txs } = nextBatchTxRequest;
201+
if (blockRequest === undefined) {
202+
console.log(`[${workerIndex}] Dumb worker: BLOCK REQ undefined`);
203+
break;
204+
}
191205

192-
await this.requestTxBatch(peerId, blockRequest);
206+
console.log(
207+
`[${workerIndex}][${count}] Worker type dumb: Requesting txs from peer ${peerId.toString()}: ${txs.map(tx => tx.toString()).join('\n')}`,
208+
);
209+
210+
await this.requestTxBatch(peerId, blockRequest);
211+
}
212+
} catch (err: any) {
213+
console.error(`Dumb worker ${workerIndex} encountered an error: ${err}`);
214+
} finally {
215+
console.log(`Dumb worker ${workerIndex} finished`);
193216
}
194217
}
195218

196219
private async smartWorkerLoop(
197220
pickNextPeer: () => PeerId | undefined,
198221
request: (pid: PeerId) => { blockRequest: BlockTxsRequest | undefined; txs: TxHash[] } | undefined,
222+
workerIndex: number,
199223
) {
224+
console.log(`Smart worker ${workerIndex} started`);
200225
let count = 0;
201226
await this.smartRequesterSemaphore.acquire();
227+
console.log(`Smart worker ${workerIndex} acquired semaphore`);
202228

203229
while (!this.shouldStop()) {
204230
count++;
205231
const peerId = pickNextPeer();
206232
const weRanOutOfPeersToQuery = peerId === undefined;
207233
if (weRanOutOfPeersToQuery) {
208234
this.logger.debug(`Worker loop smart: No more no more peers to query`);
209-
console.log(`[${count}] Worker loop smart: No more smart peers to query`);
235+
console.log(`[${workerIndex}][${count}] Worker loop smart: No more smart peers to query`);
210236

211237
//If there are no more dumb peers to query then none of our peers can become smart,
212238
//thus we can simply exit this worker
213239
const noMoreDumbPeersToQuery = this.peers.getDumbPeersToQuery().length === 0;
214240
if (noMoreDumbPeersToQuery) {
215-
return;
241+
console.log(`[${workerIndex}][${count}] Worker loop smart: No more smart peers to query, EXITING`);
242+
break;
216243
}
217244

218245
await this.smartRequesterSemaphore.acquire();
219246
this.logger.debug(`Worker loop smart: acquired next smart peer`);
220-
console.log(`[${count}] Worker loop smart: acquired next smart peer`);
247+
console.log(`[${workerIndex}][${count}] Worker loop smart: acquired next smart peer`);
221248
continue;
222249
}
223250

224251
const nextBatchTxRequest = request(peerId);
225252
if (!nextBatchTxRequest) {
226253
this.logger.warn(`Worker loop smart: Could not create next batch request`);
254+
console.log(`[${workerIndex}][${count}] Worker loop smart: Could not create next batch request`);
227255
// We retry with the next peer/batch
228256
continue;
229257
}
230258

231259
//TODO: check this, this should only happen in case something bad happened
232260
const { blockRequest, txs } = nextBatchTxRequest;
233261
if (blockRequest === undefined) {
234-
return;
262+
console.log(`[${workerIndex}] Smart worker: BLOCK REQ undefined`);
263+
break;
235264
}
236265

237266
console.log(
238-
`[${count}] Worker type smart : Requesting txs from peer ${peerId.toString()}: ${txs.map(tx => tx.toString()).join('\n')}`,
267+
`[${workerIndex}][${count}] Worker type smart : Requesting txs from peer ${peerId.toString()}: ${txs.map(tx => tx.toString()).join('\n')}`,
239268
);
240269

241270
await this.requestTxBatch(peerId, blockRequest);
242271
txs.forEach(tx => {
243272
this.txsMetadata.markNotInFlightBySmartPeer(tx);
244273
});
245274
}
275+
276+
console.log(`Smart worker ${workerIndex} finished`);
246277
}
247278

248279
private async requestTxBatch(peerId: PeerId, request: BlockTxsRequest): Promise<BlockTxsResponse | undefined> {
@@ -263,7 +294,7 @@ export class BatchTxRequester {
263294
error: err,
264295
});
265296

266-
console.log(`Peer ${peerId.toString()}\n${err}`);
297+
console.error(`Peer ${peerId.toString()}\n${err}`);
267298
await this.handleFailResponseFromPeer(peerId, ReqRespStatus.UNKNOWN);
268299
} finally {
269300
// Don't mark pinned peer as not in flight
@@ -314,6 +345,20 @@ export class BatchTxRequester {
314345
txs.forEach(tx => {
315346
this.txsMetadata.markFetched(peerId, tx);
316347
});
348+
349+
const missingTxHashes = this.txsMetadata.getMissingTxHashes();
350+
if (missingTxHashes.size === 0) {
351+
// wake sleepers so they can see shouldStop() and exit
352+
for (let i = 0; i < SMART_PEERS_TO_QUERY_IN_PARALLEL; i++) {
353+
this.smartRequesterSemaphore.release();
354+
}
355+
} else {
356+
console.log(
357+
`Missing txs: \n ${Array.from(this.txsMetadata.getMissingTxHashes())
358+
.map(tx => tx.toString())
359+
.join('\n')}`,
360+
);
361+
}
317362
}
318363

319364
private markTxsPeerHas(peerId: PeerId, response: BlockTxsResponse) {
@@ -356,7 +401,7 @@ export class BatchTxRequester {
356401
return undefined;
357402
}
358403

359-
const current = i;
404+
const current = i % length;
360405
i = (current + 1) % length;
361406
return current;
362407
};

0 commit comments

Comments
 (0)