Skip to content

Commit eb8d696

Browse files
committed
refactor(p2p): absorb fast tx collection into tx collection
1 parent 69bbcde commit eb8d696

8 files changed

Lines changed: 491 additions & 701 deletions

File tree

yarn-project/p2p/src/client/p2p_client.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,6 @@ export class P2PClient extends WithTracer implements P2P {
269269
throw new Error('Block stream not initialized');
270270
}
271271
this.blockStream.start();
272-
await this.txCollection.start();
273272
this.txFileStore?.start();
274273

275274
// Start slot monitor to call prepareForSlot when the slot changes

yarn-project/p2p/src/services/tx_collection/config.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ export type TxCollectionConfig = {
1414
txCollectionNodeRpcMaxBatchSize: number;
1515
/** A comma-separated list of file store URLs (s3://, gs://, file://, http://) for tx collection */
1616
txCollectionFileStoreUrls: string[];
17-
/** Delay in ms before file store collection starts after fast collection is triggered */
17+
/** Delay in ms from reqresp start before file store collection begins */
1818
txCollectionFileStoreFastDelayMs: number;
1919
/** Number of concurrent workers for fast file store collection */
2020
txCollectionFileStoreFastWorkerCount: number;
@@ -68,7 +68,7 @@ export const txCollectionConfigMappings: ConfigMappingsType<TxCollectionConfig>
6868
},
6969
txCollectionFileStoreFastDelayMs: {
7070
env: 'TX_COLLECTION_FILE_STORE_FAST_DELAY_MS',
71-
description: 'Delay before file store collection starts after fast collection',
71+
description: 'Delay in ms from reqresp start before file store collection begins',
7272
...numberConfigHelper(2_000),
7373
},
7474
txCollectionFileStoreFastWorkerCount: {

yarn-project/p2p/src/services/tx_collection/fast_tx_collection.ts

Lines changed: 0 additions & 379 deletions
This file was deleted.

yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts

Lines changed: 61 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { type MockProxy, mock } from 'jest-mock-extended';
1212
import type { TxPoolV2 } from '../../mem_pools/tx_pool_v2/interfaces.js';
1313
import { type FileStoreCollectionConfig, FileStoreTxCollection } from './file_store_tx_collection.js';
1414
import type { FileStoreTxSource } from './file_store_tx_source.js';
15+
import { type IRequestTracker, RequestTracker } from './request_tracker.js';
1516
import { type TxAddContext, TxCollectionSink } from './tx_collection_sink.js';
1617

1718
describe('FileStoreTxCollection', () => {
@@ -26,6 +27,11 @@ describe('FileStoreTxCollection', () => {
2627

2728
let txs: Tx[];
2829
let txHashes: TxHash[];
30+
let requestTracker: IRequestTracker;
31+
32+
// Track in-flight startCollecting invocations so afterEach can shut them down cleanly.
33+
let activeTrackers: IRequestTracker[];
34+
let activePromises: Promise<void>[];
2935

3036
const makeFileStoreSource = (name: string) => {
3137
const source = mock<FileStoreTxSource>();
@@ -49,6 +55,14 @@ describe('FileStoreTxCollection', () => {
4955
});
5056
};
5157

58+
/** Spawns a collection run and registers it for afterEach cleanup. */
59+
const startCollecting = (tracker: IRequestTracker, ctx: TxAddContext): Promise<void> => {
60+
activeTrackers.push(tracker);
61+
const promise = fileStoreCollection.startCollecting(tracker, ctx);
62+
activePromises.push(promise);
63+
return promise;
64+
};
65+
5266
/** Waits for the sink to emit txs-added events for the expected number of txs. */
5367
const waitForTxsAdded = (expectedCount: number) => {
5468
const { promise, resolve } = promiseWithResolvers<void>();
@@ -102,33 +116,38 @@ describe('FileStoreTxCollection', () => {
102116
const block = await L2Block.random(BlockNumber(1));
103117
context = { type: 'mined', block };
104118
deadline = new Date(dateProvider.now() + 60 * 60 * 1000);
119+
requestTracker = RequestTracker.create(txHashes, deadline, dateProvider);
120+
121+
activeTrackers = [];
122+
activePromises = [];
105123
});
106124

107125
afterEach(async () => {
108-
await fileStoreCollection.stop();
126+
for (const t of activeTrackers) {
127+
t.cancel();
128+
}
129+
await Promise.allSettled(activePromises);
109130
jest.restoreAllMocks();
110131
});
111132

112133
it('downloads txs when startCollecting is called', async () => {
113134
setFileStoreTxs(fileStoreSources[0], txs);
114135

115-
fileStoreCollection.start();
116-
117136
const txsAddedPromise = waitForTxsAdded(txs.length);
118-
fileStoreCollection.startCollecting(txHashes, context, deadline);
137+
void startCollecting(requestTracker, context);
119138
await txsAddedPromise;
120139

121140
expect(fileStoreSources[0].getTxsByHash).toHaveBeenCalled();
122141
expect(txPool.addMinedTxs).toHaveBeenCalled();
123142
});
124143

125-
it('skips txs marked as found', async () => {
144+
it('skips txs already marked fetched on the tracker', async () => {
126145
setFileStoreTxs(fileStoreSources[0], txs);
127146

128-
fileStoreCollection.start();
147+
// Mark first tx as found before queueing so it's never queued in the first place
148+
requestTracker.markFetched(txs[0]);
129149

130-
fileStoreCollection.startCollecting(txHashes, context, deadline);
131-
fileStoreCollection.foundTxs([txs[0]]);
150+
void startCollecting(requestTracker, context);
132151

133152
const txsAddedPromise = waitForTxsAdded(2);
134153
await txsAddedPromise;
@@ -145,53 +164,25 @@ describe('FileStoreTxCollection', () => {
145164
// Pin random so we always start at source 0, ensuring we test the fallback to source 1
146165
jest.spyOn(Math, 'random').mockReturnValue(0);
147166

148-
fileStoreCollection.start();
149-
167+
const tracker = RequestTracker.create([txHashes[0]], deadline, dateProvider);
150168
const txsAddedPromise = waitForTxsAdded(1);
151-
fileStoreCollection.startCollecting([txHashes[0]], context, deadline);
169+
void startCollecting(tracker, context);
152170
await txsAddedPromise;
153171

154172
// Both stores should have been tried
155173
expect(fileStoreSources[0].getTxsByHash).toHaveBeenCalled();
156174
expect(fileStoreSources[1].getTxsByHash).toHaveBeenCalled();
157175
expect(txPool.addMinedTxs).toHaveBeenCalled();
158-
159-
jest.restoreAllMocks();
160176
});
161177

162-
it('does not start workers if no file store sources are configured', () => {
178+
it('does not start workers if no file store sources are configured', async () => {
163179
const log = createLogger('test');
164180
fileStoreCollection = new FileStoreTxCollection([], txCollectionSink, config, dateProvider, log);
165-
fileStoreCollection.start();
166-
fileStoreCollection.startCollecting(txHashes, context, deadline);
167-
168-
// With no sources, start() is a no-op (no workers spawned) and startCollecting() returns
169-
// immediately, so no calls should have been made synchronously.
170-
expect(fileStoreSources[0].getTxsByHash).not.toHaveBeenCalled();
171-
});
172-
173-
it('does not re-queue txs that are already pending', async () => {
174-
setFileStoreTxs(fileStoreSources[0], txs);
175-
setFileStoreTxs(fileStoreSources[1], txs);
176-
177-
// Use single worker for deterministic behavior
178-
const log = createLogger('test');
179-
config = { workerCount: 1, backoffBaseMs: 1000, backoffMaxMs: 5000 };
180-
fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log);
181-
182-
fileStoreCollection.start();
183-
184-
const txsAddedPromise = waitForTxsAdded(txs.length);
185181

186-
fileStoreCollection.startCollecting(txHashes, context, deadline);
187-
fileStoreCollection.startCollecting(txHashes, context, deadline); // Duplicate call
182+
// With no sources, startCollecting resolves immediately without making any calls.
183+
await startCollecting(requestTracker, context);
188184

189-
await txsAddedPromise;
190-
191-
// With 1 worker processing sequentially, each tx should be found on the first source.
192-
// Duplicate startCollecting should not create extra entries.
193-
const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls);
194-
expect(allCalls.length).toBe(txHashes.length);
185+
expect(fileStoreSources[0].getTxsByHash).not.toHaveBeenCalled();
195186
});
196187

197188
it('retries across sources when tx is not found initially', async () => {
@@ -200,10 +191,9 @@ describe('FileStoreTxCollection', () => {
200191
config = { workerCount: 1, backoffBaseMs: 100, backoffMaxMs: 500 };
201192
fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log);
202193

203-
fileStoreCollection.start();
204-
205194
// Initially both sources return empty
206-
fileStoreCollection.startCollecting([txHashes[0]], context, deadline);
195+
const tracker = RequestTracker.create([txHashes[0]], deadline, dateProvider);
196+
void startCollecting(tracker, context);
207197

208198
// Wait for first full cycle (2 sources = 2 calls)
209199
await waitForSourceCalls(fileStoreSources, 2);
@@ -220,88 +210,54 @@ describe('FileStoreTxCollection', () => {
220210
expect(txPool.addMinedTxs).toHaveBeenCalled();
221211
});
222212

223-
it('expires entries past deadline', async () => {
224-
const log = createLogger('test');
225-
config = { workerCount: 1, backoffBaseMs: 50, backoffMaxMs: 100 };
226-
fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log);
227-
228-
// Set a very short deadline
229-
const shortDeadline = new Date(dateProvider.now() + 100);
230-
231-
fileStoreCollection.start();
232-
fileStoreCollection.startCollecting([txHashes[0]], context, shortDeadline);
233-
234-
// Wait for first full cycle (2 sources = 2 calls)
235-
await waitForSourceCalls(fileStoreSources, 2);
236-
237-
// Advance time past the deadline
238-
dateProvider.setTime(dateProvider.now() + 200);
239-
240-
// Clear mocks so we can distinguish new calls from old ones
241-
jest.clearAllMocks();
242-
243-
// Add a new entry with a valid deadline and set up source to return it.
244-
// This proves the worker is alive and the expired entry was cleaned up.
245-
setFileStoreTxs(fileStoreSources[0], [txs[1]]);
246-
const txsAddedPromise = waitForTxsAdded(1);
247-
fileStoreCollection.startCollecting([txHashes[1]], context, deadline);
248-
await txsAddedPromise;
249-
250-
// Only txHashes[1] should have been requested after clearing mocks
251-
const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls);
252-
const requestedHashes = allCalls.flat().flat();
253-
expect(requestedHashes).not.toContainEqual(txHashes[0]);
254-
expect(requestedHashes).toContainEqual(txHashes[1]);
255-
});
256-
257-
it('does not start collecting if deadline is in the past', () => {
258-
const pastDeadline = new Date(dateProvider.now() - 1000);
213+
it('does not start collecting if tracker is already cancelled', async () => {
214+
requestTracker.cancel();
259215

260-
fileStoreCollection.start();
261-
fileStoreCollection.startCollecting(txHashes, context, pastDeadline);
216+
await startCollecting(requestTracker, context);
262217

263-
// startCollecting returns immediately without adding entries when deadline is past
218+
// startCollecting returns immediately without spawning workers when tracker is cancelled
264219
expect(fileStoreSources[0].getTxsByHash).not.toHaveBeenCalled();
265220
});
266221

267-
it('foundTxs stops retry for found txs', async () => {
222+
it('stops trying for txs marked fetched on the tracker after queuing', async () => {
268223
const log = createLogger('test');
269224
config = { workerCount: 1, backoffBaseMs: 50, backoffMaxMs: 100 };
270225
fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log);
271226

272227
setFileStoreTxs(fileStoreSources[0], [txs[1]]);
273228

274-
fileStoreCollection.start();
275-
fileStoreCollection.startCollecting(txHashes, context, deadline);
229+
void startCollecting(requestTracker, context);
276230

277-
// Mark first tx as found
278-
fileStoreCollection.foundTxs([txs[0]]);
231+
// Externally mark tx[0] as found via the tracker (simulating node/reqresp/gossip finding it).
232+
// startCollecting yields before spawning workers, so this runs before any source call is made.
233+
requestTracker.markFetched(txs[0]);
279234

280235
const txsAddedPromise = waitForTxsAdded(1);
281236
await txsAddedPromise;
282237

283-
// tx[0] should never have been attempted
238+
// tx[0] should never have been attempted by the file store
284239
const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls);
285240
const requestedHashes = allCalls.flat().flat();
286241
expect(requestedHashes).not.toContainEqual(txHashes[0]);
287242
});
288243

289-
it('clearPending removes all entries', async () => {
290-
fileStoreCollection.start();
291-
fileStoreCollection.startCollecting(txHashes, context, deadline);
292-
fileStoreCollection.clearPending();
244+
it('workers exit when tracker is cancelled', async () => {
245+
// Long backoff so workers spend most of their time sleeping after a single attempt
246+
const log = createLogger('test');
247+
config = { workerCount: 2, backoffBaseMs: 60_000, backoffMaxMs: 60_000 };
248+
fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log);
249+
250+
// Pre-set the tracker timer so a cancellation does not require real-time deadline expiry
251+
const tracker = RequestTracker.create(txHashes, deadline, dateProvider);
252+
const promise = startCollecting(tracker, context);
293253

294-
// Verify workers are alive but the cleared entries are gone by adding
295-
// a new entry and confirming only it gets processed.
296-
setFileStoreTxs(fileStoreSources[0], [txs[0]]);
297-
const txsAddedPromise = waitForTxsAdded(1);
298-
fileStoreCollection.startCollecting([txHashes[0]], context, deadline);
299-
await txsAddedPromise;
254+
// Let workers do at least one round of attempts
255+
await waitForSourceCalls(fileStoreSources, 2);
300256

301-
// Only the newly added tx[0] should have been requested, not all 3 original txs
302-
const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls);
303-
const requestedHashes = allCalls.flat().flat();
304-
expect(requestedHashes).not.toContainEqual(txHashes[1]);
305-
expect(requestedHashes).not.toContainEqual(txHashes[2]);
257+
tracker.cancel();
258+
259+
// The startCollecting promise resolves once all workers settle. Without this guarantee, the
260+
// test would either hang or leak workers — both are caught by Jest's default timeout.
261+
await promise;
306262
});
307263
});

0 commit comments

Comments
 (0)