Skip to content

Commit 7abe32e

Browse files
committed
More improvements, code is in bad shape though
1 parent bfef7ea commit 7abe32e

File tree

3 files changed

+75
-23
lines changed

3 files changed

+75
-23
lines changed

yarn-project/p2p/src/client/test/p2p_client.integration_batch_txs.test.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,11 @@ describe('p2p client integration batch txs', () => {
151151
it.only('batch requester fetches all missing txs from multiple peers', async () => {
152152
const NUMBER_OF_PEERS = 4;
153153

154-
// Create 20 transactions
155154
const txCount = 20;
156155
const txs = await Promise.all(times(txCount, () => mockTx()));
157156
const txHashes = await Promise.all(txs.map(tx => tx.getTxHash()));
157+
console.log(txHashes.map(tx => tx.toString()));
158158

159-
// Create a block proposal with all tx hashes
160159
const blockNumber = 5;
161160
const blockHash = Fr.random();
162161
const blockProposal = createBlockProposal(blockNumber, blockHash, txHashes);
@@ -178,7 +177,6 @@ describe('p2p client integration batch txs', () => {
178177
const peerTxs = txs.slice(start, end);
179178
const peerTxHashSet = new Set(peerTxs.map(tx => tx.txHash.toString()));
180179

181-
// Set default mock implementations
182180
peerTxPool.hasTxs.mockImplementation((hashes: TxHash[]) => {
183181
return Promise.resolve(hashes.map(h => peerTxHashSet.has(h.toString())));
184182
});
@@ -189,13 +187,11 @@ describe('p2p client integration batch txs', () => {
189187
txPoolMocks.push(peerTxPool);
190188
}
191189

192-
// Setup clients with individual txPool mocks
193190
await setupClients(NUMBER_OF_PEERS, txPoolMocks);
194191

195192
const peerIds = clients.map(client => (client as any).p2pService.node.peerId);
196193
connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peerIds);
197194

198-
// Set up attestation pool to return the block proposal
199195
attestationPool.getBlockProposal.mockResolvedValue(blockProposal);
200196

201197
// Client 0 is missing all transactions
@@ -217,8 +213,11 @@ describe('p2p client integration batch txs', () => {
217213

218214
// Verify all transactions were fetched
219215
expect(fetchedTxs).toBeDefined();
220-
expect(fetchedTxs).toHaveLength(txCount);
221216
const fetchedHashes = await Promise.all(fetchedTxs!.map(tx => tx.getTxHash()));
217+
if (fetchedTxs?.length !== missingTxHashes.length) {
218+
const diff = new Set(fetchedHashes.map(h => h.toString())).difference(new Set(txHashes.map(h => h.toString())));
219+
console.log(`${Array.from(diff)}`);
220+
}
222221
expect(
223222
new Set(fetchedHashes.map(h => h.toString())).difference(new Set(txHashes.map(h => h.toString()))).size,
224223
).toBe(0);

yarn-project/p2p/src/services/reqresp/reqresp.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,7 @@ export class ReqResp implements ReqRespInterface {
409409
);
410410
return resp;
411411
} catch (e: any) {
412+
console.warn(`SUBPROTOCOL: ${subProtocol}\n`, e);
412413
// On error we immediately abort the stream, this is preferred way,
413414
// because it signals to the sender that error happened, whereas
414415
// closing the stream only closes our side and is much slower

yarn-project/p2p/src/services/reqresp/reqresp_batch.ts

Lines changed: 69 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { chunk } from '@aztec/foundation/collection';
22
import { createLogger } from '@aztec/foundation/log';
33
import { Semaphore } from '@aztec/foundation/queue';
4+
import { sleep } from '@aztec/foundation/sleep';
45
import type { BlockProposal } from '@aztec/stdlib/p2p';
56
import { type Tx, type TxArray, TxHash } from '@aztec/stdlib/tx';
67

@@ -19,7 +20,8 @@ const DUMB_PEERS_TO_QUERY_IN_PARALLEL = 10;
1920
export class BatchTxRequester {
2021
private readonly peers: PeerId[];
2122
private readonly smartPeers = new Set<string>();
22-
private readonly badPeers = new Set<string>();
23+
private readonly badPeers = new Map<string, number>();
24+
private readonly inFlightPeers = new Set<string>();
2325

2426
private readonly txsMetadata;
2527

@@ -40,8 +42,11 @@ export class BatchTxRequester {
4042
missingTxs.map(txHash => [txHash.toString(), new MissingTxMetadata(txHash)]),
4143
);
4244
this.peers = this.connectionSampler.getPeerListSortedByConnectionCountAsc();
45+
console.log(this.peers);
46+
47+
//Pinned peer is queried separately and thus always considered "in-flight" by both "dumb" and "smart" requester
4348
if (this.pinnedPeer) {
44-
this.smartPeers.add(this.pinnedPeer.toString());
49+
this.inFlightPeers.add(pinnedPeer!.toString());
4550
}
4651

4752
this.deadline = Date.now() + this.timeoutMs;
@@ -85,12 +90,13 @@ export class BatchTxRequester {
8590
}
8691

8792
private async smartRequester() {
88-
const nextPeerIndex = this.makeRoundRobinIndexer(() => getPeers().length);
89-
const getPeers = () => Array.from(this.smartPeers.difference(this.badPeers));
93+
const nextPeerIndex = this.makeRoundRobinIndexer();
94+
const getPeers = () => Array.from(this.smartPeers.difference(this.getBadPeers().union(this.inFlightPeers)));
9095

9196
const nextPeer = () => {
92-
const idx = nextPeerIndex();
93-
return idx === undefined ? undefined : peerIdFromString(getPeers()[idx]);
97+
const peers = getPeers();
98+
const idx = nextPeerIndex(() => peers.length);
99+
return idx === undefined ? undefined : peerIdFromString(peers[idx]);
94100
};
95101

96102
const makeRequest = (pid: PeerId) => {
@@ -113,11 +119,13 @@ export class BatchTxRequester {
113119

114120
private async dumbRequester() {
115121
const peers = new Set(this.peers.map(peer => peer.toString()));
116-
const nextPeerIndex = this.makeRoundRobinIndexer(() => getPeers().length);
117-
const nextBatchIndex = this.makeRoundRobinIndexer(() => txChunks().length);
118-
const getPeers = () => Array.from(peers.difference(this.smartPeers.union(this.badPeers)));
122+
const nextPeerIndex = this.makeRoundRobinIndexer();
123+
const nextBatchIndex = this.makeRoundRobinIndexer();
124+
const getPeers = () =>
125+
Array.from(peers.difference(this.smartPeers.union(this.getBadPeers()).union(this.inFlightPeers)));
119126

120127
const txChunks = () =>
128+
//TODO: wrap around for last batch
121129
chunk<string>(
122130
this.txsMetadata
123131
.getSortedByRequestedCountThenByInFlightCountAsc(Array.from(this.txsMetadata.getMissingTxHashes()))
@@ -126,19 +134,22 @@ export class BatchTxRequester {
126134
);
127135

128136
const makeRequest = (_pid: PeerId) => {
129-
const idx = nextBatchIndex();
137+
const txsChunks = txChunks();
138+
const idx = nextBatchIndex(() => txChunks().length);
130139
if (idx === undefined) {
131140
return undefined;
132141
}
133142

134143
const txs = txChunks()[idx].map(t => TxHash.fromString(t));
144+
console.log(`Dumb batch index: ${idx}, batches count: ${txsChunks.length}`);
135145
txs.forEach(tx => this.txsMetadata.markRequested(tx));
136146
return { blockRequest: BlockTxsRequest.fromBlockProposalAndMissingTxs(this.blockProposal, txs), txs };
137147
};
138148

139149
const nextPeer = () => {
140-
const idx = nextPeerIndex();
141-
return idx === undefined ? undefined : peerIdFromString(Array.from(getPeers())[idx]);
150+
const peers = getPeers();
151+
const idx = nextPeerIndex(() => peers.length);
152+
return idx === undefined ? undefined : peerIdFromString(peers[idx]);
142153
};
143154

144155
const workers = Array.from({ length: Math.min(DUMB_PEERS_TO_QUERY_IN_PARALLEL, this.peers.length) }, () =>
@@ -158,11 +169,14 @@ export class BatchTxRequester {
158169
await this.smartRequesterSemaphore.acquire();
159170
}
160171

172+
let count = 0;
161173
while (!this.shouldStop()) {
174+
count++;
162175
const peerId = pickNextPeer();
163176
const weRanOutOfPeersToQuery = peerId === undefined;
164177
if (weRanOutOfPeersToQuery) {
165178
this.logger.debug(`Worker loop: ${type}: No more peers to query`);
179+
console.log(`[${count}] Worker loop: ${type}: No more peers to query`);
166180
return;
167181
}
168182

@@ -178,6 +192,10 @@ export class BatchTxRequester {
178192
return;
179193
}
180194

195+
console.log(
196+
`[${count}] Worker type: ${type}: Requesting txs from peer ${peerId.toString()}: ${txs.map(tx => tx.toString()).join('\n')}`,
197+
);
198+
181199
await this.requestTxBatch(peerId, blockRequest);
182200
if (type === 'smart') {
183201
txs.forEach(tx => {
@@ -189,8 +207,11 @@ export class BatchTxRequester {
189207

190208
private async requestTxBatch(peerId: PeerId, request: BlockTxsRequest): Promise<BlockTxsResponse | undefined> {
191209
try {
210+
this.inFlightPeers.add(peerId.toString());
192211
const response = await this.reqresp.sendRequestToPeer(peerId, ReqRespSubProtocol.BLOCK_TXS, request.toBuffer());
193212
if (response.status !== ReqRespStatus.SUCCESS) {
213+
console.log(`Peer ${peerId.toString()} failed to respond with status: ${response.status}`);
214+
await this.handleFailResponseFromPeer(peerId, response.status);
194215
return;
195216
}
196217

@@ -202,11 +223,15 @@ export class BatchTxRequester {
202223
error: err,
203224
});
204225

205-
this.handleFailResponseFromPeer(peerId);
226+
console.log(`Peer ${peerId.toString()}\n${err}`);
227+
await this.handleFailResponseFromPeer(peerId, ReqRespStatus.UNKNOWN);
228+
} finally {
229+
this.inFlightPeers.delete(peerId.toString());
206230
}
207231
}
208232

209233
private handleSuccessResponseFromPeer(peerId: PeerId, response: BlockTxsResponse) {
234+
this.unMarkPeerAsBad(peerId);
210235
this.logger.debug(`Received txs: ${response.txs.length} from peer ${peerId.toString()} `);
211236
this.handleReceivedTxs(peerId, response.txs);
212237

@@ -217,6 +242,7 @@ export class BatchTxRequester {
217242
// We mark peer as "smart" only if they have some txs we are missing
218243
// Otherwise we keep them as "dumb" in hope they'll receive some new txs we are missing in the future
219244
if (!this.peerHasSomeTxsWeAreMissing(peerId, response)) {
245+
console.log(`${peerId.toString()} has no txs we are missing, skipping`);
220246
return;
221247
}
222248

@@ -249,13 +275,39 @@ export class BatchTxRequester {
249275

250276
private markTxsPeerHas(peerId: PeerId, response: BlockTxsResponse) {
251277
const txsPeerHas = this.extractHashesPeerHasFromResponse(response);
278+
console.log(`${peerId.toString()} has txs: ${txsPeerHas.map(tx => tx.toString()).join('\n')}`);
252279
this.txsMetadata.markPeerHas(peerId, txsPeerHas);
253280
}
254281

255282
//TODO: are we missing something here?
256283
// banning the peers?
257-
private handleFailResponseFromPeer(peerId: PeerId) {
258-
this.badPeers.add(peerId.toString());
284+
private async handleFailResponseFromPeer(peerId: PeerId, responseStatus: ReqRespStatus) {
285+
if (responseStatus === ReqRespStatus.FAILURE || responseStatus === ReqRespStatus.UNKNOWN) {
286+
this.markPeerAsBad(peerId);
287+
}
288+
289+
//TODO: handle this properly
290+
if (responseStatus === ReqRespStatus.RATE_LIMIT_EXCEEDED) {
291+
await sleep(1000);
292+
}
293+
}
294+
295+
private markPeerAsBad(peerId: PeerId) {
296+
this.badPeers.set(peerId.toString(), (this.badPeers.get(peerId.toString()) ?? 0) + 1);
297+
}
298+
299+
private unMarkPeerAsBad(peerId: PeerId) {
300+
this.badPeers.delete(peerId.toString());
301+
}
302+
303+
private getBadPeers(): Set<string> {
304+
const BADE_PEER_THRESHOLD = 3;
305+
return new Set(
306+
this.badPeers
307+
.entries()
308+
.filter(([_k, v]) => v > BADE_PEER_THRESHOLD)
309+
.map(([k]) => k),
310+
);
259311
}
260312

261313
private extractHashesPeerHasFromResponse(response: BlockTxsResponse): Array<TxHash> {
@@ -270,9 +322,9 @@ export class BatchTxRequester {
270322
return hashes;
271323
}
272324

273-
private makeRoundRobinIndexer(size: () => number, start = 0) {
325+
private makeRoundRobinIndexer(start = 0) {
274326
let i = start;
275-
return () => {
327+
return (size: () => number) => {
276328
const length = size();
277329
if (length === 0) {
278330
return undefined;

0 commit comments

Comments
 (0)