diff --git a/packages/api-sync/source/restore.ts b/packages/api-sync/source/restore.ts index bc80f15e7..9716283bc 100644 --- a/packages/api-sync/source/restore.ts +++ b/packages/api-sync/source/restore.ts @@ -324,7 +324,7 @@ export class Restore { const fromBlockNumber = Math.min(currentBlockNumber, mostRecentCommit.block.number); const toBlockNumber = Math.min(currentBlockNumber + BATCH_SIZE - 1, mostRecentCommit.block.number); - const commits = this.databaseService.readCommits(fromBlockNumber, toBlockNumber); + const commits = this.databaseService.readCommits(fromBlockNumber, toBlockNumber, Number.MAX_SAFE_INTEGER); const blocks: Models.Block[] = []; const transactions: Models.Transaction[] = []; diff --git a/packages/contracts/source/contracts/database.ts b/packages/contracts/source/contracts/database.ts index dd5ba7264..0c2abfa0b 100644 --- a/packages/contracts/source/contracts/database.ts +++ b/packages/contracts/source/contracts/database.ts @@ -14,8 +14,8 @@ export interface DatabaseService extends CommitHandler { getLastCommit(): Promise; hasCommitByHash(blockHash: string): Promise; - findCommitBuffers(start: number, end: number): Promise; - readCommits(start: number, end: number): AsyncGenerator; + findCommitBuffers(start: number, end: number, maxBytes: number): Promise; + readCommits(start: number, end: number, maxBytes: number): AsyncGenerator; getBlock(blockNumber: number): Promise; getBlockByHash(blockHash: string): Promise; diff --git a/packages/contracts/source/contracts/evm/storage.ts b/packages/contracts/source/contracts/evm/storage.ts index 3f4ae4de0..1957a5433 100644 --- a/packages/contracts/source/contracts/evm/storage.ts +++ b/packages/contracts/source/contracts/evm/storage.ts @@ -55,6 +55,11 @@ export interface Storage { getBlockHeaderData(blockNumber: number): Promise; getBlockNumberByHash(blockHash: string): Promise; getCommitData(blockNumber: number): Promise; + getCommitsByBlockRange( + fromBlockNumber: number, + toBlockNumber: number, + maxBytes: number, + ): Promise; getTransactionData(key: string): Promise; getTransactionKeyByHash(txHash: string): Promise; isEmpty(): Promise; diff --git a/packages/database/source/database-service.test.ts b/packages/database/source/database-service.test.ts index fa5f9e1ca..633074a36 100644 --- a/packages/database/source/database-service.test.ts +++ b/packages/database/source/database-service.test.ts @@ -83,13 +83,13 @@ describe<{ }); it("findCommitBuffers - should be ok", async ({ databaseService }) => { - const commits = await databaseService.findCommitBuffers(1, 2); + const commits = await databaseService.findCommitBuffers(1, 2, Number.MAX_SAFE_INTEGER); assert.empty(commits); }); it("readCommits - should be ok", async ({ databaseService }) => { const commits = []; - for await (const commit of databaseService.readCommits(1, 2)) { + for await (const commit of databaseService.readCommits(1, 2, Number.MAX_SAFE_INTEGER)) { commits.push(commit); } @@ -197,7 +197,6 @@ describe<{ from: transaction.from, gasLimit: BigInt(transaction.gasLimit), gasPrice: BigInt(transaction.gasPrice), - index: transaction.transactionIndex, nonce: transaction.nonce, specId: Enums.Evm.SpecId.LATEST, to: transaction.to, @@ -237,7 +236,9 @@ describe<{ }); it("#findCommitBuffers - should return commit buffer", async ({ databaseService, genesisCommit }) => { - assert.equal(await databaseService.findCommitBuffers(0, 1), [Buffer.from(genesisCommit.serialized, "hex")]); + assert.equal(await databaseService.findCommitBuffers(0, 1, Number.MAX_SAFE_INTEGER), [ + Buffer.from(genesisCommit.serialized, "hex"), + ]); }); it("#getBlock - should return block", async ({ databaseService, genesisCommit }) => { @@ -269,7 +270,7 @@ describe<{ it("#readCommits - should return commits", async ({ databaseService, genesisCommit }) => { const commits = []; - for await (const commit of databaseService.readCommits(0, 1)) { + for await (const commit of databaseService.readCommits(0, 1, Number.MAX_SAFE_INTEGER)) { commits.push(commit); } assert.equal( diff --git a/packages/database/source/database-service.ts b/packages/database/source/database-service.ts index 9abf635ba..60f0da971 100644 --- a/packages/database/source/database-service.ts +++ b/packages/database/source/database-service.ts @@ -38,26 +38,14 @@ export class DatabaseService implements Contracts.Database.DatabaseService { return blockNumber !== undefined; } - public async findCommitBuffers(start: number, end: number): Promise { - const blockNumbers: number[] = []; + public async findCommitBuffers(start: number, end: number, maxBytes: number): Promise { + const buffers: Buffer[] = []; - for (const blockNumber of this.#range(start, end)) { - blockNumbers.push(blockNumber); + for await (const commit of this.readCommits(start, end, maxBytes)) { + buffers.push(Buffer.from(commit.serialized, "hex")); } - const buffers = await Promise.all( - blockNumbers.map(async (blockNumber: number) => { - const commitStorage = await this.#readCommitStorage(blockNumber); - if (!commitStorage) { - return; - } - - const commit = await this.commitFactory.fromStorage(commitStorage); - return Buffer.from(commit.serialized, "hex"); - }), - ); - - return buffers.filter((commit) => !!commit); + return buffers; } public async getBlock(blockNumber: number): Promise { @@ -106,12 +94,13 @@ export class DatabaseService implements Contracts.Database.DatabaseService { } public async findBlocks(start: number, end: number): Promise { - const commitBuffers = await this.findCommitBuffers(start, end); + const blocks: Contracts.Crypto.Block[] = []; + + for await (const commit of this.readCommits(start, end, Number.MAX_SAFE_INTEGER)) { + blocks.push(commit.block); + } - return await this.#map( - commitBuffers, - async (buffer: Buffer) => (await this.commitFactory.fromBytes(buffer)).block, - ); + return blocks; } public async getTransactionByHash(transactionHash: string): Promise { @@ -143,16 +132,29 @@ export class DatabaseService implements Contracts.Database.DatabaseService { return this.#readTransaction(`${blockNumber}-${index}`); } - public async *readCommits(start: number, end: number): AsyncGenerator { - for (let blockNumber = start; blockNumber <= end; blockNumber++) { - const data = await this.#readCommitStorage(blockNumber); + public async *readCommits(start: number, end: number, maxBytes: number): AsyncGenerator { + let from = Math.max(0, start); + let remainingBytes = maxBytes; - if (!data) { + while (from <= end) { + const commitsData = await this.storage.getCommitsByBlockRange(from, end, remainingBytes); + if (commitsData.length === 0) { return; } - const commit = await this.commitFactory.fromStorage(data); - yield commit; + let lastBlockNumber = from; + for (const data of commitsData) { + const commit = await this.commitFactory.fromStorage(data); + lastBlockNumber = commit.block.number; + yield commit; + + remainingBytes -= commit.serialized.length / 2; + if (remainingBytes <= 0) { + return; + } + } + + from = lastBlockNumber + 1; } } @@ -201,19 +203,4 @@ export class DatabaseService implements Contracts.Database.DatabaseService { return this.transactionFactory.fromStorage({ ...transactionStorageData, blockHash: blockHeaderData.hash }); } - - async #map(data: U[], callback: (...arguments_: U[]) => Promise): Promise { - const result: T[] = []; - for (const [index, datum] of data.entries()) { - result[index] = await callback(datum); - } - - return result; - } - - *#range(start: number, end: number): Generator { - for (let index = start; index <= end; index++) { - yield index; - } - } } diff --git a/packages/evm-service/source/instances/evm.ts b/packages/evm-service/source/instances/evm.ts index 34be839bb..a716ccfb5 100644 --- a/packages/evm-service/source/instances/evm.ts +++ b/packages/evm-service/source/instances/evm.ts @@ -208,6 +208,14 @@ export class EvmInstance implements Contracts.Evm.Instance, Contracts.Evm.Storag return result; } + public async getCommitsByBlockRange( + fromBlockNumber: number, + toBlockNumber: number, + maxBytes: number, + ): Promise { + return this.#evm.getCommitsByBlockRange(BigInt(fromBlockNumber), BigInt(toBlockNumber), BigInt(maxBytes)); + } + public async getTransactionData(key: string): Promise { const result = await this.#evm.getTransactionData(key); if (result === null || result === undefined) { diff --git a/packages/evm/bindings/src/lib.rs b/packages/evm/bindings/src/lib.rs index f4936d034..5c21753cb 100644 --- a/packages/evm/bindings/src/lib.rs +++ b/packages/evm/bindings/src/lib.rs @@ -915,6 +915,27 @@ impl EvmInner { Ok(Some((proof, header, txs))) } + pub fn get_commits_by_block_range( + &mut self, + from_block_number: u64, + to_block_number: u64, + max_bytes: u64, + ) -> std::result::Result< + Vec<(ProofData, BlockHeaderData, Vec)>, + EVMError, + > { + match self.persistent_db.get_commits_by_block_range( + from_block_number, + to_block_number, + max_bytes, + ) { + Ok(commits) => Ok(commits), + Err(err) => Err(EVMError::Database( + format!("failed reading commits by block range: {}", err).into(), + )), + } + } + pub fn get_transaction_data( &mut self, key: String, @@ -1625,6 +1646,33 @@ impl JsEvmWrapper { ) } + #[napi] + pub fn get_commits_by_block_range<'env>( + &mut self, + env: &'env Env, + from_block_number: BigInt, + to_block_number: BigInt, + max_bytes: BigInt, + ) -> Result>> { + let from_block_number = from_block_number.get_u64().1; + let to_block_number = to_block_number.get_u64().1; + let max_bytes = max_bytes.get_u64().1; + env.spawn_future_with_callback( + Self::get_commits_by_block_range_async( + self.evm.clone(), + from_block_number, + to_block_number, + max_bytes, + ), + |_, result| { + Ok(result + .into_iter() + .map(|(proof, header, txs)| JsCommitData::new(proof, header, txs)) + .collect()) + }, + ) + } + #[napi] pub fn get_transaction_data<'env>( &mut self, @@ -2050,6 +2098,21 @@ impl JsEvmWrapper { } } + async fn get_commits_by_block_range_async( + evm: Arc>, + from_block_number: u64, + to_block_number: u64, + max_bytes: u64, + ) -> Result)>> { + let mut lock = evm.lock().await; + let result = lock.get_commits_by_block_range(from_block_number, to_block_number, max_bytes); + + match result { + Ok(result) => Result::Ok(result), + Err(err) => Result::Err(serde::de::Error::custom(err)), + } + } + async fn get_transaction_data_async( evm: Arc>, key: String, diff --git a/packages/evm/core/src/db.rs b/packages/evm/core/src/db.rs index 354492d79..294310e7b 100644 --- a/packages/evm/core/src/db.rs +++ b/packages/evm/core/src/db.rs @@ -642,6 +642,57 @@ impl PersistentDB { } } + pub fn get_commits_by_block_range( + &self, + from_block_number: u64, + to_block_number: u64, + max_bytes: u64, + ) -> Result)>, Error> { + // Per-commit fixed cost charged against the budget on top of the block's transaction payload, + // so that a long run of (near-)empty blocks is still bounded by block count, not just bytes. + const PER_COMMIT_OVERHEAD_BYTES: u64 = 1024; + + let tx_env = self.env.read_txn()?; + let inner = self.inner.borrow(); + + let capacity = to_block_number.saturating_sub(from_block_number).min(512) as usize; + let mut commits = Vec::with_capacity(capacity); + let mut accumulated_bytes: u64 = 0; + + for item in inner + .blocks + .range(&tx_env, &(from_block_number..=to_block_number))? + { + let (block_number, header) = item?; + + // Headers and proofs are written together per commit; a missing proof means the end of + // the available data has been reached. + let Some(proof) = inner.proofs.get(&tx_env, &block_number)? else { + break; + }; + + // Collect this block's transactions via a single range scan over its key prefix; the + // keys sort by (block_number, index), so they arrive in index order. + let mut transactions = Vec::with_capacity(header.0.transactions_count as usize); + let tx_from = TransactionKey::new(block_number, 0); + let tx_to = TransactionKey::new(block_number, u16::MAX); + for tx_item in inner.transactions.range(&tx_env, &(tx_from..=tx_to))? { + let (_, transaction) = tx_item?; + transactions.push(transaction.0); + } + + let estimated_bytes = header.0.payload_size as u64 + PER_COMMIT_OVERHEAD_BYTES; + commits.push((proof.0, header.0, transactions)); + + accumulated_bytes += estimated_bytes; + if accumulated_bytes >= max_bytes { + break; + } + } + + Ok(commits) + } + pub fn get_historical_account_info( &self, block_number: u64, @@ -2094,6 +2145,103 @@ mod tests { assert_eq!(db.genesis_info, Some(Default::default())); } + #[test] + fn test_get_commits_by_block_range() { + let db = create_temp_database(); + + // Empty range before anything is written. + assert!( + db.get_commits_by_block_range(1, 3, u64::MAX) + .unwrap() + .is_empty() + ); + + // Write blocks 1..=3; block N has N transactions, inserted in reverse sequence order to + // prove the reader returns them ordered by (block_number, sequence). + { + let mut wtxn = db.env.write_txn().unwrap(); + let inner = db.inner.borrow(); + + for block_number in 1u64..=3 { + inner + .blocks + .put( + &mut wtxn, + &block_number, + &CompactBincode(&BlockHeaderData { + number: block_number as u32, + transactions_count: block_number as u16, + ..Default::default() + }), + ) + .unwrap(); + + inner + .proofs + .put( + &mut wtxn, + &block_number, + &CompactBincode(&ProofData { + round: block_number as u32, + ..Default::default() + }), + ) + .unwrap(); + + for sequence in (0..block_number).rev() { + inner + .transactions + .put( + &mut wtxn, + &TransactionKey::new(block_number, sequence as u16), + &CompactBincode(&TransactionData { + block_number: block_number as u32, + index: sequence as u32, + tx_hash: B256::from(U256::from(block_number * 100 + sequence)), + ..Default::default() + }), + ) + .unwrap(); + } + } + + wtxn.commit().unwrap(); + } + + // Full range (unbounded budget): blocks ascending, transactions per block in sequence order. + let commits = db.get_commits_by_block_range(1, 3, u64::MAX).unwrap(); + assert_eq!(commits.len(), 3); + + for (index, (proof, header, transactions)) in commits.iter().enumerate() { + let block_number = (index + 1) as u64; + + assert_eq!(header.number, block_number as u32); + assert_eq!(proof.round, block_number as u32); + assert_eq!(transactions.len(), block_number as usize); + + for (sequence, transaction) in transactions.iter().enumerate() { + assert_eq!(transaction.index, sequence as u32); + assert_eq!(transaction.block_number, block_number as u32); + } + } + + // Sub-range returns only the requested block. + let commits = db.get_commits_by_block_range(2, 2, u64::MAX).unwrap(); + assert_eq!(commits.len(), 1); + assert_eq!(commits[0].1.number, 2); + assert_eq!(commits[0].2.len(), 2); + + // Range extending past the tip stops at the last available block. + let commits = db.get_commits_by_block_range(2, 99, u64::MAX).unwrap(); + assert_eq!(commits.len(), 2); + + // A tiny byte budget stops early but always makes progress (returns at least one commit), + // so callers can resume from the last returned block. + let commits = db.get_commits_by_block_range(1, 3, 1).unwrap(); + assert_eq!(commits.len(), 1); + assert_eq!(commits[0].1.number, 1); + } + #[test] fn test_get_receipts() { let db = create_temp_database(); diff --git a/packages/p2p/source/socket-server/controllers/get-blocks.ts b/packages/p2p/source/socket-server/controllers/get-blocks.ts index 0d8f6bb26..bd6f0dcd3 100644 --- a/packages/p2p/source/socket-server/controllers/get-blocks.ts +++ b/packages/p2p/source/socket-server/controllers/get-blocks.ts @@ -31,14 +31,15 @@ export class GetBlocksController implements Contracts.P2P.Controller { return { blocks: [] }; } + const maxPayload = constants.MAX_PAYLOAD_CLIENT; const commits: Buffer[] = await this.database.findCommitBuffers( requestBlockNumber, requestBlockNumber + requestBlockLimit - 1, + maxPayload, ); // Only return the blocks fetched while we are below the p2p maxPayload limit const blocksToReturn: Buffer[] = []; - const maxPayload = constants.MAX_PAYLOAD_CLIENT; let totalSize = 0; for (const commit of commits) { diff --git a/tests/functional/consensus/source/utilities.ts b/tests/functional/consensus/source/utilities.ts index a81210e21..dc405df30 100644 --- a/tests/functional/consensus/source/utilities.ts +++ b/tests/functional/consensus/source/utilities.ts @@ -222,9 +222,8 @@ export const getLastCommit = async (app: Contracts.Kernel.Application): Promise< const [serialized] = await databaseService.findCommitBuffers( lasCommit.block.number, lasCommit.block.number, + Number.MAX_SAFE_INTEGER, ); - return app - .get(Identifiers.Cryptography.Commit.Factory) - .fromBytes(serialized); + return app.get(Identifiers.Cryptography.Commit.Factory).fromBytes(serialized); };