Skip to content

Commit bfef7ea

Browse files
committed
Added test and bugfixes
1 parent 3be95d0 commit bfef7ea

3 files changed

Lines changed: 234 additions & 8 deletions

File tree

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
import type { EpochCache } from '@aztec/epoch-cache';
2+
import { times } from '@aztec/foundation/collection';
3+
import { Secp256k1Signer } from '@aztec/foundation/crypto';
4+
import { Fr } from '@aztec/foundation/fields';
5+
import { type Logger, createLogger } from '@aztec/foundation/log';
6+
import { sleep } from '@aztec/foundation/sleep';
7+
import { emptyChainConfig } from '@aztec/stdlib/config';
8+
import type { WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server';
9+
import { makeBlockProposal, makeHeader, mockTx } from '@aztec/stdlib/testing';
10+
import { Tx, TxHash } from '@aztec/stdlib/tx';
11+
12+
import { describe, expect, it, jest } from '@jest/globals';
13+
import { type MockProxy, mock } from 'jest-mock-extended';
14+
15+
import type { P2PClient } from '../../client/p2p_client.js';
16+
import { type P2PConfig, getP2PDefaultConfig } from '../../config.js';
17+
import type { AttestationPool } from '../../mem_pools/attestation_pool/attestation_pool.js';
18+
import type { TxPool } from '../../mem_pools/tx_pool/index.js';
19+
import type { ConnectionSampler } from '../../services/reqresp/connection-sampler/connection_sampler.js';
20+
import { BatchTxRequester } from '../../services/reqresp/reqresp_batch.js';
21+
import { generatePeerIdPrivateKeys } from '../../test-helpers/generate-peer-id-private-keys.js';
22+
import { getPorts } from '../../test-helpers/get-ports.js';
23+
import { makeEnrs } from '../../test-helpers/make-enrs.js';
24+
import { makeAndStartTestP2PClient, makeAndStartTestP2PClients } from '../../test-helpers/make-test-p2p-clients.js';
25+
26+
const TEST_TIMEOUT = 30_000;
27+
jest.setTimeout(TEST_TIMEOUT);
28+
29+
describe('p2p client integration batch txs', () => {
30+
let txPool: MockProxy<TxPool>;
31+
let attestationPool: MockProxy<AttestationPool>;
32+
let epochCache: MockProxy<EpochCache>;
33+
let worldState: MockProxy<WorldStateSynchronizer>;
34+
35+
let connectionSampler: MockProxy<ConnectionSampler>;
36+
37+
let logger: Logger;
38+
let p2pBaseConfig: P2PConfig;
39+
40+
let clients: P2PClient[] = [];
41+
42+
beforeEach(() => {
43+
clients = [];
44+
txPool = mock<TxPool>();
45+
attestationPool = mock<AttestationPool>();
46+
epochCache = mock<EpochCache>();
47+
worldState = mock<WorldStateSynchronizer>();
48+
connectionSampler = mock<ConnectionSampler>();
49+
50+
logger = createLogger('p2p:test:integration:batch');
51+
p2pBaseConfig = { ...emptyChainConfig, ...getP2PDefaultConfig() };
52+
53+
//@ts-expect-error - we want to mock the getEpochAndSlotInNextL1Slot method, mocking ts is enough
54+
epochCache.getEpochAndSlotInNextL1Slot.mockReturnValue({ ts: BigInt(0) });
55+
epochCache.getRegisteredValidators.mockResolvedValue([]);
56+
57+
txPool.hasTxs.mockResolvedValue([]);
58+
txPool.getAllTxs.mockImplementation(() => {
59+
return Promise.resolve([] as Tx[]);
60+
});
61+
txPool.addTxs.mockResolvedValue(1);
62+
txPool.getTxsByHash.mockImplementation(() => {
63+
return Promise.resolve([] as Tx[]);
64+
});
65+
66+
worldState.status.mockResolvedValue({
67+
state: mock(),
68+
syncSummary: {
69+
latestBlockNumber: 0,
70+
latestBlockHash: '',
71+
finalisedBlockNumber: 0,
72+
treesAreSynched: false,
73+
oldestHistoricBlockNumber: 0,
74+
},
75+
});
76+
logger.info(`Starting test ${expect.getState().currentTestName}`);
77+
});
78+
79+
afterEach(async () => {
80+
logger.info(`Tearing down state for ${expect.getState().currentTestName}`);
81+
await shutdown(clients);
82+
logger.info('Shut down p2p clients');
83+
84+
jest.restoreAllMocks();
85+
jest.resetAllMocks();
86+
jest.clearAllMocks();
87+
88+
clients = [];
89+
});
90+
91+
// Shutdown all test clients
92+
const shutdown = async (clients: P2PClient[]) => {
93+
await Promise.all(clients.map(client => client.stop()));
94+
await sleep(1000);
95+
};
96+
97+
const createBlockProposal = (blockNumber: number, blockHash: Fr, txHashes: TxHash[]) => {
98+
return makeBlockProposal({
99+
signer: Secp256k1Signer.random(),
100+
header: makeHeader(1, blockNumber),
101+
archive: blockHash,
102+
txHashes,
103+
});
104+
};
105+
106+
const setupClients = async (numberOfPeers: number, txPoolMocks?: MockProxy<TxPool>[]) => {
107+
if (txPoolMocks) {
108+
const peerIdPrivateKeys = generatePeerIdPrivateKeys(numberOfPeers);
109+
let ports = [];
110+
while (true) {
111+
try {
112+
ports = await getPorts(numberOfPeers);
113+
break;
114+
} catch {
115+
await sleep(1000);
116+
}
117+
}
118+
const peerEnrs = await makeEnrs(peerIdPrivateKeys, ports, p2pBaseConfig);
119+
120+
for (let i = 0; i < numberOfPeers; i++) {
121+
const client = await makeAndStartTestP2PClient(peerIdPrivateKeys[i], ports[i], peerEnrs, {
122+
p2pBaseConfig,
123+
mockAttestationPool: attestationPool,
124+
mockTxPool: txPoolMocks[i],
125+
mockEpochCache: epochCache,
126+
mockWorldState: worldState,
127+
logger: createLogger(`p2p:${i}`),
128+
});
129+
clients.push(client);
130+
}
131+
132+
return;
133+
}
134+
135+
clients = (
136+
await makeAndStartTestP2PClients(numberOfPeers, {
137+
p2pBaseConfig,
138+
mockAttestationPool: attestationPool,
139+
mockTxPool: txPool,
140+
mockEpochCache: epochCache,
141+
mockWorldState: worldState,
142+
logger,
143+
})
144+
).map(x => x.client);
145+
146+
// Give the nodes time to discover each other
147+
await sleep(4000);
148+
logger.info('Finished waiting for clients to connect');
149+
};
150+
151+
it.only('batch requester fetches all missing txs from multiple peers', async () => {
152+
const NUMBER_OF_PEERS = 4;
153+
154+
// Create 20 transactions
155+
const txCount = 20;
156+
const txs = await Promise.all(times(txCount, () => mockTx()));
157+
const txHashes = await Promise.all(txs.map(tx => tx.getTxHash()));
158+
159+
// Create a block proposal with all tx hashes
160+
const blockNumber = 5;
161+
const blockHash = Fr.random();
162+
const blockProposal = createBlockProposal(blockNumber, blockHash, txHashes);
163+
164+
// Distribute transactions across peers (simulating partial knowledge)
165+
// Peer 0 has no txs (client requesting)
166+
const peerTxDistribution = [
167+
{ start: 0, end: 0 }, // Peer 0 (requester)
168+
{ start: 0, end: 11 },
169+
{ start: 6, end: 15 },
170+
{ start: 10, end: 20 }, // Peer 3
171+
];
172+
173+
// Create individual txPool mocks for each peer
174+
const txPoolMocks: MockProxy<TxPool>[] = [];
175+
for (let i = 0; i < NUMBER_OF_PEERS; i++) {
176+
const peerTxPool = mock<TxPool>();
177+
const { start, end } = peerTxDistribution[i];
178+
const peerTxs = txs.slice(start, end);
179+
const peerTxHashSet = new Set(peerTxs.map(tx => tx.txHash.toString()));
180+
181+
// Set default mock implementations
182+
peerTxPool.hasTxs.mockImplementation((hashes: TxHash[]) => {
183+
return Promise.resolve(hashes.map(h => peerTxHashSet.has(h.toString())));
184+
});
185+
peerTxPool.getTxsByHash.mockImplementation((hashes: TxHash[]) => {
186+
return Promise.resolve(hashes.map(hash => peerTxs.find(t => t.txHash.equals(hash))));
187+
});
188+
189+
txPoolMocks.push(peerTxPool);
190+
}
191+
192+
// Setup clients with individual txPool mocks
193+
await setupClients(NUMBER_OF_PEERS, txPoolMocks);
194+
195+
const peerIds = clients.map(client => (client as any).p2pService.node.peerId);
196+
connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peerIds);
197+
198+
// Set up attestation pool to return the block proposal
199+
attestationPool.getBlockProposal.mockResolvedValue(blockProposal);
200+
201+
// Client 0 is missing all transactions
202+
const missingTxHashes = txHashes;
203+
204+
// Create BatchTxRequester instance
205+
const [client0] = clients;
206+
const requester = new BatchTxRequester(
207+
missingTxHashes,
208+
blockProposal,
209+
undefined, // no pinned peer
210+
5_000,
211+
(client0 as any).p2pService.reqresp,
212+
connectionSampler,
213+
logger,
214+
);
215+
216+
const fetchedTxs = await requester.run();
217+
218+
// Verify all transactions were fetched
219+
expect(fetchedTxs).toBeDefined();
220+
expect(fetchedTxs).toHaveLength(txCount);
221+
const fetchedHashes = await Promise.all(fetchedTxs!.map(tx => tx.getTxHash()));
222+
expect(
223+
new Set(fetchedHashes.map(h => h.toString())).difference(new Set(txHashes.map(h => h.toString()))).size,
224+
).toBe(0);
225+
});
226+
});

