Skip to content

Commit 5a04627

Browse files
committed
added tests for tx validation and some cleanups and fixes
1 parent f0a20b3 commit 5a04627

3 files changed

Lines changed: 228 additions & 5 deletions

File tree

yarn-project/foundation/src/queue/semaphore.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
import { FifoMemoryQueue } from './fifo_memory_queue.js';
22

3+
export interface ISemaphore {
4+
acquire(): Promise<void>;
5+
release(): void;
6+
}
7+
38
/**
49
* Allows the acquiring of up to `size` tokens before calls to acquire block, waiting for a call to release().
510
*/

yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.test.ts

Lines changed: 221 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import { Tx, TxArray, TxHash } from '@aztec/stdlib/tx';
1313
import { describe, expect, it, jest } from '@jest/globals';
1414
import { type MockProxy, mock } from 'jest-mock-extended';
1515

16-
import { createSecp256k1PeerId } from '../../../index.js';
16+
import { type PeerId, createSecp256k1PeerId } from '../../../index.js';
1717
import type { ConnectionSampler } from '../connection-sampler/connection_sampler.js';
1818
import type { ReqRespInterface } from '../interface.js';
1919
import { BitVector, BlockTxsRequest, BlockTxsResponse } from '../protocols/index.js';
@@ -27,7 +27,7 @@ import {
2727
RATE_LIMIT_EXCEEDED_PEER_CACHE_TTL,
2828
} from './peer_collection.js';
2929

30-
const TEST_TIMEOUT = 10_000;
30+
const TEST_TIMEOUT = 15_000;
3131
jest.setTimeout(TEST_TIMEOUT);
3232

