Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ members = [
"src/evm-log-extractor",
"src/signature-verification-canister-client",
]
resolver = "2"
resolver = "3"

[workspace.package]
authors = ["Bitfinity Network"]
Expand Down Expand Up @@ -43,7 +43,6 @@ alloy = { version = "0.15", default-features = false, features = [
"serde",
] }
anyhow = "1.0"
async-trait = "0.1"
bincode = "1.3"
bytes = "1"
candid = "0.10"
Expand All @@ -59,7 +58,7 @@ ic-log = { git = "https://github.com/bitfinity-network/canister-sdk", package =
ic-stable-structures = { git = "https://github.com/bitfinity-network/canister-sdk", package = "ic-stable-structures", tag = "v0.24.x" }
itertools = "0.14"
jsonrpsee = { version = "0.25", features = ["server", "macros"] }
lightspeed_scheduler = "0.63"
lightspeed_scheduler = "0.64"
log = "0.4"
num = "0.4"
port_check = "0.2"
Expand All @@ -80,7 +79,7 @@ sqlx = { version = "0.8.1", default-features = false, features = [
"runtime-tokio",
] }
tempfile = "3"
testcontainers = { package = "testcontainers-modules", version = "0.11", features = [
testcontainers = { package = "testcontainers-modules", version = "0.12", features = [
"postgres",
] }
thiserror = "2.0"
Expand Down
1 change: 0 additions & 1 deletion src/evm-block-extractor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ repository.workspace = true
[dependencies]
alloy = { workspace = true }
anyhow = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
did = { workspace = true }
Expand Down
3 changes: 1 addition & 2 deletions src/evm-block-extractor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use clap::{Parser, Subcommand};
use sqlx::PgPool;
use sqlx::postgres::{PgConnectOptions, PgSslMode};

use crate::database::DatabaseClient;
use crate::database::postgres_db_client::PostgresDbClient;

const VERSION: &str = env!("CARGO_PKG_VERSION");
Expand Down Expand Up @@ -78,7 +77,7 @@ pub enum Database {

impl Database {
/// Build a database client based on the database type
pub async fn build_client(self) -> anyhow::Result<Arc<dyn DatabaseClient>> {
pub async fn build_client(self) -> anyhow::Result<Arc<PostgresDbClient>> {
match self {
Database::Postgres {
username,
Expand Down
86 changes: 59 additions & 27 deletions src/evm-block-extractor/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,75 +35,102 @@ const BLOCKCHAIN_BLOCK_INFO_KEY: &str = "blockchain_block_info";
pub type CertifiedBlock = CertifiedResult<Block<H256>>;

/// A trait for interacting with a blockchain database
#[async_trait::async_trait]
pub trait DatabaseClient: Send + Sync {
/// Initialize the database
async fn init(&self, block: Option<Block<H256>>, reset_database: bool) -> anyhow::Result<()>;
fn init(
&self,
block: Option<Block<H256>>,
reset_database: bool,
) -> impl Future<Output = anyhow::Result<()>> + Send;

/// Delete/clear the tables
async fn clear(&self) -> anyhow::Result<()>;
fn clear(&self) -> impl Future<Output = anyhow::Result<()>> + Send;

/// Returns whether the block hash corresponds to the one in the db
async fn check_if_same_block_hash(&self, block: &Block<H256>) -> anyhow::Result<bool> {
let block_number = block.number.0.to();
let block_in_db = self.get_block_by_number(block_number).await?;
Ok(block.hash == block_in_db.hash)
fn check_if_same_block_hash(
&self,
block: &Block<H256>,
) -> impl Future<Output = anyhow::Result<bool>> + Send {
async {
let block_number = block.number.0.to();
let block_in_db = self.get_block_by_number(block_number).await?;
Ok(block.hash == block_in_db.hash)
}
}

/// Get a block from the database
async fn get_block_by_number(&self, block_number: u64) -> anyhow::Result<Block<H256>>;
fn get_block_by_number(
&self,
block_number: u64,
) -> impl Future<Output = anyhow::Result<Block<H256>>> + Send;

/// Get a block from the database
async fn get_full_block_by_number(
fn get_full_block_by_number(
&self,
block_number: u64,
) -> anyhow::Result<Block<Transaction>>;
) -> impl Future<Output = anyhow::Result<Block<Transaction>>> + Send;

/// Insert block data; this includes transactions and the blocks
async fn insert_block_data(
fn insert_block_data(
&self,
blocks: &[Block<H256>],
transactions: &[Transaction],
) -> anyhow::Result<()>;
) -> impl Future<Output = anyhow::Result<()>> + Send;

/// Insert certified block data
async fn insert_certified_block_data(&self, response: CertifiedBlock) -> anyhow::Result<()>;
fn insert_certified_block_data(
&self,
response: CertifiedBlock,
) -> impl Future<Output = anyhow::Result<()>> + Send;

/// Returns certified response for the last block
async fn get_last_certified_block_data(&self) -> anyhow::Result<CertifiedBlock>;
fn get_last_certified_block_data(
&self,
) -> impl Future<Output = anyhow::Result<CertifiedBlock>> + Send;

/// Get genesis balances
async fn get_genesis_balances(&self) -> anyhow::Result<Option<Vec<AccountBalance>>>;
fn get_genesis_balances(
&self,
) -> impl Future<Output = anyhow::Result<Option<Vec<AccountBalance>>>> + Send;

/// Insert genesis balances
async fn insert_genesis_balances(
fn insert_genesis_balances(
&self,
genesis_balances: &[AccountBalance],
) -> anyhow::Result<()>;
) -> impl Future<Output = anyhow::Result<()>> + Send;

/// Get chain id
async fn get_chain_id(&self) -> anyhow::Result<Option<u64>>;
fn get_chain_id(&self) -> impl Future<Output = anyhow::Result<Option<u64>>> + Send;

/// Insert chain_id
async fn insert_chain_id(&self, chain_id: u64) -> anyhow::Result<()>;
fn insert_chain_id(&self, chain_id: u64) -> impl Future<Output = anyhow::Result<()>> + Send;

/// Get a transaction from the database
async fn get_transaction(&self, tx_hash: H256) -> anyhow::Result<Transaction>;
fn get_transaction(
&self,
tx_hash: H256,
) -> impl Future<Output = anyhow::Result<Transaction>> + Send;

/// Get the latest block number
async fn get_latest_block_number(&self) -> anyhow::Result<Option<u64>>;
fn get_latest_block_number(&self) -> impl Future<Output = anyhow::Result<Option<u64>>> + Send;

/// Get earliest block number
async fn get_earliest_block_number(&self) -> anyhow::Result<u64>;
fn get_earliest_block_number(&self) -> impl Future<Output = anyhow::Result<u64>> + Send;

/// Delete latest blocks starting with `start_from`, and related transactions.
/// Deleted blocks and transactions will be preserved in 'discarded' table with
/// the given 'reason' and timestamp.
async fn discard_blocks_from(&self, start_from: u64, reason: &str) -> anyhow::Result<()>;
fn discard_blocks_from(
&self,
start_from: u64,
reason: &str,
) -> impl Future<Output = anyhow::Result<()>> + Send;

/// Returns a discarded block by its hash.
async fn get_discarded_block_by_hash(&self, block_hash: H256)
-> anyhow::Result<DiscardedBlock>;
fn get_discarded_block_by_hash(
&self,
block_hash: H256,
) -> impl Future<Output = anyhow::Result<DiscardedBlock>> + Send;

/// Returns block info from storage.
///
Expand All @@ -115,10 +142,15 @@ pub trait DatabaseClient: Send + Sync {
/// - safe_block_number: u64,
/// - finalized_block_number: u64,
/// - pending_block_number: u64,
async fn get_block_info(&self) -> anyhow::Result<Option<BlockchainBlockInfo>>;
fn get_block_info(
&self,
) -> impl Future<Output = anyhow::Result<Option<BlockchainBlockInfo>>> + Send;

/// Stores blockchain block info.
async fn set_block_info(&self, info: BlockchainBlockInfo) -> anyhow::Result<()>;
fn set_block_info(
&self,
info: BlockchainBlockInfo,
) -> impl Future<Output = anyhow::Result<()>> + Send;
}

/// Discarded block with metadata.
Expand Down
1 change: 0 additions & 1 deletion src/evm-block-extractor/src/database/postgres_db_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ impl PostgresDbClient {
}
}

#[async_trait::async_trait]
impl DatabaseClient for PostgresDbClient {
async fn init(&self, block: Option<Block<H256>>, reset_database: bool) -> anyhow::Result<()> {
MIGRATOR.run(&self.pool).await?;
Expand Down
37 changes: 25 additions & 12 deletions src/evm-block-extractor/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,34 @@ use jsonrpsee::types::{ErrorCode, ErrorObject};

use crate::database::{CertifiedBlock, DatabaseClient};

#[derive(Clone)]
pub struct EthImpl<C>
pub struct EthImpl<C, DB>
where
DB: DatabaseClient,
C: Client + Send + Sync + 'static,
{
pub blockchain: Arc<dyn DatabaseClient + 'static>,
pub blockchain: Arc<DB>,
pub evm_client: Arc<EthJsonRpcClient<C>>,
}

impl<C> EthImpl<C>
impl<C, DB> Clone for EthImpl<C, DB>
where
DB: DatabaseClient,
C: Client + Send + Sync + 'static,
{
pub fn new(
db: Arc<dyn DatabaseClient + 'static>,
evm_client: Arc<EthJsonRpcClient<C>>,
) -> Self {
fn clone(&self) -> Self {
Self {
blockchain: self.blockchain.clone(),
evm_client: self.evm_client.clone(),
}
}
}

impl<C, DB> EthImpl<C, DB>
where
DB: DatabaseClient,
C: Client + Send + Sync + 'static,
{
pub fn new(db: Arc<DB>, evm_client: Arc<EthJsonRpcClient<C>>) -> Self {
Self {
blockchain: db,
evm_client,
Expand Down Expand Up @@ -74,9 +85,10 @@ pub trait IC {
) -> RpcResult<BlockConfirmationResult>;
}

#[async_trait::async_trait]
impl<C> ICServer for EthImpl<C>
#[jsonrpsee::core::async_trait]
impl<C, DB> ICServer for EthImpl<C, DB>
where
DB: DatabaseClient + Send + Sync + 'static,
C: Client + Send + Sync + 'static,
{
async fn get_genesis_balances(&self) -> RpcResult<Vec<(Address, U256)>> {
Expand Down Expand Up @@ -143,9 +155,10 @@ where
}
}

#[async_trait::async_trait]
impl<C> EthServer for EthImpl<C>
#[jsonrpsee::core::async_trait]
impl<C, DB> EthServer for EthImpl<C, DB>
where
DB: DatabaseClient + Send + Sync + 'static,
C: Client + Send + Sync + 'static,
{
async fn get_block_by_number(
Expand Down
4 changes: 2 additions & 2 deletions src/evm-block-extractor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use crate::database::DatabaseClient;
use crate::rpc::{EthImpl, EthServer, ICServer};

/// Start the RPC server
pub async fn server_start(
pub async fn server_start<DB: DatabaseClient + Send + Sync + 'static>(
server_address: &str,
db_client: Arc<dyn DatabaseClient>,
db_client: Arc<DB>,
evm_client: Arc<EthJsonRpcClient<impl Client + 'static>>,
) -> anyhow::Result<ServerHandle> {
info!("Start server");
Expand Down
43 changes: 27 additions & 16 deletions src/evm-block-extractor/src/task/block_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use crate::config::ExtractorArgs;
use crate::database::{AccountBalance, CertifiedBlock, DatabaseClient};

/// Starts the block extractor process
pub async fn start_extractor<C: Client>(
pub async fn start_extractor<C: Client, DB: DatabaseClient>(
config: ExtractorArgs,
db_client: Arc<dyn DatabaseClient>,
db_client: Arc<DB>,
evm_client: Arc<EthJsonRpcClient<C>>,
) -> anyhow::Result<()> {
let earliest_block = evm_client
Expand Down Expand Up @@ -44,11 +44,11 @@ pub async fn start_extractor<C: Client>(
}

/// Extracts blocks from an EVMC and stores them in a database
pub struct BlockExtractor<C: Client> {
pub struct BlockExtractor<C: Client, DB: DatabaseClient> {
client: Arc<EthJsonRpcClient<C>>,
request_time_out_secs: u64,
rpc_batch_size: usize,
blockchain: Arc<dyn DatabaseClient>,
blockchain: Arc<DB>,
}

/// Outcome of the block extraction process
Expand All @@ -60,12 +60,12 @@ pub enum BlockExtractCollectOutcome {
BlocksExtracted { from_block: u64, to_block: u64 },
}

impl<C: Client> BlockExtractor<C> {
impl<C: Client, DB: DatabaseClient> BlockExtractor<C, DB> {
pub fn new(
client: Arc<EthJsonRpcClient<C>>,
request_time_out_secs: u64,
rpc_batch_size: usize,
blockchain: Arc<dyn DatabaseClient>,
blockchain: Arc<DB>,
) -> Self {
Self {
client,
Expand Down Expand Up @@ -367,13 +367,17 @@ mod tests {
use ethereum_json_rpc_client::reqwest::ReqwestClient;

use super::*;
use crate::database::postgres_db_client::PostgresDbClient;

#[test]
fn test_validate_chain_without_blocks_in_storage() {
let latest_block_in_storage = Option::<did::Block<did::H256>>::None;
let sequence = generate_valid_blocks_sequence(10, did::H256::default());
BlockExtractor::<ReqwestClient>::validate_chain(latest_block_in_storage, &sequence)
.unwrap();
BlockExtractor::<ReqwestClient, PostgresDbClient>::validate_chain(
latest_block_in_storage,
&sequence,
)
.unwrap();
}

#[test]
Expand All @@ -384,8 +388,11 @@ mod tests {
let hash = block.hash.clone();
let latest_block_in_storage = Some(block);
let sequence = generate_valid_blocks_sequence(10, hash);
BlockExtractor::<ReqwestClient>::validate_chain(latest_block_in_storage, &sequence)
.unwrap();
BlockExtractor::<ReqwestClient, PostgresDbClient>::validate_chain(
latest_block_in_storage,
&sequence,
)
.unwrap();
}

#[test]
Expand All @@ -396,9 +403,11 @@ mod tests {
let latest_block_in_storage = Some(block);
let invalid_parent_hash = keccak::keccak_hash(&[1, 2, 3]);
let sequence = generate_valid_blocks_sequence(10, invalid_parent_hash);
let err =
BlockExtractor::<ReqwestClient>::validate_chain(latest_block_in_storage, &sequence)
.unwrap_err();
let err = BlockExtractor::<ReqwestClient, PostgresDbClient>::validate_chain(
latest_block_in_storage,
&sequence,
)
.unwrap_err();
assert!(matches!(err, ChainError::InconsistentStorage))
}

Expand All @@ -414,9 +423,11 @@ mod tests {
// break the sequnce
sequence[5].parent_hash = keccak::keccak_hash(&[1, 2, 3, 4]);

let err =
BlockExtractor::<ReqwestClient>::validate_chain(latest_block_in_storage, &sequence)
.unwrap_err();
let err = BlockExtractor::<ReqwestClient, PostgresDbClient>::validate_chain(
latest_block_in_storage,
&sequence,
)
.unwrap_err();
assert!(matches!(err, ChainError::InconsistentSequence))
}

Expand Down
Loading