Skip to content

Commit 11c18fb

Browse files
committed
refactor(p2p): remove old reqresp mode
1 parent cef95ec commit 11c18fb

18 files changed

Lines changed: 170 additions & 398 deletions

yarn-project/p2p/src/client/factory.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import type { TxPoolV2 } from '../mem_pools/tx_pool_v2/interfaces.js';
2020
import { AztecKVTxPoolV2 } from '../mem_pools/tx_pool_v2/tx_pool_v2.js';
2121
import {
2222
createCheckAllowedSetupCalls,
23-
createTxValidatorForReqResponseReceivedTxs,
23+
createTxValidatorForOnDemandReceivedTxs,
2424
createTxValidatorForTransactionsEnteringPendingTxPool,
2525
getDefaultAllowedSetupFunctions,
2626
} from '../msg_validators/index.js';
@@ -153,7 +153,7 @@ export async function createP2PClient(
153153
telemetry,
154154
);
155155

156-
const txValidatorForTxCollection = createTxValidatorForReqResponseReceivedTxs(proofVerifier, config);
156+
const txValidatorForTxCollection = createTxValidatorForOnDemandReceivedTxs(proofVerifier, config);
157157
const nodeSources = [
158158
...createNodeRpcTxSources(config.txCollectionNodeRpcUrls, txValidatorForTxCollection, config),
159159
...(deps.rpcTxProviders ?? []).map(

yarn-project/p2p/src/client/test/tx_proposal_collector/p2p_client.proposal_tx_collector.bench.test.ts

Lines changed: 9 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import { mkdir, writeFile } from 'fs/promises';
66
import path from 'path';
77

88
import {
9-
type CollectorType,
109
type DistributionPattern,
1110
WorkerClientManager,
1211
testChainConfig,
@@ -15,8 +14,6 @@ import {
1514
const TEST_TIMEOUT_MS = 600_000; // 10 minutes
1615
jest.setTimeout(TEST_TIMEOUT_MS);
1716

18-
const COLLECTOR_TYPES: CollectorType[] = ['batch-requester', 'send-batch-request'];
19-
2017
const PEERS_PER_RUN = 30;
2118
const TIMEOUT_MS = 30_000;
2219

@@ -39,7 +36,6 @@ interface BenchmarkCase extends ScenarioBase {
3936
interface BenchmarkResult {
4037
missingTxCount: number;
4138
distribution: DistributionPattern;
42-
collector: CollectorType;
4339
durationMs: number;
4440
fetchedCount: number;
4541
success: boolean;
@@ -128,7 +124,7 @@ describe('ProposalTxCollector Benchmarks', () => {
128124
});
129125

130126
describe.each(CASES)('$name (missing=$missingTxCount)', benchCase => {
131-
it.each(COLLECTOR_TYPES)('collector: %s', async collectorType => {
127+
it('runs batch tx requester benchmark', async () => {
132128
if (!workerManager) {
133129
throw new Error('Worker manager not initialized');
134130
}
@@ -137,15 +133,12 @@ describe('ProposalTxCollector Benchmarks', () => {
137133
const pinnedPeerIndex = distribution === 'pinned-only' ? benchCase.pinnedPeerIndex : undefined;
138134
const seed = blockNumber * 1_000_000;
139135

140-
logger.info(
141-
`Case=${benchCase.name}, missing=${missingTxCount}, collector=${collectorType}, peers=${peers}, timeoutMs=${timeoutMs}`,
142-
);
136+
logger.info(`Case=${benchCase.name}, missing=${missingTxCount}, peers=${peers}, timeoutMs=${timeoutMs}`);
143137

144138
try {
145139
const result = await workerManager.runReqRespBenchmark({
146140
txCount: missingTxCount,
147141
distribution,
148-
collectorType,
149142
timeoutMs,
150143
pinnedPeerIndex,
151144
blockNumber,
@@ -155,22 +148,18 @@ describe('ProposalTxCollector Benchmarks', () => {
155148
results.push({
156149
missingTxCount,
157150
distribution,
158-
collector: collectorType,
159151
durationMs: result.durationMs,
160152
fetchedCount: result.fetchedCount,
161153
success: result.fetchedCount === missingTxCount,
162154
});
163155

164-
logger.info(
165-
`${collectorType}: fetched ${result.fetchedCount}/${missingTxCount} in ${result.durationMs.toFixed(0)}ms`,
166-
);
156+
logger.info(`fetched ${result.fetchedCount}/${missingTxCount} in ${result.durationMs.toFixed(0)}ms`);
167157
} catch (err: any) {
168-
logger.error(`${collectorType} failed: ${err?.message ?? String(err)}`);
158+
logger.error(`Benchmark failed: ${err?.message ?? String(err)}`);
169159

170160
results.push({
171161
missingTxCount,
172162
distribution,
173-
collector: collectorType,
174163
durationMs: timeoutMs,
175164
fetchedCount: 0,
176165
success: false,
@@ -195,64 +184,23 @@ function toPrettyString(benchResults: BenchmarkResult[]): string {
195184
lines.push('ProposalTxCollector Benchmark Results');
196185
lines.push('='.repeat(80));
197186
lines.push('');
198-
lines.push('| Collector | Distribution | Missing | Duration (ms) | Fetched | Success |');
199-
lines.push('|---------------------|--------------|---------|---------------|---------|---------|');
187+
lines.push('| Distribution | Missing | Duration (ms) | Fetched | Success |');
188+
lines.push('|--------------|---------|---------------|---------|---------|');
200189

201190
const sorted = [...benchResults].sort((a, b) => {
202191
if (a.distribution !== b.distribution) {
203192
return a.distribution.localeCompare(b.distribution);
204193
}
205-
if (a.missingTxCount !== b.missingTxCount) {
206-
return a.missingTxCount - b.missingTxCount;
207-
}
208-
return a.collector.localeCompare(b.collector);
194+
return a.missingTxCount - b.missingTxCount;
209195
});
210196

211197
for (const r of sorted) {
212198
lines.push(
213-
`| ${r.collector.padEnd(19)} | ${r.distribution.padEnd(12)} | ${String(r.missingTxCount).padStart(7)} | ` +
199+
`| ${r.distribution.padEnd(12)} | ${String(r.missingTxCount).padStart(7)} | ` +
214200
`${r.durationMs.toFixed(0).padStart(13)} | ${String(r.fetchedCount).padStart(7)} | ${r.success ? ' Yes ' : ' No '} |`,
215201
);
216202
}
217203

218-
lines.push('');
219-
lines.push('## Comparison Summary');
220-
lines.push('');
221-
222-
const keys = [...new Set(sorted.map(r => `${r.distribution}:${r.missingTxCount}`))];
223-
224-
for (const key of keys) {
225-
const [distRaw, missingRaw] = key.split(':');
226-
const dist = distRaw as DistributionPattern;
227-
const missing = Number(missingRaw);
228-
229-
const batch = sorted.find(
230-
r => r.distribution === dist && r.missingTxCount === missing && r.collector === 'batch-requester',
231-
);
232-
const send = sorted.find(
233-
r => r.distribution === dist && r.missingTxCount === missing && r.collector === 'send-batch-request',
234-
);
235-
236-
if (!batch || !send) {
237-
continue;
238-
}
239-
240-
if (!batch.success || !send.success) {
241-
lines.push(
242-
`- ${dist} (missing=${missing}): cannot compare reliably (success: batch=${batch.success}, send=${send.success})`,
243-
);
244-
continue;
245-
}
246-
247-
const faster = batch.durationMs <= send.durationMs ? 'BatchTxRequester' : 'SendBatchRequest';
248-
const slower = faster === 'BatchTxRequester' ? 'SendBatchRequest' : 'BatchTxRequester';
249-
250-
const delta = Math.abs(send.durationMs - batch.durationMs);
251-
const pct = (delta / Math.max(batch.durationMs, send.durationMs)) * 100;
252-
253-
lines.push(`- ${dist} (missing=${missing}): ${faster} is ${pct.toFixed(1)}% faster than ${slower}`);
254-
}
255-
256204
lines.push('');
257205
lines.push('='.repeat(80));
258206
lines.push('');
@@ -264,7 +212,7 @@ function toBenchmarkJSON(benchResults: BenchmarkResult[], indent = 2): string {
264212
const metrics: JsonBenchmarkResult[] = [];
265213

266214
for (const result of benchResults) {
267-
const baseName = `ProposalTxCollector/${result.collector}/${result.distribution}/missing_${result.missingTxCount}`;
215+
const baseName = `ProposalTxCollector/${result.distribution}/missing_${result.missingTxCount}`;
268216
metrics.push(
269217
{
270218
name: `${baseName}/duration`,

yarn-project/p2p/src/client/test/tx_proposal_collector/proposal_tx_collector_worker.ts

Lines changed: 20 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import type { PeerId } from '@libp2p/interface';
1717
import { peerIdFromString } from '@libp2p/peer-id';
1818

1919
import type { P2PConfig } from '../../../config.js';
20-
import { BatchTxRequesterCollector, SendBatchRequestCollector } from '../../../services/index.js';
20+
import { BatchTxRequester } from '../../../services/reqresp/batch-tx-requester/batch_tx_requester.js';
2121
import type { IBatchRequestTxValidator } from '../../../services/reqresp/batch-tx-requester/tx_validator.js';
2222
import { RateLimitStatus } from '../../../services/reqresp/rate-limiter/rate_limiter.js';
2323
import { RequestTracker } from '../../../services/tx_collection/request_tracker.js';
@@ -170,7 +170,7 @@ function installUnlimitedRateLimits() {
170170
}
171171

172172
async function runCollector(cmd: Extract<WorkerCommand, { type: 'RUN_COLLECTOR' }>) {
173-
const { collectorType, txHashes, blockProposal, pinnedPeerId, peerIds, timeoutMs } = cmd;
173+
const { txHashes, blockProposal, pinnedPeerId, peerIds, timeoutMs } = cmd;
174174
const reqResp = (ensureClient() as any).p2pService.reqresp as any;
175175
const peerList = peerIds.map(peerId => peerIdFromString(peerId));
176176

@@ -211,37 +211,24 @@ async function runCollector(cmd: Extract<WorkerCommand, { type: 'RUN_COLLECTOR'
211211
};
212212

213213
try {
214-
if (collectorType === 'batch-requester') {
215-
const collector = new BatchTxRequesterCollector(p2pService, logger, new DateProvider(), noopTxValidator);
216-
const fetched = await executeTimeout(
217-
(_signal: AbortSignal) =>
218-
collector.collectTxs(
219-
RequestTracker.create(parsedTxHashes, new Date(Date.now() + internalTimeoutMs)),
220-
parsedProposal,
221-
pinnedPeer,
222-
),
223-
timeoutMs,
224-
() => new Error(`Collector timed out after ${timeoutMs}ms`),
225-
);
226-
fetchedCount = fetched.length;
227-
} else {
228-
const collector = new SendBatchRequestCollector(
229-
p2pService,
230-
BENCHMARK_CONSTANTS.FIXED_MAX_PEERS,
231-
BENCHMARK_CONSTANTS.FIXED_MAX_RETRY_ATTEMPTS,
232-
);
233-
const fetched = await executeTimeout(
234-
(_signal: AbortSignal) =>
235-
collector.collectTxs(
236-
RequestTracker.create(parsedTxHashes, new Date(Date.now() + internalTimeoutMs)),
237-
parsedProposal,
238-
pinnedPeer,
239-
),
240-
timeoutMs,
241-
() => new Error(`Collector timed out after ${timeoutMs}ms`),
242-
);
243-
fetchedCount = fetched.length;
244-
}
214+
const fetched = await executeTimeout(
215+
(_signal: AbortSignal) => {
216+
const tracker = RequestTracker.create(parsedTxHashes, new Date(Date.now() + internalTimeoutMs));
217+
const batchRequester = new BatchTxRequester(
218+
tracker,
219+
parsedProposal,
220+
pinnedPeer,
221+
p2pService,
222+
logger,
223+
new DateProvider(),
224+
{ txValidator: noopTxValidator },
225+
);
226+
return BatchTxRequester.collectAllTxs(batchRequester.run());
227+
},
228+
timeoutMs,
229+
() => new Error(`Collector timed out after ${timeoutMs}ms`),
230+
);
231+
fetchedCount = fetched.length;
245232
} catch (err: any) {
246233
logger.warn(`Collector error: ${err?.message ?? String(err)}`);
247234
}

yarn-project/p2p/src/client/test/tx_proposal_collector/proposal_tx_collector_worker_protocol.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,13 @@ import type { P2PConfig } from '../../../config.js';
55

66
export type SerializedP2PConfig = Omit<P2PConfig, 'peerIdPrivateKey'> & { peerIdPrivateKey?: string };
77

8-
export type CollectorType = 'batch-requester' | 'send-batch-request';
9-
108
export type WorkerCommand =
119
| { type: 'START'; requestId: string; clientIndex: number; config: SerializedP2PConfig }
1210
| { type: 'SET_TXS'; requestId: string; txs: string[]; mode?: 'replace' | 'append' }
1311
| { type: 'SET_BLOCK_PROPOSAL'; requestId: string; blockProposal: string }
1412
| {
1513
type: 'RUN_COLLECTOR';
1614
requestId: string;
17-
collectorType: CollectorType;
1815
txHashes: string[];
1916
blockProposal: string;
2017
pinnedPeerId?: string;

yarn-project/p2p/src/config.test.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,6 @@ describe('config', () => {
7979
expect(allowList).toEqual([instanceFunction]);
8080
});
8181

82-
it('defaults missing txs collector type to new', () => {
83-
const config = getP2PDefaultConfig();
84-
expect(config.txCollectionMissingTxsCollectorType).toBe('new');
85-
});
86-
8782
it('defaults public IP service URLs', () => {
8883
const config = getP2PDefaultConfig();
8984
expect(config.publicIpServices).toEqual([

yarn-project/p2p/src/msg_validators/tx_validator/factory.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import {
2323
createTxValidatorForAcceptingTxsOverRPC,
2424
createTxValidatorForBlockBuilding,
2525
createTxValidatorForBlockProposalReceivedTxs,
26-
createTxValidatorForReqResponseReceivedTxs,
26+
createTxValidatorForOnDemandReceivedTxs,
2727
createTxValidatorForTransactionsEnteringPendingTxPool,
2828
} from './factory.js';
2929
import { GasLimitsValidator, GasTxValidator, MaxFeePerGasValidator } from './gas_validator.js';
@@ -160,9 +160,9 @@ describe('Validator factory functions', () => {
160160
});
161161
});
162162

163-
describe('createTxValidatorForReqResponseReceivedTxs', () => {
163+
describe('createTxValidatorForOnDemandReceivedTxs', () => {
164164
it('contains well-formedness validators only', () => {
165-
const validator = createTxValidatorForReqResponseReceivedTxs(proofVerifier, {
165+
const validator = createTxValidatorForOnDemandReceivedTxs(proofVerifier, {
166166
l1ChainId: 1,
167167
rollupVersion: 2,
168168
});

yarn-project/p2p/src/msg_validators/tx_validator/factory.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ function createTxValidatorForMinimumTxIntegrityChecks(
234234
* are invalid against current state. State-dependent checks happen when the tx
235235
* enters the pending pool or during block building.
236236
*/
237-
export function createTxValidatorForReqResponseReceivedTxs(
237+
export function createTxValidatorForOnDemandReceivedTxs(
238238
verifier: ClientProtocolCircuitVerifier,
239239
{
240240
l1ChainId,

yarn-project/p2p/src/services/libp2p/libp2p_service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ import {
7474
createFirstStageTxValidationsForGossipedTransactions,
7575
createSecondStageTxValidationsForGossipedTransactions,
7676
createTxValidatorForBlockProposalReceivedTxs,
77-
createTxValidatorForReqResponseReceivedTxs,
77+
createTxValidatorForOnDemandReceivedTxs,
7878
} from '../../msg_validators/tx_validator/factory.js';
7979
import { GossipSubEvent } from '../../types/index.js';
8080
import { type PubSubLibp2p, convertToMultiaddr } from '../../util.js';
@@ -1680,7 +1680,7 @@ export class LibP2PService extends WithTracer implements P2PService {
16801680
}
16811681

16821682
protected createRequestedTxValidator(): TxValidator {
1683-
return createTxValidatorForReqResponseReceivedTxs(this.proofVerifier, {
1683+
return createTxValidatorForOnDemandReceivedTxs(this.proofVerifier, {
16841684
l1ChainId: this.config.l1ChainId,
16851685
rollupVersion: this.config.rollupVersion,
16861686
});

yarn-project/p2p/src/services/reqresp/batch-tx-requester/interface.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ export interface ITxMetadataCollection {
3232
*/
3333
export interface BatchTxRequesterLibP2PService {
3434
/** ReqResp interface for sending requests to peers */
35-
reqResp: Pick<ReqRespInterface, 'sendBatchRequest' | 'sendRequestToPeer'>;
35+
reqResp: Pick<ReqRespInterface, 'sendRequestToPeer'>;
3636
/** Connection sampler for getting peer lists */
3737
connectionSampler: Pick<ConnectionSampler, 'getPeerListSortedByConnectionCountAsc'>;
3838
/** Configuration needed for transaction validation */

yarn-project/p2p/src/services/reqresp/batch-tx-requester/tx_validator.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { ClientProtocolCircuitVerifier } from '@aztec/stdlib/interfaces/server';
22
import { Tx, type TxValidationResult, type TxValidator } from '@aztec/stdlib/tx';
33

4-
import { createTxValidatorForReqResponseReceivedTxs } from '../../../msg_validators/index.js';
4+
import { createTxValidatorForOnDemandReceivedTxs } from '../../../msg_validators/index.js';
55

66
export interface BatchRequestTxValidatorConfig {
77
l1ChainId: number;
@@ -29,7 +29,7 @@ export class BatchRequestTxValidator implements IBatchRequestTxValidator {
2929
}
3030

3131
static createRequestedTxValidator(config: BatchRequestTxValidatorConfig): TxValidator {
32-
return createTxValidatorForReqResponseReceivedTxs(config.proofVerifier, {
32+
return createTxValidatorForOnDemandReceivedTxs(config.proofVerifier, {
3333
l1ChainId: config.l1ChainId,
3434
rollupVersion: config.rollupVersion,
3535
});

0 commit comments

Comments
 (0)