Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ export class P2PClient extends WithTracer implements P2P {
throw new Error('Block stream not initialized');
}
this.blockStream.start();
await this.txCollection.start();
this.txFileStore?.start();

// Start slot monitor to call prepareForSlot when the slot changes
Expand Down
4 changes: 2 additions & 2 deletions yarn-project/p2p/src/services/tx_collection/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export type TxCollectionConfig = {
txCollectionNodeRpcMaxBatchSize: number;
/** A comma-separated list of file store URLs (s3://, gs://, file://, http://) for tx collection */
txCollectionFileStoreUrls: string[];
/** Delay in ms before file store collection starts after fast collection is triggered */
/** Delay in ms from reqresp start before file store collection begins */
txCollectionFileStoreFastDelayMs: number;
/** Number of concurrent workers for fast file store collection */
txCollectionFileStoreFastWorkerCount: number;
Expand Down Expand Up @@ -68,7 +68,7 @@ export const txCollectionConfigMappings: ConfigMappingsType<TxCollectionConfig>
},
txCollectionFileStoreFastDelayMs: {
env: 'TX_COLLECTION_FILE_STORE_FAST_DELAY_MS',
description: 'Delay before file store collection starts after fast collection',
description: 'Delay in ms from reqresp start before file store collection begins',
...numberConfigHelper(2_000),
},
txCollectionFileStoreFastWorkerCount: {
Expand Down
379 changes: 0 additions & 379 deletions yarn-project/p2p/src/services/tx_collection/fast_tx_collection.ts

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { type MockProxy, mock } from 'jest-mock-extended';
import type { TxPoolV2 } from '../../mem_pools/tx_pool_v2/interfaces.js';
import { type FileStoreCollectionConfig, FileStoreTxCollection } from './file_store_tx_collection.js';
import type { FileStoreTxSource } from './file_store_tx_source.js';
import { type IRequestTracker, RequestTracker } from './request_tracker.js';
import { type TxAddContext, TxCollectionSink } from './tx_collection_sink.js';

describe('FileStoreTxCollection', () => {
Expand All @@ -26,6 +27,11 @@ describe('FileStoreTxCollection', () => {

let txs: Tx[];
let txHashes: TxHash[];
let requestTracker: IRequestTracker;

// Track in-flight startCollecting invocations so afterEach can shut them down cleanly.
let activeTrackers: IRequestTracker[];
let activePromises: Promise<void>[];

const makeFileStoreSource = (name: string) => {
const source = mock<FileStoreTxSource>();
Expand All @@ -49,6 +55,14 @@ describe('FileStoreTxCollection', () => {
});
};

/** Spawns a collection run and registers it for afterEach cleanup. */
const startCollecting = (tracker: IRequestTracker, ctx: TxAddContext): Promise<void> => {
activeTrackers.push(tracker);
const promise = fileStoreCollection.startCollecting(tracker, ctx);
activePromises.push(promise);
return promise;
};

/** Waits for the sink to emit txs-added events for the expected number of txs. */
const waitForTxsAdded = (expectedCount: number) => {
const { promise, resolve } = promiseWithResolvers<void>();
Expand Down Expand Up @@ -102,33 +116,38 @@ describe('FileStoreTxCollection', () => {
const block = await L2Block.random(BlockNumber(1));
context = { type: 'mined', block };
deadline = new Date(dateProvider.now() + 60 * 60 * 1000);
requestTracker = RequestTracker.create(txHashes, deadline, dateProvider);

activeTrackers = [];
activePromises = [];
});

afterEach(async () => {
await fileStoreCollection.stop();
for (const t of activeTrackers) {
t.cancel();
}
await Promise.allSettled(activePromises);
jest.restoreAllMocks();
});

it('downloads txs when startCollecting is called', async () => {
setFileStoreTxs(fileStoreSources[0], txs);

fileStoreCollection.start();

const txsAddedPromise = waitForTxsAdded(txs.length);
fileStoreCollection.startCollecting(txHashes, context, deadline);
void startCollecting(requestTracker, context);
await txsAddedPromise;

expect(fileStoreSources[0].getTxsByHash).toHaveBeenCalled();
expect(txPool.addMinedTxs).toHaveBeenCalled();
});

it('skips txs marked as found', async () => {
it('skips txs already marked fetched on the tracker', async () => {
setFileStoreTxs(fileStoreSources[0], txs);

fileStoreCollection.start();
// Mark first tx as found before queueing so it's never queued in the first place
requestTracker.markFetched(txs[0]);

fileStoreCollection.startCollecting(txHashes, context, deadline);
fileStoreCollection.foundTxs([txs[0]]);
void startCollecting(requestTracker, context);

const txsAddedPromise = waitForTxsAdded(2);
await txsAddedPromise;
Expand All @@ -145,53 +164,25 @@ describe('FileStoreTxCollection', () => {
// Pin random so we always start at source 0, ensuring we test the fallback to source 1
jest.spyOn(Math, 'random').mockReturnValue(0);

fileStoreCollection.start();

const tracker = RequestTracker.create([txHashes[0]], deadline, dateProvider);
const txsAddedPromise = waitForTxsAdded(1);
fileStoreCollection.startCollecting([txHashes[0]], context, deadline);
void startCollecting(tracker, context);
await txsAddedPromise;

// Both stores should have been tried
expect(fileStoreSources[0].getTxsByHash).toHaveBeenCalled();
expect(fileStoreSources[1].getTxsByHash).toHaveBeenCalled();
expect(txPool.addMinedTxs).toHaveBeenCalled();

jest.restoreAllMocks();
});

it('does not start workers if no file store sources are configured', () => {
it('does not start workers if no file store sources are configured', async () => {
const log = createLogger('test');
fileStoreCollection = new FileStoreTxCollection([], txCollectionSink, config, dateProvider, log);
fileStoreCollection.start();
fileStoreCollection.startCollecting(txHashes, context, deadline);

// With no sources, start() is a no-op (no workers spawned) and startCollecting() returns
// immediately, so no calls should have been made synchronously.
expect(fileStoreSources[0].getTxsByHash).not.toHaveBeenCalled();
});

it('does not re-queue txs that are already pending', async () => {
setFileStoreTxs(fileStoreSources[0], txs);
setFileStoreTxs(fileStoreSources[1], txs);

// Use single worker for deterministic behavior
const log = createLogger('test');
config = { workerCount: 1, backoffBaseMs: 1000, backoffMaxMs: 5000 };
fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log);

fileStoreCollection.start();

const txsAddedPromise = waitForTxsAdded(txs.length);

fileStoreCollection.startCollecting(txHashes, context, deadline);
fileStoreCollection.startCollecting(txHashes, context, deadline); // Duplicate call
// With no sources, startCollecting resolves immediately without making any calls.
await startCollecting(requestTracker, context);

await txsAddedPromise;

// With 1 worker processing sequentially, each tx should be found on the first source.
// Duplicate startCollecting should not create extra entries.
const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls);
expect(allCalls.length).toBe(txHashes.length);
expect(fileStoreSources[0].getTxsByHash).not.toHaveBeenCalled();
});

it('retries across sources when tx is not found initially', async () => {
Expand All @@ -200,10 +191,9 @@ describe('FileStoreTxCollection', () => {
config = { workerCount: 1, backoffBaseMs: 100, backoffMaxMs: 500 };
fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log);

fileStoreCollection.start();

// Initially both sources return empty
fileStoreCollection.startCollecting([txHashes[0]], context, deadline);
const tracker = RequestTracker.create([txHashes[0]], deadline, dateProvider);
void startCollecting(tracker, context);

// Wait for first full cycle (2 sources = 2 calls)
await waitForSourceCalls(fileStoreSources, 2);
Expand All @@ -220,88 +210,54 @@ describe('FileStoreTxCollection', () => {
expect(txPool.addMinedTxs).toHaveBeenCalled();
});

it('expires entries past deadline', async () => {
const log = createLogger('test');
config = { workerCount: 1, backoffBaseMs: 50, backoffMaxMs: 100 };
fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log);

// Set a very short deadline
const shortDeadline = new Date(dateProvider.now() + 100);

fileStoreCollection.start();
fileStoreCollection.startCollecting([txHashes[0]], context, shortDeadline);

// Wait for first full cycle (2 sources = 2 calls)
await waitForSourceCalls(fileStoreSources, 2);

// Advance time past the deadline
dateProvider.setTime(dateProvider.now() + 200);

// Clear mocks so we can distinguish new calls from old ones
jest.clearAllMocks();

// Add a new entry with a valid deadline and set up source to return it.
// This proves the worker is alive and the expired entry was cleaned up.
setFileStoreTxs(fileStoreSources[0], [txs[1]]);
const txsAddedPromise = waitForTxsAdded(1);
fileStoreCollection.startCollecting([txHashes[1]], context, deadline);
await txsAddedPromise;

// Only txHashes[1] should have been requested after clearing mocks
const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls);
const requestedHashes = allCalls.flat().flat();
expect(requestedHashes).not.toContainEqual(txHashes[0]);
expect(requestedHashes).toContainEqual(txHashes[1]);
});

it('does not start collecting if deadline is in the past', () => {
const pastDeadline = new Date(dateProvider.now() - 1000);
it('does not start collecting if tracker is already cancelled', async () => {
requestTracker.cancel();

fileStoreCollection.start();
fileStoreCollection.startCollecting(txHashes, context, pastDeadline);
await startCollecting(requestTracker, context);

// startCollecting returns immediately without adding entries when deadline is past
// startCollecting returns immediately without spawning workers when tracker is cancelled
expect(fileStoreSources[0].getTxsByHash).not.toHaveBeenCalled();
});

it('foundTxs stops retry for found txs', async () => {
it('stops trying for txs marked fetched on the tracker after queuing', async () => {
const log = createLogger('test');
config = { workerCount: 1, backoffBaseMs: 50, backoffMaxMs: 100 };
fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log);

setFileStoreTxs(fileStoreSources[0], [txs[1]]);

fileStoreCollection.start();
fileStoreCollection.startCollecting(txHashes, context, deadline);
void startCollecting(requestTracker, context);

// Mark first tx as found
fileStoreCollection.foundTxs([txs[0]]);
// Externally mark tx[0] as found via the tracker (simulating node/reqresp/gossip finding it).
// startCollecting yields before spawning workers, so this runs before any source call is made.
requestTracker.markFetched(txs[0]);

const txsAddedPromise = waitForTxsAdded(1);
await txsAddedPromise;

// tx[0] should never have been attempted
// tx[0] should never have been attempted by the file store
const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls);
const requestedHashes = allCalls.flat().flat();
expect(requestedHashes).not.toContainEqual(txHashes[0]);
});

it('clearPending removes all entries', async () => {
fileStoreCollection.start();
fileStoreCollection.startCollecting(txHashes, context, deadline);
fileStoreCollection.clearPending();
it('workers exit when tracker is cancelled', async () => {
// Long backoff so workers spend most of their time sleeping after a single attempt
const log = createLogger('test');
config = { workerCount: 2, backoffBaseMs: 60_000, backoffMaxMs: 60_000 };
fileStoreCollection = new FileStoreTxCollection(fileStoreSources, txCollectionSink, config, dateProvider, log);

// Pre-set the tracker timer so a cancellation does not require real-time deadline expiry
const tracker = RequestTracker.create(txHashes, deadline, dateProvider);
const promise = startCollecting(tracker, context);

// Verify workers are alive but the cleared entries are gone by adding
// a new entry and confirming only it gets processed.
setFileStoreTxs(fileStoreSources[0], [txs[0]]);
const txsAddedPromise = waitForTxsAdded(1);
fileStoreCollection.startCollecting([txHashes[0]], context, deadline);
await txsAddedPromise;
// Let workers do at least one round of attempts
await waitForSourceCalls(fileStoreSources, 2);

// Only the newly added tx[0] should have been requested, not all 3 original txs
const allCalls = fileStoreSources.flatMap(s => s.getTxsByHash.mock.calls);
const requestedHashes = allCalls.flat().flat();
expect(requestedHashes).not.toContainEqual(txHashes[1]);
expect(requestedHashes).not.toContainEqual(txHashes[2]);
tracker.cancel();

// The startCollecting promise resolves once all workers settle. Without this guarantee, the
// test would either hang or leak workers — both are caught by Jest's default timeout.
await promise;
});
});
Loading
Loading