11use std:: {
22 collections:: HashMap ,
3- sync:: { atomic:: AtomicU64 , Arc } ,
3+ fmt:: { self , Display } ,
4+ sync:: {
5+ atomic:: { AtomicU64 , Ordering } ,
6+ Arc ,
7+ } ,
48} ;
59
610use bitcoin:: { FeeRate , Transaction , Txid } ;
7- use lightning_block_sync:: { rest:: RestClient , rpc:: RpcClient } ;
11+ use lightning_block_sync:: {
12+ http:: JsonResponse ,
13+ rest:: RestClient ,
14+ rpc:: { RpcClient , RpcError } ,
15+ BlockSource ,
16+ } ;
817
9- use crate :: chain:: bitcoind_rpc:: rpc_credentials;
18+ use crate :: chain:: bitcoind_rpc:: { rpc_credentials, GetMempoolEntryResponse } ;
1019
1120use super :: bitcoind_rpc:: {
12- endpoint, FeeRateEstimationMode , FeeResponse , MempoolEntry , MempoolMinFeeResponse ,
21+ endpoint, FeeRateEstimationMode , FeeResponse , GetRawMempoolResponse , GetRawTransactionResponse ,
22+ MempoolEntry , MempoolMinFeeResponse ,
1323} ;
1424
1525pub struct BitcoindRestClient {
@@ -22,6 +32,12 @@ pub struct BitcoindRestClient {
2232}
2333
2434impl BitcoindRestClient {
35+ /// Creates a new, primarily REST client for the chain interactions
36+ /// with Bitcoin Core.
37+ ///
38+ /// Aside the required REST host and port, we must provide RPC
39+ /// configuration options for necessary calls not supported by the
40+ /// REST interface.
2541 pub ( crate ) fn new (
2642 rest_host : String , rest_port : u16 , rpc_host : String , rpc_port : u16 , rpc_user : String ,
2743 rpc_password : String ,
@@ -47,20 +63,25 @@ impl BitcoindRestClient {
4763 }
4864 }
4965
50- pub ( crate ) fn rpc_client ( & self ) -> Arc < RpcClient > {
51- Arc :: clone ( & self . rpc_client )
52- }
53-
5466 pub ( crate ) fn rest_client ( & self ) -> Arc < RestClient > {
5567 Arc :: clone ( & self . rest_client )
5668 }
5769
70+ /// Broadcasts the provided transaction.
71+ ///
72+ /// We rely on the internal RPC client to make this call, as this
73+ /// operation is not supported by Bitcoin Core's REST interface.
5874 pub ( crate ) async fn broadcast_transaction ( & self , tx : & Transaction ) -> std:: io:: Result < Txid > {
5975 let tx_serialized = bitcoin:: consensus:: encode:: serialize_hex ( tx) ;
6076 let tx_json = serde_json:: json!( tx_serialized) ;
6177 self . rpc_client . call_method :: < Txid > ( "sendrawtransaction" , & [ tx_json] ) . await
6278 }
6379
80+ /// Retrieve the fee estimate needed for a transaction to begin
81+ /// confirmation within the provided `num_blocks`.
82+ ///
83+ /// We rely on the internal RPC client to make this call, as this
84+ /// operation is not supported by Bitcoin Core's REST interface.
6485 pub ( crate ) async fn get_fee_estimate_for_target (
6586 & self , num_blocks : usize , estimation_mode : FeeRateEstimationMode ,
6687 ) -> std:: io:: Result < FeeRate > {
@@ -76,15 +97,218 @@ impl BitcoindRestClient {
7697 . map ( |resp| resp. 0 )
7798 }
7899
100+ /// Retrieves the minimum mempool fee rate.
79101 pub ( crate ) async fn get_mempool_minimum_fee_rate ( & self ) -> std:: io:: Result < FeeRate > {
80102 // TODO(@enigbe): Move MempoolMinFeeResponse to mod.rs
81103 self . rest_client
82- . request_resource :: < _ , MempoolMinFeeResponse > ( "rest/mempool/info.json" )
104+ . request_resource :: < JsonResponse , MempoolMinFeeResponse > ( "rest/mempool/info.json" )
105+ . await
106+ . map ( |resp| resp. 0 )
107+ }
108+
109+ /// Get the raw transaction for the provided transaction ID.
110+ pub ( crate ) async fn get_raw_transaction (
111+ & self , txid : & Txid ,
112+ ) -> std:: io:: Result < Option < Transaction > > {
113+ let txid_hex = bitcoin:: consensus:: encode:: serialize_hex ( txid) ;
114+ let tx_path = format ! ( "rest/tx/{}.json" , txid_hex) ;
115+
116+ match self
117+ . rest_client
118+ . request_resource :: < JsonResponse , GetRawTransactionResponse > ( & tx_path)
119+ . await
120+ {
121+ Ok ( resp) => Ok ( Some ( resp. 0 ) ) ,
122+ Err ( e) => match e. kind ( ) {
123+ std:: io:: ErrorKind :: Other => {
124+ let http_error_res: Result < Box < HttpError > , _ > = e. downcast ( ) ;
125+ match http_error_res {
126+ Ok ( http_error) => {
127+ if & http_error. status_code == "404" {
128+ Ok ( None )
129+ } else {
130+ Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , http_error) )
131+ }
132+ } ,
133+ Err ( _) => {
134+ let error_msg = format ! ( "Failed to process {} response." , tx_path) ;
135+ Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , error_msg. as_str ( ) ) )
136+ } ,
137+ }
138+ } ,
139+ _ => {
140+ let error_msg = format ! ( "Failed to process {} response." , tx_path) ;
141+ Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , error_msg. as_str ( ) ) )
142+ } ,
143+ } ,
144+ }
145+ }
146+
147+ pub ( crate ) async fn get_raw_mempool ( & self ) -> std:: io:: Result < Vec < Txid > > {
148+ self . rest_client
149+ . request_resource :: < JsonResponse , GetRawMempoolResponse > (
150+ "rest/mempool/contents.json?verbose=false" ,
151+ )
83152 . await
84- . map ( |resp| resp. mempoolminfee . into ( ) )
153+ . map ( |resp| resp. 0 )
154+ }
155+
156+ pub ( crate ) async fn get_mempool_entry (
157+ & self , txid : Txid ,
158+ ) -> std:: io:: Result < Option < MempoolEntry > > {
159+ let txid_hex = bitcoin:: consensus:: encode:: serialize_hex ( & txid) ;
160+ let txid_json = serde_json:: json!( txid_hex) ;
161+
162+ match self
163+ . rpc_client
164+ . call_method :: < GetMempoolEntryResponse > ( "getmempoolentry" , & [ txid_json] )
165+ . await
166+ {
167+ Ok ( resp) => Ok ( Some ( MempoolEntry { txid, time : resp. time , height : resp. height } ) ) ,
168+ Err ( e) => match e. into_inner ( ) {
169+ Some ( inner) => {
170+ let rpc_error_res: Result < Box < RpcError > , _ > = inner. downcast ( ) ;
171+
172+ match rpc_error_res {
173+ Ok ( rpc_error) => {
174+ // Check if it's the 'not found' error code.
175+ if rpc_error. code == -5 {
176+ Ok ( None )
177+ } else {
178+ Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , rpc_error) )
179+ }
180+ } ,
181+ Err ( _) => Err ( std:: io:: Error :: new (
182+ std:: io:: ErrorKind :: Other ,
183+ "Failed to process getmempoolentry response" ,
184+ ) ) ,
185+ }
186+ } ,
187+ None => Err ( std:: io:: Error :: new (
188+ std:: io:: ErrorKind :: Other ,
189+ "Failed to process getmempoolentry response" ,
190+ ) ) ,
191+ } ,
192+ }
193+ }
194+
195+ pub ( crate ) async fn update_mempool_entries_cache ( & self ) -> std:: io:: Result < ( ) > {
196+ let mempool_txids = self . get_raw_mempool ( ) . await ?;
197+
198+ let mut mempool_entries_cache = self . mempool_entries_cache . lock ( ) . await ;
199+ mempool_entries_cache. retain ( |txid, _entry| mempool_txids. contains ( txid) ) ;
200+
201+ if let Some ( difference) = mempool_txids. len ( ) . checked_sub ( mempool_entries_cache. capacity ( ) )
202+ {
203+ mempool_entries_cache. reserve ( difference) ;
204+ }
205+
206+ for txid in mempool_txids {
207+ if mempool_entries_cache. contains_key ( & txid) {
208+ continue ;
209+ }
210+
211+ if let Some ( entry) = self . get_mempool_entry ( txid) . await ? {
212+ mempool_entries_cache. insert ( txid, entry. clone ( ) ) ;
213+ }
214+ }
215+
216+ mempool_entries_cache. shrink_to_fit ( ) ;
217+
218+ Ok ( ( ) )
219+ }
220+
221+ pub ( crate ) async fn get_mempool_transactions_and_timestamp_at_height (
222+ & self , best_processed_height : u32 ,
223+ ) -> std:: io:: Result < Vec < ( Transaction , u64 ) > > {
224+ let prev_mempool_time = self . latest_mempool_timestamp . load ( Ordering :: Relaxed ) ;
225+ let mut latest_time = prev_mempool_time;
226+
227+ self . update_mempool_entries_cache ( ) . await ?;
228+
229+ let mempool_entries_cache = self . mempool_entries_cache . lock ( ) . await ;
230+ let mut mempool_txs_cache = self . mempool_txs_cache . lock ( ) . await ;
231+ mempool_txs_cache. retain ( |txid, _entry| mempool_entries_cache. contains_key ( txid) ) ;
232+
233+ if let Some ( difference) =
234+ mempool_entries_cache. len ( ) . checked_sub ( mempool_txs_cache. capacity ( ) )
235+ {
236+ mempool_txs_cache. reserve ( difference) ;
237+ }
238+
239+ let mut txs_to_emit = Vec :: with_capacity ( mempool_entries_cache. len ( ) ) ;
240+ for ( txid, entry) in mempool_entries_cache. iter ( ) {
241+ if entry. time > latest_time {
242+ latest_time = entry. time ;
243+ }
244+
245+ // Avoid emitting transactions that are already emitted if we can guarantee
246+ // blocks containing ancestors are already emitted. The bitcoind rpc interface
247+ // provides us with the block height that the tx is introduces to the mempool.
248+ // If we have already emitted the block of height, we can assume that all
249+ // ancestor txs have been processed by the receiver.
250+ let ancestor_within_height = entry. height <= best_processed_height;
251+ let is_already_emitted = entry. time <= prev_mempool_time;
252+ if is_already_emitted && ancestor_within_height {
253+ continue ;
254+ }
255+
256+ if let Some ( ( cached_tx, cached_time) ) = mempool_txs_cache. get ( txid) {
257+ txs_to_emit. push ( ( cached_tx. clone ( ) , * cached_time) ) ;
258+ continue ;
259+ }
260+
261+ match self . get_raw_transaction ( & entry. txid ) . await {
262+ Ok ( Some ( tx) ) => {
263+ mempool_txs_cache. insert ( entry. txid , ( tx. clone ( ) , entry. time ) ) ;
264+ txs_to_emit. push ( ( tx, entry. time ) ) ;
265+ } ,
266+ Ok ( None ) => {
267+ continue ;
268+ } ,
269+ Err ( e) => return Err ( e) ,
270+ }
271+ }
272+
273+ if !txs_to_emit. is_empty ( ) {
274+ self . latest_mempool_timestamp . store ( latest_time, Ordering :: Release ) ;
275+ }
276+
277+ Ok ( txs_to_emit)
278+ }
279+ }
280+
281+ impl BlockSource for BitcoindRestClient {
282+ fn get_header < ' a > (
283+ & ' a self , header_hash : & ' a bitcoin:: BlockHash , height_hint : Option < u32 > ,
284+ ) -> lightning_block_sync:: AsyncBlockSourceResult < ' a , lightning_block_sync:: BlockHeaderData > {
285+ Box :: pin ( async move { self . rest_client . get_header ( header_hash, height_hint) . await } )
286+ }
287+
288+ fn get_block < ' a > (
289+ & ' a self , header_hash : & ' a bitcoin:: BlockHash ,
290+ ) -> lightning_block_sync:: AsyncBlockSourceResult < ' a , lightning_block_sync:: BlockData > {
291+ Box :: pin ( async move { self . rest_client . get_block ( header_hash) . await } )
292+ }
293+
294+ fn get_best_block (
295+ & self ,
296+ ) -> lightning_block_sync:: AsyncBlockSourceResult < ( bitcoin:: BlockHash , Option < u32 > ) > {
297+ Box :: pin ( async move { self . rest_client . get_best_block ( ) . await } )
85298 }
299+ }
300+
301+ #[ derive( Debug ) ]
302+ struct HttpError {
303+ pub ( crate ) status_code : String ,
304+ pub ( crate ) contents : Vec < u8 > ,
305+ }
306+
307+ impl std:: error:: Error for HttpError { }
86308
87- pub ( crate ) async fn get_raw_transaction ( & self , txid : & Txid ) -> std:: io:: Result < Transaction > {
88- todo ! ( )
309+ impl Display for HttpError {
310+ fn fmt ( & self , f : & mut fmt:: Formatter ) -> fmt:: Result {
311+ let contents = String :: from_utf8_lossy ( & self . contents ) ;
312+ write ! ( f, "status_code: {}, contents: {}" , self . status_code, contents)
89313 }
90314}
0 commit comments