yarn-project/p2p/src/services/reqresp/protocols/block_txs/block_txs_reqresp.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,10 @@ export class BlockTxsRequest {
3232
return undefined; // No missing txs to request
3333
}
3434

35-
const missingHashesSet = new Set(missingTxHashes);
35+
const missingHashesSet = new Set(missingTxHashes.map(t => t.toString()));
36+
3637
const missingIndices = blockProposal.txHashes
37-
.map((hash, idx) => (missingHashesSet.has(hash) ? idx : -1))
38+
.map((hash, idx) => (missingHashesSet.has(hash.toString()) ? idx : -1))
3839
.filter(i => i != -1);
3940

4041
if (missingIndices.length === 0) {

yarn-project/p2p/src/services/reqresp/reqresp_batch.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ export class BatchTxRequester {
104104
return { blockRequest: BlockTxsRequest.fromBlockProposalAndMissingTxs(this.blockProposal, txs), txs };
105105
};
106106

107-
const workers = Array.from({ length: SMART_PEERS_TO_QUERY_IN_PARALLEL }, () =>
107+
const workers = Array.from({ length: Math.min(DUMB_PEERS_TO_QUERY_IN_PARALLEL, this.peers.length) }, () =>
108108
this.workerLoop(nextPeer, makeRequest, 'smart'),
109109
);
110110

@@ -141,7 +141,7 @@ export class BatchTxRequester {
141141
return idx === undefined ? undefined : peerIdFromString(Array.from(getPeers())[idx]);
142142
};
143143

144-
const workers = Array.from({ length: Math.min(DUMB_PEERS_TO_QUERY_IN_PARALLEL, getPeers().length) }, () =>
144+
const workers = Array.from({ length: Math.min(DUMB_PEERS_TO_QUERY_IN_PARALLEL, this.peers.length) }, () =>
145145
this.workerLoop(nextPeer, makeRequest, 'dumb'),
146146
);
147147
await Promise.allSettled(workers);
@@ -179,7 +179,6 @@ export class BatchTxRequester {
179179
}
180180

181181
await this.requestTxBatch(peerId, blockRequest);
182-
183182
if (type === 'smart') {
184183
txs.forEach(tx => {
185184
this.txsMetadata.markNotInFlightBySmartPeer(tx);
@@ -209,12 +208,12 @@ export class BatchTxRequester {
209208

210209
private handleSuccessResponseFromPeer(peerId: PeerId, response: BlockTxsResponse) {
211210
this.logger.debug(`Received txs: ${response.txs.length} from peer ${peerId.toString()} `);
211+
this.handleReceivedTxs(peerId, response.txs);
212+
212213
if (!this.isBlockResponseValid(response)) {
213214
return;
214215
}
215216

216-
this.handleReceivedTxs(peerId, response.txs);
217-
218217
// We mark peer as "smart" only if they have some txs we are missing
219218
// Otherwise we keep them as "dumb" in hope they'll receive some new txs we are missing in the future
220219
if (!this.peerHasSomeTxsWeAreMissing(peerId, response)) {
@@ -231,7 +230,7 @@ export class BatchTxRequester {
231230

232231
private isBlockResponseValid(response: BlockTxsResponse): boolean {
233232
//TODO: maybe ban peer if this does not match?
234-
const blockIdsMatch = this.blockProposal.archive === response.blockHash;
233+
const blockIdsMatch = this.blockProposal.archive.toString() === response.blockHash.toString();
235234
const peerHasSomeTxsFromProposal = !response.txIndices.isEmpty();
236235
return blockIdsMatch && peerHasSomeTxsFromProposal;
237236
}

0 commit comments

Comments
 (0)