Skip to content

Commit 195f51b

Browse files
committed
refactor: add fifo set
1 parent 42107cd commit 195f51b

9 files changed

Lines changed: 149 additions & 85 deletions

File tree

yarn-project/foundation/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
"./eth-signature": "./dest/eth-signature/index.js",
6363
"./queue": "./dest/queue/index.js",
6464
"./fifo": "./dest/fifo/index.js",
65+
"./fifo-set": "./dest/fifo_set/index.js",
6566
"./fs": "./dest/fs/index.js",
6667
"./buffer": "./dest/buffer/index.js",
6768
"./json-rpc": "./dest/json-rpc/index.js",
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import { FifoSet } from './fifo_set.js';
2+
3+
describe('FifoSet', () => {
4+
it('keeps entries up to the limit', () => {
5+
const set = FifoSet.withLimit<string>(3);
6+
7+
set.add('a');
8+
set.add('b');
9+
set.add('c');
10+
11+
expect([...set]).toEqual(['a', 'b', 'c']);
12+
expect(set.size).toBe(3);
13+
expect(set.limit).toBe(3);
14+
});
15+
16+
it('evicts the oldest entry when adding past the limit', () => {
17+
const set = FifoSet.withLimit<string>(2);
18+
19+
set.add('a');
20+
set.add('b');
21+
set.add('c');
22+
23+
expect([...set]).toEqual(['b', 'c']);
24+
});
25+
26+
it('compacts initial values to the newest entries', () => {
27+
const set = FifoSet.withLimit(2, ['a', 'b', 'c']);
28+
29+
expect([...set]).toEqual(['b', 'c']);
30+
});
31+
32+
it('does not evict when adding a duplicate', () => {
33+
const set = FifoSet.withLimit(2, ['a', 'b']);
34+
35+
set.add('a');
36+
37+
expect([...set]).toEqual(['a', 'b']);
38+
});
39+
40+
it('adds absent values and reports whether the set changed', () => {
41+
const set = FifoSet.withLimit(2, ['a']);
42+
43+
expect(set.addIfAbsent('a')).toBe(false);
44+
expect(set.addIfAbsent('b')).toBe(true);
45+
expect(set.addIfAbsent('c')).toBe(true);
46+
expect([...set]).toEqual(['b', 'c']);
47+
});
48+
49+
it('can evict undefined values', () => {
50+
const set = FifoSet.withLimit<string | undefined>(1, [undefined, 'a']);
51+
52+
expect([...set]).toEqual(['a']);
53+
54+
set.add(undefined);
55+
56+
expect([...set]).toEqual([undefined]);
57+
});
58+
59+
it.each([0, -1, 1.5, Number.NaN, Number.POSITIVE_INFINITY])('throws for invalid limit %s', limit => {
60+
expect(() => FifoSet.withLimit(limit)).toThrow('FifoSet limit must be a positive safe integer');
61+
});
62+
});
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/** A Set capped to a fixed number of entries, evicting the oldest inserted value when full. */
2+
export class FifoSet<T> extends Set<T> {
3+
private _limit: number | undefined;
4+
5+
private constructor(values?: Iterable<T>) {
6+
super(values);
7+
}
8+
9+
/** Creates a bounded set with a positive integer limit. */
10+
static withLimit<T>(limit: number, values?: Iterable<T>): FifoSet<T> {
11+
if (!Number.isSafeInteger(limit) || limit <= 0) {
12+
throw new TypeError(`FifoSet limit must be a positive safe integer: ${limit}`);
13+
}
14+
15+
const set = new FifoSet(values);
16+
set._limit = limit;
17+
set.compact();
18+
19+
return set;
20+
}
21+
22+
override add(value: T): this {
23+
super.add(value);
24+
this.compact();
25+
return this;
26+
}
27+
28+
/** Maximum number of entries retained by this set. */
29+
public get limit(): number {
30+
return this._limit ?? Infinity;
31+
}
32+
33+
/** Evicts oldest entries until the set is within its limit. */
34+
public compact(): void {
35+
while (this.size > this.limit) {
36+
const head = this.values().next();
37+
if (head.done) {
38+
return;
39+
}
40+
this.delete(head.value);
41+
}
42+
}
43+
44+
/** Adds a value only if it is absent, returning whether the set changed. */
45+
public addIfAbsent(value: T): boolean {
46+
if (this.has(value)) {
47+
return false;
48+
}
49+
this.add(value);
50+
return true;
51+
}
52+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from './fifo_set.js';

yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2_impl.ts

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { BlockNumber, SlotNumber } from '@aztec/foundation/branded-types';
2+
import { FifoSet } from '@aztec/foundation/fifo-set';
23
import type { Logger } from '@aztec/foundation/log';
34
import type { DateProvider } from '@aztec/foundation/timer';
45
import type { AztecAsyncKVStore, AztecAsyncMap } from '@aztec/kv-store';
@@ -82,7 +83,7 @@ export class TxPoolV2Impl {
8283
#evictionManager: EvictionManager;
8384
#dateProvider: DateProvider;
8485
#instrumentation: TxPoolV2Instrumentation;
85-
#evictedTxHashes: Set<string> = new Set();
86+
#evictedTxHashes: FifoSet<string>;
8687
#log: Logger;
8788
#callbacks: TxPoolV2Callbacks;
8889

@@ -105,6 +106,7 @@ export class TxPoolV2Impl {
105106
this.#checkAllowedSetupCalls = deps.checkAllowedSetupCalls;
106107

107108
this.#config = { ...DEFAULT_TX_POOL_V2_CONFIG, ...config };
109+
this.#evictedTxHashes = FifoSet.withLimit<string>(this.#config.evictedTxCacheSize);
108110
this.#archive = new TxArchive(archiveStore, this.#config.archivedTxLimit, log);
109111
this.#deletedPool = new DeletedPool(store, this.#txsDB, log);
110112
this.#dateProvider = dateProvider;
@@ -903,21 +905,11 @@ export class TxPoolV2Impl {
903905
this.#instrumentation.recordEvictions(txHashes.length, reason);
904906
for (const txHashStr of txHashes) {
905907
this.#log.debug(`Evicting tx ${txHashStr}`, { txHash: txHashStr, reason });
906-
this.#addToEvictedCache(txHashStr);
908+
this.#evictedTxHashes.add(txHashStr);
907909
}
908910
await this.#deleteTxsBatch(txHashes);
909911
}
910912

911-
/** Adds a tx hash to the bounded evicted cache, evicting the oldest entry if at capacity. */
912-
#addToEvictedCache(txHashStr: string): void {
913-
if (this.#evictedTxHashes.size >= this.#config.evictedTxCacheSize) {
914-
// FIFO eviction: remove the first (oldest) entry
915-
const oldest = this.#evictedTxHashes.values().next().value!;
916-
this.#evictedTxHashes.delete(oldest);
917-
}
918-
this.#evictedTxHashes.add(txHashStr);
919-
}
920-
921913
// ============================================================================
922914
// PRIVATE HELPERS - Validation & Conflict Resolution
923915
// ============================================================================

yarn-project/p2p/src/services/tx_file_store/tx_file_store.ts

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { FifoSet } from '@aztec/foundation/fifo-set';
12
import { type Logger, createLogger } from '@aztec/foundation/log';
23
import { RunningPromise } from '@aztec/foundation/promise';
34
import { makeBackoff, retry } from '@aztec/foundation/retry';
@@ -10,6 +11,8 @@ import type { TxPoolV2 } from '../../mem_pools/index.js';
1011
import type { TxFileStoreConfig } from './config.js';
1112
import { TxFileStoreInstrumentation } from './instrumentation.js';
1213

14+
const MAX_RECENT_UPLOADS = 1000;
15+
1316
/**
1417
* Uploads validated transactions to a file store as a fallback retrieval mechanism.
1518
* Listens to TxPool txs-added events and uploads txs asynchronously with bounded concurrency.
@@ -21,9 +24,7 @@ export class TxFileStore {
2124
private readonly handleTxsAdded: (args: { txs: Tx[]; source?: string }) => void;
2225

2326
/** Recently uploaded tx hashes for deduplication. */
24-
private recentUploads: Set<string> = new Set();
25-
private recentUploadsOrder: string[] = [];
26-
private readonly maxRecentUploads = 1000;
27+
private recentUploads = FifoSet.withLimit<string>(MAX_RECENT_UPLOADS);
2728

2829
private constructor(
2930
private readonly fileStore: FileStore,
@@ -127,24 +128,11 @@ export class TxFileStore {
127128
const path = `${this.basePath}/txs/${txHash}.bin`;
128129
const timer = new Timer();
129130

130-
if (this.recentUploads.has(txHash)) {
131+
if (!this.recentUploads.addIfAbsent(txHash)) {
131132
return;
132133
}
133134

134135
try {
135-
this.recentUploads.add(txHash);
136-
this.recentUploadsOrder.push(txHash);
137-
138-
if (this.recentUploadsOrder.length > this.maxRecentUploads) {
139-
// delete old entries in recentUploads
140-
for (const txHashToRemove of this.recentUploadsOrder.splice(
141-
0,
142-
this.recentUploadsOrder.length - this.maxRecentUploads,
143-
)) {
144-
this.recentUploads.delete(txHashToRemove);
145-
}
146-
}
147-
148136
await retry(
149137
() => this.fileStore.save(path, tx.toBuffer(), { compress: true }),
150138
`Uploading tx ${txHash}`,

yarn-project/slasher/src/watchers/attestations_block_watcher.ts

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import { EpochCache } from '@aztec/epoch-cache';
22
import { SlotNumber } from '@aztec/foundation/branded-types';
33
import { merge, pick } from '@aztec/foundation/collection';
4+
import { FifoSet } from '@aztec/foundation/fifo-set';
45
import { type Logger, createLogger } from '@aztec/foundation/log';
56
import {
67
type InvalidCheckpointDetectedEvent,
78
type L2BlockSourceEventEmitter,
89
L2BlockSourceEvents,
910
type ValidateCheckpointNegativeResult,
1011
} from '@aztec/stdlib/block';
11-
import type { CheckpointInfo } from '@aztec/stdlib/checkpoint';
1212
import { OffenseType } from '@aztec/stdlib/slashing';
1313

1414
import EventEmitter from 'node:events';
@@ -20,6 +20,7 @@ const AttestationsBlockWatcherConfigKeys = [
2020
'slashAttestDescendantOfInvalidPenalty',
2121
'slashProposeInvalidAttestationsPenalty',
2222
] as const;
23+
const MAX_INVALID_CHECKPOINTS = 100;
2324

2425
type AttestationsBlockWatcherConfig = Pick<SlasherConfig, (typeof AttestationsBlockWatcherConfigKeys)[number]>;
2526

@@ -32,11 +33,8 @@ type AttestationsBlockWatcherConfig = Pick<SlasherConfig, (typeof AttestationsBl
3233
export class AttestationsBlockWatcher extends (EventEmitter as new () => WatcherEmitter) implements Watcher {
3334
private log: Logger = createLogger('attestations-block-watcher');
3435

35-
// Only keep track of the last N invalid checkpoints
36-
private maxInvalidCheckpoints = 100;
37-
38-
// All invalid archive roots seen
39-
private invalidArchiveRoots: Set<string> = new Set();
36+
// Recently seen invalid archive roots.
37+
private invalidArchiveRoots = FifoSet.withLimit<string>(MAX_INVALID_CHECKPOINTS);
4038

4139
private config: AttestationsBlockWatcherConfig;
4240

@@ -98,8 +96,7 @@ export class AttestationsBlockWatcher extends (EventEmitter as new () => Watcher
9896
reason: validationResult.valid === false ? validationResult.reason : 'unknown',
9997
});
10098

101-
// Store the invalid checkpoint
102-
this.addInvalidCheckpoint(event.validationResult.checkpoint);
99+
this.invalidArchiveRoots.add(checkpoint.archive.toString());
103100

104101
// Slash the proposer of the invalid checkpoint
105102
this.slashProposer(event.validationResult);
@@ -181,14 +178,4 @@ export class AttestationsBlockWatcher extends (EventEmitter as new () => Watcher
181178
}
182179
}
183180
}
184-
185-
private addInvalidCheckpoint(checkpoint: CheckpointInfo) {
186-
this.invalidArchiveRoots.add(checkpoint.archive.toString());
187-
188-
// Prune old entries if we exceed the maximum
189-
if (this.invalidArchiveRoots.size > this.maxInvalidCheckpoints) {
190-
const oldestKey = this.invalidArchiveRoots.keys().next().value!;
191-
this.invalidArchiveRoots.delete(oldestKey);
192-
}
193-
}
194181
}

yarn-project/slasher/src/watchers/broadcasted_invalid_checkpoint_proposal_watcher.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { EpochCacheInterface } from '@aztec/epoch-cache';
22
import { SlotNumber } from '@aztec/foundation/branded-types';
33
import { merge, pick } from '@aztec/foundation/collection';
44
import type { EthAddress } from '@aztec/foundation/eth-address';
5+
import { FifoSet } from '@aztec/foundation/fifo-set';
56
import { type Logger, createLogger } from '@aztec/foundation/log';
67
import { RunningPromise } from '@aztec/foundation/running-promise';
78
import type { P2PClient, SlasherConfig } from '@aztec/stdlib/interfaces/server';
@@ -39,7 +40,7 @@ export class BroadcastedInvalidCheckpointProposalWatcher
3940
{
4041
private readonly log: Logger = createLogger('broadcasted-invalid-checkpoint-proposal-watcher');
4142
private readonly runningPromise: RunningPromise;
42-
private readonly emittedOffenses = new Set<string>();
43+
private readonly emittedOffenses: FifoSet<string>;
4344
private readonly scanSlotLookback: number;
4445
private config: BroadcastedInvalidCheckpointProposalWatcherConfig;
4546
private lastScannedSlot: SlotNumber | undefined;
@@ -54,6 +55,12 @@ export class BroadcastedInvalidCheckpointProposalWatcher
5455
const constants = epochCache.getL1Constants();
5556
this.config = pick(config, ...BroadcastedInvalidCheckpointProposalWatcherConfigKeys);
5657
this.scanSlotLookback = Math.max(1, scanSlotLookback);
58+
59+
// Bound emitted offenses to the number of slots we rescan. This watcher currently tracks one offense type,
60+
// and at most one offense of that type can be emitted per slot.
61+
const offenseTypes = 1;
62+
this.emittedOffenses = FifoSet.withLimit<string>(offenseTypes * this.scanSlotLookback);
63+
5764
const intervalMs = Math.max(1000, (constants.ethereumSlotDuration * 1000) / 4);
5865
this.runningPromise = new RunningPromise(() => this.scan(), this.log, intervalMs);
5966
this.log.info('BroadcastedInvalidCheckpointProposalWatcher initialized', {
@@ -182,10 +189,6 @@ export class BroadcastedInvalidCheckpointProposalWatcher
182189

183190
private markAsNewOffense(args: WantToSlashArgs): boolean {
184191
const key = `${args.validator.toString()}-${args.offenseType}-${args.epochOrSlot}`;
185-
if (this.emittedOffenses.has(key)) {
186-
return false;
187-
}
188-
this.emittedOffenses.add(key);
189-
return true;
192+
return this.emittedOffenses.addIfAbsent(key);
190193
}
191194
}

0 commit comments

Comments
 (0)