3333
describe('BatchTxRequester', () => {
@@ -1148,6 +1148,225 @@ describe('BatchTxRequester', () => {
11481148
expect(result!.length).toBeLessThan(txCount); // Not all due to abort
11491149
});
11501150
});
1151+
1152+
describe('Transaction validation', () => {
1153+
it('should only yield valid transactions and filter out invalid ones', async () => {
1154+
const txCount = 10;
1155+
const deadline = 5_000;
1156+
const missing = Array.from({ length: txCount }, () => TxHash.random());
1157+
1158+
blockProposal = makeBlockProposal({
1159+
signer: Secp256k1Signer.random(),
1160+
header: makeHeader(1, 1),
1161+
archive: Fr.random(),
1162+
txHashes: missing,
1163+
});
1164+
1165+
const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]);
1166+
connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers);
1167+
1168+
// Define which transactions each peer has
1169+
const peerTransactions = new Map([
1170+
[peers[0].toString(), Array.from({ length: 5 }, (_, i) => i)], // peer1: txs 0-4
1171+
[peers[1].toString(), Array.from({ length: 5 }, (_, i) => i + 5)], // peer2: txs 5-9
1172+
]);
1173+
1174+
const validationCalls: Array<{ tx: TxHash; peerId: string }> = [];
1175+
const invalidTxIndices = new Set([2, 3, 7]); // Mark transactions at indices 2, 3, and 7 as invalid
1176+
1177+
const customTxValidator = jest.fn(async (tx: Tx, peerId: PeerId) => {
1178+
validationCalls.push({ tx: tx.txHash, peerId: peerId.toString() });
1179+
const txIndex = missing.findIndex(h => h.equals(tx.txHash));
1180+
1181+
return !invalidTxIndices.has(txIndex);
1182+
});
1183+
1184+
const { mockImplementation } = createRequestLogger(blockProposal, new Set(), peerTransactions);
1185+
reqresp.sendRequestToPeer.mockImplementation(mockImplementation);
1186+
1187+
const requester = new BatchTxRequester(
1188+
missing,
1189+
blockProposal,
1190+
undefined,
1191+
deadline,
1192+
reqresp,
1193+
connectionSampler,
1194+
customTxValidator,
1195+
logger,
1196+
new DateProvider(),
1197+
{
1198+
smartParallelWorkerCount: 0,
1199+
dumbParallelWorkerCount: 2,
1200+
},
1201+
);
1202+
1203+
const result = await BatchTxRequester.collectAllTxs(requester.run());
1204+
1205+
const expectedValidCount = txCount - invalidTxIndices.size;
1206+
1207+
expect(result.length).toBe(expectedValidCount);
1208+
1209+
// Verify that invalid transactions are NOT in the result
1210+
const resultTxHashes = new Set(result.map(tx => tx.txHash.toString()));
1211+
invalidTxIndices.forEach(invalidIndex => {
1212+
const invalidTxHash = missing[invalidIndex].toString();
1213+
expect(resultTxHashes.has(invalidTxHash)).toBe(false);
1214+
});
1215+
1216+
// Verify that valid transactions ARE in the result
1217+
const validIndices = Array.from({ length: txCount }, (_, i) => i).filter(i => !invalidTxIndices.has(i));
1218+
validIndices.forEach(validIndex => {
1219+
const validTxHash = missing[validIndex].toString();
1220+
expect(resultTxHashes.has(validTxHash)).toBe(true);
1221+
});
1222+
1223+
const peer0Calls = validationCalls.filter(call => call.peerId === peers[0].toString());
1224+
const peer1Calls = validationCalls.filter(call => call.peerId === peers[1].toString());
1225+
1226+
expect(peer0Calls.length).toBeGreaterThan(0);
1227+
expect(peer1Calls.length).toBeGreaterThan(0);
1228+
});
1229+
1230+
it('should handle mixed valid and invalid transactions from multiple peers', async () => {
1231+
const txCount = 12;
1232+
const deadline = 5_000;
1233+
const missing = Array.from({ length: txCount }, () => TxHash.random());
1234+
1235+
blockProposal = makeBlockProposal({
1236+
signer: Secp256k1Signer.random(),
1237+
header: makeHeader(1, 1),
1238+
archive: Fr.random(),
1239+
txHashes: missing,
1240+
});
1241+
1242+
const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId(), createSecp256k1PeerId()]);
1243+
connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers);
1244+
1245+
const peerTransactions = new Map([
1246+
[peers[0].toString(), [0, 1, 2, 3, 4]],
1247+
[peers[1].toString(), [2, 3, 4, 5, 6, 7, 8]],
1248+
[peers[2].toString(), [0, 6, 7, 8, 9, 10, 11]],
1249+
]);
1250+
1251+
// Validator that rejects transactions based on peer
1252+
// This simulates different peers having different validity for same transaction
1253+
const peerSpecificValidator = jest.fn(async (tx: Tx, peerId: PeerId) => {
1254+
const txIndex = missing.findIndex(h => h.equals(tx.txHash));
1255+
1256+
// Peer 0: rejects even indices
1257+
if (peerId.toString() === peers[0].toString() && txIndex % 2 === 0) {
1258+
return false;
1259+
}
1260+
1261+
// Peer 1: rejects indices divisible by 3
1262+
if (peerId.toString() === peers[1].toString() && txIndex % 3 === 0) {
1263+
return false;
1264+
}
1265+
1266+
// Peer 2: accepts all
1267+
return true;
1268+
});
1269+
1270+
const { mockImplementation } = createRequestLogger(blockProposal, new Set(), peerTransactions);
1271+
reqresp.sendRequestToPeer.mockImplementation(mockImplementation);
1272+
1273+
const requester = new BatchTxRequester(
1274+
missing,
1275+
blockProposal,
1276+
undefined,
1277+
deadline,
1278+
reqresp,
1279+
connectionSampler,
1280+
peerSpecificValidator,
1281+
logger,
1282+
new DateProvider(),
1283+
{
1284+
smartParallelWorkerCount: 0,
1285+
dumbParallelWorkerCount: 3,
1286+
},
1287+
);
1288+
1289+
const result = await BatchTxRequester.collectAllTxs(requester.run());
1290+
1291+
// Verify we got all transactions (since peer2 accepts all and has access to 6-11)
1292+
// And other peers can provide the rest
1293+
expect(result.length).toBe(txCount);
1294+
1295+
// Verify no duplicates in result
1296+
const uniqueTxHashes = new Set(result.map(tx => tx.txHash.toString()));
1297+
expect(uniqueTxHashes.size).toBe(result.length);
1298+
});
1299+
1300+
it('should handle validator throwing errors gracefully', async () => {
1301+
const txCount = 8;
1302+
const deadline = 3_000;
1303+
const missing = Array.from({ length: txCount }, () => TxHash.random());
1304+
1305+
blockProposal = makeBlockProposal({
1306+
signer: Secp256k1Signer.random(),
1307+
header: makeHeader(1, 1),
1308+
archive: Fr.random(),
1309+
txHashes: missing,
1310+
});
1311+
1312+
const peer = await createSecp256k1PeerId();
1313+
connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([peer]);
1314+
1315+
// Validator that throws errors for specific transactions
1316+
const throwingValidator = jest.fn(async (tx: Tx, _peerId: PeerId) => {
1317+
const txIndex = missing.findIndex(h => h.equals(tx.txHash));
1318+
1319+
// Throw error for transactions at indices 1 and 3
1320+
if (txIndex === 1 || txIndex === 3) {
1321+
throw new Error(`Validation error for tx at index ${txIndex}`);
1322+
}
1323+
1324+
// Reject transaction at index 5 normally
1325+
if (txIndex === 5) {
1326+
return false;
1327+
}
1328+
1329+
return true;
1330+
});
1331+
1332+
const peerTransactions = new Map([[peer.toString(), Array.from({ length: txCount }, (_, i) => i)]]);
1333+
1334+
const { mockImplementation } = createRequestLogger(blockProposal, new Set(), peerTransactions);
1335+
reqresp.sendRequestToPeer.mockImplementation(mockImplementation);
1336+
1337+
const requester = new BatchTxRequester(
1338+
missing,
1339+
blockProposal,
1340+
undefined,
1341+
deadline,
1342+
reqresp,
1343+
connectionSampler,
1344+
throwingValidator,
1345+
logger,
1346+
new DateProvider(),
1347+
{
1348+
smartParallelWorkerCount: 0,
1349+
dumbParallelWorkerCount: 1,
1350+
},
1351+
);
1352+
1353+
const result = await BatchTxRequester.collectAllTxs(requester.run());
1354+
1355+
// Expected: 8 total - 2 that threw errors - 1 that returned false = 5 valid txs
1356+
expect(result.length).toBe(5);
1357+
1358+
// Verify that transactions that threw errors are NOT in result
1359+
const resultTxHashes = new Set(result.map(tx => tx.txHash.toString()));
1360+
expect(resultTxHashes.has(missing[1].toString())).toBe(false);
1361+
expect(resultTxHashes.has(missing[3].toString())).toBe(false);
1362+
expect(resultTxHashes.has(missing[5].toString())).toBe(false);
1363+
1364+
// Verify that valid transactions ARE in result
1365+
[0, 2, 4, 6, 7].forEach(validIndex => {
1366+
expect(resultTxHashes.has(missing[validIndex].toString())).toBe(true);
1367+
});
1368+
});
1369+
});
11511370
});
11521371

