Skip to content

Commit 7f523b3

Browse files
spalladinoclaude
andauthored
fix(archiver): filter tagged log queries by block number (#21388)
Resolves the referenceBlock hash to a block number in the AztecNode and passes it down as upToBlockNumber so the LogStore stops returning logs from blocks beyond the client's sync point. Also adds an ordering check on log insertion to guard against out-of-order appends. Fixes F-417 Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6689d32 commit 7f523b3

7 files changed

Lines changed: 283 additions & 26 deletions

File tree

yarn-project/archiver/src/errors.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,22 @@ export class BlockAlreadyCheckpointedError extends Error {
7474
}
7575
}
7676

77+
/** Thrown when logs are added for a tag whose last stored log has a higher block number than the new log. */
78+
export class OutOfOrderLogInsertionError extends Error {
79+
constructor(
80+
public readonly logType: 'private' | 'public',
81+
public readonly tag: string,
82+
public readonly lastBlockNumber: number,
83+
public readonly newBlockNumber: number,
84+
) {
85+
super(
86+
`Out-of-order ${logType} log insertion for tag ${tag}: ` +
87+
`last existing log is from block ${lastBlockNumber} but new log is from block ${newBlockNumber}`,
88+
);
89+
this.name = 'OutOfOrderLogInsertionError';
90+
}
91+
}
92+
7793
/** Thrown when a proposed block conflicts with an already checkpointed block (different content). */
7894
export class CannotOverwriteCheckpointedBlockError extends Error {
7995
constructor(

yarn-project/archiver/src/modules/data_source_base.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,16 +165,21 @@ export abstract class ArchiverDataSourceBase
165165
return (await this.store.getPendingChainValidationStatus()) ?? { valid: true };
166166
}
167167

168-
public getPrivateLogsByTags(tags: SiloedTag[], page?: number): Promise<TxScopedL2Log[][]> {
169-
return this.store.getPrivateLogsByTags(tags, page);
168+
public getPrivateLogsByTags(
169+
tags: SiloedTag[],
170+
page?: number,
171+
upToBlockNumber?: BlockNumber,
172+
): Promise<TxScopedL2Log[][]> {
173+
return this.store.getPrivateLogsByTags(tags, page, upToBlockNumber);
170174
}
171175

172176
public getPublicLogsByTagsFromContract(
173177
contractAddress: AztecAddress,
174178
tags: Tag[],
175179
page?: number,
180+
upToBlockNumber?: BlockNumber,
176181
): Promise<TxScopedL2Log[][]> {
177-
return this.store.getPublicLogsByTagsFromContract(contractAddress, tags, page);
182+
return this.store.getPublicLogsByTagsFromContract(contractAddress, tags, page, upToBlockNumber);
178183
}
179184

180185
public getPublicLogs(filter: LogFilter): Promise<GetPublicLogsResponse> {

yarn-project/archiver/src/store/kv_archiver_store.test.ts

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import {
4949
CannotOverwriteCheckpointedBlockError,
5050
CheckpointNumberNotSequentialError,
5151
InitialCheckpointNumberNotSequentialError,
52+
OutOfOrderLogInsertionError,
5253
} from '../errors.js';
5354
import { MessageStoreError } from '../store/message_store.js';
5455
import type { InboxMessage } from '../structs/inbox_message.js';
@@ -2329,6 +2330,32 @@ describe('KVArchiverDataStore', () => {
23292330
]);
23302331
});
23312332

2333+
it('throws on out-of-order private log insertion', async () => {
2334+
const sharedTag = makePrivateLogTag(99, 0, 0);
2335+
2336+
// Create blocks 4 and 5 with the same shared tag
2337+
const prevArchive1 = logsCheckpoints[logsCheckpoints.length - 1].checkpoint.blocks[0].archive;
2338+
const checkpoint4 = await makeCheckpointWithLogs(numBlocksForLogs + 1, {
2339+
previousArchive: prevArchive1,
2340+
numTxsPerBlock,
2341+
privateLogs: { numLogsPerTx: numPrivateLogsPerTx },
2342+
});
2343+
checkpoint4.checkpoint.blocks[0].body.txEffects[0].privateLogs[0] = makePrivateLog(sharedTag);
2344+
2345+
const prevArchive2 = checkpoint4.checkpoint.blocks[0].archive;
2346+
const checkpoint5 = await makeCheckpointWithLogs(numBlocksForLogs + 2, {
2347+
previousArchive: prevArchive2,
2348+
numTxsPerBlock,
2349+
privateLogs: { numLogsPerTx: numPrivateLogsPerTx },
2350+
});
2351+
checkpoint5.checkpoint.blocks[0].body.txEffects[0].privateLogs[0] = makePrivateLog(sharedTag);
2352+
2353+
// Store block 5's logs first (higher block number), then try to store block 4's logs
2354+
// (lower block number) — this should fail.
2355+
await store.addLogs([checkpoint5.checkpoint.blocks[0]]);
2356+
await expect(store.addLogs([checkpoint4.checkpoint.blocks[0]])).rejects.toThrow(OutOfOrderLogInsertionError);
2357+
});
2358+
23322359
it('is possible to request logs for non-existing tags and determine their position', async () => {
23332360
const tags = [makePrivateLogTag(99, 88, 77), makePrivateLogTag(1, 1, 1)];
23342361

@@ -2347,6 +2374,48 @@ describe('KVArchiverDataStore', () => {
23472374
]);
23482375
});
23492376

2377+
it('filters logs up to specified block number', async () => {
2378+
// Tags are unique per block, so create a shared tag across blocks by adding logs with the same tag
2379+
const sharedTag = makePrivateLogTag(1, 2, 1);
2380+
2381+
// Add extra blocks with logs sharing the same tag
2382+
for (let blockNum = numBlocksForLogs + 1; blockNum <= numBlocksForLogs + 2; blockNum++) {
2383+
const previousArchive = logsCheckpoints[logsCheckpoints.length - 1].checkpoint.blocks[0].archive;
2384+
const newCheckpoint = await makeCheckpointWithLogs(blockNum, {
2385+
previousArchive,
2386+
numTxsPerBlock,
2387+
privateLogs: { numLogsPerTx: numPrivateLogsPerTx },
2388+
});
2389+
const newLog = newCheckpoint.checkpoint.blocks[0].body.txEffects[1].privateLogs[1];
2390+
newLog.fields[0] = sharedTag.value;
2391+
newCheckpoint.checkpoint.blocks[0].body.txEffects[1].privateLogs[1] = newLog;
2392+
await store.addCheckpoints([newCheckpoint]);
2393+
await store.addLogs([newCheckpoint.checkpoint.blocks[0]]);
2394+
logsCheckpoints.push(newCheckpoint);
2395+
}
2396+
2397+
// Without filter, should return logs from block 1 and the extra blocks
2398+
const allLogs = await store.getPrivateLogsByTags([sharedTag]);
2399+
expect(allLogs[0].some(log => log.blockNumber > numBlocksForLogs)).toBe(true);
2400+
2401+
// With upToBlockNumber=numBlocksForLogs, should only return the original log from block 1
2402+
const filteredLogs = await store.getPrivateLogsByTags([sharedTag], 0, BlockNumber(numBlocksForLogs));
2403+
expect(filteredLogs[0].length).toBeGreaterThan(0);
2404+
for (const log of filteredLogs[0]) {
2405+
expect(log.blockNumber).toBeLessThanOrEqual(numBlocksForLogs);
2406+
}
2407+
expect(filteredLogs[0].length).toBeLessThan(allLogs[0].length);
2408+
});
2409+
2410+
it('returns all logs when upToBlockNumber is not set', async () => {
2411+
const tag = makePrivateLogTag(1, 2, 1);
2412+
2413+
const logsWithoutFilter = await store.getPrivateLogsByTags([tag]);
2414+
const logsWithUndefined = await store.getPrivateLogsByTags([tag], 0, undefined);
2415+
2416+
expect(logsWithoutFilter).toEqual(logsWithUndefined);
2417+
});
2418+
23502419
describe('pagination', () => {
23512420
const paginationTag = makePrivateLogTag(1, 2, 1);
23522421

@@ -2368,6 +2437,20 @@ describe('KVArchiverDataStore', () => {
23682437
}
23692438
});
23702439

2440+
it('pagination works correctly with upToBlockNumber', async () => {
2441+
// With a low upToBlockNumber, the filtered set should be smaller than MAX_LOGS_PER_TAG
2442+
const filteredPage0 = await store.getPrivateLogsByTags([paginationTag], 0, BlockNumber(5));
2443+
for (const log of filteredPage0[0]) {
2444+
expect(log.blockNumber).toBeLessThanOrEqual(5);
2445+
}
2446+
2447+
// Page 1 with the same filter should only contain remaining filtered logs
2448+
const filteredPage1 = await store.getPrivateLogsByTags([paginationTag], 1, BlockNumber(5));
2449+
for (const log of filteredPage1[0]) {
2450+
expect(log.blockNumber).toBeLessThanOrEqual(5);
2451+
}
2452+
});
2453+
23712454
it('returns first page of logs when page=0', async () => {
23722455
const logsByTags = await store.getPrivateLogsByTags([paginationTag], 0);
23732456

@@ -2535,6 +2618,32 @@ describe('KVArchiverDataStore', () => {
25352618
]);
25362619
});
25372620

2621+
it('throws on out-of-order public log insertion', async () => {
2622+
const sharedTag = makePublicLogTag(99, 0, 0);
2623+
2624+
// Create blocks 4 and 5 with the same shared tag
2625+
const prevArchive1 = logsCheckpoints[logsCheckpoints.length - 1].checkpoint.blocks[0].archive;
2626+
const checkpoint4 = await makeCheckpointWithLogs(numBlocksForLogs + 1, {
2627+
previousArchive: prevArchive1,
2628+
numTxsPerBlock,
2629+
publicLogs: { numLogsPerTx: numPublicLogsPerTx, contractAddress },
2630+
});
2631+
checkpoint4.checkpoint.blocks[0].body.txEffects[0].publicLogs[0] = makePublicLog(sharedTag, contractAddress);
2632+
2633+
const prevArchive2 = checkpoint4.checkpoint.blocks[0].archive;
2634+
const checkpoint5 = await makeCheckpointWithLogs(numBlocksForLogs + 2, {
2635+
previousArchive: prevArchive2,
2636+
numTxsPerBlock,
2637+
publicLogs: { numLogsPerTx: numPublicLogsPerTx, contractAddress },
2638+
});
2639+
checkpoint5.checkpoint.blocks[0].body.txEffects[0].publicLogs[0] = makePublicLog(sharedTag, contractAddress);
2640+
2641+
// Store block 5's logs first (higher block number), then try to store block 4's logs
2642+
// (lower block number) — this should fail.
2643+
await store.addLogs([checkpoint5.checkpoint.blocks[0]]);
2644+
await expect(store.addLogs([checkpoint4.checkpoint.blocks[0]])).rejects.toThrow(OutOfOrderLogInsertionError);
2645+
});
2646+
25382647
it('is possible to request logs for non-existing tags and determine their position', async () => {
25392648
const tags = [makePublicLogTag(99, 88, 77), makePublicLogTag(1, 1, 0)];
25402649

@@ -2553,6 +2662,52 @@ describe('KVArchiverDataStore', () => {
25532662
]);
25542663
});
25552664

2665+
it('filters logs up to specified block number', async () => {
2666+
const sharedTag = makePublicLogTag(1, 2, 1);
2667+
2668+
// Add extra blocks with logs sharing the same tag
2669+
for (let blockNum = numBlocksForLogs + 1; blockNum <= numBlocksForLogs + 2; blockNum++) {
2670+
const previousArchive = logsCheckpoints[logsCheckpoints.length - 1].checkpoint.blocks[0].archive;
2671+
const newCheckpoint = await makeCheckpointWithLogs(blockNum, {
2672+
previousArchive,
2673+
numTxsPerBlock,
2674+
publicLogs: { numLogsPerTx: numPublicLogsPerTx, contractAddress },
2675+
});
2676+
const newLog = newCheckpoint.checkpoint.blocks[0].body.txEffects[1].publicLogs[1];
2677+
newLog.fields[0] = sharedTag.value;
2678+
newCheckpoint.checkpoint.blocks[0].body.txEffects[1].publicLogs[1] = newLog;
2679+
await store.addCheckpoints([newCheckpoint]);
2680+
await store.addLogs([newCheckpoint.checkpoint.blocks[0]]);
2681+
logsCheckpoints.push(newCheckpoint);
2682+
}
2683+
2684+
// Without filter, should return logs from block 1 and the extra blocks
2685+
const allLogs = await store.getPublicLogsByTagsFromContract(contractAddress, [sharedTag]);
2686+
expect(allLogs[0].some(log => log.blockNumber > numBlocksForLogs)).toBe(true);
2687+
2688+
// With upToBlockNumber=numBlocksForLogs, should only return the original log from block 1
2689+
const filteredLogs = await store.getPublicLogsByTagsFromContract(
2690+
contractAddress,
2691+
[sharedTag],
2692+
0,
2693+
BlockNumber(numBlocksForLogs),
2694+
);
2695+
expect(filteredLogs[0].length).toBeGreaterThan(0);
2696+
for (const log of filteredLogs[0]) {
2697+
expect(log.blockNumber).toBeLessThanOrEqual(numBlocksForLogs);
2698+
}
2699+
expect(filteredLogs[0].length).toBeLessThan(allLogs[0].length);
2700+
});
2701+
2702+
it('returns all logs when upToBlockNumber is not set', async () => {
2703+
const tag = makePublicLogTag(1, 2, 1);
2704+
2705+
const logsWithoutFilter = await store.getPublicLogsByTagsFromContract(contractAddress, [tag]);
2706+
const logsWithUndefined = await store.getPublicLogsByTagsFromContract(contractAddress, [tag], 0, undefined);
2707+
2708+
expect(logsWithoutFilter).toEqual(logsWithUndefined);
2709+
});
2710+
25562711
describe('pagination', () => {
25572712
const paginationTag = makePublicLogTag(1, 2, 1);
25582713

@@ -2574,6 +2729,28 @@ describe('KVArchiverDataStore', () => {
25742729
}
25752730
});
25762731

2732+
it('pagination works correctly with upToBlockNumber', async () => {
2733+
const filteredPage0 = await store.getPublicLogsByTagsFromContract(
2734+
contractAddress,
2735+
[paginationTag],
2736+
0,
2737+
BlockNumber(5),
2738+
);
2739+
for (const log of filteredPage0[0]) {
2740+
expect(log.blockNumber).toBeLessThanOrEqual(5);
2741+
}
2742+
2743+
const filteredPage1 = await store.getPublicLogsByTagsFromContract(
2744+
contractAddress,
2745+
[paginationTag],
2746+
1,
2747+
BlockNumber(5),
2748+
);
2749+
for (const log of filteredPage1[0]) {
2750+
expect(log.blockNumber).toBeLessThanOrEqual(5);
2751+
}
2752+
});
2753+
25772754
it('returns first page of logs when page=0', async () => {
25782755
const logsByTags = await store.getPublicLogsByTagsFromContract(contractAddress, [paginationTag], 0);
25792756

yarn-project/archiver/src/store/kv_archiver_store.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -470,10 +470,11 @@ export class KVArchiverDataStore implements ContractDataSource {
470470
* array implies no logs match that tag.
471471
* @param tags - The tags to search for.
472472
* @param page - The page number (0-indexed) for pagination. Returns at most 10 logs per tag per page.
473+
* @param upToBlockNumber - If set, only return logs from blocks up to and including this block number.
473474
*/
474-
getPrivateLogsByTags(tags: SiloedTag[], page?: number): Promise<TxScopedL2Log[][]> {
475+
getPrivateLogsByTags(tags: SiloedTag[], page?: number, upToBlockNumber?: BlockNumber): Promise<TxScopedL2Log[][]> {
475476
try {
476-
return this.#logStore.getPrivateLogsByTags(tags, page);
477+
return this.#logStore.getPrivateLogsByTags(tags, page, upToBlockNumber);
477478
} catch (err) {
478479
return Promise.reject(err);
479480
}
@@ -485,14 +486,16 @@ export class KVArchiverDataStore implements ContractDataSource {
485486
* @param contractAddress - The contract address to search logs for.
486487
* @param tags - The tags to search for.
487488
* @param page - The page number (0-indexed) for pagination. Returns at most 10 logs per tag per page.
489+
* @param upToBlockNumber - If set, only return logs from blocks up to and including this block number.
488490
*/
489491
getPublicLogsByTagsFromContract(
490492
contractAddress: AztecAddress,
491493
tags: Tag[],
492494
page?: number,
495+
upToBlockNumber?: BlockNumber,
493496
): Promise<TxScopedL2Log[][]> {
494497
try {
495-
return this.#logStore.getPublicLogsByTagsFromContract(contractAddress, tags, page);
498+
return this.#logStore.getPublicLogsByTagsFromContract(contractAddress, tags, page, upToBlockNumber);
496499
} catch (err) {
497500
return Promise.reject(err);
498501
}

0 commit comments

Comments
 (0)