Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
a771d1d
Added skeleton implementation
mralj Jul 25, 2025
713c7f8
Improvements, fixes and cleanups
mralj Jul 28, 2025
24d2c4a
P2: Improvements, fixes and cleanups
mralj Jul 28, 2025
08f5017
More improvements and cleanups
mralj Jul 29, 2025
c34ab11
Added test and bugfixes
mralj Jul 30, 2025
d9809df
More improvements, code is in bad shape though
mralj Jul 31, 2025
7600563
post rebase fix
mralj Aug 7, 2025
21adaef
Refactor
mralj Aug 7, 2025
34b2c47
typo fix
mralj Aug 7, 2025
58e4bc3
Bugfixes
mralj Aug 8, 2025
1d8cfa2
cleanups
mralj Aug 17, 2025
e73d73f
test bugfix
mralj Aug 18, 2025
4cec14e
More bugfixes and cleanups
mralj Aug 18, 2025
7ca7d18
Testing and code improvements
mralj Sep 3, 2025
389d136
more tests
mralj Sep 4, 2025
f69521f
cleanup
mralj Sep 4, 2025
352e687
Transition to smart peers test
mralj Sep 5, 2025
5de4e5b
more smart peer tests
mralj Sep 5, 2025
b1b51d8
better naming
mralj Sep 5, 2025
07976bd
testing bad peers
mralj Sep 5, 2025
f5f374f
small test improvements
mralj Sep 8, 2025
d7ca489
proper handlingof reqeust_limit_exceeded and cleanups
mralj Sep 8, 2025
cf2cd88
rate limiting tests
mralj Sep 8, 2025
b842eb6
test aboritng
mralj Sep 8, 2025
259682c
yields txs one by one + tx validation
mralj Sep 9, 2025
be46c7e
added tests for tx validation and some cleanups and fixes
mralj Sep 9, 2025
256f95a
proper pinned peer handling
mralj Sep 10, 2025
667961b
added comments and cleanups
mralj Sep 10, 2025
cde3d86
Cleanup
mralj Sep 10, 2025
2e2c101
block txs request sends the whole list of txs to request
mralj Sep 10, 2025
0d9360b
Properly handle block_request with sent hashes
mralj Sep 10, 2025
1d98cd2
remove finished todos
mralj Sep 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions yarn-project/foundation/src/promise/utils.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { DateProvider } from '../timer/date.js';

export type PromiseWithResolvers<T> = {
promise: Promise<T>;
resolve: (value: T) => void;
Expand Down Expand Up @@ -27,3 +29,25 @@ export function promiseWithResolvers<T>(): PromiseWithResolvers<T> {
reject,
};
}

/**
* Helper function that waits for a predicate to become true.
* @param pred - The predicate function to evaluate
* @param interval - The interval in milliseconds to check the predicate (default: 10ms)
* @param timeout - The maximum time in milliseconds to wait before rejecting (default: 5000ms)
* @param dateProvider - An optional DateProvider instance for getting the current time (default: new DateProvider())
*/
export function waitFor(pred: () => boolean, interval = 10, timeout = 5_000, dateProvider = new DateProvider()) {
const started = dateProvider.now();
return new Promise<void>((resolve, reject) => {
const id = setInterval(() => {
if (pred()) {
clearInterval(id);
resolve();
} else if (dateProvider.now() - started >= timeout) {
clearInterval(id);
reject(new Error('waitFor: timeout'));
}
}, interval);
});
}
5 changes: 5 additions & 0 deletions yarn-project/foundation/src/queue/semaphore.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { FifoMemoryQueue } from './fifo_memory_queue.js';

export interface ISemaphore {
acquire(): Promise<void>;
release(): void;
}

