diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index a91755a81b00..9e7c486033fc 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -269,7 +269,6 @@ export class P2PClient extends WithTracer implements P2P { throw new Error('Block stream not initialized'); } this.blockStream.start(); - await this.txCollection.start(); this.txFileStore?.start(); // Start slot monitor to call prepareForSlot when the slot changes diff --git a/yarn-project/p2p/src/services/tx_collection/config.ts b/yarn-project/p2p/src/services/tx_collection/config.ts index f8f5ceeea81f..68de3db09303 100644 --- a/yarn-project/p2p/src/services/tx_collection/config.ts +++ b/yarn-project/p2p/src/services/tx_collection/config.ts @@ -14,7 +14,7 @@ export type TxCollectionConfig = { txCollectionNodeRpcMaxBatchSize: number; /** A comma-separated list of file store URLs (s3://, gs://, file://, http://) for tx collection */ txCollectionFileStoreUrls: string[]; - /** Delay in ms before file store collection starts after fast collection is triggered */ + /** Delay in ms from reqresp start before file store collection begins */ txCollectionFileStoreFastDelayMs: number; /** Number of concurrent workers for fast file store collection */ txCollectionFileStoreFastWorkerCount: number; @@ -68,7 +68,7 @@ export const txCollectionConfigMappings: ConfigMappingsType }, txCollectionFileStoreFastDelayMs: { env: 'TX_COLLECTION_FILE_STORE_FAST_DELAY_MS', - description: 'Delay before file store collection starts after fast collection', + description: 'Delay in ms from reqresp start before file store collection begins', ...numberConfigHelper(2_000), }, txCollectionFileStoreFastWorkerCount: { diff --git a/yarn-project/p2p/src/services/tx_collection/fast_tx_collection.ts b/yarn-project/p2p/src/services/tx_collection/fast_tx_collection.ts deleted file mode 100644 index 7bcb1366342b..000000000000 --- a/yarn-project/p2p/src/services/tx_collection/fast_tx_collection.ts +++ /dev/null @@ -1,379 +0,0 @@ -import { BlockNumber } from '@aztec/foundation/branded-types'; -import { times } from '@aztec/foundation/collection'; -import { type Logger, createLogger } from '@aztec/foundation/log'; -import { sleep } from '@aztec/foundation/sleep'; -import { DateProvider, elapsed } from '@aztec/foundation/timer'; -import type { L2BlockInfo } from '@aztec/stdlib/block'; -import { type Tx, TxHash } from '@aztec/stdlib/tx'; - -import type { PeerId } from '@libp2p/interface'; - -import { BatchTxRequester } from '../reqresp/batch-tx-requester/batch_tx_requester.js'; -import type { BatchTxRequesterLibP2PService } from '../reqresp/batch-tx-requester/interface.js'; -import type { BlockTxsSource } from '../reqresp/index.js'; -import type { TxCollectionConfig } from './config.js'; -import { type IRequestTracker, RequestTracker } from './request_tracker.js'; -import type { FastCollectionRequest, FastCollectionRequestInput } from './tx_collection.js'; -import type { TxAddContext, TxCollectionSink } from './tx_collection_sink.js'; -import type { TxSource } from './tx_source.js'; - -/** - * Collect missing transactions for a block or proposal via reqresp. - * @param requestTracker - The missing transactions tracker - * @param blockTxsSource - The block or proposal containing the transactions - * @param pinnedPeer - Optional peer expected to have the transactions - * @returns The collected transactions - */ -export type IReqRespTxsCollector = ( - requestTracker: IRequestTracker, - blockTxsSource: BlockTxsSource, - pinnedPeer: PeerId | undefined, -) => Promise; - -export class FastTxCollection { - // eslint-disable-next-line aztec-custom/no-non-primitive-in-collections - protected requests: Set = new Set(); - - constructor( - private readonly p2pService: BatchTxRequesterLibP2PService, - private nodes: TxSource[], - private txCollectionSink: TxCollectionSink, - private config: TxCollectionConfig, - private dateProvider: DateProvider = new DateProvider(), - private log: Logger = createLogger('p2p:tx_collection_service'), - protected reqRespTxsCollector?: IReqRespTxsCollector, - ) { - if (!this.reqRespTxsCollector) { - this.reqRespTxsCollector = (requestTracker, blockTxsSource, pinnedPeer) => - BatchTxRequester.collectAllTxs( - new BatchTxRequester( - requestTracker, - blockTxsSource, - pinnedPeer, - this.p2pService, - this.log, - this.dateProvider, - ).run(), - ); - } - } - - public async stop() { - this.requests.forEach(request => { - request.requestTracker.cancel(); - }); - await Promise.resolve(); - } - - public getFastCollectionRequests() { - return this.requests; - } - - public async collectFastFor( - input: FastCollectionRequestInput, - txHashes: TxHash[] | string[], - opts: { deadline: Date; pinnedPeer?: PeerId }, - ) { - const timeout = opts.deadline.getTime() - this.dateProvider.now(); - if (timeout <= 0) { - this.log.warn(`Deadline for fast tx collection is in the past (${timeout}ms)`, { - deadline: opts.deadline.getTime(), - now: this.dateProvider.now(), - }); - return []; - } - - const blockInfo: L2BlockInfo = - input.type === 'proposal' - ? { ...input.blockProposal.toBlockInfo(), blockNumber: input.blockNumber } - : { ...input.block.toBlockInfo() }; - - const request: FastCollectionRequest = { - ...input, - blockInfo, - requestTracker: RequestTracker.create(txHashes, opts.deadline, this.dateProvider), - }; - - const [duration] = await elapsed(() => this.collectFast(request, { ...opts })); - - this.log.verbose( - `Collected ${request.requestTracker.collectedTxs.length} txs out of ${txHashes.length} for ${input.type} at slot ${blockInfo.slotNumber}`, - { - ...blockInfo, - duration, - requestType: input.type, - missingTxs: [...request.requestTracker.missingTxHashes], - }, - ); - return request.requestTracker.collectedTxs; - } - - protected async collectFast(request: FastCollectionRequest, opts: { pinnedPeer?: PeerId }) { - this.requests.add(request); - const { blockInfo } = request; - - this.log.debug( - `Starting fast collection of ${request.requestTracker.numberOfMissingTxs} txs for ${request.type} at slot ${blockInfo.slotNumber}`, - { ...blockInfo, requestType: request.type, deadline: request.requestTracker.deadline }, - ); - - try { - // Start blasting all nodes for the txs. We give them a little time to respond before we start reqresp. - // We race against the cancellation token to exit as soon as all txs are collected, the deadline expires, - // or the request is externally cancelled. - const nodeCollectionPromise = this.collectFastFromNodes(request); - const waitBeforeReqResp = sleep(this.config.txCollectionFastNodesTimeoutBeforeReqRespMs); - await Promise.race([request.requestTracker.cancellationToken, waitBeforeReqResp]); - - // If we have collected all txs or the request was cancelled, we can stop here. - // Wait for node collection to settle so inner tasks finish before we return. - if (request.requestTracker.checkCancelled()) { - if (request.requestTracker.allFetched()) { - this.log.debug(`All txs collected for slot ${blockInfo.slotNumber} without reqresp`, blockInfo); - } - await nodeCollectionPromise; - return; - } - - // Start blasting reqresp for the remaining txs. Note that node collection keeps running in parallel. - // We stop when we have collected all txs, timed out, or both node collection and reqresp have given up. - // Inner tasks observe requestTracker.checkCancelled() and stop themselves, so this settles shortly after cancellation. - await Promise.allSettled([this.collectFastViaReqResp(request, opts), nodeCollectionPromise]); - } catch (err) { - this.log.error(`Error collecting txs for ${request.type} for slot ${blockInfo.slotNumber}`, err, { - ...blockInfo, - missingTxs: request.requestTracker.missingTxHashes.values().map(txHash => txHash.toString()), - }); - } finally { - // Ensure no unresolved promises and remove the request from the set - request.requestTracker.cancel(); - this.requests.delete(request); - } - } - - /** - * Starts collecting txs from all configured nodes. We send `txCollectionFastMaxParallelRequestsPerNode` requests - * in parallel to each node. We keep track of the number of attempts made to collect each tx, so we can prioritize - * the txs that have been requested less often whenever we need to send a new batch of requests. We ensure that no - * tx is requested more than once at the same time to the same node. - */ - private async collectFastFromNodes(request: FastCollectionRequest): Promise { - if (this.nodes.length === 0) { - return; - } - - // Keep a shared priority queue of all txs pending to be requested, sorted by the number of attempts made to collect them. - const attemptsPerTx = [...request.requestTracker.missingTxHashes].map(txHash => ({ - txHash, - attempts: 0, - found: false, - })); - - // Returns once we have finished all node loops. Each loop finishes when the deadline is hit, or all txs have been collected. - await Promise.allSettled(this.nodes.map(node => this.collectFastFromNode(request, node, attemptsPerTx))); - } - - private async collectFastFromNode( - request: FastCollectionRequest, - node: TxSource, - attemptsPerTx: { txHash: string; attempts: number; found: boolean }[], - ) { - const notFinished = () => !request.requestTracker.checkCancelled(); - - const maxParallelRequests = this.config.txCollectionFastMaxParallelRequestsPerNode; - const maxBatchSize = this.config.txCollectionNodeRpcMaxBatchSize; - const activeRequestsToThisNode = new Set(); // Track the txs being actively requested to this node - - const processBatch = async () => { - while (notFinished()) { - // Pull tx hashes from the attemptsPerTx array, which is sorted by attempts, - // so we prioritize txs that have been requested less often. - const batch = []; - let index = 0; - while (batch.length < maxBatchSize) { - const txToRequest = attemptsPerTx[index++]; - if (!txToRequest) { - // No more txs to process - break; - } else if (!request.requestTracker.isMissing(txToRequest.txHash)) { - // Mark as found if it was found somewhere else, we'll then remove it from the array. - // We don't delete it now since 'array.splice' is pretty expensive, so we do it after sorting. - txToRequest.found = true; - } else if (!activeRequestsToThisNode.has(txToRequest.txHash)) { - // If the tx is not alredy being requested to this node, add it to the current batch and increase attempts. - // Note that we increase the attempts *before* making the request, so the next `collectFastFromNode` that - // needs to grab txs to send, will pick txs that have been requested less often, instead of all requesting - // the same txs at the same time. - batch.push(txToRequest); - activeRequestsToThisNode.add(txToRequest.txHash); - txToRequest.attempts++; - } - } - - // After modifying the array by removing txs or updating attempts, re-sort it and trim the found txs from the end. - attemptsPerTx.sort((a, b) => - a.found === b.found ? a.attempts - b.attempts : Number(a.found) - Number(b.found), - ); - const firstFoundTxIndex = attemptsPerTx.findIndex(tx => tx.found); - if (firstFoundTxIndex !== -1) { - attemptsPerTx.length = firstFoundTxIndex; - } - - // If we see no more txs to request, we can stop this "process" loop - if (batch.length === 0) { - return; - } - - const txHashes = batch.map(({ txHash }) => txHash); - // Collect this batch from the node - await this.txCollectionSink.collect( - async () => { - const result = await node.getTxsByHash(txHashes.map(TxHash.fromString)); - for (const tx of result.validTxs) { - request.requestTracker.markFetched(tx); - } - return result; - }, - txHashes, - { - description: `fast ${node.getInfo()}`, - node: node.getInfo(), - method: 'fast-node-rpc', - ...request.blockInfo, - }, - this.getAddContext(request), - ); - - // Clear from the active requests the txs we just requested - for (const requestedTx of batch) { - activeRequestsToThisNode.delete(requestedTx.txHash); - } - - // Sleep a bit until hitting the node again, but wake up immediately on cancellation - if (notFinished()) { - await Promise.race([ - sleep(this.config.txCollectionFastNodeIntervalMs), - request.requestTracker.cancellationToken, - ]); - } - } - }; - - // Kick off N parallel requests to the node, up to the maxParallelRequests limit - await Promise.all(times(maxParallelRequests, processBatch)); - } - - private async collectFastViaReqResp(request: FastCollectionRequest, opts: { pinnedPeer?: PeerId }) { - const pinnedPeer = opts.pinnedPeer; - const blockInfo = request.blockInfo; - const slotNumber = blockInfo.slotNumber; - if (request.requestTracker.timeoutMs < 100) { - this.log.warn( - `Not initiating fast reqresp for txs for ${request.type} at slot ${blockInfo.slotNumber} due to timeout`, - { timeoutMs: request.requestTracker.timeoutMs, ...blockInfo }, - ); - return; - } - - this.log.debug( - `Starting fast reqresp for ${request.requestTracker.numberOfMissingTxs} txs for ${request.type} at slot ${blockInfo.slotNumber}`, - { ...blockInfo, timeoutMs: request.requestTracker.timeoutMs, pinnedPeer }, - ); - - try { - await this.txCollectionSink.collect( - async () => { - let blockTxsSource: BlockTxsSource; - if (request.type === 'proposal') { - blockTxsSource = request.blockProposal; - } else if (request.type === 'block') { - blockTxsSource = { - txHashes: request.block.body.txEffects.map(e => e.txHash), - archive: request.block.archive.root, - }; - } else { - throw new Error(`Unknown request type: ${(request as { type: string }).type}`); - } - - const result = await this.reqRespTxsCollector!(request.requestTracker, blockTxsSource, pinnedPeer); - return { validTxs: result, invalidTxHashes: [] }; - }, - Array.from(request.requestTracker.missingTxHashes), - { description: `reqresp for slot ${slotNumber}`, method: 'fast-req-resp', ...opts, ...request.blockInfo }, - this.getAddContext(request), - ); - } catch (err) { - this.log.error(`Error sending fast reqresp request for txs`, err, { - txs: [...request.requestTracker.missingTxHashes], - ...blockInfo, - }); - } - } - - /** Returns the TxAddContext for the given request, used by the sink to add txs to the pool correctly. */ - private getAddContext(request: FastCollectionRequest): TxAddContext { - if (request.type === 'proposal') { - return { type: 'proposal', blockHeader: request.blockProposal.blockHeader }; - } else { - return { type: 'mined', block: request.block }; - } - } - - /** - * Handle txs by marking them as found for the requests that are waiting for them, and resolves the request if all its txs have been found. - * Called internally and from the main tx collection manager whenever the tx pool emits a tx-added event. - */ - public foundTxs(txs: Tx[]) { - for (const request of this.requests) { - for (const tx of txs) { - const txHash = tx.txHash.toString(); - // Remove the tx hash from the missing set, and add it to the found set. - if (request.requestTracker.markFetched(tx)) { - this.log.trace(`Found tx ${txHash} for fast collection request`, { - ...request.blockInfo, - txHash: tx.txHash.toString(), - type: request.type, - }); - if (request.requestTracker.allFetched()) { - this.log.trace(`All txs found for fast collection request`, { - ...request.blockInfo, - type: request.type, - }); - break; - } - } - } - } - } - - /** Returns the tx hashes that are still missing (from all requests). */ - public getMissingTxHashes(): TxHash[] { - return Array.from(this.requests.values()).flatMap(request => - Array.from(request.requestTracker.missingTxHashes).map(TxHash.fromString), - ); - } - - /** - * Stop collecting all txs for blocks less than or requal to the block number specified. - * To be called when we no longer care about gathering txs up to a certain block, eg when they become proven or finalized. - */ - public stopCollectingForBlocksUpTo(blockNumber: BlockNumber): void { - for (const request of this.requests) { - if (request.blockInfo.blockNumber <= blockNumber) { - request.requestTracker.cancel(); - } - } - } - - /** - * Stop collecting all txs for blocks greater than the block number specified. - * To be called when there is a chain prune and previously mined txs are no longer relevant. - */ - public stopCollectingForBlocksAfter(blockNumber: BlockNumber): void { - for (const request of this.requests) { - if (request.blockInfo.blockNumber > blockNumber) { - request.requestTracker.cancel(); - } - } - } -} diff --git a/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts index eacef127cfb2..f41512a1dbca 100644 --- a/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts +++ b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts @@ -12,6 +12,7 @@ import { type MockProxy, mock } from 'jest-mock-extended'; import type { TxPoolV2 } from '../../mem_pools/tx_pool_v2/interfaces.js'; import { type FileStoreCollectionConfig, FileStoreTxCollection } from './file_store_tx_collection.js'; import type { FileStoreTxSource } from './file_store_tx_source.js'; +import { type IRequestTracker, RequestTracker } from './request_tracker.js'; import { type TxAddContext, TxCollectionSink } from './tx_collection_sink.js'; describe('FileStoreTxCollection', () => { @@ -26,6 +27,11 @@ describe('FileStoreTxCollection', () => { let txs: Tx[]; let txHashes: TxHash[]; + let requestTracker: IRequestTracker; + + // Track in-flight startCollecting invocations so afterEach can shut them down cleanly. + let activeTrackers: IRequestTracker[]; + let activePromises: Promise[]; const makeFileStoreSource = (name: string) => { const source = mock(); @@ -49,6 +55,14 @@ describe('FileStoreTxCollection', () => { }); }; + /** Spawns a collection run and registers it for afterEach cleanup. */ + const startCollecting = (tracker: IRequestTracker, ctx: TxAddContext): Promise => { + activeTrackers.push(tracker); + const promise = fileStoreCollection.startCollecting(tracker, ctx); + activePromises.push(promise); + return promise; + }; + /** Waits for the sink to emit txs-added events for the expected number of txs. */ const waitForTxsAdded = (expectedCount: number) => { const { promise, resolve } = promiseWithResolvers(); @@ -102,33 +116,38 @@ describe('FileStoreTxCollection', () => { const block = await L2Block.random(BlockNumber(1)); context = { type: 'mined', block }; deadline = new Date(dateProvider.now() + 60 * 60 * 1000); + requestTracker = RequestTracker.create(txHashes, deadline, dateProvider); + + activeTrackers = []; + activePromises = []; }); afterEach(async () => { - await fileStoreCollection.stop(); + for (const t of activeTrackers) { + t.cancel(); + } + await Promise.allSettled(activePromises); jest.restoreAllMocks(); }); it('downloads txs when startCollecting is called', async () => { setFileStoreTxs(fileStoreSources[0], txs); - fileStoreCollection.start(); - const txsAddedPromise = waitForTxsAdded(txs.length); - fileStoreCollection.startCollecting(txHashes, context, deadline); + void startCollecting(requestTracker, context); await txsAddedPromise; expect(fileStoreSources[0].getTxsByHash).toHaveBeenCalled(); expect(txPool.addMinedTxs).toHaveBeenCalled(); }); - it('skips txs marked as found', async () => { + it('skips txs already marked fetched on the tracker', async () => { setFileStoreTxs(fileStoreSources[0], txs); - fileStoreCollection.start(); + // Mark first tx as found before queueing so it's never queued in the first place + requestTracker.markFetched(txs[0]); - fileStoreCollection.startCollecting(txHashes, context, deadline); - fileStoreCollection.foundTxs([txs[0]]); + void startCollecting(requestTracker, context); const txsAddedPromise = waitForTxsAdded(2); await txsAddedPromise; @@ -145,53 +164,25 @@ describe('FileStoreTxCollection', () => { // Pin random so we always start at source 0, ensuring we test the fallback to source 1 jest.spyOn(Math, 'random').mockReturnValue(0); - fileStoreCollection.start(); - + const tracker = RequestTracker.create([txHashes[0]], deadline, dateProvider); const txsAddedPromise = waitForTxsAdded(1); - fileStoreCollection.startCollecting([txHashes[0]], context, deadline); + void startCollecting(tracker, context); await txsAddedPromise; // Both stores should have been tried expect(fileStoreSources[0].getTxsByHash).toHaveBeenCalled(); expect(fileStoreSources[1].getTxsByHash).toHaveBeenCalled(); expect(txPool.addMinedTxs).toHaveBeenCalled(); - - jest.restoreAllMocks(); }); - it('does not start workers if no file store sources are configured', () => { + it('does not start workers if no file store sources are configured', async () => { const log = createLogger('test'); fileStoreCollection = new FileStoreTxCollection([], txCollectionSink, config, dateProvider, log); - fileStoreCollection.start(); - fileStoreCollection.startCollecting(txHashes, context, deadline); - - // With no sources, start() is a no-op (no workers spawned) and startCollecting() returns - // immediately, so no calls should have been made synchronously. - expect(fileStoreSources[0].getTxsByHash).not.toHaveBeenCalled(); - }); - - it('does not re-queue txs that are already pending', async () => { - setFileStoreTxs(fileStoreSources[0], txs); - setFileStoreTxs(fileStoreSources[1], txs); - - // Use single worker for deterministic behavior - const log = createLogger('test'); - config = { workerCount: 1, backoffBaseMs: 1000, backoffMaxMs: 5000 }; - fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log); - - fileStoreCollection.start(); - - const txsAddedPromise = waitForTxsAdded(txs.length); - fileStoreCollection.startCollecting(txHashes, context, deadline); - fileStoreCollection.startCollecting(txHashes, context, deadline); // Duplicate call + // With no sources, startCollecting resolves immediately without making any calls. + await startCollecting(requestTracker, context); - await txsAddedPromise; - - // With 1 worker processing sequentially, each tx should be found on the first source. - // Duplicate startCollecting should not create extra entries. - const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls); - expect(allCalls.length).toBe(txHashes.length); + expect(fileStoreSources[0].getTxsByHash).not.toHaveBeenCalled(); }); it('retries across sources when tx is not found initially', async () => { @@ -200,10 +191,9 @@ describe('FileStoreTxCollection', () => { config = { workerCount: 1, backoffBaseMs: 100, backoffMaxMs: 500 }; fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log); - fileStoreCollection.start(); - // Initially both sources return empty - fileStoreCollection.startCollecting([txHashes[0]], context, deadline); + const tracker = RequestTracker.create([txHashes[0]], deadline, dateProvider); + void startCollecting(tracker, context); // Wait for first full cycle (2 sources = 2 calls) await waitForSourceCalls(fileStoreSources, 2); @@ -220,88 +210,54 @@ describe('FileStoreTxCollection', () => { expect(txPool.addMinedTxs).toHaveBeenCalled(); }); - it('expires entries past deadline', async () => { - const log = createLogger('test'); - config = { workerCount: 1, backoffBaseMs: 50, backoffMaxMs: 100 }; - fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log); - - // Set a very short deadline - const shortDeadline = new Date(dateProvider.now() + 100); - - fileStoreCollection.start(); - fileStoreCollection.startCollecting([txHashes[0]], context, shortDeadline); - - // Wait for first full cycle (2 sources = 2 calls) - await waitForSourceCalls(fileStoreSources, 2); - - // Advance time past the deadline - dateProvider.setTime(dateProvider.now() + 200); - - // Clear mocks so we can distinguish new calls from old ones - jest.clearAllMocks(); - - // Add a new entry with a valid deadline and set up source to return it. - // This proves the worker is alive and the expired entry was cleaned up. - setFileStoreTxs(fileStoreSources[0], [txs[1]]); - const txsAddedPromise = waitForTxsAdded(1); - fileStoreCollection.startCollecting([txHashes[1]], context, deadline); - await txsAddedPromise; - - // Only txHashes[1] should have been requested after clearing mocks - const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls); - const requestedHashes = allCalls.flat().flat(); - expect(requestedHashes).not.toContainEqual(txHashes[0]); - expect(requestedHashes).toContainEqual(txHashes[1]); - }); - - it('does not start collecting if deadline is in the past', () => { - const pastDeadline = new Date(dateProvider.now() - 1000); + it('does not start collecting if tracker is already cancelled', async () => { + requestTracker.cancel(); - fileStoreCollection.start(); - fileStoreCollection.startCollecting(txHashes, context, pastDeadline); + await startCollecting(requestTracker, context); - // startCollecting returns immediately without adding entries when deadline is past + // startCollecting returns immediately without spawning workers when tracker is cancelled expect(fileStoreSources[0].getTxsByHash).not.toHaveBeenCalled(); }); - it('foundTxs stops retry for found txs', async () => { + it('stops trying for txs marked fetched on the tracker after queuing', async () => { const log = createLogger('test'); config = { workerCount: 1, backoffBaseMs: 50, backoffMaxMs: 100 }; fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log); setFileStoreTxs(fileStoreSources[0], [txs[1]]); - fileStoreCollection.start(); - fileStoreCollection.startCollecting(txHashes, context, deadline); + void startCollecting(requestTracker, context); - // Mark first tx as found - fileStoreCollection.foundTxs([txs[0]]); + // Externally mark tx[0] as found via the tracker (simulating node/reqresp/gossip finding it). + // startCollecting yields before spawning workers, so this runs before any source call is made. + requestTracker.markFetched(txs[0]); const txsAddedPromise = waitForTxsAdded(1); await txsAddedPromise; - // tx[0] should never have been attempted + // tx[0] should never have been attempted by the file store const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls); const requestedHashes = allCalls.flat().flat(); expect(requestedHashes).not.toContainEqual(txHashes[0]); }); - it('clearPending removes all entries', async () => { - fileStoreCollection.start(); - fileStoreCollection.startCollecting(txHashes, context, deadline); - fileStoreCollection.clearPending(); + it('workers exit when tracker is cancelled', async () => { + // Long backoff so workers spend most of their time sleeping after a single attempt + const log = createLogger('test'); + config = { workerCount: 2, backoffBaseMs: 60_000, backoffMaxMs: 60_000 }; + fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log); + + // Pre-set the tracker timer so a cancellation does not require real-time deadline expiry + const tracker = RequestTracker.create(txHashes, deadline, dateProvider); + const promise = startCollecting(tracker, context); - // Verify workers are alive but the cleared entries are gone by adding - // a new entry and confirming only it gets processed. - setFileStoreTxs(fileStoreSources[0], [txs[0]]); - const txsAddedPromise = waitForTxsAdded(1); - fileStoreCollection.startCollecting([txHashes[0]], context, deadline); - await txsAddedPromise; + // Let workers do at least one round of attempts + await waitForSourceCalls(fileStoreSources, 2); - // Only the newly added tx[0] should have been requested, not all 3 original txs - const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls); - const requestedHashes = allCalls.flat().flat(); - expect(requestedHashes).not.toContainEqual(txHashes[1]); - expect(requestedHashes).not.toContainEqual(txHashes[2]); + tracker.cancel(); + + // The startCollecting promise resolves once all workers settle. Without this guarantee, the + // test would either hang or leak workers — both are caught by Jest's default timeout. + await promise; }); }); diff --git a/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts index 165ba3d9928a..abaf1b64ad6e 100644 --- a/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts +++ b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts @@ -1,10 +1,11 @@ +import { times } from '@aztec/foundation/collection'; import { type Logger, createLogger } from '@aztec/foundation/log'; -import { type PromiseWithResolvers, promiseWithResolvers } from '@aztec/foundation/promise'; import { sleep } from '@aztec/foundation/sleep'; import { DateProvider } from '@aztec/foundation/timer'; -import { Tx, TxHash } from '@aztec/stdlib/tx'; +import { TxHash } from '@aztec/stdlib/tx'; import type { FileStoreTxSource } from './file_store_tx_source.js'; +import type { IRequestTracker } from './request_tracker.js'; import type { TxAddContext, TxCollectionSink } from './tx_collection_sink.js'; /** Configuration for a FileStoreTxCollection instance. */ @@ -16,8 +17,6 @@ export type FileStoreCollectionConfig = { type FileStoreTxEntry = { txHash: string; - context: TxAddContext; - deadline: Date; attempts: number; lastAttemptTime: number; nextSourceIndex: number; @@ -25,96 +24,60 @@ type FileStoreTxEntry = { /** * Collects txs from file stores as a fallback after P2P methods have been tried. - * Uses a shared worker pool that pulls entries with priority (fewest attempts first), - * retries with round-robin across sources, and applies exponential backoff between - * full cycles through all sources. + * Each call to startCollecting spins up its own worker pool which pulls entries with priority + * (fewest attempts first), retries with round-robin across sources, and applies exponential + * backoff between full cycles through all sources. Workers self-terminate when the request + * tracker is cancelled (deadline / all-fetched / external) or when there is nothing left to do. */ export class FileStoreTxCollection { - /** Map from tx hash string to entry for all pending downloads. */ - private entries = new Map(); - - /** Worker promises for the shared worker pool. */ - private workers: Promise[] = []; - - /** Whether the worker pool is running. */ - private running = false; - - /** Signal used to wake sleeping workers when new entries arrive or stop is called. */ - private wakeSignal: PromiseWithResolvers; - constructor( private readonly sources: FileStoreTxSource[], private readonly txCollectionSink: TxCollectionSink, private readonly config: FileStoreCollectionConfig, private readonly dateProvider: DateProvider = new DateProvider(), private readonly log: Logger = createLogger('p2p:file_store_tx_collection'), - ) { - this.wakeSignal = promiseWithResolvers(); - } - - /** Starts the shared worker pool. */ - public start(): void { - if (this.sources.length === 0) { - this.log.debug('No file store sources configured'); - return; - } - this.running = true; - for (let i = 0; i < this.config.workerCount; i++) { - this.workers.push(this.workerLoop()); - } - } - - /** Stops all workers and clears state. */ - public async stop(): Promise { - this.running = false; - this.wake(); - await Promise.all(this.workers); - this.workers = []; - this.entries.clear(); - } - - /** Adds entries to the shared map and wakes workers. */ - public startCollecting(txHashes: TxHash[], context: TxAddContext, deadline: Date): void { - if (this.sources.length === 0 || txHashes.length === 0) { - return; - } - if (+deadline <= this.dateProvider.now()) { + ) {} + + /** + * Spins up workers to download all txs still missing from the tracker, racing across the + * configured file store sources. Resolves once all workers settle. + */ + public async startCollecting(requestTracker: IRequestTracker, context: TxAddContext): Promise { + if (this.sources.length === 0 || requestTracker.checkCancelled()) { return; } - for (const txHash of txHashes) { - const hashStr = txHash.toString(); - if (!this.entries.has(hashStr)) { - this.entries.set(hashStr, { - txHash: hashStr, - context, - deadline, - attempts: 0, - lastAttemptTime: 0, - nextSourceIndex: Math.floor(Math.random() * this.sources.length), - }); - } + // eslint-disable-next-line aztec-custom/no-non-primitive-in-collections + const entries: Set = new Set(); + for (const hashStr of requestTracker.missingTxHashes) { + entries.add({ + txHash: hashStr, + attempts: 0, + lastAttemptTime: 0, + nextSourceIndex: Math.floor(Math.random() * this.sources.length), + }); } - this.wake(); - } - /** Removes entries for txs that have been found elsewhere. */ - public foundTxs(txs: Tx[]): void { - for (const tx of txs) { - this.entries.delete(tx.getTxHash().toString()); + // Yield before spawning so the synchronous caller can finish any follow-up (eg. marking a tx + // as fetched on the tracker, or cancelling it) before workers begin scanning entries. + await Promise.resolve(); + if (requestTracker.checkCancelled()) { + return; } - } - /** Clears all pending entries. */ - public clearPending(): void { - this.entries.clear(); + await Promise.allSettled(times(this.config.workerCount, () => this.workerLoop(entries, requestTracker, context))); } - private async workerLoop(): Promise { - while (this.running) { - const action = this.getNextAction(); + private async workerLoop( + // eslint-disable-next-line aztec-custom/no-non-primitive-in-collections + entries: Set, + requestTracker: IRequestTracker, + context: TxAddContext, + ): Promise { + while (!requestTracker.checkCancelled() && entries.size > 0) { + const action = this.getNextAction(entries, requestTracker); if (action.type === 'sleep') { - await action.promise; + await Promise.race([sleep(action.ms), requestTracker.cancellationToken]); continue; } @@ -133,10 +96,10 @@ export class FileStoreTxCollection { method: 'file-store', fileStore: source.getInfo(), }, - entry.context, + context, ); if (result.txs.length > 0) { - this.entries.delete(entry.txHash); + entries.delete(entry); } } catch (err) { this.log.trace(`Error downloading tx ${entry.txHash} from ${source.getInfo()}`, { err }); @@ -144,15 +107,20 @@ export class FileStoreTxCollection { } } - /** Single-pass scan: removes expired entries, finds the best ready entry, or computes sleep time. */ - private getNextAction(): { type: 'process'; entry: FileStoreTxEntry } | { type: 'sleep'; promise: Promise } { + /** Single-pass scan: removes stale entries, finds the best ready entry, or computes sleep time. */ + private getNextAction( + // eslint-disable-next-line aztec-custom/no-non-primitive-in-collections + entries: Set, + requestTracker: IRequestTracker, + ): { type: 'process'; entry: FileStoreTxEntry } | { type: 'sleep'; ms: number } { const now = this.dateProvider.now(); let best: FileStoreTxEntry | undefined; let earliestReadyAt = Infinity; - for (const [key, entry] of this.entries) { - if (+entry.deadline <= now) { - this.entries.delete(key); + for (const entry of entries) { + // Drop entries whose tx was already found via another collection path. + if (!requestTracker.isMissing(entry.txHash)) { + entries.delete(entry); continue; } const backoffMs = this.getBackoffMs(entry); @@ -169,10 +137,9 @@ export class FileStoreTxCollection { if (best) { return { type: 'process', entry: best }; } - if (earliestReadyAt < Infinity) { - return { type: 'sleep', promise: this.sleepOrWake(earliestReadyAt - now) }; - } - return { type: 'sleep', promise: this.waitForWake() }; + // earliestReadyAt is finite whenever there are surviving entries; if entries became empty, + // the outer worker loop will exit on its next iteration via entries.size === 0. + return { type: 'sleep', ms: earliestReadyAt === Infinity ? 0 : earliestReadyAt - now }; } /** Computes backoff for an entry. Backoff applies after a full cycle through all sources. */ @@ -183,20 +150,4 @@ export class FileStoreTxCollection { } return Math.min(this.config.backoffBaseMs * Math.pow(2, fullCycles - 1), this.config.backoffMaxMs); } - - /** Resolves the current wake signal and creates a new one. */ - private wake(): void { - this.wakeSignal.resolve(); - this.wakeSignal = promiseWithResolvers(); - } - - /** Waits until the wake signal is resolved. */ - private async waitForWake(): Promise { - await this.wakeSignal.promise; - } - - /** Sleeps for the given duration or until the wake signal is resolved. */ - private async sleepOrWake(ms: number): Promise { - await Promise.race([sleep(ms), this.wakeSignal.promise]); - } } diff --git a/yarn-project/p2p/src/services/tx_collection/index.ts b/yarn-project/p2p/src/services/tx_collection/index.ts index 4f151c32e27f..293ebdde7ab3 100644 --- a/yarn-project/p2p/src/services/tx_collection/index.ts +++ b/yarn-project/p2p/src/services/tx_collection/index.ts @@ -1,4 +1,3 @@ -export { TxCollection, type FastCollectionRequestInput } from './tx_collection.js'; -export { type IReqRespTxsCollector } from './fast_tx_collection.js'; +export { TxCollection, type FastCollectionRequestInput, type IReqRespTxsCollector } from './tx_collection.js'; export { type TxSource, createNodeRpcTxSources, NodeRpcTxSource } from './tx_source.js'; export { FileStoreTxSource, createFileStoreTxSources } from './file_store_tx_source.js'; diff --git a/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts b/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts index 750e09e34fb3..5cb61cbeedd9 100644 --- a/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts +++ b/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts @@ -16,9 +16,8 @@ import type { TxPoolV2, TxPoolV2Events } from '../../mem_pools/tx_pool_v2/interf import type { BatchTxRequesterLibP2PService } from '../reqresp/batch-tx-requester/interface.js'; import type { BlockTxsSource } from '../reqresp/protocols/block_txs/block_txs_reqresp.js'; import { type TxCollectionConfig, txCollectionConfigMappings } from './config.js'; -import { FastTxCollection, type IReqRespTxsCollector } from './fast_tx_collection.js'; import type { FileStoreTxSource } from './file_store_tx_source.js'; -import { type FastCollectionRequest, TxCollection } from './tx_collection.js'; +import { type FastCollectionRequest, type IReqRespTxsCollector, TxCollection } from './tx_collection.js'; import type { TxSource } from './tx_source.js'; describe('TxCollection', () => { @@ -95,7 +94,7 @@ describe('TxCollection', () => { const setReqRespResponse = (promise: Promise) => { let lastArgs: Parameters | undefined; - txCollection.fastCollection.reqRespTxsCollector = jest.fn().mockImplementation((...x) => { + txCollection.reqRespTxsCollector = jest.fn().mockImplementation((...x) => { lastArgs = x; return promise; }); @@ -147,16 +146,16 @@ describe('TxCollection', () => { setReqRespTxs([]); }); - afterEach(async () => { - await txCollection.stop(); + afterEach(() => { + txCollection.stop(); }); - describe('fast collection', () => { + describe('fast tx collection', () => { it('collects txs from nodes only', async () => { setNodeTxs(nodes[0], txs); const collected = await txCollection.collectFastForBlock(block, txHashes, { deadline }); expect(nodes[0].getTxsByHash).toHaveBeenCalledWith(txHashes); - expect(txCollection.fastCollection.reqRespTxsCollector).not.toHaveBeenCalled(); + expect(txCollection.reqRespTxsCollector).not.toHaveBeenCalled(); expectTxsMinedInPool(txs); expect(collected).toEqual(txs); }); @@ -191,7 +190,7 @@ describe('TxCollection', () => { const collected = await txCollection.collectFastForBlock(block, txHashes, { deadline }); expect(nodes[0].getTxsByHash).toHaveBeenCalledWith(txHashes); expect(nodes[1].getTxsByHash).toHaveBeenCalledWith(txHashes); - expect(txCollection.fastCollection.reqRespTxsCollector).toHaveBeenCalledTimes(1); + expect(txCollection.reqRespTxsCollector).toHaveBeenCalledTimes(1); expectLastReqRespCollectorArgs(argsGetter); expectTxsMinedInPool([txs[0]]); expectTxsMinedInPool([txs[1]]); @@ -203,12 +202,26 @@ describe('TxCollection', () => { txCollection = new TestTxCollection(mockP2PService, [], constants, txPool, config, [], dateProvider); const argsGetter = setReqRespTxs(txs); const collected = await txCollection.collectFastForBlock(block, txHashes, { deadline }); - expect(txCollection.fastCollection.reqRespTxsCollector).toHaveBeenCalledTimes(1); + expect(txCollection.reqRespTxsCollector).toHaveBeenCalledTimes(1); expectLastReqRespCollectorArgs(argsGetter); expectTxsMinedInPool(txs); expect(collected).toEqual(txs); }); + it('starts reqresp immediately when no nodes are configured', async () => { + // Large initial wait — if reqresp were gated by it, the collection would take ~10s. + config = { ...config, txCollectionFastNodesTimeoutBeforeReqRespMs: 10_000 }; + txCollection = new TestTxCollection(mockP2PService, [], constants, txPool, config, [], dateProvider); + setReqRespTxs(txs); + + const startTime = dateProvider.now(); + const collected = await txCollection.collectFastForBlock(block, txHashes, { deadline }); + + expect(txCollection.reqRespTxsCollector).toHaveBeenCalledTimes(1); + expect(dateProvider.now() - startTime).toBeLessThan(1000); + expect(collected).toEqual(txs); + }); + it('keeps retrying txs not found until deadline', async () => { deadline = new Date(dateProvider.now() + 2000); setNodeTxs(nodes[0], [txs[0]]); @@ -219,7 +232,7 @@ describe('TxCollection', () => { expect(dateProvider.now()).toBeGreaterThanOrEqual(+deadline - 5); expect(nodes[0].getTxsByHash).toHaveBeenCalledWith(txHashes); expect(nodes[0].getTxsByHash).toHaveBeenCalledWith([txHashes[2]]); - expect(txCollection.fastCollection.reqRespTxsCollector).toHaveBeenCalledTimes(1); + expect(txCollection.reqRespTxsCollector).toHaveBeenCalledTimes(1); expectLastReqRespCollectorArgs(argsGetter); expectTxsMinedInPool([txs[0]]); expectTxsMinedInPool([txs[1]]); @@ -274,15 +287,15 @@ describe('TxCollection', () => { const collected = await txCollection.collectFastForBlock(block, txHashes, { deadline }); expect(collected).toEqual([]); expect(nodes[0].getTxsByHash).not.toHaveBeenCalled(); - expect(txCollection.fastCollection.reqRespTxsCollector).not.toHaveBeenCalled(); + expect(txCollection.reqRespTxsCollector).not.toHaveBeenCalled(); }); describe('cancellation signals', () => { /** Captures the FastCollectionRequest during collectFast, before it's removed in finally. */ const captureRequest = () => { let captured: FastCollectionRequest | undefined; - const origCollectFast = txCollection.fastCollection.collectFast.bind(txCollection.fastCollection); - jest.spyOn(txCollection.fastCollection, 'collectFast').mockImplementation((request, opts) => { + const origCollectFast = txCollection.collectFast.bind(txCollection); + jest.spyOn(txCollection, 'collectFast').mockImplementation((request, opts) => { captured = request; return origCollectFast(request, opts); }); @@ -319,7 +332,7 @@ describe('TxCollection', () => { setReqRespTxs([]); const collected = await txCollection.collectFastForBlock(block, txHashes, { deadline }); - expect(txCollection.fastCollection.reqRespTxsCollector).not.toHaveBeenCalled(); + expect(txCollection.reqRespTxsCollector).not.toHaveBeenCalled(); expect(collected).toEqual(txs); }); @@ -332,7 +345,7 @@ describe('TxCollection', () => { const collected = await txCollection.collectFastForBlock(block, txHashes, { deadline }); - expect(txCollection.fastCollection.reqRespTxsCollector).not.toHaveBeenCalled(); + expect(txCollection.reqRespTxsCollector).not.toHaveBeenCalled(); expect(dateProvider.now()).toBeGreaterThanOrEqual(+deadline - 5); expect(collected).toEqual([]); }); @@ -382,13 +395,13 @@ describe('TxCollection', () => { const request = getRequest(); expect(request).toBeDefined(); // Reqresp should not have started yet — we're still in the initial wait - expect(txCollection.fastCollection.reqRespTxsCollector).not.toHaveBeenCalled(); + expect(txCollection.reqRespTxsCollector).not.toHaveBeenCalled(); request.requestTracker.cancel(); await collectionPromise; // Should have exited without ever starting reqresp - expect(txCollection.fastCollection.reqRespTxsCollector).not.toHaveBeenCalled(); + expect(txCollection.reqRespTxsCollector).not.toHaveBeenCalled(); expect(dateProvider.now()).toBeLessThan(+deadline); }); @@ -406,7 +419,7 @@ describe('TxCollection', () => { const collectionPromise = txCollection.collectFastForBlock(block, txHashes, { deadline }); await sleep(200); - expect(txCollection.fastCollection.reqRespTxsCollector).toHaveBeenCalled(); + expect(txCollection.reqRespTxsCollector).toHaveBeenCalled(); getRequest().requestTracker.cancel(); collectorPromise.resolve([]); @@ -439,7 +452,7 @@ describe('TxCollection', () => { expect(request).toBeDefined(); expect(request.requestTracker.checkCancelled()).toBe(false); - await txCollection.stop(); + txCollection.stop(); expect(request.requestTracker.checkCancelled()).toBe(true); collectorPromise.resolve([]); @@ -489,13 +502,13 @@ describe('TxCollection', () => { const collectionPromise = txCollection.collectFastForBlock(block, txHashes, { deadline }); await sleep(100); - expect(txCollection.fastCollection.requests.size).toBe(1); + expect(txCollection.requests.size).toBe(1); txCollection.stopCollectingForBlocksUpTo(block.number); collectorPromise.resolve([]); await collectionPromise; - expect(txCollection.fastCollection.requests.size).toBe(0); + expect(txCollection.requests.size).toBe(0); }); }); }); @@ -529,17 +542,15 @@ describe('TxCollection', () => { it('collects txs from file store after configured delay', async () => { setFileStoreTxs(fileStoreSources[0], txs); - await txCollection.start(); - deadline = new Date(dateProvider.now() + 500); + // Long deadline so the collection ends when file store finds the txs (not when deadline fires) + deadline = new Date(dateProvider.now() + 5000); const collectionPromise = txCollection.collectFastForBlock(block, txHashes, { deadline }); - // File store should not have been called yet (delay hasn't elapsed) + // File store should not have been called yet (delays haven't elapsed) expect(fileStoreSources[0].getTxsByHash).not.toHaveBeenCalled(); - // Advance time past the configured file store delay - dateProvider.setTime(dateProvider.now() + 200); - // Allow the async sleep resolution and worker processing to complete - await sleep(200); + // Wait for: node wait (200ms default) + file store delay (100ms) + worker processing + await sleep(500); await collectionPromise; // File store should now have been called for each tx @@ -549,34 +560,28 @@ describe('TxCollection', () => { it('does not download txs from file store if found via P2P before delay expires', async () => { setFileStoreTxs(fileStoreSources[0], txs); - await txCollection.start(); - deadline = new Date(dateProvider.now() + 500); + // Long deadline so the collection ends when all txs are found (not when deadline fires) + deadline = new Date(dateProvider.now() + 5000); const collectionPromise = txCollection.collectFastForBlock(block, txHashes, { deadline }); - // Simulate all txs found via P2P before delay expires + // Simulate all txs found via P2P before delay expires — this cancels the tracker immediately txCollection.handleTxsAddedToPool({ txs, source: 'test' }); - // Now advance time past the delay - dateProvider.setTime(dateProvider.now() + 200); await sleep(100); await collectionPromise; - // File store should not have downloaded any txs because they were all found + // File store should not have downloaded any txs because they were all found before the delay const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls); expect(allCalls.length).toBe(0); }); }); }); -class TestFastTxCollection extends FastTxCollection { +class TestTxCollection extends TxCollection { // eslint-disable-next-line aztec-custom/no-non-primitive-in-collections declare requests: Set; - declare collectFast: (request: FastCollectionRequest, opts: { pinnedPeer?: PeerId }) => Promise; - declare reqRespTxsCollector?: IReqRespTxsCollector; -} - -class TestTxCollection extends TxCollection { - declare fastCollection: TestFastTxCollection; declare fileStoreFastCollection: TxCollection['fileStoreFastCollection']; declare handleTxsAddedToPool: TxPoolV2Events['txs-added']; + declare collectFast: (request: FastCollectionRequest, opts: { pinnedPeer?: PeerId }) => Promise; + declare reqRespTxsCollector?: IReqRespTxsCollector; } diff --git a/yarn-project/p2p/src/services/tx_collection/tx_collection.ts b/yarn-project/p2p/src/services/tx_collection/tx_collection.ts index 9a609fb408a3..30814392650c 100644 --- a/yarn-project/p2p/src/services/tx_collection/tx_collection.ts +++ b/yarn-project/p2p/src/services/tx_collection/tx_collection.ts @@ -1,7 +1,8 @@ import { BlockNumber } from '@aztec/foundation/branded-types'; +import { times } from '@aztec/foundation/collection'; import { type Logger, createLogger } from '@aztec/foundation/log'; import { sleep } from '@aztec/foundation/sleep'; -import { DateProvider } from '@aztec/foundation/timer'; +import { DateProvider, elapsed } from '@aztec/foundation/timer'; import type { L2Block, L2BlockInfo } from '@aztec/stdlib/block'; import type { L1RollupConstants } from '@aztec/stdlib/epoch-helpers'; import type { BlockProposal } from '@aztec/stdlib/p2p'; @@ -12,12 +13,13 @@ import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-clien import type { PeerId } from '@libp2p/interface'; import type { TxPoolV2, TxPoolV2Events } from '../../mem_pools/tx_pool_v2/interfaces.js'; +import { BatchTxRequester } from '../reqresp/batch-tx-requester/batch_tx_requester.js'; import type { BatchTxRequesterLibP2PService } from '../reqresp/batch-tx-requester/interface.js'; +import type { BlockTxsSource } from '../reqresp/index.js'; import type { TxCollectionConfig } from './config.js'; -import { FastTxCollection } from './fast_tx_collection.js'; import { FileStoreTxCollection } from './file_store_tx_collection.js'; import type { FileStoreTxSource } from './file_store_tx_source.js'; -import type { IRequestTracker } from './request_tracker.js'; +import { type IRequestTracker, RequestTracker } from './request_tracker.js'; import { type TxAddContext, TxCollectionSink } from './tx_collection_sink.js'; import type { TxSource } from './tx_source.js'; @@ -32,20 +34,36 @@ export type FastCollectionRequest = FastCollectionRequestInput & { blockInfo: L2BlockInfo; }; +/** + * Collect missing transactions for a block or proposal via reqresp. + * @param requestTracker - The missing transactions tracker + * @param blockTxsSource - The block or proposal containing the transactions + * @param pinnedPeer - Optional peer expected to have the transactions + * @returns The collected transactions + */ +export type IReqRespTxsCollector = ( + requestTracker: IRequestTracker, + blockTxsSource: BlockTxsSource, + pinnedPeer: PeerId | undefined, +) => Promise; + /** * Coordinates tx collection from remote RPC nodes, reqresp, and file store. * - * The fast collection methods quickly gather txs from RPC nodes and reqresp, usually for attesting - * to block proposals or preparing to prove an epoch. A delayed file-store fallback can also fetch - * txs if configured. Both paths send txs to the collection sink, which handles metrics and adds - * them to the tx pool. Whenever a tx is added to either the sink or the pool, this service is - * notified via events and stops collecting that tx across all in-flight requests. + * Runs a sequential pipeline: node RPC → reqresp → file store. Node collection starts immediately, + * reqresp starts after a configured delay, and file store (if configured) starts after a further + * delay. All paths send txs to the collection sink, which handles metrics and adds them to the + * tx pool. Whenever a tx is added to the sink or the pool, this service is notified and stops + * collecting that tx across all in-flight requests. */ export class TxCollection { - /** Fast collection methods */ - protected readonly fastCollection: FastTxCollection; + // eslint-disable-next-line aztec-custom/no-non-primitive-in-collections + protected requests: Set = new Set(); - /** File store collection for fast (proposal/proving) path */ + /** The collector for txs via reqresp */ + protected reqRespTxsCollector?: IReqRespTxsCollector; + + /** File store collection for the fast (proposal/proving) path */ protected readonly fileStoreFastCollection: FileStoreTxCollection; /** Handles txs found by collection paths before adding to the pool */ @@ -57,12 +75,6 @@ export class TxCollection { /** Handler for the txs-added event from the tx collection sink */ protected readonly handleTxsFound: TxPoolV2Events['txs-added']; - /** Whether the service has been started. */ - private started = false; - - /** Whether file store sources are configured. */ - private readonly hasFileStoreSources: boolean; - constructor( private readonly p2pService: BatchTxRequesterLibP2PService, private readonly nodes: TxSource[], @@ -76,16 +88,18 @@ export class TxCollection { ) { this.txCollectionSink = new TxCollectionSink(this.txPool, telemetryClient, this.log); - this.fastCollection = new FastTxCollection( - this.p2pService, - this.nodes, - this.txCollectionSink, - this.config, - this.dateProvider, - this.log, - ); + this.reqRespTxsCollector = (requestTracker, blockTxsSource, pinnedPeer) => + BatchTxRequester.collectAllTxs( + new BatchTxRequester( + requestTracker, + blockTxsSource, + pinnedPeer, + this.p2pService, + this.log, + this.dateProvider, + ).run(), + ); - this.hasFileStoreSources = fileStoreSources.length > 0; this.fileStoreFastCollection = new FileStoreTxCollection( fileStoreSources, this.txCollectionSink, @@ -112,19 +126,11 @@ export class TxCollection { this.txPool.on('txs-added', this.handleTxsAddedToPool); } - /** Starts all collection loops. */ - public start(): Promise { - this.started = true; - this.fileStoreFastCollection.start(); - - // TODO(palla/txs): Collect mined unproven tx hashes for txs we dont have in the pool and populate missingTxs on startup - return Promise.resolve(); - } - - /** Stops all activity. */ - public async stop() { - this.started = false; - await Promise.all([this.fastCollection.stop(), this.fileStoreFastCollection.stop()]); + /** Stops all activity. Cancels in-flight requests; file store workers self-terminate. */ + public stop() { + this.requests.forEach(request => { + request.requestTracker.cancel(); + }); this.txPool.removeListener('txs-added', this.handleTxsAddedToPool); this.txCollectionSink.removeListener('txs-added', this.handleTxsFound); @@ -145,48 +151,295 @@ export class TxCollection { } /** Collects the set of txs for the given proposal or block as fast as possible */ - public collectFastFor( + public async collectFastFor( input: FastCollectionRequestInput, txHashes: TxHash[] | string[], opts: { deadline: Date; pinnedPeer?: PeerId }, ) { + const timeout = opts.deadline.getTime() - this.dateProvider.now(); + if (timeout <= 0) { + this.log.warn(`Deadline for fast tx collection is in the past (${timeout}ms)`, { + deadline: opts.deadline.getTime(), + now: this.dateProvider.now(), + }); + return []; + } + const hashes = txHashes.map(h => (typeof h === 'string' ? TxHash.fromString(h) : h)); - // Delay file store collection to give P2P methods time to find txs first - if (this.hasFileStoreSources) { - const context = this.getAddContextForInput(input); - sleep(this.config.txCollectionFileStoreFastDelayMs) - .then(() => { - if (!this.started) { - return; - } + const blockInfo: L2BlockInfo = + input.type === 'proposal' + ? { ...input.blockProposal.toBlockInfo(), blockNumber: input.blockNumber } + : { ...input.block.toBlockInfo() }; + + const request: FastCollectionRequest = { + ...input, + blockInfo, + requestTracker: RequestTracker.create(hashes, opts.deadline, this.dateProvider), + }; + + const [duration] = await elapsed(() => this.collectFast(request, { pinnedPeer: opts.pinnedPeer })); + + this.log.verbose( + `Collected ${request.requestTracker.collectedTxs.length} txs out of ${hashes.length} for ${input.type} at slot ${blockInfo.slotNumber}`, + { + ...blockInfo, + duration, + requestType: input.type, + missingTxs: [...request.requestTracker.missingTxHashes], + }, + ); + return request.requestTracker.collectedTxs; + } + + protected async collectFast(request: FastCollectionRequest, opts: { pinnedPeer?: PeerId }) { + this.requests.add(request); + const { blockInfo } = request; + + this.log.debug( + `Starting fast collection of ${request.requestTracker.numberOfMissingTxs} txs for ${request.type} at slot ${blockInfo.slotNumber}`, + { ...blockInfo, requestType: request.type, deadline: request.requestTracker.deadline }, + ); + + try { + // 1. Start node collection in the background. + // Note: this will be a noop if no nodes are configured. + const nodeCollectionPromise = this.collectFastFromNodes(request); + + // 2. Wait before starting reqresp, interruptible by cancellation or node exhaustion. + await Promise.race([ + request.requestTracker.cancellationToken, + sleep(this.config.txCollectionFastNodesTimeoutBeforeReqRespMs), + nodeCollectionPromise, // If node collection has finished (or if there are no nodes configured), we can exit early. + ]); + + // 3. Start reqresp in the background (runs in parallel with node collection). + // Note: this will be a noop if all TXs were already found. + const reqRespPromise = this.collectFastViaReqResp(request, opts); + + // 4. Wait before starting file store, interruptible by cancellation. + await Promise.race([ + request.requestTracker.cancellationToken, + sleep(this.config.txCollectionFileStoreFastDelayMs), + reqRespPromise, // If reqresp has finished, we can exit early. + ]); + + // 5. Start file store collection in the background. Self-terminates on tracker cancel / all-found. + // Note: this will be a noop if all TXs were already found. + const fileStorePromise = this.fileStoreFastCollection.startCollecting( + request.requestTracker, + this.getAddContext(request), + ); + + // 6. Wait for all paths to settle. + // NOTE: The request will automatically be cancelled after `opt.deadline` is reached. + await Promise.allSettled([reqRespPromise, nodeCollectionPromise, fileStorePromise]); + } catch (err) { + this.log.error(`Error collecting txs for ${request.type} for slot ${blockInfo.slotNumber}`, err, { + ...blockInfo, + missingTxs: request.requestTracker.missingTxHashes.values().map(txHash => txHash.toString()), + }); + } finally { + request.requestTracker.cancel(); + this.requests.delete(request); + } + } - // Only queue txs that are still missing after the delay. - const missingTxHashStrings = new Set(this.fastCollection.getMissingTxHashes().map(hash => hash.toString())); - const missingTxHashesToCollect = hashes.filter(hash => missingTxHashStrings.has(hash.toString())); - if (missingTxHashesToCollect.length > 0) { - this.fileStoreFastCollection.startCollecting(missingTxHashesToCollect, context, opts.deadline); + /** + * Starts collecting txs from all configured nodes. We send `txCollectionFastMaxParallelRequestsPerNode` requests + * in parallel to each node. We keep track of the number of attempts made to collect each tx, so we can prioritize + * the txs that have been requested less often whenever we need to send a new batch of requests. We ensure that no + * tx is requested more than once at the same time to the same node. + */ + private async collectFastFromNodes(request: FastCollectionRequest): Promise { + if (this.nodes.length === 0) { + return; + } + + // Keep a shared priority queue of all txs pending to be requested, sorted by the number of attempts made to collect them. + const attemptsPerTx = [...request.requestTracker.missingTxHashes].map(txHash => ({ + txHash, + attempts: 0, + found: false, + })); + + // Returns once we have finished all node loops. Each loop finishes when the deadline is hit, or all txs have been collected. + await Promise.allSettled(this.nodes.map(node => this.collectFastFromNode(request, node, attemptsPerTx))); + } + + private async collectFastFromNode( + request: FastCollectionRequest, + node: TxSource, + attemptsPerTx: { txHash: string; attempts: number; found: boolean }[], + ) { + const notFinished = () => !request.requestTracker.checkCancelled(); + + const maxParallelRequests = this.config.txCollectionFastMaxParallelRequestsPerNode; + const maxBatchSize = this.config.txCollectionNodeRpcMaxBatchSize; + const activeRequestsToThisNode = new Set(); // Track the txs being actively requested to this node + + const processBatch = async () => { + while (notFinished()) { + // Pull tx hashes from the attemptsPerTx array, which is sorted by attempts, + // so we prioritize txs that have been requested less often. + const batch = []; + let index = 0; + while (batch.length < maxBatchSize) { + const txToRequest = attemptsPerTx[index++]; + if (!txToRequest) { + // No more txs to process + break; + } else if (!request.requestTracker.isMissing(txToRequest.txHash)) { + // Mark as found if it was found somewhere else, we'll then remove it from the array. + // We don't delete it now since 'array.splice' is pretty expensive, so we do it after sorting. + txToRequest.found = true; + } else if (!activeRequestsToThisNode.has(txToRequest.txHash)) { + // If the tx is not already being requested to this node, add it to the current batch and increase attempts. + // Note that we increase the attempts *before* making the request, so the next `collectFastFromNode` that + // needs to grab txs to send, will pick txs that have been requested less often, instead of all requesting + // the same txs at the same time. + batch.push(txToRequest); + activeRequestsToThisNode.add(txToRequest.txHash); + txToRequest.attempts++; } - }) - .catch(err => this.log.error('Error in file store fast delay', err)); + } + + // After modifying the array by removing txs or updating attempts, re-sort it and trim the found txs from the end. + attemptsPerTx.sort((a, b) => + a.found === b.found ? a.attempts - b.attempts : Number(a.found) - Number(b.found), + ); + const firstFoundTxIndex = attemptsPerTx.findIndex(tx => tx.found); + if (firstFoundTxIndex !== -1) { + attemptsPerTx.length = firstFoundTxIndex; + } + + // If we see no more txs to request, we can stop this "process" loop + if (batch.length === 0) { + return; + } + + const txHashes = batch.map(({ txHash }) => txHash); + // Collect this batch from the node + await this.txCollectionSink.collect( + async () => { + const result = await node.getTxsByHash(txHashes.map(TxHash.fromString)); + for (const tx of result.validTxs) { + request.requestTracker.markFetched(tx); + } + return result; + }, + txHashes, + { + description: `fast ${node.getInfo()}`, + node: node.getInfo(), + method: 'fast-node-rpc', + ...request.blockInfo, + }, + this.getAddContext(request), + ); + + // Clear from the active requests the txs we just requested + for (const requestedTx of batch) { + activeRequestsToThisNode.delete(requestedTx.txHash); + } + + // Sleep a bit until hitting the node again, but wake up immediately on cancellation + if (notFinished()) { + await Promise.race([ + sleep(this.config.txCollectionFastNodeIntervalMs), + request.requestTracker.cancellationToken, + ]); + } + } + }; + + // Kick off N parallel requests to the node, up to the maxParallelRequests limit + await Promise.all(times(maxParallelRequests, processBatch)); + } + + private async collectFastViaReqResp(request: FastCollectionRequest, opts: { pinnedPeer?: PeerId }) { + const pinnedPeer = opts.pinnedPeer; + const blockInfo = request.blockInfo; + const slotNumber = blockInfo.slotNumber; + if (request.requestTracker.timeoutMs < 100) { + this.log.warn( + `Not initiating fast reqresp for txs for ${request.type} at slot ${blockInfo.slotNumber} due to timeout`, + { timeoutMs: request.requestTracker.timeoutMs, ...blockInfo }, + ); + return; + } + + if (request.requestTracker.checkCancelled()) { + this.log.debug(`No txs to collect via reqresp for ${request.type} at slot ${blockInfo.slotNumber}`, { + ...blockInfo, + }); + return; } - return this.fastCollection.collectFastFor(input, txHashes, opts); + this.log.debug( + `Starting fast reqresp for ${request.requestTracker.numberOfMissingTxs} txs for ${request.type} at slot ${blockInfo.slotNumber}`, + { ...blockInfo, timeoutMs: request.requestTracker.timeoutMs, pinnedPeer }, + ); + + try { + await this.txCollectionSink.collect( + async () => { + let blockTxsSource: BlockTxsSource; + if (request.type === 'proposal') { + blockTxsSource = request.blockProposal; + } else if (request.type === 'block') { + blockTxsSource = { + txHashes: request.block.body.txEffects.map(e => e.txHash), + archive: request.block.archive.root, + }; + } else { + throw new Error(`Unknown request type: ${(request as { type: string }).type}`); + } + + const result = await this.reqRespTxsCollector!(request.requestTracker, blockTxsSource, pinnedPeer); + return { validTxs: result, invalidTxHashes: [] }; + }, + Array.from(request.requestTracker.missingTxHashes), + { description: `reqresp for slot ${slotNumber}`, method: 'fast-req-resp', ...opts, ...request.blockInfo }, + this.getAddContext(request), + ); + } catch (err) { + this.log.error(`Error sending fast reqresp request for txs`, err, { + txs: [...request.requestTracker.missingTxHashes], + ...blockInfo, + }); + } } - /** Returns the TxAddContext for the given fast collection request input */ - private getAddContextForInput(input: FastCollectionRequestInput): TxAddContext { - if (input.type === 'proposal') { - return { type: 'proposal', blockHeader: input.blockProposal.blockHeader }; + /** Returns the TxAddContext for the given request, used by the sink to add txs to the pool correctly. */ + private getAddContext(request: FastCollectionRequest): TxAddContext { + if (request.type === 'proposal') { + return { type: 'proposal', blockHeader: request.blockProposal.blockHeader }; } else { - return { type: 'mined', block: input.block }; + return { type: 'mined', block: request.block }; } } - /** Mark the given txs as found. Stops collecting them. */ + /** Mark the given txs as found. Stops collecting them across all in-flight requests. */ private foundTxs(txs: Tx[]) { - this.fastCollection.foundTxs(txs); - this.fileStoreFastCollection.foundTxs(txs); + for (const request of this.requests) { + for (const tx of txs) { + if (request.requestTracker.markFetched(tx)) { + this.log.trace(`Found tx ${tx.txHash.toString()} for fast collection request`, { + ...request.blockInfo, + txHash: tx.txHash.toString(), + type: request.type, + }); + if (request.requestTracker.allFetched()) { + this.log.trace(`All txs found for fast collection request`, { + ...request.blockInfo, + type: request.type, + }); + break; + } + } + } + } } /** @@ -194,8 +447,11 @@ export class TxCollection { * To be called when we no longer care about gathering txs up to a certain block, eg when they become proven or finalized. */ public stopCollectingForBlocksUpTo(blockNumber: BlockNumber): void { - this.fastCollection.stopCollectingForBlocksUpTo(blockNumber); - this.fileStoreFastCollection.clearPending(); + for (const request of this.requests) { + if (request.blockInfo.blockNumber <= blockNumber) { + request.requestTracker.cancel(); + } + } } /** @@ -203,7 +459,10 @@ export class TxCollection { * To be called when there is a chain prune and previously mined txs are no longer relevant. */ public stopCollectingForBlocksAfter(blockNumber: BlockNumber): void { - this.fastCollection.stopCollectingForBlocksAfter(blockNumber); - this.fileStoreFastCollection.clearPending(); + for (const request of this.requests) { + if (request.blockInfo.blockNumber > blockNumber) { + request.requestTracker.cancel(); + } + } } }