Skip to content

Commit 0a47490

Browse files
committed
Added skeleton implementation
1 parent be82c67 commit 0a47490

4 files changed

Lines changed: 283 additions & 1 deletion

File tree

yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,22 @@ export class ConnectionSampler {
129129
return { peer: lastPeer, sampledPeers };
130130
}
131131

132+
/*
133+
* Returns all peers sorted by connection count ascending,
134+
* meaning that the peers with the least number of active connections are earlier in an array
135+
*
136+
* @param: excluding - peers to exclude
137+
* @return: list of peer ids
138+
* */
139+
public getPeerListSortedByConnectionCountAsc(excluding?: Set<string>): PeerId[] {
140+
return this.libp2p
141+
.getPeers()
142+
.filter(id => !excluding?.has(id.toString()))
143+
.map(id => ({ id, count: this.activeConnectionsCount.get(id) ?? 0 }))
144+
.sort((a, b) => a.count - b.count)
145+
.map(p => p.id);
146+
}
147+
132148
/**
133149
* Samples a batch of unique peers from the libp2p node, prioritizing peers without active connections
134150
*

yarn-project/p2p/src/services/reqresp/protocols/block_txs/bitvector.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,13 @@ export class BitVector {
6262
return Array.from({ length: this.length }, (_, i) => i).filter(i => this.isSet(i));
6363
}
6464

65+
/**
66+
* Returns true if no indices are set to true
67+
* */
68+
isEmpty(): boolean {
69+
return this.getTrueIndices().length === 0;
70+
}
71+
6572
/**
6673
* Serializes the BitVector object into a Buffer
6774
*

yarn-project/p2p/src/services/reqresp/protocols/block_txs/block_txs_reqresp.ts

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { Fr } from '@aztec/foundation/fields';
22
import { BufferReader, serializeToBuffer } from '@aztec/foundation/serialize';
3-
import { TxArray } from '@aztec/stdlib/tx';
3+
import type { BlockProposal } from '@aztec/stdlib/p2p';
4+
import { TxArray, TxHash } from '@aztec/stdlib/tx';
45

56
import { BitVector } from './bitvector.js';
67

@@ -15,6 +16,36 @@ export class BlockTxsRequest {
1516
readonly txIndices: BitVector,
1617
) {}
1718

19+
/**
20+
* Crates new BlockTxsRequest given proposal and missing tx hashes
21+
*
22+
* @param: blockProposal - The block proposal for which we are making request
23+
* @param: missingTxHashes - Tx hashes from the proposal we are missing
24+
*
25+
* @returns undefined if there were no missingTxHashes matching BlockProposal hashes, otherwise
26+
* returns new BlockTxsRequest*/
27+
static fromBlockProposalAndMissingTxs(
28+
blockProposal: BlockProposal,
29+
missingTxHashes: TxHash[],
30+
): BlockTxsRequest | undefined {
31+
if (missingTxHashes.length === 0) {
32+
return undefined; // No missing txs to request
33+
}
34+
35+
const missingHashesSet = new Set(missingTxHashes);
36+
const missingIndices = blockProposal.txHashes
37+
.map((hash, idx) => (missingHashesSet.has(hash) ? idx : -1))
38+
.filter(i => i != -1);
39+
40+
if (missingIndices.length === 0) {
41+
return undefined; // No indices found for missing tx hashes
42+
}
43+
44+
const requestBitVector = BitVector.init(blockProposal.txHashes.length, missingIndices);
45+
46+
return new BlockTxsRequest(blockProposal.archive, requestBitVector);
47+
}
48+
1849
/**
1950
* Deserializes the BlockTxRequest object from a Buffer
2051
* @param buffer - Buffer or BufferReader object to deserialize
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
import { chunk } from '@aztec/foundation/collection';
2+
import { createLogger } from '@aztec/foundation/log';
3+
import { promiseWithResolvers } from '@aztec/foundation/promise';
4+
import type { BlockProposal } from '@aztec/stdlib/p2p';
5+
import type { TxHash } from '@aztec/stdlib/tx';
6+
7+
import type { PeerId } from '@libp2p/interface';
8+
import { peerIdFromString } from '@libp2p/peer-id';
9+
10+
import type { ConnectionSampler } from './connection-sampler/connection_sampler.js';
11+
import { type ReqRespInterface, ReqRespSubProtocol } from './interface.js';
12+
import { BlockTxsRequest, BlockTxsResponse } from './protocols/index.js';
13+
import { ReqRespStatus } from './status.js';
14+
15+
const TX_BATCH_SIZE = 8;
16+
const PEERS_TO_QUERY_IN_PARALLEL = 10;
17+
18+
class MissingTxMetadata {
19+
constructor(
20+
public readonly txHash: TxHash,
21+
public fetched = false,
22+
public inFlight = false,
23+
public requestedTimes = 0,
24+
public readonly peers = new Set<string>(),
25+
) {}
26+
}
27+
28+
export class BatchTxRequester {
29+
private readonly peers: PeerId[];
30+
private readonly smartPeers = new Set<string>();
31+
private readonly peersToTxMap = new Map<string, Array<TxHash>>();
32+
33+
private readonly txsMetadata;
34+
35+
private startSmartRequester: ((v: void) => void) | undefined = undefined;
36+
37+
constructor(
38+
private readonly blockProposal: BlockProposal,
39+
private readonly missingTxs: TxHash[],
40+
private readonly pinnedPeer: PeerId | undefined,
41+
private readonly reqresp: ReqRespInterface,
42+
private readonly connectionSampler: ConnectionSampler,
43+
private logger = createLogger('p2p:reqresp_batch'),
44+
) {
45+
this.txsMetadata = new Map(this.missingTxs.map(txHash => [txHash.toString(), new MissingTxMetadata(txHash)]));
46+
this.peers = this.connectionSampler.getPeerListSortedByConnectionCountAsc();
47+
if (this.pinnedPeer) {
48+
this.smartPeers.add(this.pinnedPeer.toString());
49+
}
50+
}
51+
52+
public async run() {
53+
if (this.missingTxs.length === 0) {
54+
this.logger.debug('No missing txs to request');
55+
return;
56+
}
57+
58+
const { promise, resolve } = promiseWithResolvers<void>();
59+
this.startSmartRequester = resolve;
60+
61+
Promise.allSettled([this.smartRequester(promise), this.dumbRequester()]);
62+
}
63+
64+
private async smartRequester(start: Promise<void>) {
65+
const nextPeerIndex = this.makeRoundRobinIndexer(() => this.smartPeers.size);
66+
// if we don't have a pinned peer we wait to start smart requester
67+
// otherwise we start immediately with the pinned peer
68+
if (!this.pinnedPeer) {
69+
await start;
70+
}
71+
72+
const nextPeer = () => peerIdFromString(Array.from(this.smartPeers)[nextPeerIndex()]);
73+
const makeRequest = (pid: PeerId) => {
74+
//TODO: for this peer we have to make batch on the fly based on which txs peer has
75+
const txsPeerHas = this.peersToTxMap.get(pid.toString());
76+
const peerHasTxs = txsPeerHas && txsPeerHas.length > 0;
77+
if (!peerHasTxs) {
78+
return undefined;
79+
}
80+
81+
//TODO: make this smarter, we should only request txs that we don't have
82+
// and we should request txs that have been requested the least times
83+
const txsToRequest = txsPeerHas.slice(0, TX_BATCH_SIZE);
84+
85+
return BlockTxsRequest.fromBlockProposalAndMissingTxs(this.blockProposal, txsToRequest);
86+
};
87+
88+
// Kick off workers
89+
const workers = Array.from({ length: Math.min(PEERS_TO_QUERY_IN_PARALLEL, this.smartPeers.size) }, () =>
90+
this.workerLoop(nextPeer, makeRequest),
91+
);
92+
await Promise.allSettled(workers);
93+
}
94+
95+
private async dumbRequester() {
96+
const nextPeerIndex = this.makeRoundRobinIndexer(() => peers.length);
97+
const nextBatchIndex = this.makeRoundRobinIndexer(() => txChunks.length);
98+
99+
const peers = [...this.peers];
100+
const txChunks = chunk<TxHash>(this.missingTxs, TX_BATCH_SIZE);
101+
102+
//TODO: batches should be adaptive
103+
const makeRequest = (_pid: PeerId) => {
104+
return BlockTxsRequest.fromBlockProposalAndMissingTxs(this.blockProposal, txChunks[nextBatchIndex()]);
105+
};
106+
107+
const nextPeer = () => peers.filter(pid => !this.smartPeers.has(pid.toString()))[nextPeerIndex()];
108+
109+
const workers = Array.from({ length: Math.min(PEERS_TO_QUERY_IN_PARALLEL, peers.length) }, () =>
110+
this.workerLoop(nextPeer, makeRequest),
111+
);
112+
await Promise.allSettled(workers);
113+
}
114+
115+
private async workerLoop(pickNextPeer: () => PeerId, request: (pid: PeerId) => BlockTxsRequest | undefined) {
116+
while (!this.shouldStop()) {
117+
const peerId = pickNextPeer();
118+
//
119+
// TODO: think about this a bit more what should we do in this case?
120+
const nextBatchTxRequest = request(peerId);
121+
if (!nextBatchTxRequest) {
122+
this.logger.warn('No txs matched');
123+
continue;
124+
}
125+
126+
await this.requestTxBatch(peerId, nextBatchTxRequest);
127+
}
128+
}
129+
130+
private async requestTxBatch(peerId: PeerId, request: BlockTxsRequest): Promise<BlockTxsResponse | undefined> {
131+
try {
132+
const response = await this.reqresp.sendRequestToPeer(peerId, ReqRespSubProtocol.BLOCK_TXS, request.toBuffer());
133+
if (response.status !== ReqRespStatus.SUCCESS) {
134+
return;
135+
}
136+
137+
const block_response = BlockTxsResponse.fromBuffer(response.data);
138+
this.handleSuccessResponseFromPeer(peerId, block_response);
139+
} catch (err: any) {
140+
this.logger.error(`Failed to deserialize response from peer ${peerId.toString()}: ${err.message}`, {
141+
peerId,
142+
error: err,
143+
});
144+
145+
this.handleFailResponseFromPeer(peerId);
146+
}
147+
}
148+
149+
//TODO: 1 mark missing transactions as fetched
150+
//TODO: 2 mark peer having this transactions
151+
//TODO: 3 stream responses either via async generator or callbacks
152+
private handleSuccessResponseFromPeer(peerId: PeerId, response: BlockTxsResponse) {
153+
this.logger.debug(`Received txs: ${response.txs.length} from peer ${peerId.toString()} `);
154+
if (!this.isBlockResponseValid(response)) {
155+
return;
156+
}
157+
158+
//TODO: yield txs
159+
for (const tx of response.txs) {
160+
const key = tx.txHash.toString();
161+
let meta = this.txsMetadata.get(key);
162+
163+
if (!meta) {
164+
meta = new MissingTxMetadata(tx.txHash, true);
165+
this.txsMetadata.set(key, meta);
166+
} else {
167+
meta.fetched = true; // mutate in place; no need to re-set
168+
}
169+
}
170+
171+
const peerIdStr = peerId.toString();
172+
this.smartPeers.add(peerIdStr);
173+
const txsPeerHas = this.extractHashesPeerHasFromResponse(response);
174+
// NOTE: it's ok to override this and not make it union with previous data
175+
// because the newer request contains most up to date info
176+
this.peersToTxMap.set(peerIdStr, txsPeerHas);
177+
178+
//TODO: maybe wait for at least couple of peers so that we don't spam single one?
179+
if (this.startSmartRequester !== undefined) {
180+
this.startSmartRequester();
181+
// We use "undefined" here as marker that startSmartRequester has been called
182+
this.startSmartRequester = undefined;
183+
}
184+
}
185+
186+
private isBlockResponseValid(response: BlockTxsResponse): boolean {
187+
//TODO: maybe ban peer if this does not match?
188+
const blockIdsMatch = this.blockProposal.archive === response.blockHash;
189+
const peerHasSomeTxsFromProposal = !response.txIndices.isEmpty();
190+
return blockIdsMatch && peerHasSomeTxsFromProposal;
191+
}
192+
193+
//TODO:
194+
private handleFailResponseFromPeer(peerId: PeerId) {}
195+
196+
private extractHashesPeerHasFromResponse(response: BlockTxsResponse): Array<TxHash> {
197+
const hashes: TxHash[] = [];
198+
const indicesOfHashesPeerHas = new Set(response.txIndices.getTrueIndices());
199+
this.blockProposal.txHashes.forEach((hash, idx) => {
200+
if (indicesOfHashesPeerHas.has(idx)) {
201+
hashes.push(hash);
202+
}
203+
});
204+
205+
return hashes;
206+
}
207+
208+
private makeRoundRobinIndexer(size: () => number, start = 0) {
209+
let i = start;
210+
return () => {
211+
const current = i;
212+
i = (current + 1) % size();
213+
return current;
214+
};
215+
}
216+
217+
private fetchedAllTxs() {
218+
return this.txsMetadata.values().every(tx => tx.fetched);
219+
}
220+
221+
//TODO: stop on:
222+
//1. abort signal
223+
//2. deadline
224+
//3. received all
225+
private shouldStop() {
226+
return this.fetchedAllTxs() || this.txsMetadata.size === 0;
227+
}
228+
}

0 commit comments

Comments
 (0)