@@ -18,11 +18,14 @@ use serde::de::DeserializeOwned;
1818use serde:: Serialize ;
1919use serde_json:: Value ;
2020
21- use super :: DatabaseClient ;
21+ use super :: { AccountBalance , DatabaseClient } ;
2222
2323const BQ_EXE_RESULTS_TABLE_ID : & str = "exe_results" ;
2424const BQ_BLOCKS_TABLE_ID : & str = "blocks" ;
2525const BQ_TRANSACTIONS_TABLE_ID : & str = "transactions" ;
26+ const BQ_KEY_VALUE_TABLE_ID : & str = "key_value_data" ;
27+
28+ const GENESIS_BALANCES_KEY : & str = "genesis_balances" ;
2629
2730#[ derive( Clone ) ]
2831/// A client for BigQuery that can be used to query and insert data
@@ -67,7 +70,16 @@ impl BigQueryDbClient {
6770 } )
6871 }
6972
70- async fn execute_query < T : DeserializeOwned > ( & self , query : QueryRequest ) -> anyhow:: Result < T > {
73+ async fn query_one < T : DeserializeOwned > ( & self , query : QueryRequest ) -> anyhow:: Result < T > {
74+ self . query_one_optional ( query)
75+ . await ?
76+ . ok_or ( anyhow:: anyhow!( "No data found for the query" ) )
77+ }
78+
79+ async fn query_one_optional < T : DeserializeOwned > (
80+ & self ,
81+ query : QueryRequest ,
82+ ) -> anyhow:: Result < Option < T > > {
7183 let mut response = self . client . job ( ) . query ( & self . project_id , query) . await ?;
7284
7385 if response. next_row ( ) {
@@ -79,12 +91,11 @@ impl BigQueryDbClient {
7991
8092 let result: T = serde_json:: from_str ( & result_str) ?;
8193
82- Ok ( result)
94+ Ok ( Some ( result) )
8395 } else {
84- Err ( anyhow :: anyhow! ( "No data found for the query" ) )
96+ Ok ( None )
8597 }
8698 }
87-
8899 async fn insert_batch_data (
89100 & self ,
90101 table_id : & str ,
@@ -110,6 +121,8 @@ impl BigQueryDbClient {
110121 async fn create_tables_if_not_present ( & self ) -> anyhow:: Result < ( ) > {
111122 let dataset = Dataset :: new ( & self . project_id , & self . dataset_id ) ;
112123
124+ log:: info!( "Creating tables if not present" ) ;
125+
113126 // Make sure the dataset exists
114127 if self
115128 . client
@@ -145,6 +158,13 @@ impl BigQueryDbClient {
145158 TableFieldSchema :: integer( "block_number" ) ,
146159 ] ,
147160 ) ,
161+ (
162+ BQ_KEY_VALUE_TABLE_ID ,
163+ vec ! [
164+ TableFieldSchema :: string( "key" ) ,
165+ TableFieldSchema :: json( "data" ) ,
166+ ] ,
167+ ) ,
148168 ] ;
149169
150170 // Check each table and create if it does not exist
@@ -173,6 +193,51 @@ impl BigQueryDbClient {
173193
174194 Ok ( ( ) )
175195 }
196+
197+ async fn fetch_key_value_data < D : DeserializeOwned > (
198+ & self ,
199+ key : & str ,
200+ ) -> anyhow:: Result < Option < D > > {
201+ let query_request = QueryRequest {
202+ query_parameters : Some ( vec ! [ QueryParameter {
203+ name: Some ( "key" . to_string( ) ) ,
204+ parameter_type: Some ( QueryParameterType {
205+ r#type: "STRING" . to_string( ) ,
206+ ..Default :: default ( )
207+ } ) ,
208+ parameter_value: Some ( QueryParameterValue {
209+ value: Some ( key. to_string( ) ) ,
210+ ..Default :: default ( )
211+ } ) ,
212+ } ] ) ,
213+ query : format ! (
214+ "SELECT data FROM `{project_id}.{dataset_id}.{table_id}` WHERE key = @key" ,
215+ project_id = self . project_id,
216+ dataset_id = self . dataset_id,
217+ table_id = BQ_KEY_VALUE_TABLE_ID ,
218+ ) ,
219+ ..Default :: default ( )
220+ } ;
221+
222+ self . query_one_optional ( query_request) . await
223+ }
224+
225+ async fn insert_key_value_data < D : Serialize > ( & self , key : & str , data : D ) -> anyhow:: Result < ( ) > {
226+ let json = KeyValueDataRow {
227+ key : key. to_string ( ) ,
228+ data : serde_json:: to_value ( data) . expect ( "Failed to serialize data" ) ,
229+ } ;
230+
231+ let key_value_row = TableDataInsertAllRequestRows {
232+ insert_id : Some ( key. to_string ( ) ) ,
233+ json : serde_json:: to_value ( json) ?,
234+ } ;
235+
236+ log:: debug!( "Inserting key value data with key [{}]" , key) ;
237+
238+ self . insert_batch_data ( BQ_KEY_VALUE_TABLE_ID , vec ! [ key_value_row] )
239+ . await
240+ }
176241}
177242
178243#[ async_trait:: async_trait]
@@ -185,7 +250,6 @@ impl DatabaseClient for BigQueryDbClient {
185250 if !self . check_if_same_block_hash ( & block) . await ? {
186251 if reset_database {
187252 self . clear ( ) . await ?;
188- self . create_tables_if_not_present ( ) . await ?;
189253 } else {
190254 return Err ( anyhow:: anyhow!(
191255 "The block hash in the database is different from the one in the block"
@@ -210,8 +274,9 @@ impl DatabaseClient for BigQueryDbClient {
210274 delete_table ( BQ_BLOCKS_TABLE_ID . to_owned ( ) ) . await ?;
211275 delete_table ( BQ_EXE_RESULTS_TABLE_ID . to_owned ( ) ) . await ?;
212276 delete_table ( BQ_TRANSACTIONS_TABLE_ID . to_owned ( ) ) . await ?;
277+ delete_table ( BQ_KEY_VALUE_TABLE_ID . to_owned ( ) ) . await ?;
213278
214- Ok ( ( ) )
279+ self . create_tables_if_not_present ( ) . await
215280 }
216281
217282 async fn get_block_by_number ( & self , block_number : u64 ) -> anyhow:: Result < Block < H256 > > {
@@ -236,7 +301,7 @@ impl DatabaseClient for BigQueryDbClient {
236301 ..Default :: default ( )
237302 } ;
238303
239- self . execute_query ( query_request) . await
304+ self . query_one ( query_request) . await
240305 }
241306
242307 async fn get_full_block_by_number (
@@ -402,7 +467,7 @@ impl DatabaseClient for BigQueryDbClient {
402467 ..Default :: default ( )
403468 } ;
404469
405- let exe_result: StorableExecutionResult = self . execute_query ( query_request) . await ?;
470+ let exe_result: StorableExecutionResult = self . query_one ( query_request) . await ?;
406471
407472 Ok ( TransactionReceipt :: from ( exe_result) )
408473 }
@@ -458,6 +523,18 @@ impl DatabaseClient for BigQueryDbClient {
458523 ) )
459524 }
460525 }
526+
527+ async fn get_genesis_balances ( & self ) -> anyhow:: Result < Option < Vec < AccountBalance > > > {
528+ self . fetch_key_value_data ( GENESIS_BALANCES_KEY ) . await
529+ }
530+
531+ async fn insert_genesis_balances (
532+ & self ,
533+ genesis_balances : & [ AccountBalance ] ,
534+ ) -> anyhow:: Result < ( ) > {
535+ self . insert_key_value_data ( GENESIS_BALANCES_KEY , genesis_balances)
536+ . await
537+ }
461538}
462539
463540/// A row in the BigQuery table
@@ -482,3 +559,11 @@ pub struct TransactionRow {
482559 transaction : Value ,
483560 block_number : u64 ,
484561}
562+
563+ /// A row in the BigQuery table
564+ #[ derive( Debug , Serialize ) ]
565+ pub struct KeyValueDataRow {
566+ key : String ,
567+ #[ serde( serialize_with = "serialize_json_as_string" ) ]
568+ data : Value ,
569+ }
0 commit comments