Skip to content

Commit d62a67b

Browse files
committed
P2: Improvements, fixes and cleanups
1 parent 79b111f commit d62a67b

1 file changed

Lines changed: 141 additions & 57 deletions

File tree

yarn-project/p2p/src/services/reqresp/reqresp_batch.ts

Lines changed: 141 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { chunk } from '@aztec/foundation/collection';
22
import { createLogger } from '@aztec/foundation/log';
33
import { promiseWithResolvers } from '@aztec/foundation/promise';
44
import type { BlockProposal } from '@aztec/stdlib/p2p';
5-
import type { TxArray, TxHash } from '@aztec/stdlib/tx';
5+
import type { Tx, TxArray, TxHash } from '@aztec/stdlib/tx';
66

77
import type { PeerId } from '@libp2p/interface';
88
import { peerIdFromString } from '@libp2p/peer-id';
@@ -15,25 +15,14 @@ import { ReqRespStatus } from './status.js';
1515
const TX_BATCH_SIZE = 8;
1616
const PEERS_TO_QUERY_IN_PARALLEL = 10;
1717

18-
class MissingTxMetadata {
19-
constructor(
20-
public readonly txHash: TxHash,
21-
public fetched = false,
22-
public inFlight = false,
23-
public requestedTimes = 0,
24-
public readonly peers = new Set<string>(),
25-
) {}
26-
}
27-
2818
export class BatchTxRequester {
2919
private readonly peers: PeerId[];
3020
private readonly smartPeers = new Set<string>();
3121
private readonly badPeers = new Set<string>();
32-
private readonly peersToTxMap = new Map<string, Array<TxHash>>();
3322

3423
private readonly txsMetadata;
3524

36-
private readonly deadline = Date.now() + this.timeoutMs;
25+
private readonly deadline;
3726

3827
private startSmartRequester: ((v: void) => void) | undefined = undefined;
3928

@@ -46,11 +35,15 @@ export class BatchTxRequester {
4635
private readonly connectionSampler: ConnectionSampler,
4736
private logger = createLogger('p2p:reqresp_batch'),
4837
) {
49-
this.txsMetadata = new Map(this.missingTxs.map(txHash => [txHash.toString(), new MissingTxMetadata(txHash)]));
38+
this.txsMetadata = new MissingTxMetadataCollection(
39+
this.missingTxs.map(txHash => [txHash.toString(), new MissingTxMetadata(txHash)]),
40+
);
5041
this.peers = this.connectionSampler.getPeerListSortedByConnectionCountAsc();
5142
if (this.pinnedPeer) {
5243
this.smartPeers.add(this.pinnedPeer.toString());
5344
}
45+
46+
this.deadline = Date.now() + this.timeoutMs;
5447
}
5548

5649
public async run() {
@@ -64,6 +57,9 @@ export class BatchTxRequester {
6457

6558
//TODO: executeTimeout?
6659
await Promise.allSettled([this.smartRequester(promise), this.dumbRequester()]);
60+
61+
//TODO: handle this via async iter
62+
return this.txsMetadata.getFetchedTxs();
6763
}
6864

6965
private async smartRequester(start: Promise<void>) {
@@ -82,16 +78,8 @@ export class BatchTxRequester {
8278
};
8379

8480
const makeRequest = (pid: PeerId) => {
85-
//TODO: for this peer we have to make batch on the fly based on which txs peer has
86-
const txsPeerHas = this.peersToTxMap.get(pid.toString());
87-
const peerHasTxs = txsPeerHas && txsPeerHas.length > 0;
88-
if (!peerHasTxs) {
89-
return undefined;
90-
}
91-
92-
//TODO: make this smarter, we should only request txs that we don't have
93-
// and we should request txs that have been requested the least times
94-
const txsToRequest = txsPeerHas.slice(0, TX_BATCH_SIZE);
81+
const txsToRequest = this.txsMetadata.getTxsToRequestFromThePeer(pid).slice(0, TX_BATCH_SIZE);
82+
txsToRequest.forEach(tx => this.txsMetadata.markRequested(tx));
9583

9684
return BlockTxsRequest.fromBlockProposalAndMissingTxs(this.blockProposal, txsToRequest);
9785
};
@@ -105,17 +93,24 @@ export class BatchTxRequester {
10593
private async dumbRequester() {
10694
const peers = new Set(this.peers.map(peer => peer.toString()));
10795
const nextPeerIndex = this.makeRoundRobinIndexer(() => getPeers().length);
108-
const nextBatchIndex = this.makeRoundRobinIndexer(() => txChunks.length);
96+
const nextBatchIndex = this.makeRoundRobinIndexer(() => txChunks().length);
10997
const getPeers = () => Array.from(peers.difference(this.smartPeers.union(this.badPeers)));
11098

111-
const txChunks = chunk<TxHash>(this.missingTxs, TX_BATCH_SIZE);
99+
const txChunks = () =>
100+
chunk<TxHash>(
101+
this.missingTxs.filter(t => !this.txsMetadata.isFetched(t)),
102+
TX_BATCH_SIZE,
103+
);
112104

113-
//TODO: batches should be adaptive
114105
const makeRequest = (_pid: PeerId) => {
115106
const idx = nextBatchIndex();
116-
return idx === undefined
117-
? undefined
118-
: BlockTxsRequest.fromBlockProposalAndMissingTxs(this.blockProposal, txChunks[idx]);
107+
if (idx === undefined) {
108+
return undefined;
109+
}
110+
111+
const txs = txChunks()[idx];
112+
txs.forEach(tx => this.txsMetadata.markRequested(tx));
113+
return BlockTxsRequest.fromBlockProposalAndMissingTxs(this.blockProposal, txs);
119114
};
120115

121116
const nextPeer = () => {
@@ -201,33 +196,14 @@ export class BatchTxRequester {
201196

202197
private handleReceivedTxs(peerId: PeerId, txs: TxArray) {
203198
//TODO: yield txs
204-
for (const tx of txs) {
205-
const key = tx.txHash.toString();
206-
let txMeta = this.txsMetadata.get(key);
207-
if (txMeta) {
208-
txMeta.fetched = true;
209-
txMeta.peers.add(peerId.toString());
210-
} else {
211-
//TODO: what to do about peer which sent txs we didn't request?
212-
// 1. don't request from it in the scope of this batch request
213-
// 2. ban it immediately?
214-
// 3. track it and ban it?
215-
//
216-
// NOTE: don't break immediately peer still might have txs we need
217-
}
218-
}
199+
txs.forEach(tx => {
200+
this.txsMetadata.markFetched(peerId, tx);
201+
});
219202
}
220203

221204
private markTxsPeerHas(peerId: PeerId, response: BlockTxsResponse) {
222-
const peerIdStr = peerId.toString();
223205
const txsPeerHas = this.extractHashesPeerHasFromResponse(response);
224-
// NOTE: it's ok to override this and not make it union with previous data
225-
// because the newer request contains most up to date info
226-
this.peersToTxMap.set(peerIdStr, txsPeerHas);
227-
228-
this.txsMetadata.values().forEach(txMeta => {
229-
txMeta.peers.add(peerIdStr);
230-
});
206+
this.txsMetadata.markPeerHas(peerId, txsPeerHas);
231207
}
232208

233209
//TODO: are we missing something here?
@@ -266,11 +242,119 @@ export class BatchTxRequester {
266242
return this.txsMetadata.values().every(tx => tx.fetched);
267243
}
268244

269-
//TODO: stop on:
270-
//1. abort signal
271-
//2. deadline
272-
//3. received all
245+
//TODO: abort signal here?
273246
private shouldStop() {
274247
return this.txsMetadata.size === 0 || this.fetchedAllTxs() || Date.now() > this.deadline;
275248
}
276249
}
250+
251+
class MissingTxMetadata {
252+
constructor(
253+
public readonly txHash: TxHash,
254+
public fetched = false,
255+
public requestedTimes = 0,
256+
public tx: Tx | undefined = undefined,
257+
public readonly peers = new Set<string>(),
258+
) {}
259+
260+
public markAsRequested() {
261+
this.requestedTimes++;
262+
}
263+
264+
public markAsFetched(peerId: PeerId, tx: Tx) {
265+
this.fetched = true;
266+
this.tx = tx;
267+
268+
this.peers.add(peerId.toString());
269+
}
270+
271+
public toString() {
272+
return this.txHash.toString();
273+
}
274+
}
275+
276+
/*
277+
* Single source or truth for transactions we are fetching
278+
* This could be better optimized but given expected count of missing txs (N < 100)
279+
* At the moment there is no need for it. And benefit is that we have everything in single store*/
280+
class MissingTxMetadataCollection extends Map<string, MissingTxMetadata> {
281+
public getSortedByRequestedTimesAsc(): MissingTxMetadata[] {
282+
return Array.from(this.values()).sort((a, b) => a.requestedTimes - b.requestedTimes);
283+
}
284+
285+
public isFetched(txHash: TxHash): boolean {
286+
//If something went' wrong and we don't have txMeta for this hash
287+
// We should not request it, so here we "pretend" that it was fetched
288+
return this.get(txHash.toString())?.fetched ?? true;
289+
}
290+
291+
public getFetchedTxHashes(): Set<TxHash> {
292+
return new Set(
293+
this.values()
294+
.filter(t => t.fetched)
295+
.map(t => t.txHash),
296+
);
297+
}
298+
299+
public getFetchedTxs(): Tx[] {
300+
return Array.from(
301+
this.values()
302+
.map(t => t.tx)
303+
.filter(t => !!t),
304+
);
305+
}
306+
307+
public getTxsPeerHas(peer: PeerId): Set<TxHash> {
308+
const peerIdStr = peer.toString();
309+
const txsPeerHas = new Set<TxHash>();
310+
311+
this.values().forEach(txMeta => {
312+
if (txMeta.peers.has(peerIdStr)) {
313+
txsPeerHas.add(txMeta.txHash);
314+
}
315+
});
316+
317+
return txsPeerHas;
318+
}
319+
320+
//TODO: sort by least requested
321+
public getTxsToRequestFromThePeer(peer: PeerId): TxHash[] {
322+
const txsPeerHas = this.getTxsPeerHas(peer);
323+
const fetchedTxs = this.getFetchedTxHashes();
324+
325+
return Array.from(txsPeerHas.difference(fetchedTxs));
326+
}
327+
328+
public markRequested(txHash: TxHash) {
329+
this.get(txHash.toString())?.markAsRequested();
330+
}
331+
332+
public markFetched(peerId: PeerId, tx: Tx) {
333+
const txHashStr = tx.txHash.toString();
334+
const txMeta = this.get(txHashStr);
335+
if (!txMeta) {
336+
//TODO: what to do about peer which sent txs we didn't request?
337+
// 1. don't request from it in the scope of this batch request
338+
// 2. ban it immediately?
339+
// 3. track it and ban it?
340+
//
341+
return;
342+
}
343+
344+
txMeta.markAsFetched(peerId, tx);
345+
346+
txMeta.fetched = true;
347+
}
348+
349+
public markPeerHas(peerId: PeerId, txHash: TxHash[]) {
350+
const peerIdStr = peerId.toString();
351+
txHash
352+
.map(t => t.toString())
353+
.forEach(txh => {
354+
const txMeta = this.get(txh);
355+
if (txMeta) {
356+
txMeta.peers.add(peerIdStr);
357+
}
358+
});
359+
}
360+
}

0 commit comments

Comments
 (0)