Skip to content

Commit df877a1

Browse files
authored
Merge pull request #122 from bitfinity-network/add_certification
Add lest certified block data to block extractor
2 parents d6af444 + 562386c commit df877a1

11 files changed

Lines changed: 253 additions & 13 deletions

File tree

src/evm-block-extractor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ ethers-core = { workspace = true }
1717
futures = { workspace = true }
1818
gcp-bigquery-client = { workspace = true }
1919
hex = { workspace = true }
20+
jsonrpc-core = { workspace = true }
2021
jsonrpsee = { workspace = true }
2122
lightspeed_scheduler = { workspace = true }
2223
log = { workspace = true }

src/evm-block-extractor/src/database/big_query_db_client.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@ use serde::de::DeserializeOwned;
1717
use serde::Serialize;
1818
use serde_json::Value;
1919

20-
use super::{AccountBalance, DataContainer, DatabaseClient, CHAIN_ID_KEY, GENESIS_BALANCES_KEY};
20+
use super::{
21+
AccountBalance, CertifiedBlock, DataContainer, DatabaseClient, CHAIN_ID_KEY,
22+
GENESIS_BALANCES_KEY,
23+
};
2124

2225
const BQ_BLOCKS_TABLE_ID: &str = "blocks";
26+
const BQ_CERTIFIED_BLOCKS_TABLE_ID: &str = "certified_blocks";
2327
const BQ_TRANSACTIONS_TABLE_ID: &str = "transactions";
2428
const BQ_KEY_VALUE_TABLE_ID: &str = "key_value_data";
2529

