11use std:: {
22 collections:: HashMap ,
3- sync:: { atomic:: AtomicU64 , Arc } ,
3+ fmt:: { self , Display } ,
4+ sync:: {
5+ atomic:: { AtomicU64 , Ordering } ,
6+ Arc ,
7+ } ,
48} ;
59
610use bitcoin:: { FeeRate , Transaction , Txid } ;
7- use lightning_block_sync:: { rest:: RestClient , rpc:: RpcClient } ;
11+ use lightning_block_sync:: {
12+ http:: JsonResponse ,
13+ rest:: RestClient ,
14+ rpc:: { RpcClient , RpcError } ,
15+ BlockSource ,
16+ } ;
817
9- use crate :: chain:: bitcoind_rpc:: rpc_credentials;
18+ use crate :: chain:: bitcoind_rpc:: { rpc_credentials, GetMempoolEntryResponse } ;
1019
1120use super :: bitcoind_rpc:: {
12- endpoint, FeeRateEstimationMode , FeeResponse , MempoolEntry , MempoolMinFeeResponse ,
21+ endpoint, FeeRateEstimationMode , FeeResponse , GetRawMempoolResponse , GetRawTransactionResponse ,
22+ MempoolEntry , MempoolMinFeeResponse ,
1323} ;
1424
1525pub struct BitcoindRestClient {
1626 rest_client : Arc < RestClient > ,
1727 rpc_client : Arc < RpcClient > ,
1828 latest_mempool_timestamp : AtomicU64 ,
19- // TODO(@enigbe): Move MempoolEntry to mod.rs
2029 mempool_entries_cache : tokio:: sync:: Mutex < HashMap < Txid , MempoolEntry > > ,
2130 mempool_txs_cache : tokio:: sync:: Mutex < HashMap < Txid , ( Transaction , u64 ) > > ,
2231}
2332
2433impl BitcoindRestClient {
34+ /// Creates a new, primarily REST client for the chain interactions
35+ /// with Bitcoin Core.
36+ ///
37+ /// Aside the required REST host and port, we must provide RPC
38+ /// configuration options for necessary calls not supported by the
39+ /// REST interface.
2540 pub ( crate ) fn new (
2641 rest_host : String , rest_port : u16 , rpc_host : String , rpc_port : u16 , rpc_user : String ,
2742 rpc_password : String ,
2843 ) -> Self {
29- let rest_endpoint = endpoint ( rest_host, rest_port) ;
44+ let rest_endpoint = endpoint ( rest_host, rest_port) . with_path ( "/rest" . to_string ( ) ) ;
3045 let rest_client = Arc :: new ( RestClient :: new ( rest_endpoint) ) ;
3146
3247 let rpc_endpoint = endpoint ( rpc_host, rpc_port) ;
@@ -47,20 +62,25 @@ impl BitcoindRestClient {
4762 }
4863 }
4964
50- pub ( crate ) fn rpc_client ( & self ) -> Arc < RpcClient > {
51- Arc :: clone ( & self . rpc_client )
52- }
53-
5465 pub ( crate ) fn rest_client ( & self ) -> Arc < RestClient > {
5566 Arc :: clone ( & self . rest_client )
5667 }
5768
69+ /// Broadcasts the provided transaction.
70+ ///
71+ /// We rely on the internal RPC client to make this call, as this
72+ /// operation is not supported by Bitcoin Core's REST interface.
5873 pub ( crate ) async fn broadcast_transaction ( & self , tx : & Transaction ) -> std:: io:: Result < Txid > {
5974 let tx_serialized = bitcoin:: consensus:: encode:: serialize_hex ( tx) ;
6075 let tx_json = serde_json:: json!( tx_serialized) ;
6176 self . rpc_client . call_method :: < Txid > ( "sendrawtransaction" , & [ tx_json] ) . await
6277 }
6378
79+ /// Retrieve the fee estimate needed for a transaction to begin
80+ /// confirmation within the provided `num_blocks`.
81+ ///
82+ /// We rely on the internal RPC client to make this call, as this
83+ /// operation is not supported by Bitcoin Core's REST interface.
6484 pub ( crate ) async fn get_fee_estimate_for_target (
6585 & self , num_blocks : usize , estimation_mode : FeeRateEstimationMode ,
6686 ) -> std:: io:: Result < FeeRate > {
@@ -76,15 +96,225 @@ impl BitcoindRestClient {
7696 . map ( |resp| resp. 0 )
7797 }
7898
79- pub ( crate ) async fn get_mempool_minimum_fee_rate ( & self ) -> std:: io:: Result < FeeRate > {
80- // TODO(@enigbe): Move MempoolMinFeeResponse to mod.rs
99+ /// Retrieves the minimum mempool fee rate.
100+ pub async fn get_mempool_minimum_fee_rate ( & self ) -> std:: io:: Result < FeeRate > {
101+ // TODO(@enigbe): Move MempoolMinFeeResponse to common.
102+ self . rest_client
103+ . request_resource :: < JsonResponse , MempoolMinFeeResponse > ( "mempool/info.json" )
104+ . await
105+ . map ( |resp| resp. 0 )
106+ }
107+
108+ /// Retrieves the raw transaction for the provided transaction ID.
109+ pub ( crate ) async fn get_raw_transaction (
110+ & self , txid : & Txid ,
111+ ) -> std:: io:: Result < Option < Transaction > > {
112+ let txid_hex = bitcoin:: consensus:: encode:: serialize_hex ( txid) ;
113+ let tx_path = format ! ( "tx/{}.json" , txid_hex) ;
114+
115+ match self
116+ . rest_client
117+ . request_resource :: < JsonResponse , GetRawTransactionResponse > ( & tx_path)
118+ . await
119+ {
120+ Ok ( resp) => Ok ( Some ( resp. 0 ) ) ,
121+ Err ( e) => match e. kind ( ) {
122+ std:: io:: ErrorKind :: Other => {
123+ let http_error_res: Result < Box < HttpError > , _ > = e. downcast ( ) ;
124+ match http_error_res {
125+ Ok ( http_error) => {
126+ if & http_error. status_code == "404" {
127+ Ok ( None )
128+ } else {
129+ Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , http_error) )
130+ }
131+ } ,
132+ Err ( _) => {
133+ let error_msg = format ! ( "Failed to process {} response." , tx_path) ;
134+ Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , error_msg. as_str ( ) ) )
135+ } ,
136+ }
137+ } ,
138+ _ => {
139+ let error_msg = format ! ( "Failed to process {} response." , tx_path) ;
140+ Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , error_msg. as_str ( ) ) )
141+ } ,
142+ } ,
143+ }
144+ }
145+
146+ /// Retrieves the transaction IDs in the mempool.
147+ pub ( crate ) async fn get_raw_mempool ( & self ) -> std:: io:: Result < Vec < Txid > > {
81148 self . rest_client
82- . request_resource :: < _ , MempoolMinFeeResponse > ( "rest/mempool/info.json" )
149+ . request_resource :: < JsonResponse , GetRawMempoolResponse > (
150+ "mempool/contents.json?verbose=false" ,
151+ )
83152 . await
84- . map ( |resp| resp. mempoolminfee . into ( ) )
153+ . map ( |resp| resp. 0 )
154+ }
155+
156+ /// Retrieves a mempool entry if it exists.
157+ ///
158+ /// We rely on the internal RPC client to make this call because
159+ /// using the REST interface is not as efficient. With REST, we have
160+ /// to retrieve the list of entries in the mempool and then filter
161+ /// for the txid of interest.
162+ pub ( crate ) async fn get_mempool_entry (
163+ & self , txid : Txid ,
164+ ) -> std:: io:: Result < Option < MempoolEntry > > {
165+ let txid_hex = bitcoin:: consensus:: encode:: serialize_hex ( & txid) ;
166+ let txid_json = serde_json:: json!( txid_hex) ;
167+
168+ match self
169+ . rpc_client
170+ . call_method :: < GetMempoolEntryResponse > ( "getmempoolentry" , & [ txid_json] )
171+ . await
172+ {
173+ Ok ( resp) => Ok ( Some ( MempoolEntry { txid, time : resp. time , height : resp. height } ) ) ,
174+ Err ( e) => match e. into_inner ( ) {
175+ Some ( inner) => {
176+ let rpc_error_res: Result < Box < RpcError > , _ > = inner. downcast ( ) ;
177+
178+ match rpc_error_res {
179+ Ok ( rpc_error) => {
180+ // Check if it's the 'not found' error code.
181+ if rpc_error. code == -5 {
182+ Ok ( None )
183+ } else {
184+ Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , rpc_error) )
185+ }
186+ } ,
187+ Err ( _) => Err ( std:: io:: Error :: new (
188+ std:: io:: ErrorKind :: Other ,
189+ "Failed to process getmempoolentry response" ,
190+ ) ) ,
191+ }
192+ } ,
193+ None => Err ( std:: io:: Error :: new (
194+ std:: io:: ErrorKind :: Other ,
195+ "Failed to process getmempoolentry response" ,
196+ ) ) ,
197+ } ,
198+ }
199+ }
200+
201+ pub ( crate ) async fn update_mempool_entries_cache ( & self ) -> std:: io:: Result < ( ) > {
202+ let mempool_txids = self . get_raw_mempool ( ) . await ?;
203+
204+ let mut mempool_entries_cache = self . mempool_entries_cache . lock ( ) . await ;
205+ mempool_entries_cache. retain ( |txid, _entry| mempool_txids. contains ( txid) ) ;
206+
207+ if let Some ( difference) = mempool_txids. len ( ) . checked_sub ( mempool_entries_cache. capacity ( ) )
208+ {
209+ mempool_entries_cache. reserve ( difference) ;
210+ }
211+
212+ for txid in mempool_txids {
213+ if mempool_entries_cache. contains_key ( & txid) {
214+ continue ;
215+ }
216+
217+ if let Some ( entry) = self . get_mempool_entry ( txid) . await ? {
218+ mempool_entries_cache. insert ( txid, entry. clone ( ) ) ;
219+ }
220+ }
221+
222+ mempool_entries_cache. shrink_to_fit ( ) ;
223+
224+ Ok ( ( ) )
225+ }
226+
227+ pub ( crate ) async fn get_mempool_transactions_and_timestamp_at_height (
228+ & self , best_processed_height : u32 ,
229+ ) -> std:: io:: Result < Vec < ( Transaction , u64 ) > > {
230+ let prev_mempool_time = self . latest_mempool_timestamp . load ( Ordering :: Relaxed ) ;
231+ let mut latest_time = prev_mempool_time;
232+
233+ self . update_mempool_entries_cache ( ) . await ?;
234+
235+ let mempool_entries_cache = self . mempool_entries_cache . lock ( ) . await ;
236+ let mut mempool_txs_cache = self . mempool_txs_cache . lock ( ) . await ;
237+ mempool_txs_cache. retain ( |txid, _entry| mempool_entries_cache. contains_key ( txid) ) ;
238+
239+ if let Some ( difference) =
240+ mempool_entries_cache. len ( ) . checked_sub ( mempool_txs_cache. capacity ( ) )
241+ {
242+ mempool_txs_cache. reserve ( difference) ;
243+ }
244+
245+ let mut txs_to_emit = Vec :: with_capacity ( mempool_entries_cache. len ( ) ) ;
246+ for ( txid, entry) in mempool_entries_cache. iter ( ) {
247+ if entry. time > latest_time {
248+ latest_time = entry. time ;
249+ }
250+
251+ // Avoid emitting transactions that are already emitted if we can guarantee
252+ // blocks containing ancestors are already emitted. The bitcoind rpc interface
253+ // provides us with the block height that the tx is introduces to the mempool.
254+ // If we have already emitted the block of height, we can assume that all
255+ // ancestor txs have been processed by the receiver.
256+ let ancestor_within_height = entry. height <= best_processed_height;
257+ let is_already_emitted = entry. time <= prev_mempool_time;
258+ if is_already_emitted && ancestor_within_height {
259+ continue ;
260+ }
261+
262+ if let Some ( ( cached_tx, cached_time) ) = mempool_txs_cache. get ( txid) {
263+ txs_to_emit. push ( ( cached_tx. clone ( ) , * cached_time) ) ;
264+ continue ;
265+ }
266+
267+ match self . get_raw_transaction ( & entry. txid ) . await {
268+ Ok ( Some ( tx) ) => {
269+ mempool_txs_cache. insert ( entry. txid , ( tx. clone ( ) , entry. time ) ) ;
270+ txs_to_emit. push ( ( tx, entry. time ) ) ;
271+ } ,
272+ Ok ( None ) => {
273+ continue ;
274+ } ,
275+ Err ( e) => return Err ( e) ,
276+ }
277+ }
278+
279+ if !txs_to_emit. is_empty ( ) {
280+ self . latest_mempool_timestamp . store ( latest_time, Ordering :: Release ) ;
281+ }
282+
283+ Ok ( txs_to_emit)
284+ }
285+ }
286+
287+ impl BlockSource for BitcoindRestClient {
288+ fn get_header < ' a > (
289+ & ' a self , header_hash : & ' a bitcoin:: BlockHash , height_hint : Option < u32 > ,
290+ ) -> lightning_block_sync:: AsyncBlockSourceResult < ' a , lightning_block_sync:: BlockHeaderData > {
291+ Box :: pin ( async move { self . rest_client . get_header ( header_hash, height_hint) . await } )
292+ }
293+
294+ fn get_block < ' a > (
295+ & ' a self , header_hash : & ' a bitcoin:: BlockHash ,
296+ ) -> lightning_block_sync:: AsyncBlockSourceResult < ' a , lightning_block_sync:: BlockData > {
297+ Box :: pin ( async move { self . rest_client . get_block ( header_hash) . await } )
298+ }
299+
300+ fn get_best_block (
301+ & self ,
302+ ) -> lightning_block_sync:: AsyncBlockSourceResult < ( bitcoin:: BlockHash , Option < u32 > ) > {
303+ Box :: pin ( async move { self . rest_client . get_best_block ( ) . await } )
85304 }
305+ }
306+
307+ #[ derive( Debug ) ]
308+ struct HttpError {
309+ pub ( crate ) status_code : String ,
310+ pub ( crate ) contents : Vec < u8 > ,
311+ }
312+
313+ impl std:: error:: Error for HttpError { }
86314
87- pub ( crate ) async fn get_raw_transaction ( & self , txid : & Txid ) -> std:: io:: Result < Transaction > {
88- todo ! ( )
315+ impl Display for HttpError {
316+ fn fmt ( & self , f : & mut fmt:: Formatter ) -> fmt:: Result {
317+ let contents = String :: from_utf8_lossy ( & self . contents ) ;
318+ write ! ( f, "status_code: {}, contents: {}" , self . status_code, contents)
89319 }
90320}
0 commit comments