Skip to content

Commit 6fdcd36

Browse files
committed
proper pinned peer handling
1 parent 5a04627 commit 6fdcd36

2 files changed

Lines changed: 346 additions & 8 deletions

File tree

yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.test.ts

Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1367,6 +1367,311 @@ describe('BatchTxRequester', () => {
13671367
});
13681368
});
13691369
});
1370+
1371+
describe('Pinned peer functionality', () => {
1372+
it('Should query pinned peer if available', async () => {
1373+
const txCount = 10;
1374+
const deadline = 5_000;
1375+
const missing = Array.from({ length: txCount }, () => TxHash.random());
1376+
1377+
blockProposal = makeBlockProposal({
1378+
signer: Secp256k1Signer.random(),
1379+
header: makeHeader(1, 1),
1380+
archive: Fr.random(),
1381+
txHashes: missing,
1382+
});
1383+
1384+
const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId(), createSecp256k1PeerId()]);
1385+
1386+
connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers);
1387+
const [pinnedPeer, regularPeer1, regularPeer2] = peers;
1388+
1389+
// Pinned peer has all transactions, regular peers have partial
1390+
const peerTransactions = new Map([
1391+
[pinnedPeer.toString(), Array.from({ length: txCount }, (_, i) => i)], // All transactions
1392+
[regularPeer1.toString(), Array.from({ length: 5 }, (_, i) => i)], // First 5
1393+
[regularPeer2.toString(), Array.from({ length: 5 }, (_, i) => i + 5)], // Last 5
1394+
]);
1395+
1396+
const { requestLog, mockImplementation } = createRequestLogger(blockProposal, new Set(), peerTransactions);
1397+
reqresp.sendRequestToPeer.mockImplementation(mockImplementation);
1398+
1399+
const requester = new BatchTxRequester(
1400+
missing,
1401+
blockProposal,
1402+
pinnedPeer,
1403+
deadline,
1404+
reqresp,
1405+
connectionSampler,
1406+
txValidator,
1407+
logger,
1408+
new DateProvider(),
1409+
{
1410+
smartParallelWorkerCount: 0,
1411+
dumbParallelWorkerCount: 2,
1412+
},
1413+
);
1414+
1415+
const results = await BatchTxRequester.collectAllTxs(requester.run());
1416+
expect(results).toHaveLength(txCount);
1417+
1418+
expect(requestLog.has(pinnedPeer.toString())).toBe(true);
1419+
const pinnedPeerRequests = requestLog.get(pinnedPeer.toString())!;
1420+
expect(pinnedPeerRequests[0].indices.length).toEqual(TX_BATCH_SIZE);
1421+
});
1422+
1423+
it('should never mark pinned peer as smart', async () => {
1424+
const txCount = 30;
1425+
const deadline = 5_000;
1426+
const missing = Array.from({ length: txCount }, () => TxHash.random());
1427+
1428+
blockProposal = makeBlockProposal({
1429+
signer: Secp256k1Signer.random(),
1430+
header: makeHeader(1, 1),
1431+
archive: Fr.random(),
1432+
txHashes: missing,
1433+
});
1434+
1435+
const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]);
1436+
connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers);
1437+
1438+
const [pinnedPeer, regularPeer] = peers;
1439+
const peerCollection = new TestPeerCollection(new PeerCollection(peers, pinnedPeer, new DateProvider()));
1440+
1441+
// Both peers have all transactions
1442+
const peerTransactions = new Map([
1443+
[pinnedPeer.toString(), Array.from({ length: txCount }, (_, i) => i)],
1444+
[regularPeer.toString(), Array.from({ length: txCount }, (_, i) => i)],
1445+
]);
1446+
1447+
const { mockImplementation } = createRequestLogger(blockProposal, new Set(), peerTransactions, 50);
1448+
reqresp.sendRequestToPeer.mockImplementation(mockImplementation);
1449+
1450+
const requester = new BatchTxRequester(
1451+
missing,
1452+
blockProposal,
1453+
pinnedPeer,
1454+
deadline,
1455+
reqresp,
1456+
connectionSampler,
1457+
txValidator,
1458+
logger,
1459+
new DateProvider(),
1460+
{
1461+
smartParallelWorkerCount: 1,
1462+
dumbParallelWorkerCount: 1,
1463+
peerCollection,
1464+
},
1465+
);
1466+
1467+
await BatchTxRequester.collectAllTxs(requester.run());
1468+
1469+
// Verify pinned peer was never marked as smart
1470+
expect(peerCollection.getSmartPeers()).not.toContain(pinnedPeer.toString());
1471+
expect(peerCollection.getSmartPeersToQuery()).not.toContain(pinnedPeer.toString());
1472+
});
1473+
1474+
it('should handle pinned peer being rate limited and recover', async () => {
1475+
const txCount = 6;
1476+
const deadline = 8_000;
1477+
const clock = new TestClock();
1478+
const missing = Array.from({ length: txCount }, () => TxHash.random());
1479+
1480+
blockProposal = makeBlockProposal({
1481+
signer: Secp256k1Signer.random(),
1482+
header: makeHeader(1, 1),
1483+
archive: Fr.random(),
1484+
txHashes: missing,
1485+
});
1486+
1487+
const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]);
1488+
const [pinnedPeer, regularPeer] = peers;
1489+
const peerCollection = new TestPeerCollection(new PeerCollection(peers, pinnedPeer, clock));
1490+
1491+
connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers);
1492+
1493+
const peerTransactions = new Map([
1494+
[pinnedPeer.toString(), Array.from({ length: txCount }, (_, i) => i)],
1495+
[regularPeer.toString(), Array.from({ length: txCount }, (_, i) => i)],
1496+
]);
1497+
1498+
let pinnedPeerRequestCount = 0;
1499+
reqresp.sendRequestToPeer.mockImplementation(async (peerId: any, _sub: any, data: any) => {
1500+
const peerStr = peerId.toString();
1501+
1502+
// First request to pinned peer returns rate limit
1503+
if (peerStr === pinnedPeer.toString() && pinnedPeerRequestCount === 0) {
1504+
pinnedPeerRequestCount++;
1505+
return {
1506+
status: ReqRespStatus.RATE_LIMIT_EXCEEDED,
1507+
data: Buffer.alloc(0),
1508+
};
1509+
}
1510+
1511+
// All other requests succeed
1512+
const request = BlockTxsRequest.fromBuffer(data);
1513+
const requestedIndices = request.txIndices.getTrueIndices();
1514+
const peerHasIndices = peerTransactions.get(peerStr) || [];
1515+
const availableIndices = requestedIndices.filter(idx => peerHasIndices.includes(idx));
1516+
const availableTxHashes = availableIndices.map(idx => blockProposal.txHashes[idx]);
1517+
const availableTxs = availableTxHashes.map(h => makeTx(h));
1518+
1519+
const response = new BlockTxsResponse(
1520+
blockProposal.archive,
1521+
new TxArray(...availableTxs),
1522+
BitVector.init(blockProposal.txHashes.length, peerHasIndices),
1523+
);
1524+
1525+
return {
1526+
status: ReqRespStatus.SUCCESS,
1527+
data: response.toBuffer(),
1528+
};
1529+
});
1530+
1531+
const requester = new BatchTxRequester(
1532+
missing,
1533+
blockProposal,
1534+
pinnedPeer,
1535+
deadline,
1536+
reqresp,
1537+
connectionSampler,
1538+
txValidator,
1539+
logger,
1540+
clock,
1541+
{
1542+
smartParallelWorkerCount: 0,
1543+
dumbParallelWorkerCount: 2,
1544+
peerCollection,
1545+
},
1546+
);
1547+
1548+
const runPromise = BatchTxRequester.collectAllTxs(requester.run());
1549+
1550+
// Let some time pass for rate limit handling
1551+
await sleep(100);
1552+
clock.advanceTo(RATE_LIMIT_EXCEEDED_PEER_CACHE_TTL + 1);
1553+
1554+
const results = await runPromise;
1555+
expect(results).toHaveLength(txCount);
1556+
1557+
// Verify pinned peer was marked as rate limited
1558+
expect(peerCollection.peersMarkedRateLimitExceeded).toContain(pinnedPeer.toString());
1559+
});
1560+
1561+
it('should handle pinned peer being marked as bad and continue with regular peers', async () => {
1562+
const txCount = 8;
1563+
const deadline = 5_000;
1564+
const missing = Array.from({ length: txCount }, () => TxHash.random());
1565+
1566+
blockProposal = makeBlockProposal({
1567+
signer: Secp256k1Signer.random(),
1568+
header: makeHeader(1, 1),
1569+
archive: Fr.random(),
1570+
txHashes: missing,
1571+
});
1572+
1573+
const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]);
1574+
const [pinnedPeer, regularPeer] = peers;
1575+
const peerCollection = new TestPeerCollection(new PeerCollection(peers, pinnedPeer, new DateProvider()));
1576+
1577+
connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers);
1578+
1579+
// Regular peer has all transactions, pinned peer will fail
1580+
const peerTransactions = new Map([[regularPeer.toString(), Array.from({ length: txCount }, (_, i) => i)]]);
1581+
1582+
const peersToReturnFailureFor = new Set([pinnedPeer.toString()]);
1583+
const { mockImplementation } = createRequestLogger(blockProposal, peersToReturnFailureFor, peerTransactions);
1584+
reqresp.sendRequestToPeer.mockImplementation(mockImplementation);
1585+
1586+
const requester = new BatchTxRequester(
1587+
missing,
1588+
blockProposal,
1589+
pinnedPeer,
1590+
deadline,
1591+
reqresp,
1592+
connectionSampler,
1593+
txValidator,
1594+
logger,
1595+
new DateProvider(),
1596+
{
1597+
smartParallelWorkerCount: 0,
1598+
dumbParallelWorkerCount: 1,
1599+
peerCollection,
1600+
},
1601+
);
1602+
1603+
const results = await BatchTxRequester.collectAllTxs(requester.run());
1604+
1605+
expect(results).toHaveLength(txCount);
1606+
expect(peerCollection.peersMarkedBad).toContain(pinnedPeer.toString());
1607+
});
1608+
1609+
it('should validate transactions from pinned peer same as regular peers', async () => {
1610+
const txCount = 8;
1611+
const deadline = 5_000;
1612+
const missing = Array.from({ length: txCount }, () => TxHash.random());
1613+
1614+
blockProposal = makeBlockProposal({
1615+
signer: Secp256k1Signer.random(),
1616+
header: makeHeader(1, 1),
1617+
archive: Fr.random(),
1618+
txHashes: missing,
1619+
});
1620+
1621+
const [pinnedPeer, regularPeer] = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]);
1622+
connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([regularPeer]);
1623+
1624+
const peerTransactions = new Map([
1625+
[pinnedPeer.toString(), Array.from({ length: 4 }, (_, i) => i)], // First 4 txs
1626+
[regularPeer.toString(), Array.from({ length: 4 }, (_, i) => i + 4)], // Last 4 txs
1627+
]);
1628+
1629+
const validationCalls: Array<{ tx: TxHash; peerId: string }> = [];
1630+
const invalidTxIndices = new Set([1, 6]); // Mark some transactions as invalid
1631+
1632+
const customTxValidator = jest.fn(async (tx: Tx, peerId: PeerId) => {
1633+
validationCalls.push({ tx: tx.txHash, peerId: peerId.toString() });
1634+
const txIndex = missing.findIndex(h => h.equals(tx.txHash));
1635+
return !invalidTxIndices.has(txIndex);
1636+
});
1637+
1638+
const { mockImplementation } = createRequestLogger(blockProposal, new Set(), peerTransactions);
1639+
reqresp.sendRequestToPeer.mockImplementation(mockImplementation);
1640+
1641+
const requester = new BatchTxRequester(
1642+
missing,
1643+
blockProposal,
1644+
pinnedPeer,
1645+
deadline,
1646+
reqresp,
1647+
connectionSampler,
1648+
customTxValidator,
1649+
logger,
1650+
new DateProvider(),
1651+
{
1652+
smartParallelWorkerCount: 0,
1653+
dumbParallelWorkerCount: 2,
1654+
},
1655+
);
1656+
1657+
const results = await BatchTxRequester.collectAllTxs(requester.run());
1658+
1659+
// Should receive 6 valid transactions (8 total - 2 invalid)
1660+
expect(results).toHaveLength(6);
1661+
1662+
// Verify validation was called for both pinned and regular peers
1663+
const pinnedPeerValidations = validationCalls.filter(call => call.peerId === pinnedPeer.toString());
1664+
const regularPeerValidations = validationCalls.filter(call => call.peerId === regularPeer.toString());
1665+
1666+
expect(pinnedPeerValidations.length).toBeGreaterThan(0);
1667+
expect(regularPeerValidations.length).toBeGreaterThan(0);
1668+
1669+
// Verify invalid transactions were filtered out
1670+
const resultTxHashes = new Set(results.map(tx => tx.txHash.toString()));
1671+
expect(resultTxHashes.has(missing[1].toString())).toBe(false); // Invalid from pinned
1672+
expect(resultTxHashes.has(missing[6].toString())).toBe(false); // Invalid from regular
1673+
});
1674+
});
13701675
});
13711676