@@ -139,6 +143,13 @@ impl BigQueryDbClient {
139143
TableFieldSchema::json("body"),
140144
],
141145
),
146+
(
147+
BQ_CERTIFIED_BLOCKS_TABLE_ID,
148+
vec![
149+
TableFieldSchema::integer("id"),
150+
TableFieldSchema::json("certified_response"),
151+
],
152+
),
142153
(
143154
BQ_TRANSACTIONS_TABLE_ID,
144155
vec![
@@ -481,6 +492,36 @@ impl DatabaseClient for BigQueryDbClient {
481492
.await
482493
}
483494

495+
async fn insert_certified_block_data(&self, block: CertifiedBlock) -> anyhow::Result<()> {
496+
let hash = block.data.hash.to_hex_str();
497+
let block_row = CertifiedBlockRow {
498+
id: block.data.number.0.as_u64(),
499+
certified_response: serde_json::to_value(block).expect("Failed to serialize block"),
500+
};
501+
502+
let rows = TableDataInsertAllRequestRows {
503+
insert_id: Some(hash),
504+
json: serde_json::to_value(block_row).expect("Failed to serialize block"),
505+
};
506+
507+
self.insert_batch_data(BQ_CERTIFIED_BLOCKS_TABLE_ID, vec![rows])
508+
.await
509+
}
510+
511+
async fn get_last_certified_block_data(&self) -> anyhow::Result<CertifiedBlock> {
512+
let query_request = QueryRequest {
513+
query_parameters: None,
514+
query: format!(
515+
"SELECT certified_response FROM `{project_id}.{dataset_id}.{table_id}` ORDER BY id DESC LIMIT 1",
516+
project_id = self.project_id,
517+
dataset_id = self.dataset_id,
518+
table_id = BQ_CERTIFIED_BLOCKS_TABLE_ID,),
519+
..Default::default()
520+
};
521+
522+
self.query_one(query_request).await
523+
}
524+
484525
async fn get_transaction(&self, tx_hash: H256) -> anyhow::Result<Transaction> {
485526
let query_request = QueryRequest {
486527
query_parameters: Some(vec![
@@ -517,6 +558,13 @@ struct BlockRow {
517558
body: Value,
518559
}
519560

561+
#[derive(Debug, Serialize)]
562+
pub struct CertifiedBlockRow {
563+
id: u64,
564+
#[serde(serialize_with = "serialize_json_as_string")]
565+
certified_response: Value,
566+
}
567+
520568
#[derive(Debug, Serialize, Clone)]
521569
struct ExeResultRow {
522570
tx_hash: String,

src/evm-block-extractor/src/database/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub mod big_query_db_client;
22
pub mod postgres_db_client;
33

4+
use did::certified::CertifiedResult;
45
use did::{Block, Transaction, H160, H256, U256};
56
use serde::{Deserialize, Serialize};
67

@@ -28,6 +29,9 @@ const GENESIS_BALANCES_KEY: &str = "genesis_balances";
2829
/// The chain id key in the key value store
2930
const CHAIN_ID_KEY: &str = "chain_id";
3031

32+
/// Certified block data
33+
pub type CertifiedBlock = CertifiedResult<Block<H256>>;
34+
3135
/// A trait for interacting with a blockchain database
3236
#[async_trait::async_trait]
3337
pub trait DatabaseClient: Send + Sync {
@@ -60,6 +64,12 @@ pub trait DatabaseClient: Send + Sync {
6064
transactions: &[Transaction],
6165
) -> anyhow::Result<()>;
6266

67+
/// Insert certified block data
68+
async fn insert_certified_block_data(&self, response: CertifiedBlock) -> anyhow::Result<()>;
69+
70+
/// Returns certified response for the last block
71+
async fn get_last_certified_block_data(&self) -> anyhow::Result<CertifiedBlock>;
72+
6373
/// Get genesis balances
6474
async fn get_genesis_balances(&self) -> anyhow::Result<Option<Vec<AccountBalance>>>;
6575

src/evm-block-extractor/src/database/postgres_db_client.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ use serde::de::DeserializeOwned;
55
use serde::Serialize;
66
use sqlx::postgres::PgRow;
77

8-
use super::{AccountBalance, DataContainer, DatabaseClient, CHAIN_ID_KEY, GENESIS_BALANCES_KEY};
8+
use super::{
9+
AccountBalance, CertifiedBlock, DataContainer, DatabaseClient, CHAIN_ID_KEY,
10+
GENESIS_BALANCES_KEY,
11+
};
912

1013
static MIGRATOR: Migrator = ::sqlx::migrate!("src_resources/db/postgres/migrations");
1114

@@ -150,6 +153,30 @@ impl DatabaseClient for PostgresDbClient {
150153
Ok(())
151154
}
152155

156+
async fn insert_certified_block_data(&self, response: CertifiedBlock) -> anyhow::Result<()> {
157+
let block_id = response.data.number.0.as_u64();
158+
159+
let mut tx = self.pool.begin().await?;
160+
sqlx::query("INSERT INTO CERTIFIED_EVM_BLOCK (id, certified_response) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET certified_response = $2")
161+
.bind(block_id as i64)
162+
.bind(serde_json::to_value(response)?)
163+
.execute(&mut *tx)
164+
.await
165+
.map_err(|e| anyhow::anyhow!("Error inserting certified block {}: {:?}", block_id, e))
166+
.map(|_| ())?;
167+
tx.commit().await?;
168+
169+
Ok(())
170+
}
171+
172+
async fn get_last_certified_block_data(&self) -> anyhow::Result<CertifiedBlock> {
173+
sqlx::query("SELECT certified_response FROM CERTIFIED_EVM_BLOCK ORDER BY id DESC LIMIT 1")
174+
.fetch_one(&self.pool)
175+
.await
176+
.map_err(|e| anyhow::anyhow!("Error getting last certified block: {:?}", e))
177+
.and_then(|row| from_row_value(&row, 0))
178+
}
179+
153180
/// Get the latest block number
154181
async fn get_latest_block_number(&self) -> anyhow::Result<Option<u64>> {
155182
sqlx::query("SELECT MAX(id) FROM EVM_BLOCK")

src/evm-block-extractor/src/rpc.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use ethers_core::utils::rlp::{RlpStream, EMPTY_LIST_RLP};
55
use jsonrpsee::core::RpcResult;
66
use jsonrpsee::proc_macros::rpc;
77

8-
use crate::database::DatabaseClient;
8+
use crate::database::{CertifiedBlock, DatabaseClient};
99

1010
#[derive(Clone)]
1111
pub struct EthImpl {
@@ -46,6 +46,9 @@ pub trait IC {
4646

4747
#[method(name = "getGenesisBalances")]
4848
async fn get_genesis_balances(&self) -> RpcResult<Vec<(H160, U256)>>;
49+
50+
#[method(name = "getLastBlockCertifiedData")]
51+
async fn get_last_block_certified_data(&self) -> RpcResult<CertifiedBlock>;
4952
}
5053

5154
#[async_trait::async_trait]
@@ -114,6 +117,19 @@ impl ICServer for EthImpl {
114117
.map(|account| (account.address.into(), account.balance.into()))
115118
.collect())
116119
}
120+
121+
async fn get_last_block_certified_data(&self) -> RpcResult<CertifiedBlock> {
122+
let certified_data = self
123+
.blockchain
124+
.get_last_certified_block_data()
125+
.await
126+
.map_err(|e| {
127+
log::error!("Error getting last block certified data: {:?}", e);
128+
jsonrpsee::types::error::ErrorCode::InternalError
129+
})?;
130+
131+
Ok(certified_data)
132+
}
117133
}
118134

119135
#[async_trait::async_trait]

src/evm-block-extractor/src/task/block_extractor.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use log::*;
77
use tokio::time::Duration;
88

99
use crate::config::ExtractorArgs;
10-
use crate::database::{AccountBalance, DatabaseClient};
10+
use crate::database::{AccountBalance, CertifiedBlock, DatabaseClient};
1111

1212
/// Starts the block extractor process
1313
pub async fn start_extractor(
@@ -37,7 +37,7 @@ pub async fn start_extractor(
3737
debug!("latest block number stored: {:?}", start_block);
3838

3939
extractor
40-
.collect_blocks(start_block.map(|b| b + 1).unwrap_or_default(), end_block)
40+
.collect_all(start_block.map(|b| b + 1).unwrap_or_default(), end_block)
4141
.await?;
4242

4343
Ok(())
@@ -69,13 +69,14 @@ impl BlockExtractor {
6969
/// Collects blocks from the EVMC and stores them in the database.
7070
/// Returns the inclusive range of blocks that were collected.
7171
/// This collects also the genesis accounts if needed.
72-
pub async fn collect_blocks(
72+
pub async fn collect_all(
7373
&mut self,
7474
from_block_inclusive: u64,
7575
to_block_inclusive: u64,
7676
) -> anyhow::Result<(u64, u64)> {
7777
self.collect_chain_id().await?;
7878
self.collect_genesis_balances().await?;
79+
self.collect_last_certified_block().await?;
7980

8081
info!(
8182
"Getting blocks from {:?} to {}",
@@ -132,6 +133,25 @@ impl BlockExtractor {
132133
Ok((from_block_inclusive, to_block_inclusive))
133134
}
134135

136+
/// Collects last certified block
137+
async fn collect_last_certified_block(&self) -> anyhow::Result<()> {
138+
const JSON_RPC_METHOD_LAST_CERTIFIED_BLOCK: &str = "ic_getLastCertifiedBlock";
139+
140+
let certified_block = self
141+
.client
142+
.single_request::<CertifiedBlock>(
143+
JSON_RPC_METHOD_LAST_CERTIFIED_BLOCK.to_string(),
144+
jsonrpc_core::Params::Array(vec![]),
145+
jsonrpc_core::Id::Null,
146+
)
147+
.await?;
148+
self.blockchain
149+
.insert_certified_block_data(certified_block)
150+
.await?;
151+
152+
Ok(())
153+
}
154+
135155
/// Collects the genesis accounts if needed.
136156
async fn collect_genesis_balances(&self) -> anyhow::Result<()> {
137157
if self.blockchain.get_genesis_balances().await?.is_some() {

src/evm-block-extractor/src_resources/db/postgres/migrations/00001_create_schema.sql

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ create table EVM_BLOCK (
1010

1111
-- End - EVM_BLOCK -
1212

13-
1413
-----------------------------------------
1514
-- Begin - EVM_TRANSACTION_EXE_RESULT -
1615
-----------------------------------------
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
-----------------------------
2+
-- Begin - CERTIFIED_EVM_BLOCK -
3+
-----------------------------
4+
5+
create table CERTIFIED_EVM_BLOCK (
6+
ID bigint primary key,
7+
CERTIFIED_RESPONSE JSONB
8+
);
9+
10+
-- End - CERTIFIED_EVM_BLOCK -

src/evm-block-extractor/tests/tests/block_extractor_it.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,7 @@ async fn test_extractor_collect_blocks() {
2929

3030
println!("Getting blocks from {:?} to {}", start_block, end_block);
3131

32-
let result = extractor
33-
.collect_blocks(start_block, end_block)
34-
.await
35-
.unwrap();
32+
let result = extractor.collect_all(start_block, end_block).await.unwrap();
3633

3734
assert_eq!(result.0, start_block);
3835
assert_eq!(result.1, end_block);
@@ -63,6 +60,17 @@ async fn test_extractor_collect_blocks() {
6360
assert_eq!(evmc_chain_id, db_chain_id);
6461
}
6562

63+
// Check last certified block
64+
{
65+
let certified_data = db_client.get_last_certified_block_data().await.unwrap();
66+
assert!(!certified_data.certificate.is_empty());
67+
assert!(!certified_data.witness.is_empty());
68+
69+
// Check that it is more or less last block
70+
assert!(end_block - 10 <= certified_data.data.number.0.as_u64());
71+
assert!(end_block + 10 >= certified_data.data.number.0.as_u64());
72+
}
73+
6674
for block_num in start_block..=end_block {
6775
let block = db_client.get_block_by_number(block_num).await.unwrap();
6876

src/evm-block-extractor/tests/tests/database_client_it.rs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use did::{Block, Transaction, H160, H256, U256, U64};
2-
use evm_block_extractor::database::AccountBalance;
2+
use evm_block_extractor::database::{AccountBalance, CertifiedBlock};
33
use rand::random;
44

55
use crate::test_with_clients;
@@ -533,3 +533,50 @@ async fn test_insert_and_fetch_chain_id() {
533533
})
534534
.await;
535535
}
536+
537+
#[tokio::test]
538+
async fn test_insert_and_fetch_last_block_certified_data() {
539+
test_with_clients(|db_client| async move {
540+
// Arrange
541+
db_client.init(None, false).await.unwrap();
542+
543+
let block_1 = Block::<H256> {
544+
number: 1u64.into(),
545+
..Default::default()
546+
};
547+
let block_2 = Block::<H256> {
548+
number: 2u64.into(),
549+
..Default::default()
550+
};
551+
552+
let result_1 = CertifiedBlock {
553+
certificate: vec![1, 2, 3],
554+
witness: vec![5, 6, 7],
555+
data: block_1,
556+
};
557+
let result_2 = CertifiedBlock {
558+
certificate: vec![11, 12, 13],
559+
witness: vec![15, 16, 17],
560+
data: block_2,
561+
};
562+
563+
db_client
564+
.insert_certified_block_data(result_1.clone())
565+
.await
566+
.unwrap();
567+
assert_eq!(
568+
result_1,
569+
db_client.get_last_certified_block_data().await.unwrap()
570+
);
571+
572+
db_client
573+
.insert_certified_block_data(result_2.clone())
574+
.await
575+
.unwrap();
576+
assert_eq!(
577+
result_2,
578+
db_client.get_last_certified_block_data().await.unwrap()
579+
);
580+
})
581+
.await;
582+
}

0 commit comments

Comments
 (0)