11531372
const makeTx = (txHash?: string | TxHash) => Tx.random({ txHash }) as Tx;

yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ export class BatchTxRequester {
106106
public static async collectAllTxs(generator: AsyncGenerator<Tx, Tx | undefined, unknown>): Promise<Tx[]> {
107107
const txs: Tx[] = [];
108108
for await (const tx of generator) {
109+
if (tx === undefined) break;
109110
txs.push(tx);
110111
}
111112
return txs;
@@ -295,7 +296,7 @@ export class BatchTxRequester {
295296
}
296297

297298
// Otherwise we wait until some peer becomes smart
298-
await this.smartRequesterSemaphore.acquire();
299+
await executeTimeout((_: AbortSignal) => this.smartRequesterSemaphore.acquire(), this.timeoutMs);
299300
this.logger.debug(`Worker loop smart: acquired next smart peer`);
300301
continue;
301302
}
@@ -347,7 +348,6 @@ export class BatchTxRequester {
347348

348349
this.handleFailResponseFromPeer(peerId, ReqRespStatus.UNKNOWN);
349350
} finally {
350-
// Don't mark pinned peer as not in flight
351351
this.peers.unMarkPeerInFlight(peerId);
352352
}
353353
}
@@ -429,7 +429,6 @@ export class BatchTxRequester {
429429
private markTxsPeerHas(peerId: PeerId, response: BlockTxsResponse) {
430430
const txsPeerHas = this.extractHashesPeerHasFromResponse(response);
431431
this.logger.debug(`${peerId.toString()} has txs: ${txsPeerHas.map(tx => tx.toString()).join(', ')}`);
432-
//TODO: validate txs
433432
this.txsMetadata.markPeerHas(peerId, txsPeerHas);
434433
}
435434

0 commit comments

Comments
 (0)