13721677
const makeTx = (txHash?: string | TxHash) => Tx.random({ txHash }) as Tx;

yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,27 +112,55 @@ export class BatchTxRequester {
112112
return txs;
113113
}
114114

115-
//TODO: handle pinned peer properly
115+
/*
116+
* Handles so-called pinned peer
117+
* The pinned peer is the one who sent us block proposal
118+
* We expect pinned peer to have all transactions from the proposal at some point
119+
* This holds because they them selves have to attest to proposal and thus fetch all missing transactions
120+
*
121+
* Given the reasoning above - we query pinned peer separately from dumb/smart peers
122+
* */
116123
private async pinnedPeerRequester() {
124+
if (!this.pinnedPeer) {
125+
this.logger.debug('No pinned peer to request from');
126+
return;
127+
}
128+
117129
while (!this.shouldStop()) {
118-
if (!this.pinnedPeer) {
119-
this.logger.debug('No pinned peer to request from');
130+
// We've hit rate limits on the pinned peer - wait a bit before making another request
131+
if (this.peers.getRateLimitExceededPeers().has(this.pinnedPeer.toString())) {
132+
await sleep(RATE_LIMIT_EXCEEDED_PEER_CACHE_TTL);
133+
continue;
134+
}
135+
136+
//Pinned peer went bad, don't request from it anymore
137+
if (this.peers.getBadPeers().has(this.pinnedPeer.toString())) {
120138
return;
121139
}
122140

123-
const txsToRequest = this.txsMetadata.getTxsToRequestFromThePeer(this.pinnedPeer).slice(0, TX_BATCH_SIZE);
124-
if (txsToRequest.length === 0) {
141+
// From pinned peer we always request transactions so that we first request the least requested and not in flight
142+
// This makes sense since pinned peer should have ALL transactions,
143+
// Thus if it has all it is best to ask pinned first for the transactions we have trouble getting from other peers
144+
const txs = this.txsMetadata.getTxsToRequestFromThePeer(this.pinnedPeer);
145+
if (txs.length === 0) {
125146
this.logger.debug(`Pinned peer ${this.pinnedPeer.toString()} has no txs to request`);
126147
return;
127148
}
128149

129-
txsToRequest.forEach(tx => this.txsMetadata.markRequested(tx));
150+
txs.forEach(tx => {
151+
this.txsMetadata.markRequested(tx);
152+
this.txsMetadata.markInFlightBySmartPeer(tx);
153+
});
130154

131-
const request = BlockTxsRequest.fromBlockProposalAndMissingTxs(this.blockProposal, txsToRequest);
155+
const request = BlockTxsRequest.fromBlockProposalAndMissingTxs(this.blockProposal, txs);
132156
if (!request) {
133157
return;
134158
}
135159
await this.requestTxBatch(this.pinnedPeer, request);
160+
161+
txs.forEach(tx => {
162+
this.txsMetadata.markNotInFlightBySmartPeer(tx);
163+
});
136164
}
137165
}
138166

@@ -357,7 +385,8 @@ export class BatchTxRequester {
357385
this.logger.debug(`Received txs: ${response.txs.length} from peer ${peerId.toString()} `);
358386
await this.handleReceivedTxs(peerId, response.txs);
359387

360-
if (!this.isBlockResponseValid(response)) {
388+
const pinnedPeerShouldNeverBeMarkedAsSmart = this.pinnedPeer && peerId.toString() === this.pinnedPeer.toString();
389+
if (pinnedPeerShouldNeverBeMarkedAsSmart) {
361390
return;
362391
}
363392

@@ -366,6 +395,10 @@ export class BatchTxRequester {
366395
return;
367396
}
368397

398+
if (!this.isBlockResponseValid(response)) {
399+
return;
400+
}
401+
369402
// We mark peer as "smart" only if they have some txs we are missing
370403
// Otherwise we keep them as "dumb" in hope they'll receive some new txs we are missing in the future
371404
if (!this.peerHasSomeTxsWeAreMissing(peerId, response)) {

0 commit comments

Comments
 (0)