/**
* Allows the acquiring of up to `size` tokens before calls to acquire block, waiting for a call to release().
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
import type { EpochCache } from '@aztec/epoch-cache';
import { times } from '@aztec/foundation/collection';
import { Secp256k1Signer } from '@aztec/foundation/crypto';
import { Fr } from '@aztec/foundation/fields';
import { type Logger, createLogger } from '@aztec/foundation/log';
import { retryUntil } from '@aztec/foundation/retry';
import { sleep } from '@aztec/foundation/sleep';
import { emptyChainConfig } from '@aztec/stdlib/config';
import type { WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server';
import { makeBlockProposal, makeHeader, mockTx } from '@aztec/stdlib/testing';
import { Tx, TxHash } from '@aztec/stdlib/tx';

import { describe, expect, it, jest } from '@jest/globals';
import { type MockProxy, mock } from 'jest-mock-extended';

import type { P2PClient } from '../../client/p2p_client.js';
import { type P2PConfig, getP2PDefaultConfig } from '../../config.js';
import type { AttestationPool } from '../../mem_pools/attestation_pool/attestation_pool.js';
import type { TxPool } from '../../mem_pools/tx_pool/index.js';
import { BatchTxRequester } from '../../services/reqresp/batch-tx-requester/batch_tx_requester.js';
import type { ConnectionSampler } from '../../services/reqresp/connection-sampler/connection_sampler.js';
import { generatePeerIdPrivateKeys } from '../../test-helpers/generate-peer-id-private-keys.js';
import { getPorts } from '../../test-helpers/get-ports.js';
import { makeEnrs } from '../../test-helpers/make-enrs.js';
import { makeAndStartTestP2PClient, makeAndStartTestP2PClients } from '../../test-helpers/make-test-p2p-clients.js';

const TEST_TIMEOUT = 30_000;
jest.setTimeout(TEST_TIMEOUT);

describe('p2p client integration batch txs', () => {
let txPool: MockProxy<TxPool>;
let attestationPool: MockProxy<AttestationPool>;
let epochCache: MockProxy<EpochCache>;
let worldState: MockProxy<WorldStateSynchronizer>;

let connectionSampler: MockProxy<ConnectionSampler>;

let logger: Logger;
let p2pBaseConfig: P2PConfig;

let clients: P2PClient[] = [];

beforeEach(() => {
clients = [];
txPool = mock<TxPool>();
attestationPool = mock<AttestationPool>();
epochCache = mock<EpochCache>();
worldState = mock<WorldStateSynchronizer>();
connectionSampler = mock<ConnectionSampler>();

logger = createLogger('p2p:test:integration:batch');
p2pBaseConfig = { ...emptyChainConfig, ...getP2PDefaultConfig() };

//@ts-expect-error - we want to mock the getEpochAndSlotInNextL1Slot method, mocking ts is enough
epochCache.getEpochAndSlotInNextL1Slot.mockReturnValue({ ts: BigInt(0) });
epochCache.getRegisteredValidators.mockResolvedValue([]);

txPool.hasTxs.mockResolvedValue([]);
txPool.getAllTxs.mockImplementation(() => {
return Promise.resolve([] as Tx[]);
});
txPool.addTxs.mockResolvedValue(1);
txPool.getTxsByHash.mockImplementation(() => {
return Promise.resolve([] as Tx[]);
});

worldState.status.mockResolvedValue({
state: mock(),
syncSummary: {
latestBlockNumber: 0,
latestBlockHash: '',
finalizedBlockNumber: 0,
treesAreSynched: false,
oldestHistoricBlockNumber: 0,
},
});
logger.info(`Starting test ${expect.getState().currentTestName}`);
});

afterEach(async () => {
logger.info(`Tearing down state for ${expect.getState().currentTestName}`);
await shutdown(clients);
logger.info('Shut down p2p clients');

jest.restoreAllMocks();
jest.resetAllMocks();
jest.clearAllMocks();

clients = [];
});

// Shutdown all test clients
const shutdown = async (clients: P2PClient[]) => {
await Promise.all(clients.map(client => client.stop()));
await sleep(1000);
};

const createBlockProposal = (blockNumber: number, blockHash: Fr, txHashes: TxHash[]) => {
return makeBlockProposal({
signer: Secp256k1Signer.random(),
header: makeHeader(1, blockNumber),
archive: blockHash,
txHashes,
});
};

const setupClients = async (numberOfPeers: number, txPoolMocks?: MockProxy<TxPool>[]) => {
if (txPoolMocks) {
const peerIdPrivateKeys = generatePeerIdPrivateKeys(numberOfPeers);
let ports = [];
while (true) {
try {
ports = await getPorts(numberOfPeers);
break;
} catch {
await sleep(1000);
}
}
const peerEnrs = await makeEnrs(peerIdPrivateKeys, ports, p2pBaseConfig);

for (let i = 0; i < numberOfPeers; i++) {
const client = await makeAndStartTestP2PClient(peerIdPrivateKeys[i], ports[i], peerEnrs, {
p2pBaseConfig,
mockAttestationPool: attestationPool,
mockTxPool: txPoolMocks[i],
mockEpochCache: epochCache,
mockWorldState: worldState,
logger: createLogger(`p2p:${i}`),
});
clients.push(client);
}

return;
}

clients = (
await makeAndStartTestP2PClients(numberOfPeers, {
p2pBaseConfig,
mockAttestationPool: attestationPool,
mockTxPool: txPool,
mockEpochCache: epochCache,
mockWorldState: worldState,
logger,
})
).map(x => x.client);
};

async function makeSureClientsAreStarted() {
// Give the nodes time to discover each other
await sleep(4000);
for (const c of clients) {
await retryUntil(async () => (await c.getPeers()).length == clients.length - 1, 'peers discovered', 12, 0.5);
}

logger.info('Finished waiting for clients to connect');
}

it('batch requester fetches all missing txs from multiple peers', async () => {
const NUMBER_OF_PEERS = 4;

const txCount = 20;
const txs = await Promise.all(times(txCount, () => mockTx()));
const txHashes = await Promise.all(txs.map(tx => tx.getTxHash()));

const blockNumber = 5;
const blockHash = Fr.random();
const blockProposal = createBlockProposal(blockNumber, blockHash, txHashes);

// Distribute transactions across peers (simulating partial knowledge)
// Peer 0 has no txs (client requesting)
const peerTxDistribution = [
{ start: 0, end: 0 }, // Peer 0 (requester)
{ start: 0, end: 11 },
{ start: 6, end: 15 },
{ start: 10, end: 20 }, // Peer 3
];

// Create individual txPool mocks for each peer
const txPoolMocks: MockProxy<TxPool>[] = [];
for (let i = 0; i < NUMBER_OF_PEERS; i++) {
const peerTxPool = mock<TxPool>();
const { start, end } = peerTxDistribution[i];
const peerTxs = txs.slice(start, end);
const peerTxHashSet = new Set(peerTxs.map(tx => tx.txHash.toString()));

peerTxPool.hasTxs.mockImplementation((hashes: TxHash[]) => {
return Promise.resolve(hashes.map(h => peerTxHashSet.has(h.toString())));
});
peerTxPool.getTxsByHash.mockImplementation((hashes: TxHash[]) => {
return Promise.resolve(hashes.map(hash => peerTxs.find(t => t.txHash.equals(hash))));
});

txPoolMocks.push(peerTxPool);
}

await setupClients(NUMBER_OF_PEERS, txPoolMocks);
await makeSureClientsAreStarted();

const peerIds = clients.map(client => (client as any).p2pService.node.peerId);
connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peerIds);

attestationPool.getBlockProposal.mockResolvedValue(blockProposal);

// Client 0 is missing all transactions
const missingTxHashes = txHashes;

// Create BatchTxRequester instance
const [client0] = clients;
const requester = new BatchTxRequester(
missingTxHashes,
blockProposal,
undefined, // no pinned peer
5_000,
(client0 as any).p2pService.reqresp,
connectionSampler,
() => Promise.resolve(true),
logger,
);

const fetchedTxs = await BatchTxRequester.collectAllTxs(requester.run());

// Verify all transactions were fetched
expect(fetchedTxs).toBeDefined();
const fetchedHashes = await Promise.all(fetchedTxs!.map(tx => tx.getTxHash()));
expect(
new Set(fetchedHashes.map(h => h.toString())).difference(new Set(txHashes.map(h => h.toString()))).size,
).toBe(0);
});
});
Loading
Loading