Skip to content

Commit 566d7f5

Browse files
authored
refactor(p2p): merge FastTxCollection into TxCollection with sequential pipeline (#23245)
## Summary - Removes `FastTxCollection` as a separate class and absorbs all its logic directly into `TxCollection` - Replaces the old parallel file-store delay with a single sequential pipeline: node RPC → reqresp → file store, where each phase blocks on the previous (cancellation-aware) - File store collection is now driven by `IRequestTracker` — the same synchronization primitive used by node and reqresp paths. The tracker is the single source of truth for "is this tx still missing?" and "is this request still alive?" - `FileStoreTxCollection` simplified: dropped `start()`/`stop()`/persistent worker pool/`wakeSignal`. `startCollecting(requestTracker, context)` returns `Promise<void>`, spins up its own per-call worker pool, and workers self-terminate when the tracker is cancelled (all-fetched / deadline / external) ## Collection flow inside `collectFast` 1. Start node RPC collection in the background 2. Wait `txCollectionFastNodesTimeoutBeforeReqRespMs` — interruptible by cancellation **or by node exhaustion** (so when no nodes are configured, reqresp starts immediately) 3. Start reqresp in the background (parallel with nodes) 4. Wait `txCollectionFileStoreFastDelayMs` — interruptible by cancellation or reqresp completion 5. Start file store collection in the background (its workers self-terminate) 6. `Promise.allSettled` on node + reqresp + file store `txCollectionFileStoreFastDelayMs` description updated to reflect it is now anchored to reqresp start, not collection start. ## File store / tracker integration - `FileStoreTxCollection.startCollecting` no longer takes `(txHashes, context, deadline)`; it takes `(requestTracker, context)` and reads the missing txs + deadline from the tracker - Workers check `requestTracker.isMissing(hash)` each scan — if the tx was found via another path (node/reqresp/gossipsub), the entry is dropped without an extra fetch - Workers race their backoff sleeps against `requestTracker.cancellationToken` — cancelling a request (deadline, `stopCollectingForBlocksUpTo/After`, or `stop()`) propagates to file store workers immediately - Removed `foundTxs`/`clearPending` plumbing on `FileStoreTxCollection` — the tracker handles both implicitly - `startCollecting` yields once after building its entry set, so a synchronous follow-up call (e.g. `markFetched` in tests, or the gossipsub-found path in production) lands before workers begin scanning ## Tests - `tx_collection.test.ts`: collapsed the `TestFastTxCollection` subclass; all accesses go directly through `TxCollection`. Added "starts reqresp immediately when no nodes are configured" covering the node-exhaustion shortcut - `file_store_tx_collection.test.ts`: rewritten for the new shape — no `start()`/`stop()`, lifecycle driven by the tracker (cancel to terminate workers). New "workers exit when tracker is cancelled" covers the per-call worker-pool teardown Closes https://linear.app/aztec-labs/issue/A-933/tx-collection-dont-retrieve-transactions-that-have-already-been via new synchronization with the request tracker.
1 parent 25fa2cc commit 566d7f5

8 files changed

Lines changed: 491 additions & 701 deletions

File tree

yarn-project/p2p/src/client/p2p_client.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,6 @@ export class P2PClient extends WithTracer implements P2P {
269269
throw new Error('Block stream not initialized');
270270
}
271271
this.blockStream.start();
272-
await this.txCollection.start();
273272
this.txFileStore?.start();
274273

275274
// Start slot monitor to call prepareForSlot when the slot changes

yarn-project/p2p/src/services/tx_collection/config.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ export type TxCollectionConfig = {
1414
txCollectionNodeRpcMaxBatchSize: number;
1515
/** A comma-separated list of file store URLs (s3://, gs://, file://, http://) for tx collection */
1616
txCollectionFileStoreUrls: string[];
17-
/** Delay in ms before file store collection starts after fast collection is triggered */
17+
/** Delay in ms from reqresp start before file store collection begins */
1818
txCollectionFileStoreFastDelayMs: number;
1919
/** Number of concurrent workers for fast file store collection */
2020
txCollectionFileStoreFastWorkerCount: number;
@@ -68,7 +68,7 @@ export const txCollectionConfigMappings: ConfigMappingsType<TxCollectionConfig>
6868
},
6969
txCollectionFileStoreFastDelayMs: {
7070
env: 'TX_COLLECTION_FILE_STORE_FAST_DELAY_MS',
71-
description: 'Delay before file store collection starts after fast collection',
71+
description: 'Delay in ms from reqresp start before file store collection begins',
7272
...numberConfigHelper(2_000),
7373
},
7474
txCollectionFileStoreFastWorkerCount: {

yarn-project/p2p/src/services/tx_collection/fast_tx_collection.ts

Lines changed: 0 additions & 379 deletions
This file was deleted.

yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts

Lines changed: 61 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { type MockProxy, mock } from 'jest-mock-extended';
1212
import type { TxPoolV2 } from '../../mem_pools/tx_pool_v2/interfaces.js';
1313
import { type FileStoreCollectionConfig, FileStoreTxCollection } from './file_store_tx_collection.js';
1414
import type { FileStoreTxSource } from './file_store_tx_source.js';
15+
import { type IRequestTracker, RequestTracker } from './request_tracker.js';
1516
import { type TxAddContext, TxCollectionSink } from './tx_collection_sink.js';
1617

1718
describe('FileStoreTxCollection', () => {
@@ -26,6 +27,11 @@ describe('FileStoreTxCollection', () => {
2627

2728
let txs: Tx[];
2829
let txHashes: TxHash[];
30+
let requestTracker: IRequestTracker;
31+
32+
// Track in-flight startCollecting invocations so afterEach can shut them down cleanly.
33+
let activeTrackers: IRequestTracker[];
34+
let activePromises: Promise<void>[];
2935

3036
const makeFileStoreSource = (name: string) => {
3137
const source = mock<FileStoreTxSource>();
@@ -49,6 +55,14 @@ describe('FileStoreTxCollection', () => {
4955
});
5056
};
5157

58+
/** Spawns a collection run and registers it for afterEach cleanup. */
59+
const startCollecting = (tracker: IRequestTracker, ctx: TxAddContext): Promise<void> => {
60+
activeTrackers.push(tracker);
61+
const promise = fileStoreCollection.startCollecting(tracker, ctx);
62+
activePromises.push(promise);
63+
return promise;
64+
};
65+
5266
/** Waits for the sink to emit txs-added events for the expected number of txs. */
5367
const waitForTxsAdded = (expectedCount: number) => {
5468
const { promise, resolve } = promiseWithResolvers<void>();
@@ -102,33 +116,38 @@ describe('FileStoreTxCollection', () => {
102116
const block = await L2Block.random(BlockNumber(1));
103117
context = { type: 'mined', block };
104118
deadline = new Date(dateProvider.now() + 60 * 60 * 1000);
119+
requestTracker = RequestTracker.create(txHashes, deadline, dateProvider);
120+
121+
activeTrackers = [];
122+
activePromises = [];
105123
});
106124

107125
afterEach(async () => {
108-
await fileStoreCollection.stop();
126+
for (const t of activeTrackers) {
127+
t.cancel();
128+
}
129+
await Promise.allSettled(activePromises);
109130
jest.restoreAllMocks();
110131
});
111132

112133
it('downloads txs when startCollecting is called', async () => {
113134
setFileStoreTxs(fileStoreSources[0], txs);
114135

115-
fileStoreCollection.start();
116-
117136
const txsAddedPromise = waitForTxsAdded(txs.length);
118-
fileStoreCollection.startCollecting(txHashes, context, deadline);
137+
void startCollecting(requestTracker, context);
119138
await txsAddedPromise;
120139

121140
expect(fileStoreSources[0].getTxsByHash).toHaveBeenCalled();
122141
expect(txPool.addMinedTxs).toHaveBeenCalled();
123142
});
124143

125-
it('skips txs marked as found', async () => {
144+
it('skips txs already marked fetched on the tracker', async () => {
126145
setFileStoreTxs(fileStoreSources[0], txs);
127146

128-
fileStoreCollection.start();
147+
// Mark first tx as found before queueing so it's never queued in the first place
148+
requestTracker.markFetched(txs[0]);
129149

130-
fileStoreCollection.startCollecting(txHashes, context, deadline);
131-
fileStoreCollection.foundTxs([txs[0]]);
150+
void startCollecting(requestTracker, context);
132151

133152
const txsAddedPromise = waitForTxsAdded(2);
134153
await txsAddedPromise;
@@ -145,53 +164,25 @@ describe('FileStoreTxCollection', () => {
145164
// Pin random so we always start at source 0, ensuring we test the fallback to source 1
146165
jest.spyOn(Math, 'random').mockReturnValue(0);
147166

148-
fileStoreCollection.start();
149-
167+
const tracker = RequestTracker.create([txHashes[0]], deadline, dateProvider);
150168
const txsAddedPromise = waitForTxsAdded(1);
151-
fileStoreCollection.startCollecting([txHashes[0]], context, deadline);
169+
void startCollecting(tracker, context);
152170
await txsAddedPromise;
153171

154172
// Both stores should have been tried
155173
expect(fileStoreSources[0].getTxsByHash).toHaveBeenCalled();
156174
expect(fileStoreSources[1].getTxsByHash).toHaveBeenCalled();
157175
expect(txPool.addMinedTxs).toHaveBeenCalled();
158-
159-
jest.restoreAllMocks();
160176
});
161177

162-
it('does not start workers if no file store sources are configured', () => {
178+
it('does not start workers if no file store sources are configured', async () => {
163179
const log = createLogger('test');
164180
fileStoreCollection = new FileStoreTxCollection([], txCollectionSink, config, dateProvider, log);
165-
fileStoreCollection.start();
166-
fileStoreCollection.startCollecting(txHashes, context, deadline);
167-
168-
// With no sources, start() is a no-op (no workers spawned) and startCollecting() returns
169-
// immediately, so no calls should have been made synchronously.
170-
expect(fileStoreSources[0].getTxsByHash).not.toHaveBeenCalled();
171-
});
172-
173-
it('does not re-queue txs that are already pending', async () => {
174-
setFileStoreTxs(fileStoreSources[0], txs);
175-
setFileStoreTxs(fileStoreSources[1], txs);
176-
177-
// Use single worker for deterministic behavior
178-
const log = createLogger('test');
179-
config = { workerCount: 1, backoffBaseMs: 1000, backoffMaxMs: 5000 };
180-
fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log);
181-
182-
fileStoreCollection.start();
183-
184-
const txsAddedPromise = waitForTxsAdded(txs.length);
185181

186-
fileStoreCollection.startCollecting(txHashes, context, deadline);
187-
fileStoreCollection.startCollecting(txHashes, context, deadline); // Duplicate call
182+
// With no sources, startCollecting resolves immediately without making any calls.
183+
await startCollecting(requestTracker, context);
188184

189-
await txsAddedPromise;
190-
191-
// With 1 worker processing sequentially, each tx should be found on the first source.
192-
// Duplicate startCollecting should not create extra entries.
193-
const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls);
194-
expect(allCalls.length).toBe(txHashes.length);
185+
expect(fileStoreSources[0].getTxsByHash).not.toHaveBeenCalled();
195186
});
196187

197188
it('retries across sources when tx is not found initially', async () => {
@@ -200,10 +191,9 @@ describe('FileStoreTxCollection', () => {
200191
config = { workerCount: 1, backoffBaseMs: 100, backoffMaxMs: 500 };
201192
fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log);
202193

203-
fileStoreCollection.start();
204-
205194
// Initially both sources return empty
206-
fileStoreCollection.startCollecting([txHashes[0]], context, deadline);
195+
const tracker = RequestTracker.create([txHashes[0]], deadline, dateProvider);
196+
void startCollecting(tracker, context);
207197

208198
// Wait for first full cycle (2 sources = 2 calls)
209199
await waitForSourceCalls(fileStoreSources, 2);
@@ -220,88 +210,54 @@ describe('FileStoreTxCollection', () => {
220210
expect(txPool.addMinedTxs).toHaveBeenCalled();
221211
});
222212

223-
it('expires entries past deadline', async () => {
224-
const log = createLogger('test');
225-
config = { workerCount: 1, backoffBaseMs: 50, backoffMaxMs: 100 };
226-
fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log);
227-
228-
// Set a very short deadline
229-
const shortDeadline = new Date(dateProvider.now() + 100);
230-
231-
fileStoreCollection.start();
232-
fileStoreCollection.startCollecting([txHashes[0]], context, shortDeadline);
233-
234-
// Wait for first full cycle (2 sources = 2 calls)
235-
await waitForSourceCalls(fileStoreSources, 2);
236-
237-
// Advance time past the deadline
238-
dateProvider.setTime(dateProvider.now() + 200);
239-
240-
// Clear mocks so we can distinguish new calls from old ones
241-
jest.clearAllMocks();
242-
243-
// Add a new entry with a valid deadline and set up source to return it.
244-
// This proves the worker is alive and the expired entry was cleaned up.
245-
setFileStoreTxs(fileStoreSources[0], [txs[1]]);
246-
const txsAddedPromise = waitForTxsAdded(1);
247-
fileStoreCollection.startCollecting([txHashes[1]], context, deadline);
248-
await txsAddedPromise;
249-
250-
// Only txHashes[1] should have been requested after clearing mocks
251-
const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls);
252-
const requestedHashes = allCalls.flat().flat();
253-
expect(requestedHashes).not.toContainEqual(txHashes[0]);
254-
expect(requestedHashes).toContainEqual(txHashes[1]);
255-
});
256-
257-
it('does not start collecting if deadline is in the past', () => {
258-
const pastDeadline = new Date(dateProvider.now() - 1000);
213+
it('does not start collecting if tracker is already cancelled', async () => {
214+
requestTracker.cancel();
259215

260-
fileStoreCollection.start();
261-
fileStoreCollection.startCollecting(txHashes, context, pastDeadline);
216+
await startCollecting(requestTracker, context);
262217

263-
// startCollecting returns immediately without adding entries when deadline is past
218+
// startCollecting returns immediately without spawning workers when tracker is cancelled
264219
expect(fileStoreSources[0].getTxsByHash).not.toHaveBeenCalled();
265220
});
266221

267-
it('foundTxs stops retry for found txs', async () => {
222+
it('stops trying for txs marked fetched on the tracker after queuing', async () => {
268223
const log = createLogger('test');
269224
config = { workerCount: 1, backoffBaseMs: 50, backoffMaxMs: 100 };
270225
fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log);
271226

272227
setFileStoreTxs(fileStoreSources[0], [txs[1]]);
273228

274-
fileStoreCollection.start();
275-
fileStoreCollection.startCollecting(txHashes, context, deadline);
229+
void startCollecting(requestTracker, context);
276230

277-
// Mark first tx as found
278-
fileStoreCollection.foundTxs([txs[0]]);
231+
// Externally mark tx[0] as found via the tracker (simulating node/reqresp/gossip finding it).
232+
// startCollecting yields before spawning workers, so this runs before any source call is made.
233+
requestTracker.markFetched(txs[0]);
279234

280235
const txsAddedPromise = waitForTxsAdded(1);
281236
await txsAddedPromise;
282237

283-
// tx[0] should never have been attempted
238+
// tx[0] should never have been attempted by the file store
284239
const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls);
285240
const requestedHashes = allCalls.flat().flat();
286241
expect(requestedHashes).not.toContainEqual(txHashes[0]);
287242
});
288243

289-
it('clearPending removes all entries', async () => {
290-
fileStoreCollection.start();
291-
fileStoreCollection.startCollecting(txHashes, context, deadline);
292-
fileStoreCollection.clearPending();
244+
it('workers exit when tracker is cancelled', async () => {
245+
// Long backoff so workers spend most of their time sleeping after a single attempt
246+
const log = createLogger('test');
247+
config = { workerCount: 2, backoffBaseMs: 60_000, backoffMaxMs: 60_000 };
248+
fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log);
249+
250+
// Pre-set the tracker timer so a cancellation does not require real-time deadline expiry
251+
const tracker = RequestTracker.create(txHashes, deadline, dateProvider);
252+
const promise = startCollecting(tracker, context);
293253

294-
// Verify workers are alive but the cleared entries are gone by adding
295-
// a new entry and confirming only it gets processed.
296-
setFileStoreTxs(fileStoreSources[0], [txs[0]]);
297-
const txsAddedPromise = waitForTxsAdded(1);
298-
fileStoreCollection.startCollecting([txHashes[0]], context, deadline);
299-
await txsAddedPromise;
254+
// Let workers do at least one round of attempts
255+
await waitForSourceCalls(fileStoreSources, 2);
300256

301-
// Only the newly added tx[0] should have been requested, not all 3 original txs
302-
const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls);
303-
const requestedHashes = allCalls.flat().flat();
304-
expect(requestedHashes).not.toContainEqual(txHashes[1]);
305-
expect(requestedHashes).not.toContainEqual(txHashes[2]);
257+
tracker.cancel();
258+
259+
// The startCollecting promise resolves once all workers settle. Without this guarantee, the
260+
// test would either hang or leak workers — both are caught by Jest's default timeout.
261+
await promise;
306262
});
307263
});

0 commit comments

Comments
 (0)