From 4234fb831c42faa51a8232359f0ef91b265bce7f Mon Sep 17 00:00:00 2001 From: Nicolas Chamo Date: Thu, 7 May 2026 13:09:49 -0300 Subject: [PATCH] refactor(pxe): batch tagged private log queries across all secrets --- .../oracle/private_execution.test.ts | 2 +- yarn-project/pxe/src/logs/log_service.ts | 38 +-- .../pxe/src/tagging/get_all_logs_by_tags.ts | 4 + yarn-project/pxe/src/tagging/index.ts | 4 +- ...ate_logs_for_sender_recipient_pair.test.ts | 189 ------------ ..._private_logs_for_sender_recipient_pair.ts | 130 --------- .../sync_tagged_private_logs.test.ts | 275 ++++++++++++++++++ .../sync_tagged_private_logs.ts | 236 +++++++++++++++ .../utils/load_logs_for_range.test.ts | 223 -------------- .../utils/load_logs_for_range.ts | 44 --- 10 files changed, 530 insertions(+), 615 deletions(-) delete mode 100644 yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.test.ts delete mode 100644 yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.ts create mode 100644 yarn-project/pxe/src/tagging/recipient_sync/sync_tagged_private_logs.test.ts create mode 100644 yarn-project/pxe/src/tagging/recipient_sync/sync_tagged_private_logs.ts delete mode 100644 yarn-project/pxe/src/tagging/recipient_sync/utils/load_logs_for_range.test.ts delete mode 100644 yarn-project/pxe/src/tagging/recipient_sync/utils/load_logs_for_range.ts diff --git a/yarn-project/pxe/src/contract_function_simulator/oracle/private_execution.test.ts b/yarn-project/pxe/src/contract_function_simulator/oracle/private_execution.test.ts index 3abc14adf645..a29e0a35197f 100644 --- a/yarn-project/pxe/src/contract_function_simulator/oracle/private_execution.test.ts +++ b/yarn-project/pxe/src/contract_function_simulator/oracle/private_execution.test.ts @@ -325,7 +325,7 @@ describe('Private Execution test suite', () => { // on the input. aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[]) => Promise.resolve(tags.map(() => []))); - // Mock getL2Tips and getBlockHeader for loadPrivateLogsForSenderRecipientPair + // Mock getL2Tips and getBlockHeader for syncTaggedPrivateLogs l2TipsStore.getL2Tips.mockResolvedValue(makeL2Tips(anchorBlockHeader.globalVariables.blockNumber)); // TODO: refactor. Maybe it's worth stubbing a key store diff --git a/yarn-project/pxe/src/logs/log_service.ts b/yarn-project/pxe/src/logs/log_service.ts index 6f8fe80bf49f..1b28eb5aed24 100644 --- a/yarn-project/pxe/src/logs/log_service.ts +++ b/yarn-project/pxe/src/logs/log_service.ts @@ -14,7 +14,7 @@ import type { SenderAddressBookStore } from '../storage/tagging_store/sender_add import { getAllPrivateLogsByTags, getAllPublicLogsByTagsFromContract, - loadPrivateLogsForSenderRecipientPair, + syncTaggedPrivateLogs, } from '../tagging/index.js'; export class LogService { @@ -116,37 +116,23 @@ export class LogService { public async fetchTaggedLogs(contractAddress: AztecAddress, recipient: AztecAddress): Promise { this.log.verbose(`Fetching tagged logs for ${contractAddress.toString()}`); - // We only load logs from block up to and including the anchor block number - const anchorBlockNumber = this.anchorBlockHeader.getBlockNumber(); - const anchorBlockHash = await this.anchorBlockHeader.hash(); - const l2Tips = await this.l2TipsStore.getL2Tips(); - const currentTimestamp = this.anchorBlockHeader.globalVariables.timestamp; // Get all secrets for this recipient (one per sender) const secrets = await this.#getSecretsForSenders(contractAddress, recipient); - // Load logs for all sender-recipient pairs in parallel - const logArrays = await Promise.all( - secrets.map(secret => - loadPrivateLogsForSenderRecipientPair( - secret, - this.aztecNode, - this.recipientTaggingStore, - anchorBlockNumber, - anchorBlockHash, - currentTimestamp, - l2Tips.finalized.block.number, - this.jobId, - ), - ), + const logs = await syncTaggedPrivateLogs( + secrets, + this.aztecNode, + this.recipientTaggingStore, + this.anchorBlockHeader, + l2Tips.finalized.block.number, + this.jobId, ); - return logArrays - .flat() - .map( - scopedLog => - new PendingTaggedLog(scopedLog.logData, scopedLog.txHash, scopedLog.noteHashes, scopedLog.firstNullifier), - ); + return logs.map( + scopedLog => + new PendingTaggedLog(scopedLog.logData, scopedLog.txHash, scopedLog.noteHashes, scopedLog.firstNullifier), + ); } async #getSecretsForSenders( diff --git a/yarn-project/pxe/src/tagging/get_all_logs_by_tags.ts b/yarn-project/pxe/src/tagging/get_all_logs_by_tags.ts index fca567621567..a3c832d2a25b 100644 --- a/yarn-project/pxe/src/tagging/get_all_logs_by_tags.ts +++ b/yarn-project/pxe/src/tagging/get_all_logs_by_tags.ts @@ -39,6 +39,10 @@ async function getAllPagesInBatches( tags: Tag[], fetchAllPagesForBatch: (batch: Tag[]) => Promise, ): Promise { + if (tags.length === 0) { + return []; + } + if (tags.length <= MAX_RPC_LEN) { return fetchAllPagesForBatch(tags); } diff --git a/yarn-project/pxe/src/tagging/index.ts b/yarn-project/pxe/src/tagging/index.ts index 6b812a8f0a47..7ac229dbaebf 100644 --- a/yarn-project/pxe/src/tagging/index.ts +++ b/yarn-project/pxe/src/tagging/index.ts @@ -4,12 +4,12 @@ * The objective of the sender sync algorithm is to determine which tags have already been used by a sender, thereby * deciding which tag should be used next. * - * The objective of the recipient sync algorithm is to load and process the corresponding logs. + * The objective of the recipient sync algorithm is to fetch and sync the corresponding logs. * * @module tagging */ -export { loadPrivateLogsForSenderRecipientPair } from './recipient_sync/load_private_logs_for_sender_recipient_pair.js'; +export { syncTaggedPrivateLogs } from './recipient_sync/sync_tagged_private_logs.js'; export { syncSenderTaggingIndexes } from './sender_sync/sync_sender_tagging_indexes.js'; export { UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN } from './constants.js'; export { getAllPrivateLogsByTags, getAllPublicLogsByTagsFromContract } from './get_all_logs_by_tags.js'; diff --git a/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.test.ts b/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.test.ts deleted file mode 100644 index 5a8838e4b566..000000000000 --- a/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.test.ts +++ /dev/null @@ -1,189 +0,0 @@ -import { MAX_TX_LIFETIME } from '@aztec/constants'; -import { BlockNumber } from '@aztec/foundation/branded-types'; -import { Fr } from '@aztec/foundation/curves/bn254'; -import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; -import { BlockHash } from '@aztec/stdlib/block'; -import type { AztecNode } from '@aztec/stdlib/interfaces/server'; -import { type ExtendedDirectionalAppTaggingSecret, SiloedTag } from '@aztec/stdlib/logs'; -import { randomExtendedDirectionalAppTaggingSecret, randomTxScopedPrivateL2Log } from '@aztec/stdlib/testing'; - -import { type MockProxy, mock } from 'jest-mock-extended'; - -import { RecipientTaggingStore } from '../../storage/tagging_store/recipient_tagging_store.js'; -import { UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN } from '../index.js'; -import { loadPrivateLogsForSenderRecipientPair } from './load_private_logs_for_sender_recipient_pair.js'; - -// In this test suite we don't care about the anchor block behavior as that is sufficiently tested by -// the loadLogsForRange test suite, so we use a high block number to ensure it occurs after all logs. -const FAR_FUTURE_BLOCK_NUMBER = BlockNumber(100); -const MOCK_ANCHOR_BLOCK_HASH = BlockHash.random(); - -describe('loadPrivateLogsForSenderRecipientPair', () => { - let secret: ExtendedDirectionalAppTaggingSecret; - - let aztecNode: MockProxy; - let taggingStore: RecipientTaggingStore; - - const currentTimestamp = BigInt(Math.floor(Date.now() / 1000)); - - function computeSiloedTagForIndex(index: number) { - return SiloedTag.compute({ extendedSecret: secret, index }); - } - - function makeLog(blockNumber: number, blockTimestamp: bigint, tag: Fr) { - return randomTxScopedPrivateL2Log({ blockNumber, blockTimestamp, tag }); - } - - beforeAll(async () => { - secret = await randomExtendedDirectionalAppTaggingSecret(); - aztecNode = mock(); - }); - - beforeEach(async () => { - aztecNode.getPrivateLogsByTags.mockReset(); - taggingStore = new RecipientTaggingStore(await openTmpStore('test')); - }); - - it('returns empty array when no logs found', async () => { - const finalizedBlockNumber = BlockNumber(10); - - // no logs found for any tag - aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[]) => { - return Promise.resolve(tags.map((_tag: SiloedTag) => [])); - }); - - const logs = await loadPrivateLogsForSenderRecipientPair( - secret, - aztecNode, - taggingStore, - FAR_FUTURE_BLOCK_NUMBER, - MOCK_ANCHOR_BLOCK_HASH, - currentTimestamp, - finalizedBlockNumber, - 'test', - ); - - expect(logs).toHaveLength(0); - expect(await taggingStore.getHighestAgedIndex(secret, 'test')).toBeUndefined(); - expect(await taggingStore.getHighestFinalizedIndex(secret, 'test')).toBeUndefined(); - }); - - it('loads log and updates highest finalized index but not highest aged index', async () => { - const finalizedBlockNumber = BlockNumber(10); - - const logBlockTimestamp = currentTimestamp - 5000n; // not aged - const logIndex = 5; - const logTag = await computeSiloedTagForIndex(logIndex); - - // The log is finalized - aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[]) => { - return Promise.resolve( - tags.map((t: SiloedTag) => - t.equals(logTag) ? [makeLog(Number(finalizedBlockNumber), logBlockTimestamp, logTag.value)] : [], - ), - ); - }); - - const logs = await loadPrivateLogsForSenderRecipientPair( - secret, - aztecNode, - taggingStore, - FAR_FUTURE_BLOCK_NUMBER, - MOCK_ANCHOR_BLOCK_HASH, - currentTimestamp, - finalizedBlockNumber, - 'test', - ); - - expect(logs).toHaveLength(1); - expect(await taggingStore.getHighestFinalizedIndex(secret, 'test')).toBe(logIndex); - expect(await taggingStore.getHighestAgedIndex(secret, 'test')).toBeUndefined(); - }); - - it('loads log and updates both highest aged and highest finalized indexes', async () => { - const finalizedBlockNumber = BlockNumber(10); - - const logBlockTimestamp = currentTimestamp - BigInt(MAX_TX_LIFETIME) - 1000n; // aged - const logIndex = 7; - const logTag = await computeSiloedTagForIndex(logIndex); - - // The log is finalized - aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[]) => { - return Promise.resolve( - tags.map((t: SiloedTag) => - t.equals(logTag) ? [makeLog(Number(finalizedBlockNumber), logBlockTimestamp, logTag.value)] : [], - ), - ); - }); - - const logs = await loadPrivateLogsForSenderRecipientPair( - secret, - aztecNode, - taggingStore, - FAR_FUTURE_BLOCK_NUMBER, - MOCK_ANCHOR_BLOCK_HASH, - currentTimestamp, - finalizedBlockNumber, - 'test', - ); - - expect(logs).toHaveLength(1); - expect(await taggingStore.getHighestAgedIndex(secret, 'test')).toBe(logIndex); - expect(await taggingStore.getHighestFinalizedIndex(secret, 'test')).toBe(logIndex); - }); - - it('logs at boundaries are properly loaded, window and highest indexes advance as expected', async () => { - const finalizedBlockNumber = BlockNumber(10); - - const log1BlockTimestamp = currentTimestamp - BigInt(MAX_TX_LIFETIME) - 1000n; // Aged - const log2BlockTimestamp = currentTimestamp - 5000n; // Not aged - const highestAgedIndex = 3; - const highestFinalizedIndex = 5; - const log1Index = highestAgedIndex + 1; // At the beginning of the range - const log2Index = highestFinalizedIndex + UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN; // At the window boundary - const log1Tag = await computeSiloedTagForIndex(log1Index); - const log2Tag = await computeSiloedTagForIndex(log2Index); - - // Set existing highest aged index and highest finalized index - await taggingStore.updateHighestAgedIndex(secret, highestAgedIndex, 'test'); - await taggingStore.updateHighestFinalizedIndex(secret, highestFinalizedIndex, 'test'); - - // We record the number of queried tags to be able to verify that the window was moved forward correctly. - let numQueriedTags = 0; - - aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[]) => { - numQueriedTags += tags.length; - return Promise.resolve( - tags.map((t: SiloedTag) => { - if (t.equals(log1Tag)) { - return [makeLog(Number(finalizedBlockNumber), log1BlockTimestamp, log1Tag.value)]; - } else if (t.equals(log2Tag)) { - return [makeLog(Number(finalizedBlockNumber), log2BlockTimestamp, log2Tag.value)]; - } - return []; - }), - ); - }); - - const logs = await loadPrivateLogsForSenderRecipientPair( - secret, - aztecNode, - taggingStore, - FAR_FUTURE_BLOCK_NUMBER, - MOCK_ANCHOR_BLOCK_HASH, - currentTimestamp, - finalizedBlockNumber, - 'test', - ); - - // Verify that both logs at the boundaries of the range were found and processed - expect(logs).toHaveLength(2); - expect(await taggingStore.getHighestFinalizedIndex(secret, 'test')).toBe(log2Index); - expect(await taggingStore.getHighestAgedIndex(secret, 'test')).toBe(log1Index); - - // Verify that the window was moved forward correctly - // Total range queried: from (highestAgedIndex + 1) to (log2Index + WINDOW_LEN + 1) exclusive - const expectedNumQueriedTags = log2Index + UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN - highestAgedIndex; - expect(numQueriedTags).toBe(expectedNumQueriedTags); - }); -}); diff --git a/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.ts b/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.ts deleted file mode 100644 index feed4773fa54..000000000000 --- a/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.ts +++ /dev/null @@ -1,130 +0,0 @@ -import type { BlockNumber } from '@aztec/foundation/branded-types'; -import type { BlockHash } from '@aztec/stdlib/block'; -import type { AztecNode } from '@aztec/stdlib/interfaces/client'; -import type { ExtendedDirectionalAppTaggingSecret, TxScopedL2Log } from '@aztec/stdlib/logs'; - -import type { RecipientTaggingStore } from '../../storage/tagging_store/recipient_tagging_store.js'; -import { UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN } from '../constants.js'; -import { findHighestIndexes } from './utils/find_highest_indexes.js'; -import { loadLogsForRange } from './utils/load_logs_for_range.js'; - -/** - * Loads private logs for the app-sender-recipient triplet defined by `secret` and updates the highest aged and - * finalized indexes in the db. At most load logs from blocks up to and including `anchorBlockNumber`. - * - * @dev This function can be safely executed "in parallel" for other sender-recipient pairs because the data in - * in the tagging data provider is indexed by the secret and hence completely disjoint. - */ -export async function loadPrivateLogsForSenderRecipientPair( - secret: ExtendedDirectionalAppTaggingSecret, - aztecNode: AztecNode, - taggingStore: RecipientTaggingStore, - anchorBlockNumber: BlockNumber, - anchorBlockHash: BlockHash, - currentTimestamp: bigint, - finalizedBlockNumber: BlockNumber, - jobId: string, -): Promise { - // # Explanation of how the algorithm works - // When we perform the sync we will look at logs that correspond to the tagging index range - // (highestAgedIndex, highestFinalizedIndex + WINDOW_LEN] - // - // highestAgedIndex is the highest index that was used in a tx that is included in a block at least - // `MAX_TX_LIFETIME` seconds ago. - // highestFinalizedIndex is the highest index that was used in a tx that is included in a finalized block. - // - // "(" denotes an open end of the range - the index is not included in the range. - // "]" denotes a closed end of the range - the index is included in the range. - // - // ## Explanation of highestAgedIndex - // - // highestAgedIndex is chosen such that for all tagging indexes `i <= highestAgedIndex` we know that no new logs can - // ever appear. - // - // This relies on the "maximum inclusion timestamp" rule enforced by the kernel and rollup circuits: - // - a transaction's maximum inclusion timestamp is at most `MAX_TX_LIFETIME` seconds after - // the timestamp of its anchor block; and - // - a rollup only includes transactions whose inclusion timestamp is >= the L2 block's timestamp. - // - // Suppose some device used index `I` in a transaction anchored to block `B_N` at time `N`, and that block is now at - // least `MAX_TX_LIFETIME` seconds in the past. Then there is no possibility of any *other* device - // trying to use an index <= `I` while anchoring to a *newer* block than `B_N` because if we were anchoring to - // a newer block than `B_N` then we would already have seen the log with index `I` and hence the device would have - // chosen a larger index. - // If that *other* device would anchor to a block older than `B_N` then that tx could never be included in a block - // because it would already have been expired. - // - // Therefore, once we see that index `I` has been used in a block that is at least `MAX_TX_LIFETIME` - // seconds old, we can safely stop syncing logs for all indexes <= `I` and set highestAgedIndex = `I`. - // - // ## Explanation of the upper bound `highestFinalizedIndex + WINDOW_LEN` - // - // When a sender chooses a tagging index, they will select an index that is at most `WINDOW_LEN` greater than - // the highest finalized index. If that index was already used, they will throw an error. For this reason we - // don't have to look further than `highestFinalizedIndex + WINDOW_LEN`. - - let start: number, end: number; - { - const currentHighestAgedIndex = await taggingStore.getHighestAgedIndex(secret, jobId); - const currentHighestFinalizedIndex = await taggingStore.getHighestFinalizedIndex(secret, jobId); - - // We don't want to include the highest aged index so we start from `currentHighestAgedIndex + 1` (or 0 if not set) - start = currentHighestAgedIndex === undefined ? 0 : currentHighestAgedIndex + 1; - - // The highest index a sender can choose is "highest finalized index + window length" but given that - // `loadLogsForRange` expects an exclusive `end` we add 1. - end = (currentHighestFinalizedIndex ?? 0) + UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN + 1; - } - - const logs: TxScopedL2Log[] = []; - - while (true) { - // Get private logs with their block timestamps and corresponding tagging indexes - const privateLogsWithIndexes = await loadLogsForRange( - secret, - aztecNode, - start, - end, - anchorBlockNumber, - anchorBlockHash, - ); - - if (privateLogsWithIndexes.length === 0) { - break; - } - - logs.push(...privateLogsWithIndexes.map(({ log }) => log)); - - const { highestAgedIndex, highestFinalizedIndex } = findHighestIndexes( - privateLogsWithIndexes, - currentTimestamp, - finalizedBlockNumber, - ); - - // Store updates in data provider and update local variables - if (highestAgedIndex !== undefined) { - await taggingStore.updateHighestAgedIndex(secret, highestAgedIndex, jobId); - } - - if (highestFinalizedIndex === undefined) { - // We have not found a new highest finalized index, so there is no need to move the window forward. - break; - } - - if (highestAgedIndex !== undefined && highestAgedIndex > highestFinalizedIndex) { - // This is just a sanity check as this should never happen. - throw new Error( - `Highest aged index (${highestAgedIndex}) must not exceed highest finalized index (${highestFinalizedIndex})`, - ); - } - - await taggingStore.updateHighestFinalizedIndex(secret, highestFinalizedIndex, jobId); - - // For the next iteration we want to look only at indexes for which we have not attempted to load logs yet while - // ensuring that we do not look further than WINDOW_LEN ahead of the highest finalized index. - start = end; - end = highestFinalizedIndex + UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN + 1; // `end` is exclusive so we add 1. - } - - return logs; -} diff --git a/yarn-project/pxe/src/tagging/recipient_sync/sync_tagged_private_logs.test.ts b/yarn-project/pxe/src/tagging/recipient_sync/sync_tagged_private_logs.test.ts new file mode 100644 index 000000000000..1a17d2c552e0 --- /dev/null +++ b/yarn-project/pxe/src/tagging/recipient_sync/sync_tagged_private_logs.test.ts @@ -0,0 +1,275 @@ +import { MAX_TX_LIFETIME } from '@aztec/constants'; +import { BlockNumber } from '@aztec/foundation/branded-types'; +import { Fr } from '@aztec/foundation/curves/bn254'; +import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; +import type { AztecNode } from '@aztec/stdlib/interfaces/server'; +import { type ExtendedDirectionalAppTaggingSecret, SiloedTag } from '@aztec/stdlib/logs'; +import { randomExtendedDirectionalAppTaggingSecret, randomTxScopedPrivateL2Log } from '@aztec/stdlib/testing'; +import { BlockHeader } from '@aztec/stdlib/tx'; + +import { type MockProxy, mock } from 'jest-mock-extended'; + +import { RecipientTaggingStore } from '../../storage/tagging_store/recipient_tagging_store.js'; +import { UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN, syncTaggedPrivateLogs } from '../index.js'; + +const FAR_FUTURE_BLOCK_NUMBER = BlockNumber(100); +const CURRENT_TIMESTAMP = BigInt(Math.floor(Date.now() / 1000)); +const ANCHOR_BLOCK_HEADER = BlockHeader.random({ blockNumber: FAR_FUTURE_BLOCK_NUMBER, timestamp: CURRENT_TIMESTAMP }); +const JOB_ID = 'test-job'; + +describe('syncTaggedPrivateLogs', () => { + const aztecNode: MockProxy = mock(); + let taggingStore: RecipientTaggingStore; + + function computeSiloedTagForIndex(secret: ExtendedDirectionalAppTaggingSecret, index: number) { + return SiloedTag.compute({ extendedSecret: secret, index }); + } + + function makeLog(blockNumber: number, blockTimestamp: bigint, tag: Fr) { + return randomTxScopedPrivateL2Log({ blockNumber, blockTimestamp, tag }); + } + + beforeEach(async () => { + aztecNode.getPrivateLogsByTags.mockReset(); + taggingStore = new RecipientTaggingStore(await openTmpStore('test')); + }); + + it('returns empty array when given no secrets', async () => { + const logs = await syncTaggedPrivateLogs([], aztecNode, taggingStore, ANCHOR_BLOCK_HEADER, BlockNumber(10), JOB_ID); + + expect(logs).toHaveLength(0); + expect(aztecNode.getPrivateLogsByTags).not.toHaveBeenCalled(); + }); + + it('returns empty array when no logs found for any secret', async () => { + const secrets = await makeSecrets(3); + aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[]) => { + return Promise.resolve(tags.map(() => [])); + }); + + const logs = await syncTaggedPrivateLogs( + secrets, + aztecNode, + taggingStore, + ANCHOR_BLOCK_HEADER, + BlockNumber(10), + JOB_ID, + ); + + expect(logs).toHaveLength(0); + }); + + it('batches tags from multiple secrets into a single RPC call', async () => { + const secrets = await makeSecrets(3); + const finalizedBlockNumber = BlockNumber(10); + + aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[]) => { + return Promise.resolve(tags.map(() => [])); + }); + + await syncTaggedPrivateLogs(secrets, aztecNode, taggingStore, ANCHOR_BLOCK_HEADER, finalizedBlockNumber, JOB_ID); + + expect(aztecNode.getPrivateLogsByTags).toHaveBeenCalledTimes(1); + }); + + it('syncs logs and updates store independently per secret', async () => { + const secrets = await makeSecrets(3); + const finalizedBlockNumber = BlockNumber(10); + const logBlockTimestamp = CURRENT_TIMESTAMP - BigInt(MAX_TX_LIFETIME) - 1000n; + + const log1Index = 3; + const log2Index = 7; + const log1Tag = await computeSiloedTagForIndex(secrets[0], log1Index); + const log2Tag = await computeSiloedTagForIndex(secrets[1], log2Index); + + aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[]) => { + return Promise.resolve( + tags.map((t: SiloedTag) => { + if (t.equals(log1Tag)) { + return [makeLog(Number(finalizedBlockNumber), logBlockTimestamp, log1Tag.value)]; + } else if (t.equals(log2Tag)) { + return [makeLog(Number(finalizedBlockNumber), logBlockTimestamp, log2Tag.value)]; + } + return []; + }), + ); + }); + + const logs = await syncTaggedPrivateLogs( + secrets, + aztecNode, + taggingStore, + ANCHOR_BLOCK_HEADER, + finalizedBlockNumber, + JOB_ID, + ); + + expect(logs).toHaveLength(2); + expect(await taggingStore.getHighestAgedIndex(secrets[0], JOB_ID)).toBe(log1Index); + expect(await taggingStore.getHighestFinalizedIndex(secrets[0], JOB_ID)).toBe(log1Index); + expect(await taggingStore.getHighestAgedIndex(secrets[1], JOB_ID)).toBe(log2Index); + expect(await taggingStore.getHighestFinalizedIndex(secrets[1], JOB_ID)).toBe(log2Index); + // secrets[2] found nothing, so its store must be untouched + expect(await taggingStore.getHighestAgedIndex(secrets[2], JOB_ID)).toBeUndefined(); + expect(await taggingStore.getHighestFinalizedIndex(secrets[2], JOB_ID)).toBeUndefined(); + }); + + it('does not advance aged index for recent logs', async () => { + const [secret] = await makeSecrets(1); + const finalizedBlockNumber = BlockNumber(10); + const logBlockTimestamp = CURRENT_TIMESTAMP - 5000n; // not aged + + const logIndex = 5; + const logTag = await computeSiloedTagForIndex(secret, logIndex); + + aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[]) => { + return Promise.resolve( + tags.map((t: SiloedTag) => + t.equals(logTag) ? [makeLog(Number(finalizedBlockNumber), logBlockTimestamp, logTag.value)] : [], + ), + ); + }); + + await syncTaggedPrivateLogs([secret], aztecNode, taggingStore, ANCHOR_BLOCK_HEADER, finalizedBlockNumber, JOB_ID); + + expect(await taggingStore.getHighestFinalizedIndex(secret, JOB_ID)).toBe(logIndex); + expect(await taggingStore.getHighestAgedIndex(secret, JOB_ID)).toBeUndefined(); + }); + + it('updates store correctly when multiple iterations are needed', async () => { + const [secret] = await makeSecrets(1); + const finalizedBlockNumber = BlockNumber(10); + const agedBlockTimestamp = CURRENT_TIMESTAMP - BigInt(MAX_TX_LIFETIME) - 1000n; + + // A log at the last index of the initial window [0, WINDOW_LEN] moves the finalized index to WINDOW_LEN, + // which shifts the next window forward and triggers a second iteration. + const lastIndexInInitialWindow = UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN; + const log1Tag = await computeSiloedTagForIndex(secret, lastIndexInInitialWindow); + + // A second log sits in the advanced window, only reachable in the second iteration. + const newWindowIndex = lastIndexInInitialWindow + 3; + const log2Tag = await computeSiloedTagForIndex(secret, newWindowIndex); + + aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[]) => { + return Promise.resolve( + tags.map((t: SiloedTag) => { + if (t.equals(log1Tag)) { + return [makeLog(Number(finalizedBlockNumber), agedBlockTimestamp, log1Tag.value)]; + } else if (t.equals(log2Tag)) { + return [makeLog(Number(finalizedBlockNumber), agedBlockTimestamp, log2Tag.value)]; + } + return []; + }), + ); + }); + + const logs = await syncTaggedPrivateLogs( + [secret], + aztecNode, + taggingStore, + ANCHOR_BLOCK_HEADER, + finalizedBlockNumber, + JOB_ID, + ); + + expect(logs).toHaveLength(2); + expect(await taggingStore.getHighestAgedIndex(secret, JOB_ID)).toBe(newWindowIndex); + expect(await taggingStore.getHighestFinalizedIndex(secret, JOB_ID)).toBe(newWindowIndex); + }); + + it('respects pre-existing store indexes', async () => { + const [secret] = await makeSecrets(1); + const finalizedBlockNumber = BlockNumber(10); + + const existingAgedIndex = 5; + const existingFinalizedIndex = 8; + await taggingStore.updateHighestAgedIndex(secret, existingAgedIndex, JOB_ID); + await taggingStore.updateHighestFinalizedIndex(secret, existingFinalizedIndex, JOB_ID); + + aztecNode.getPrivateLogsByTags.mockResolvedValue([]); + + await syncTaggedPrivateLogs([secret], aztecNode, taggingStore, ANCHOR_BLOCK_HEADER, finalizedBlockNumber, JOB_ID); + + const calledTags = aztecNode.getPrivateLogsByTags.mock.calls[0][0]; + + // The query window must start at existingAgedIndex+1 and end at existingFinalizedIndex+WINDOW_LEN (inclusive). + const expectedStart = existingAgedIndex + 1; + const expectedEnd = existingFinalizedIndex + UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN; + const expectedTags = await Promise.all( + Array.from({ length: expectedEnd - expectedStart + 1 }, (_, i) => + computeSiloedTagForIndex(secret, expectedStart + i), + ), + ); + + expect(calledTags).toEqual(expectedTags); + }); + + it('handles multiple logs at the same tag index', async () => { + const [secret] = await makeSecrets(1); + const finalizedBlockNumber = BlockNumber(10); + const logBlockTimestamp = CURRENT_TIMESTAMP - BigInt(MAX_TX_LIFETIME) - 1000n; + + const logIndex = 3; + const logTag = await computeSiloedTagForIndex(secret, logIndex); + + // Two logs returned for the same tag + aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[]) => { + return Promise.resolve( + tags.map((t: SiloedTag) => + t.equals(logTag) + ? [ + makeLog(Number(finalizedBlockNumber), logBlockTimestamp, logTag.value), + makeLog(Number(finalizedBlockNumber), logBlockTimestamp, logTag.value), + ] + : [], + ), + ); + }); + + const logs = await syncTaggedPrivateLogs( + [secret], + aztecNode, + taggingStore, + ANCHOR_BLOCK_HEADER, + finalizedBlockNumber, + JOB_ID, + ); + + expect(logs).toHaveLength(2); + }); + + it('filters out logs from blocks after the anchor block', async () => { + const [secret] = await makeSecrets(1); + const anchorBlock = BlockNumber(10); + const header = BlockHeader.random({ blockNumber: anchorBlock, timestamp: CURRENT_TIMESTAMP }); + const finalizedBlockNumber = BlockNumber(10); + const logBlockTimestamp = CURRENT_TIMESTAMP - BigInt(MAX_TX_LIFETIME) - 1000n; + + const logIndex = 3; + const logTag = await computeSiloedTagForIndex(secret, logIndex); + + // Three logs: one before anchor, one at anchor, one after anchor + aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[]) => { + return Promise.resolve( + tags.map((t: SiloedTag) => + t.equals(logTag) + ? [ + makeLog(Number(anchorBlock) - 1, logBlockTimestamp, logTag.value), + makeLog(Number(anchorBlock), logBlockTimestamp, logTag.value), + makeLog(Number(anchorBlock) + 1, logBlockTimestamp, logTag.value), + ] + : [], + ), + ); + }); + + const logs = await syncTaggedPrivateLogs([secret], aztecNode, taggingStore, header, finalizedBlockNumber, JOB_ID); + + // Only logs at or before the anchor block should be included + expect(logs).toHaveLength(2); + }); +}); + +function makeSecrets(count: number): Promise { + return Promise.all(Array.from({ length: count }, () => randomExtendedDirectionalAppTaggingSecret())); +} diff --git a/yarn-project/pxe/src/tagging/recipient_sync/sync_tagged_private_logs.ts b/yarn-project/pxe/src/tagging/recipient_sync/sync_tagged_private_logs.ts new file mode 100644 index 000000000000..6e904ad6c012 --- /dev/null +++ b/yarn-project/pxe/src/tagging/recipient_sync/sync_tagged_private_logs.ts @@ -0,0 +1,236 @@ +import type { BlockNumber } from '@aztec/foundation/branded-types'; +import { isDefined } from '@aztec/foundation/types'; +import type { BlockHash } from '@aztec/stdlib/block'; +import type { AztecNode } from '@aztec/stdlib/interfaces/client'; +import type { ExtendedDirectionalAppTaggingSecret, TxScopedL2Log } from '@aztec/stdlib/logs'; +import { SiloedTag } from '@aztec/stdlib/logs'; +import type { BlockHeader } from '@aztec/stdlib/tx'; + +import type { RecipientTaggingStore } from '../../storage/tagging_store/recipient_tagging_store.js'; +import { UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN } from '../constants.js'; +import { getAllPrivateLogsByTags } from '../get_all_logs_by_tags.js'; +import { findHighestIndexes } from './utils/find_highest_indexes.js'; + +/** + * Fetches and syncs tagged private logs for multiple sender-recipient pairs, batching tag queries across all secrets + * into shared RPC calls. + * + * # Explanation of how the algorithm works + * + * For each secret we sync logs that correspond to the tagging index range + * (highestAgedIndex, highestFinalizedIndex + WINDOW_LEN] + * + * highestAgedIndex is the highest index that was used in a tx that is included in a block at least + * `MAX_TX_LIFETIME` seconds ago. + * highestFinalizedIndex is the highest index that was used in a tx that is included in a finalized block. + * + * "(" denotes an open end of the range - the index is not included in the range. + * "]" denotes a closed end of the range - the index is included in the range. + * + * ## Explanation of highestAgedIndex + * + * highestAgedIndex is chosen such that for all tagging indexes `i <= highestAgedIndex` we know that no new logs can + * ever appear. + * + * This relies on the "maximum inclusion timestamp" rule enforced by the kernel and rollup circuits: + * - a transaction's maximum inclusion timestamp is at most `MAX_TX_LIFETIME` seconds after + * the timestamp of its anchor block; and + * - a rollup only includes transactions whose inclusion timestamp is >= the L2 block's timestamp. + * + * Suppose some device used index `I` in a transaction anchored to block `B_N` at time `N`, and that block is now at + * least `MAX_TX_LIFETIME` seconds in the past. Then there is no possibility of any *other* device + * trying to use an index <= `I` while anchoring to a *newer* block than `B_N` because if we were anchoring to + * a newer block than `B_N` then we would already have seen the log with index `I` and hence the device would have + * chosen a larger index. + * If that *other* device would anchor to a block older than `B_N` then that tx could never be included in a block + * because it would already have been expired. + * + * Therefore, once we see that index `I` has been used in a block that is at least `MAX_TX_LIFETIME` + * seconds old, we can safely stop syncing logs for all indexes <= `I` and set highestAgedIndex = `I`. + * + * ## Explanation of the upper bound `highestFinalizedIndex + WINDOW_LEN` + * + * When a sender chooses a tagging index, they will select an index that is at most `WINDOW_LEN` greater than + * the highest finalized index. If that index was already used, they will throw an error. For this reason we + * don't have to look further than `highestFinalizedIndex + WINDOW_LEN`. + * + * ## Batching across secrets + * + * Instead of running one RPC call per secret, we merge tags from all pending secrets into a single flat array, + * make one batched `getAllPrivateLogsByTags` call (which internally chunks at MAX_RPC_LEN), then split results + * back per secret using tracked offsets. Only secrets whose window advanced are kept for the next iteration. + */ +export async function syncTaggedPrivateLogs( + secrets: ExtendedDirectionalAppTaggingSecret[], + aztecNode: AztecNode, + taggingStore: RecipientTaggingStore, + anchorBlockHeader: BlockHeader, + finalizedBlockNumber: BlockNumber, + jobId: string, +): Promise { + if (secrets.length === 0) { + return []; + } + + const anchorBlockNumber = anchorBlockHeader.getBlockNumber(); + const anchorBlockHash = await anchorBlockHeader.hash(); + const currentTimestamp = anchorBlockHeader.globalVariables.timestamp; + + // Read stored indexes from the db and compute the initial [start, end) range for each secret + let pending = await getIndexRangesForSecrets(secrets, taggingStore, jobId); + const allLogs: TxScopedL2Log[] = []; + + while (pending.length > 0) { + // Compute tags for all pending secrets and fetch logs in batched RPC calls + const logsPerSecret = await fetchLogsForSecrets(pending, aztecNode, anchorBlockNumber, anchorBlockHash); + + const nextRound = await Promise.all( + pending.map(async (pendingSecret, i) => { + const logsFoundWithSecret = logsPerSecret[i]; + if (logsFoundWithSecret.length === 0) { + // No logs found, no need to update indexes or advance window. + return undefined; + } + + allLogs.push(...logsFoundWithSecret.map(({ log }) => log)); + + // Persist new indexes. If the finalized index moved forward, the window advances + // and we need another round for this secret. + return await updateIndexesAndAdvanceWindow( + pendingSecret, + logsFoundWithSecret, + taggingStore, + currentTimestamp, + finalizedBlockNumber, + jobId, + ); + }), + ); + + pending = nextRound.filter(isDefined); + } + + return allLogs; +} + +/** Reads stored indexes for each secret and computes the initial index range to query. */ +function getIndexRangesForSecrets( + secrets: ExtendedDirectionalAppTaggingSecret[], + taggingStore: RecipientTaggingStore, + jobId: string, +): Promise { + return Promise.all( + secrets.map(async secret => { + const [currentHighestAgedIndex, currentHighestFinalizedIndex] = await Promise.all([ + taggingStore.getHighestAgedIndex(secret, jobId), + taggingStore.getHighestFinalizedIndex(secret, jobId), + ]); + + const start = currentHighestAgedIndex === undefined ? 0 : currentHighestAgedIndex + 1; + const end = (currentHighestFinalizedIndex ?? 0) + UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN + 1; + + return { secret, start, end }; + }), + ); +} + +/** + * Computes siloed tags for all pending secrets' index ranges, fetches logs in one batched RPC call, + * and returns the results grouped back per secret. + */ +async function fetchLogsForSecrets( + pending: PendingSecret[], + aztecNode: AztecNode, + anchorBlockNumber: BlockNumber, + anchorBlockHash: BlockHash, +): Promise { + // Determine the index range for each secret + const indexesPerSecret = pending.map(({ start, end }) => Array.from({ length: end - start }, (_, i) => start + i)); + + // Compute siloed tags for all indexes + const tagsPerSecret = await Promise.all( + pending.map(({ secret }, i) => + Promise.all(indexesPerSecret[i].map(index => SiloedTag.compute({ extendedSecret: secret, index }))), + ), + ); + + const allTags = tagsPerSecret.flat(); + + // getAllPrivateLogsByTags handles MAX_RPC_LEN chunking internally + const allResults = await getAllPrivateLogsByTags(aztecNode, allTags, anchorBlockHash); + + // Split flat results back per secret using the known lengths + const logsPerSecret: LogWithIndex[][] = []; + let offset = 0; + for (const indexes of indexesPerSecret) { + const logsForSecret: LogWithIndex[] = []; + for (let i = 0; i < indexes.length; i++) { + for (const log of allResults[offset + i]) { + if (log.blockNumber <= anchorBlockNumber) { + logsForSecret.push({ log, taggingIndex: indexes[i] }); + } + } + } + logsPerSecret.push(logsForSecret); + offset += indexes.length; + } + + return logsPerSecret; +} + +/** + * Processes a single secret's fetched logs: updates stored indexes and returns a new PendingSecret + * if the window needs to advance, or undefined if this secret is done. + */ +async function updateIndexesAndAdvanceWindow( + pending: PendingSecret, + logsWithIndexes: LogWithIndex[], + taggingStore: RecipientTaggingStore, + currentTimestamp: bigint, + finalizedBlockNumber: BlockNumber, + jobId: string, +): Promise { + const { highestAgedIndex, highestFinalizedIndex } = findHighestIndexes( + logsWithIndexes, + currentTimestamp, + finalizedBlockNumber, + ); + + // Store updates in data provider and update local variables + if (highestAgedIndex !== undefined) { + await taggingStore.updateHighestAgedIndex(pending.secret, highestAgedIndex, jobId); + } + + if (highestFinalizedIndex === undefined) { + // We have not found a new highest finalized index, so there is no need to move the window forward. + return undefined; + } + + if (highestAgedIndex !== undefined && highestAgedIndex > highestFinalizedIndex) { + // This is just a sanity check as this should never happen. + throw new Error( + `Highest aged index (${highestAgedIndex}) must not exceed highest finalized index (${highestFinalizedIndex})`, + ); + } + + await taggingStore.updateHighestFinalizedIndex(pending.secret, highestFinalizedIndex, jobId); + + // For the next iteration we want to look only at indexes for which we have not yet fetched logs while + // ensuring that we do not look further than WINDOW_LEN ahead of the highest finalized index. + return { + secret: pending.secret, + start: pending.end, + end: highestFinalizedIndex + UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN + 1, + }; +} + +type PendingSecret = { + secret: ExtendedDirectionalAppTaggingSecret; + start: number; + end: number; +}; + +type LogWithIndex = { + log: TxScopedL2Log; + taggingIndex: number; +}; diff --git a/yarn-project/pxe/src/tagging/recipient_sync/utils/load_logs_for_range.test.ts b/yarn-project/pxe/src/tagging/recipient_sync/utils/load_logs_for_range.test.ts deleted file mode 100644 index 89134335968d..000000000000 --- a/yarn-project/pxe/src/tagging/recipient_sync/utils/load_logs_for_range.test.ts +++ /dev/null @@ -1,223 +0,0 @@ -import { BlockNumber } from '@aztec/foundation/branded-types'; -import { BlockHash } from '@aztec/stdlib/block'; -import type { AztecNode } from '@aztec/stdlib/interfaces/server'; -import { type ExtendedDirectionalAppTaggingSecret, SiloedTag } from '@aztec/stdlib/logs'; -import { randomExtendedDirectionalAppTaggingSecret, randomTxScopedPrivateL2Log } from '@aztec/stdlib/testing'; -import { TxHash } from '@aztec/stdlib/tx'; - -import { type MockProxy, mock } from 'jest-mock-extended'; - -import { loadLogsForRange } from './load_logs_for_range.js'; - -// In tests where the anchor block behavior is not under examination, we use a high block number to ensure it occurs -// after all logs. -const FAR_FUTURE_BLOCK_NUMBER = BlockNumber(100); -const MOCK_ANCHOR_BLOCK_HASH = BlockHash.random(); - -describe('loadLogsForRange', () => { - // App contract address and secret to be used on the input of the loadLogsForRange function. - let secret: ExtendedDirectionalAppTaggingSecret; - - let aztecNode: MockProxy; - - function computeSiloedTagForIndex(index: number) { - return SiloedTag.compute({ extendedSecret: secret, index }); - } - - function makeLog(txHash: TxHash, blockNumber: number, blockTimestamp: bigint, tag: SiloedTag) { - return randomTxScopedPrivateL2Log({ txHash, blockNumber, blockTimestamp, tag: tag.value }); - } - - beforeAll(async () => { - secret = await randomExtendedDirectionalAppTaggingSecret(); - aztecNode = mock(); - }); - - beforeEach(() => { - aztecNode.getPrivateLogsByTags.mockReset(); - }); - - it('returns empty array when no logs found for the given window', async () => { - aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[]) => { - // No log found for any tag - return Promise.resolve(tags.map((_tag: SiloedTag) => [])); - }); - - expect( - await loadLogsForRange(secret, aztecNode, 0, 10, FAR_FUTURE_BLOCK_NUMBER, MOCK_ANCHOR_BLOCK_HASH), - ).toHaveLength(0); - }); - - it('handles multiple logs at different indexes', async () => { - const txHash1 = TxHash.random(); - const txHash2 = TxHash.random(); - const blockNumber1 = 5; - const blockNumber2 = 6; - const index1 = 2; - const index2 = 7; - const timestamp1 = 1000n; - const timestamp2 = 2000n; - const tag1 = await computeSiloedTagForIndex(index1); - const tag2 = await computeSiloedTagForIndex(index2); - - aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[]) => { - return Promise.all( - tags.map((t: SiloedTag) => { - if (t.equals(tag1)) { - return [makeLog(txHash1, blockNumber1, timestamp1, tag1)]; - } else if (t.equals(tag2)) { - return [makeLog(txHash2, blockNumber2, timestamp2, tag2)]; - } - return []; - }), - ); - }); - - const result = await loadLogsForRange(secret, aztecNode, 0, 10, FAR_FUTURE_BLOCK_NUMBER, MOCK_ANCHOR_BLOCK_HASH); - - expect(result).toHaveLength(2); - const resultByIndex = result.sort((a, b) => a.taggingIndex - b.taggingIndex); - expect(resultByIndex[0].taggingIndex).toBe(index1); - expect(resultByIndex[0].log.blockTimestamp).toBe(timestamp1); - expect(resultByIndex[0].log.txHash.equals(txHash1)).toBe(true); - expect(resultByIndex[1].taggingIndex).toBe(index2); - expect(resultByIndex[1].log.blockTimestamp).toBe(timestamp2); - expect(resultByIndex[1].log.txHash.equals(txHash2)).toBe(true); - }); - - it('handles multiple logs at the same index', async () => { - const txHash1 = TxHash.random(); - const txHash2 = TxHash.random(); - const blockNumber1 = 5; - const blockNumber2 = 6; - const index = 4; - const timestamp1 = 1000n; - const timestamp2 = 2000n; - const tag = await computeSiloedTagForIndex(index); - - aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[]) => { - return Promise.resolve( - tags.map((t: SiloedTag) => - t.equals(tag) - ? [makeLog(txHash1, blockNumber1, timestamp1, tag), makeLog(txHash2, blockNumber2, timestamp2, tag)] - : [], - ), - ); - }); - - const result = await loadLogsForRange(secret, aztecNode, 0, 10, FAR_FUTURE_BLOCK_NUMBER, MOCK_ANCHOR_BLOCK_HASH); - - expect(result).toHaveLength(2); - expect(result[0].taggingIndex).toBe(index); - expect(result[1].taggingIndex).toBe(index); - const txHashes = result.map(r => r.log.txHash.toString()); - expect(txHashes).toContain(txHash1.toString()); - expect(txHashes).toContain(txHash2.toString()); - }); - - it('handles multiple logs in the same block', async () => { - const txHash1 = TxHash.random(); - const txHash2 = TxHash.random(); - const blockNumber = 5; - const index1 = 2; - const index2 = 3; - const timestamp = 1000n; - const tag1 = await computeSiloedTagForIndex(index1); - const tag2 = await computeSiloedTagForIndex(index2); - - aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[]) => { - return Promise.resolve( - tags.map((t: SiloedTag) => { - if (t.equals(tag1)) { - return [makeLog(txHash1, blockNumber, timestamp, tag1)]; - } else if (t.equals(tag2)) { - return [makeLog(txHash2, blockNumber, timestamp, tag2)]; - } - return []; - }), - ); - }); - - const result = await loadLogsForRange(secret, aztecNode, 0, 10, FAR_FUTURE_BLOCK_NUMBER, MOCK_ANCHOR_BLOCK_HASH); - - expect(result).toHaveLength(2); - - const resultByIndex = result.sort((a, b) => a.taggingIndex - b.taggingIndex); - expect(resultByIndex[0].taggingIndex).toBe(index1); - expect(resultByIndex[0].log.blockTimestamp).toBe(timestamp); - expect(resultByIndex[1].taggingIndex).toBe(index2); - expect(resultByIndex[1].log.blockTimestamp).toBe(timestamp); - }); - - it('respects start (inclusive) and end (exclusive) boundaries', async () => { - const start = 5; - const end = 10; - - const txHashAtStart = TxHash.random(); - const txHashAtEnd = TxHash.random(); - const timestamp = 1000n; - const tagAtStart = await computeSiloedTagForIndex(start); - const tagAtEnd = await computeSiloedTagForIndex(end); - - aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[]) => { - return Promise.resolve( - tags.map((t: SiloedTag) => { - if (t.equals(tagAtStart)) { - return [makeLog(txHashAtStart, 5, timestamp, tagAtStart)]; - } else if (t.equals(tagAtEnd)) { - return [makeLog(txHashAtEnd, 6, timestamp, tagAtEnd)]; - } - return []; - }), - ); - }); - - const result = await loadLogsForRange( - secret, - aztecNode, - start, - end, - FAR_FUTURE_BLOCK_NUMBER, - MOCK_ANCHOR_BLOCK_HASH, - ); - - // Should only include log at start (inclusive), not at end (exclusive) - expect(result).toHaveLength(1); - expect(result[0].taggingIndex).toBe(start); - expect(result[0].log.txHash.equals(txHashAtStart)).toBe(true); - }); - - it('filters out logs from blocks after anchor block', async () => { - const anchorBlockNumber = 10; - - const index = 3; - const timestamp = 1000n; - const tag = await computeSiloedTagForIndex(index); - - aztecNode.getPrivateLogsByTags.mockImplementation((tags: SiloedTag[]) => { - return Promise.resolve( - tags.map((t: SiloedTag) => - t.equals(tag) - ? [ - makeLog(TxHash.random(), anchorBlockNumber - 1, timestamp, tag), - makeLog(TxHash.random(), anchorBlockNumber, timestamp, tag), - makeLog(TxHash.random(), anchorBlockNumber + 1, timestamp, tag), - ] - : [], - ), - ); - }); - - const result = await loadLogsForRange( - secret, - aztecNode, - 0, - 10, - BlockNumber(anchorBlockNumber), - MOCK_ANCHOR_BLOCK_HASH, - ); - - // Should only include logs from blocks at or before the anchor block number - expect(result).toHaveLength(2); - }); -}); diff --git a/yarn-project/pxe/src/tagging/recipient_sync/utils/load_logs_for_range.ts b/yarn-project/pxe/src/tagging/recipient_sync/utils/load_logs_for_range.ts deleted file mode 100644 index c8e3bfa575b7..000000000000 --- a/yarn-project/pxe/src/tagging/recipient_sync/utils/load_logs_for_range.ts +++ /dev/null @@ -1,44 +0,0 @@ -import type { BlockNumber } from '@aztec/foundation/branded-types'; -import type { BlockHash } from '@aztec/stdlib/block'; -import type { AztecNode } from '@aztec/stdlib/interfaces/client'; -import type { ExtendedDirectionalAppTaggingSecret, TxScopedL2Log } from '@aztec/stdlib/logs'; -import { SiloedTag } from '@aztec/stdlib/logs'; - -import { getAllPrivateLogsByTags } from '../../get_all_logs_by_tags.js'; - -/** - * Gets private logs with their corresponding block timestamps and tagging indexes for the given index range and - * `extendedSecret`. At most load logs from blocks up to and including `anchorBlockNumber`. `start` is inclusive and - * `end` is exclusive. - */ -export async function loadLogsForRange( - extendedSecret: ExtendedDirectionalAppTaggingSecret, - aztecNode: AztecNode, - start: number, - end: number, - anchorBlockNumber: BlockNumber, - anchorBlockHash: BlockHash, -): Promise> { - // Derive siloed tags for the window - const siloedTags = await Promise.all( - Array.from({ length: end - start }, (_, i) => SiloedTag.compute({ extendedSecret, index: start + i })), - ); - - // We use the utility function below to retrieve all logs for the tags across all pages, so we don't need to handle - // pagination here. - const logs = await getAllPrivateLogsByTags(aztecNode, siloedTags, anchorBlockHash); - - // Pair logs with their corresponding tagging indexes - const logsWithIndexes: Array<{ log: TxScopedL2Log; taggingIndex: number }> = []; - for (let i = 0; i < logs.length; i++) { - const logsForTag = logs[i]; - const taggingIndex = start + i; - for (const log of logsForTag) { - if (log.blockNumber <= anchorBlockNumber) { - logsWithIndexes.push({ log, taggingIndex }); - } - } - } - - return logsWithIndexes; -}