diff --git a/Cargo.lock b/Cargo.lock index 2539eabb..fbaf5ea2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -476,7 +476,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b59a3f7fbe678874fa34354097644a171276e02a49934c13b3d61c54610ddf39" dependencies = [ "bdk_core", - "electrum-client 0.24.1", + "electrum-client", ] [[package]] @@ -844,12 +844,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "chunked_transfer" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e4de3bc4ea267985becf712dc6d9eed8b04c953b3fcfb339ebc87acd9804901" - [[package]] name = "cipher" version = "0.4.4" @@ -1362,23 +1356,6 @@ dependencies = [ "serde", ] -[[package]] -name = "electrum-client" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c7b1f8783238bb18e6e137875b0a66f3dffe6c7ea84066e05d033cf180b150f" -dependencies = [ - "bitcoin", - "byteorder", - "libc", - "log", - "rustls 0.23.38", - "serde", - "serde_json", - "webpki-roots 0.25.4", - "winapi", -] - [[package]] name = "electrum-client" version = "0.24.1" @@ -2404,19 +2381,6 @@ dependencies = [ "possiblyrandom", ] -[[package]] -name = "lightning-block-sync" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee5069846b07a62aaecdaf25233e067bc69f245b7c8fd00cc9c217053221f875" -dependencies = [ - "bitcoin", - "chunked_transfer", - "lightning", - "serde_json", - "tokio", -] - [[package]] name = "lightning-dns-resolver" version = "0.3.0" @@ -2503,6 +2467,17 @@ dependencies = [ "lightning", ] +[[package]] +name = "lightning-transaction-sync" +version = "0.2.1" +dependencies = [ + "bitcoin", + "electrum-client", + "esplora-client", + "lightning", + "lightning-macros 0.2.1", +] + [[package]] name = "lightning-types" version = "0.3.1" @@ -3698,20 +3673,21 @@ dependencies = [ "clap", "dircmp", "dirs", - "electrum-client 0.20.0", + "electrum-client", + "esplora-client", "futures", "hex-conservative 0.3.2", "http", "lazy_static", "lightning", "lightning-background-processor", - "lightning-block-sync", "lightning-dns-resolver", "lightning-invoice", "lightning-macros 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "lightning-net-tokio", "lightning-persister", "lightning-rapid-gossip-sync", + "lightning-transaction-sync", "magic-crypt", "once_cell", "rand 0.8.6", @@ -3748,7 +3724,7 @@ dependencies = [ "baid64", "base85", "chrono", - "electrum-client 0.24.1", + "electrum-client", "esplora-client", "getrandom 0.3.4", "indexmap", diff --git a/Cargo.toml b/Cargo.toml index fed84e29..b7f49615 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,17 +22,19 @@ chacha20poly1305 = { version = "0.10.1", features = ["stream"] } chrono = { version = "0.4", default-features = false, features = ["clock"] } clap = "4.5.20" dirs = "5.0.1" +electrum-client = { version = "0.24.0", default-features = false, features = ["use-rustls"] } +esplora-client = { version = "0.12", default-features = false, features = ["blocking-https-rustls"] } futures = "0.3" hex = { package = "hex-conservative", version = "0.3.0", default-features = false } lightning = { version = "0.2.0", path = "./rust-lightning/lightning", features = ["dnssec"] } lightning-background-processor = { version = "0.2.0", path = "./rust-lightning/lightning-background-processor" } -lightning-block-sync = { version = "0.2.0", features = ["rpc-client", "tokio"] } lightning-dns-resolver = { version = "0.3.0", path = "./rust-lightning/lightning-dns-resolver" } lightning-invoice = { version = "0.34.0", features = ["std"], path = "./rust-lightning/lightning-invoice" } lightning-macros = { version = "0.2.0" } lightning-net-tokio = { version = "0.2.0" } lightning-persister = { version = "0.2.0", path = "./rust-lightning/lightning-persister", features = ["tokio"] } lightning-rapid-gossip-sync = { version = "0.2.0", path = "./rust-lightning/lightning-rapid-gossip-sync" } +lightning-transaction-sync = { version = "0.2.0", path = "./rust-lightning/lightning-transaction-sync", features = ["electrum", "esplora-blocking"] } magic-crypt = "4.0.1" rand = "0.8.5" regex = { version = "1.11", default-features = false } @@ -59,7 +61,6 @@ zip = { version = "2.2.0", default-features = false, features = ["time", "zstd"] [dev-dependencies] dircmp = "0.2.0" -electrum-client = "0.20.0" http = "1.4.0" lazy_static = { version = "1.5.0", default-features = false } lightning = { version = "0.2.0", path = "./rust-lightning/lightning", features = ["_rln_test_hooks"] } diff --git a/README.md b/README.md index 5a89e408..0d957bcd 100644 --- a/README.md +++ b/README.md @@ -46,14 +46,12 @@ docker build -t rgb-lightning-node . ## Run In order to operate, the node will need: -- a bitcoind node - an indexer instance (electrum or esplora) - an [RGB proxy server] instance Once services are running, daemons can be started. Each daemon needs to be started in a separate shell with `rgb-lightning-node`, specifying: -- bitcoind user, password, host and port - node data directory - node listening port - LN peer listening port @@ -129,18 +127,10 @@ For more info about regtest utility commands, run: ``` When unlocking regtest nodes use the following local services: -- bitcoind_rpc_username: user -- bitcoind_rpc_password: password -- bitcoind_rpc_host: localhost -- bitcoind_rpc_port: 18433 - indexer_url: 127.0.0.1:50001 - proxy_endpoint: rpc://127.0.0.1:3000/json-rpc To unlock a regtest nodes running in docker use the following local services: -- bitcoind_rpc_username: user -- bitcoind_rpc_password: password -- bitcoind_rpc_host: bitcoind -- bitcoind_rpc_port: 18433 - indexer_url: electrs:50001 - proxy_endpoint: rpc://proxy:3000/json-rpc @@ -172,10 +162,6 @@ rgb-lightning-node dataldk2/ --daemon-listening-port 3003 \ ``` When unlocking testnet3 nodes you can use the following services: -- bitcoind_rpc_username: user -- bitcoind_rpc_username: password -- bitcoind_rpc_host: electrum.iriswallet.com -- bitcoind_rpc_port: 18332 - indexer_url: ssl://electrum.iriswallet.com:50013 - proxy_endpoint: rpcs://proxy.iriswallet.com/0.2/json-rpc @@ -183,7 +169,6 @@ When unlocking testnet3 nodes you can use the following services: To run testnet4 use the same options as testnet3 except for: - CLI arg: `--network testnet4` -- bitcoind_rpc_port: 18443 - indexer_url: ssl://electrum.iriswallet.com:50053 ## Use diff --git a/openapi.yaml b/openapi.yaml index 32e674d7..21a0ecf9 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -3117,27 +3117,11 @@ components: type: object required: - password - - bitcoind_rpc_username - - bitcoind_rpc_password - - bitcoind_rpc_host - - bitcoind_rpc_port - announce_addresses properties: password: type: string example: nodepassword - bitcoind_rpc_username: - type: string - example: user - bitcoind_rpc_password: - type: string - example: password - bitcoind_rpc_host: - type: string - example: localhost - bitcoind_rpc_port: - type: integer - example: 18443 indexer_url: type: - string diff --git a/src/bitcoind.rs b/src/bitcoind.rs deleted file mode 100644 index 3150ae2b..00000000 --- a/src/bitcoind.rs +++ /dev/null @@ -1,358 +0,0 @@ -use base64::{engine::general_purpose, Engine as _}; -use bitcoin::blockdata::transaction::Transaction; -use bitcoin::consensus::encode; -use bitcoin::hash_types::BlockHash; -use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; -use lightning::log_warn; -use lightning::util::logger::Logger; -use lightning_block_sync::http::HttpEndpoint; -use lightning_block_sync::http::JsonResponse; -use lightning_block_sync::rpc::RpcClient; -use lightning_block_sync::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource}; -use std::collections::HashMap; -use std::convert::TryInto; -use std::str::FromStr; -use std::sync::atomic::{AtomicU32, Ordering}; -use std::sync::Arc; -use std::time::Duration; - -use crate::disk::FilesystemLogger; -#[cfg(test)] -use crate::test::mock_fee; - -pub struct BitcoindClient { - pub(crate) bitcoind_rpc_client: Arc, - fees: Arc>, - handle: tokio::runtime::Handle, - logger: Arc, -} - -impl BlockSource for BitcoindClient { - fn get_header<'a>( - &'a self, - header_hash: &'a BlockHash, - height_hint: Option, - ) -> AsyncBlockSourceResult<'a, BlockHeaderData> { - Box::pin(async move { - self.bitcoind_rpc_client - .get_header(header_hash, height_hint) - .await - }) - } - - fn get_block<'a>( - &'a self, - header_hash: &'a BlockHash, - ) -> AsyncBlockSourceResult<'a, BlockData> { - Box::pin(async move { self.bitcoind_rpc_client.get_block(header_hash).await }) - } - - fn get_best_block(&self) -> AsyncBlockSourceResult<'_, (BlockHash, Option)> { - Box::pin(async move { self.bitcoind_rpc_client.get_best_block().await }) - } -} - -pub struct MempoolMinFeeResponse { - pub feerate_sat_per_kw: Option, - pub errored: bool, -} - -impl TryInto for JsonResponse { - type Error = std::io::Error; - fn try_into(self) -> std::io::Result { - let errored = !self.0["errors"].is_null(); - assert_eq!(self.0["maxmempool"].as_u64(), Some(300000000)); - Ok(MempoolMinFeeResponse { - errored, - feerate_sat_per_kw: self.0["mempoolminfee"] - .as_f64() - .map(|feerate_btc_per_kvbyte| { - (feerate_btc_per_kvbyte * 100_000_000.0 / 4.0).round() as u32 - }), - }) - } -} - -pub struct BlockchainInfo { - pub latest_height: usize, - pub latest_blockhash: BlockHash, - pub chain: String, -} - -impl TryInto for JsonResponse { - type Error = std::io::Error; - fn try_into(self) -> std::io::Result { - Ok(BlockchainInfo { - latest_height: self.0["blocks"].as_u64().unwrap() as usize, - latest_blockhash: BlockHash::from_str(self.0["bestblockhash"].as_str().unwrap()) - .unwrap(), - chain: self.0["chain"].as_str().unwrap().to_string(), - }) - } -} - -pub struct FeeResponse { - pub feerate_sat_per_kw: Option, - pub errored: bool, -} - -impl TryInto for JsonResponse { - type Error = std::io::Error; - fn try_into(self) -> std::io::Result { - let errored = !self.0["errors"].is_null(); - Ok(FeeResponse { - errored, - feerate_sat_per_kw: self.0["feerate"].as_f64().map(|feerate_btc_per_kvbyte| { - (feerate_btc_per_kvbyte * 100_000_000.0 / 4.0).round() as u32 - }), - }) - } -} - -/// The minimum feerate we are allowed to send, as specify by LDK. -const MIN_FEERATE: u32 = 253; - -impl BitcoindClient { - pub(crate) async fn new( - host: String, - port: u16, - rpc_user: String, - rpc_password: String, - handle: tokio::runtime::Handle, - logger: Arc, - ) -> std::io::Result { - let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port); - let rpc_credentials = general_purpose::STANDARD.encode(format!( - "{}:{}", - rpc_user.clone(), - rpc_password.clone() - )); - let bitcoind_rpc_client = RpcClient::new(&rpc_credentials, http_endpoint); - let _dummy = bitcoind_rpc_client - .call_method::("getblockchaininfo", &[]) - .await - .map_err(|_| { - std::io::Error::new(std::io::ErrorKind::PermissionDenied, - "failed to make initial call to bitcoind - please check your RPC user/password and access settings") - })?; - let mut fees: HashMap = HashMap::new(); - fees.insert( - ConfirmationTarget::MaximumFeeEstimate, - AtomicU32::new(50000), - ); - fees.insert(ConfirmationTarget::UrgentOnChainSweep, AtomicU32::new(5000)); - fees.insert( - ConfirmationTarget::MinAllowedAnchorChannelRemoteFee, - AtomicU32::new(MIN_FEERATE), - ); - fees.insert( - ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee, - AtomicU32::new(MIN_FEERATE), - ); - fees.insert( - ConfirmationTarget::AnchorChannelFee, - AtomicU32::new(MIN_FEERATE), - ); - fees.insert( - ConfirmationTarget::NonAnchorChannelFee, - AtomicU32::new(2000), - ); - fees.insert( - ConfirmationTarget::ChannelCloseMinimum, - AtomicU32::new(MIN_FEERATE), - ); - fees.insert( - ConfirmationTarget::OutputSpendingFee, - AtomicU32::new(MIN_FEERATE), - ); - - let client = Self { - bitcoind_rpc_client: Arc::new(bitcoind_rpc_client), - fees: Arc::new(fees), - handle: handle.clone(), - logger, - }; - BitcoindClient::poll_for_fee_estimates( - client.fees.clone(), - client.bitcoind_rpc_client.clone(), - client.logger.clone(), - handle, - ); - Ok(client) - } - - fn poll_for_fee_estimates( - fees: Arc>, - rpc_client: Arc, - logger: Arc, - handle: tokio::runtime::Handle, - ) { - handle.spawn(async move { - async fn get_estimate( - rpc_client: &Arc, - logger: &Arc, - params: &[serde_json::Value], - default: u32, - ) -> u32 { - match rpc_client - .call_method::("estimatesmartfee", params) - .await - { - Ok(res) => match res.feerate_sat_per_kw { - Some(feerate) => Some(std::cmp::max(feerate, MIN_FEERATE)), - None => { - log_warn!(logger, "Fee estimation unavailable"); - None - } - }, - Err(e) => { - log_warn!(logger, "Error getting fee estimate: {}", e); - None - } - } - .unwrap_or(default) - } - - loop { - let mempoolmin_estimate = { - match rpc_client - .call_method::("getmempoolinfo", &[]) - .await - { - Ok(res) => match res.feerate_sat_per_kw { - Some(feerate) => Some(std::cmp::max(feerate, MIN_FEERATE)), - None => { - log_warn!(logger, "Mempool info unavailable"); - None - } - }, - Err(e) => { - log_warn!(logger, "Error getting mepool info: {}", e); - None - } - } - .unwrap_or(MIN_FEERATE) - }; - let background_estimate = get_estimate( - &rpc_client, - &logger, - &[serde_json::json!(144), serde_json::json!("ECONOMICAL")], - MIN_FEERATE, - ) - .await; - - let normal_estimate = get_estimate( - &rpc_client, - &logger, - &[serde_json::json!(18), serde_json::json!("ECONOMICAL")], - 2000, - ) - .await; - - let high_prio_estimate = get_estimate( - &rpc_client, - &logger, - &[serde_json::json!(6), serde_json::json!("CONSERVATIVE")], - 5000, - ) - .await; - - let very_high_prio_estimate = get_estimate( - &rpc_client, - &logger, - &[serde_json::json!(2), serde_json::json!("CONSERVATIVE")], - 50000, - ) - .await; - - fees.get(&ConfirmationTarget::MaximumFeeEstimate) - .unwrap() - .store(very_high_prio_estimate, Ordering::Release); - fees.get(&ConfirmationTarget::UrgentOnChainSweep) - .unwrap() - .store(high_prio_estimate, Ordering::Release); - fees.get(&ConfirmationTarget::MinAllowedAnchorChannelRemoteFee) - .unwrap() - .store(mempoolmin_estimate, Ordering::Release); - fees.get(&ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee) - .unwrap() - .store(background_estimate - 250, Ordering::Release); - fees.get(&ConfirmationTarget::AnchorChannelFee) - .unwrap() - .store(background_estimate, Ordering::Release); - fees.get(&ConfirmationTarget::NonAnchorChannelFee) - .unwrap() - .store(normal_estimate, Ordering::Release); - fees.get(&ConfirmationTarget::ChannelCloseMinimum) - .unwrap() - .store(background_estimate, Ordering::Release); - fees.get(&ConfirmationTarget::OutputSpendingFee) - .unwrap() - .store(background_estimate, Ordering::Release); - - tokio::time::sleep(Duration::from_secs(60)).await; - } - }); - } - - pub async fn get_blockchain_info(&self) -> BlockchainInfo { - self.bitcoind_rpc_client - .call_method::("getblockchaininfo", &[]) - .await - .unwrap() - } -} - -impl FeeEstimator for BitcoindClient { - fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 { - let fee = self - .fees - .get(&confirmation_target) - .unwrap() - .load(Ordering::Acquire); - #[cfg(test)] - let fee = mock_fee(fee); - fee - } -} - -impl BroadcasterInterface for BitcoindClient { - fn broadcast_transactions(&self, txs: &[&Transaction]) { - // As of Bitcoin Core 28, using `submitpackage` allows us to broadcast multiple - // transactions at once and have them propagate through the network as a whole, avoiding - // some pitfalls with anchor channels where the first transaction doesn't make it into the - // mempool at all. Several older versions of Bitcoin Core also support `submitpackage`, - // however, so we just use it unconditionally here. - // Sadly, Bitcoin Core has an arbitrary restriction on `submitpackage` - it must actually - // contain a package (see https://github.com/bitcoin/bitcoin/issues/31085). - let txn = txs.iter().map(encode::serialize_hex).collect::>(); - let bitcoind_rpc_client = Arc::clone(&self.bitcoind_rpc_client); - let logger = Arc::clone(&self.logger); - self.handle.spawn(async move { - let res = if txn.len() == 1 { - let tx_json = serde_json::json!(txn[0]); - bitcoind_rpc_client - .call_method::("sendrawtransaction", &[tx_json]) - .await - } else { - let tx_json = serde_json::json!(txn); - bitcoind_rpc_client - .call_method::("submitpackage", &[tx_json]) - .await - }; - // This may error due to RL calling `broadcast_transactions` with the same transaction - // multiple times, but the error is safe to ignore. - match res { - Ok(_) => {} - Err(e) => { - let err_str = e.get_ref().unwrap().to_string(); - log_warn!(logger, - "Warning, failed to broadcast a transaction, this is likely okay but may indicate an error: {}\nTransactions: {:?}", - err_str, - txn); - print!("Warning, failed to broadcast a transaction, this is likely okay but may indicate an error: {err_str}\n> "); - } - } - }); - } -} diff --git a/src/error.rs b/src/error.rs index fd32d4af..03c5b908 100644 --- a/src/error.rs +++ b/src/error.rs @@ -4,7 +4,7 @@ use axum::{ response::{IntoResponse, Response}, Json, }; -use rgb_lib::{BitcoinNetwork, Error as RgbLibError}; +use rgb_lib::Error as RgbLibError; use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize)] @@ -56,9 +56,6 @@ pub enum APIError { #[error("Failed to sync BDK: {0}")] FailedBdkSync(String), - #[error("Failed to connect to bitcoind client: {0}")] - FailedBitcoindConnection(String), - #[error("Failed broadcast: {0}")] FailedBroadcast(String), @@ -245,9 +242,6 @@ pub enum APIError { #[error("Network error: {0}")] Network(String), - #[error("The network of the given bitcoind ({0}) doesn't match the node's chain ({1})")] - NetworkMismatch(String, BitcoinNetwork), - #[error("No uncolored UTXOs are available (hint: call createutxos)")] NoAvailableUtxos, @@ -498,7 +492,6 @@ impl IntoResponse for APIError { | APIError::ChangingState | APIError::DuplicatePayment(_) | APIError::FailedBdkSync(_) - | APIError::FailedBitcoindConnection(_) | APIError::FailedBroadcast(_) | APIError::FailedPeerConnection | APIError::InsufficientAssets @@ -510,7 +503,6 @@ impl IntoResponse for APIError { | APIError::LockedNode | APIError::MaxFeeExceeded(_) | APIError::MinFeeNotMet(_) - | APIError::NetworkMismatch(_, _) | APIError::NoAvailableUtxos | APIError::NoRoute | APIError::NotInitialized diff --git a/src/indexer.rs b/src/indexer.rs new file mode 100644 index 00000000..46e82c37 --- /dev/null +++ b/src/indexer.rs @@ -0,0 +1,687 @@ +use bitcoin::block::Header; +use bitcoin::blockdata::transaction::Transaction; +use bitcoin::consensus::encode; +use bitcoin::constants::ChainHash; +use bitcoin::{BlockHash, Network, Script, ScriptBuf, TxOut, Txid}; +use electrum_client::utils::validate_merkle_proof; +use electrum_client::{Client as ElectrumClient, ElectrumApi, Param}; +use esplora_client::blocking::BlockingClient as EsploraBlockingClient; +use esplora_client::Builder as EsploraBuilder; +use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; +use lightning::chain::{BestBlock, Confirm, Filter, WatchedOutput}; +use lightning::log_warn; +use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult}; +use lightning::util::logger::Logger; +use lightning_transaction_sync::{ElectrumSyncClient, EsploraSyncClient}; +use rgb_lib::wallet::rust_only::IndexerProtocol as RgbLibIndexerProtocol; +use std::collections::{BTreeMap, HashMap}; +use std::io; +use std::str::FromStr; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use crate::disk::FilesystemLogger; +use crate::ldk::PeerGossipSync; +#[cfg(test)] +use crate::test::mock_fee; + +type Confirmable = Arc; + +const MIN_FEERATE: u32 = 253; + +enum IndexerBackend { + Electrum(Arc), + Esplora(Arc), +} + +pub(crate) struct IndexerClient { + backend: IndexerBackend, + fees: Arc>, + network: Network, + handle: tokio::runtime::Handle, + logger: Arc, +} + +pub(crate) struct IndexerGossipVerifier { + client: Arc, + gossiper: Arc, + peer_manager_wake: Arc, +} + +pub(crate) enum IndexerSyncClient { + Electrum { + client: ElectrumSyncClient>, + registered_txs: Mutex>, + }, + Esplora(EsploraSyncClient>), +} + +pub(crate) struct RegisteredTx { + script_pubkey: ScriptBuf, + confirmed: Option<(u32, BlockHash)>, +} + +struct ConfirmedRegisteredTx { + tx: Transaction, + header: Header, + height: u32, + pos: usize, +} + +impl IndexerClient { + pub(crate) fn new( + server_url: String, + protocol: RgbLibIndexerProtocol, + network: Network, + handle: tokio::runtime::Handle, + logger: Arc, + ) -> io::Result { + let fees = Arc::new(default_fee_buckets()); + let backend = match protocol { + RgbLibIndexerProtocol::Electrum => { + let client = Arc::new(ElectrumClient::new(&server_url).map_err(|e| { + io::Error::other(format!("failed to connect to electrum server: {e}")) + })?); + client.server_features().map_err(|e| { + io::Error::other(format!("failed to query electrum server features: {e}")) + })?; + poll_electrum_fee_estimates( + fees.clone(), + client.clone(), + logger.clone(), + handle.clone(), + ); + IndexerBackend::Electrum(client) + } + RgbLibIndexerProtocol::Esplora => { + let client = Arc::new(EsploraBuilder::new(&server_url).build_blocking()); + client.get_tip_hash().map_err(|e| { + io::Error::other(format!("failed to connect to esplora server: {e}")) + })?; + client.get_height().map_err(|e| { + io::Error::other(format!("failed to query esplora tip height: {e}")) + })?; + poll_esplora_fee_estimates( + fees.clone(), + client.clone(), + logger.clone(), + handle.clone(), + ); + IndexerBackend::Esplora(client) + } + }; + + Ok(Self { + backend, + fees, + network, + handle, + logger, + }) + } + + pub(crate) fn get_best_block(&self) -> io::Result { + match &self.backend { + IndexerBackend::Electrum(client) => { + let tip = client.block_headers_subscribe().map_err(|e| { + io::Error::other(format!("failed to fetch electrum tip header: {e}")) + })?; + Ok(BestBlock::new(tip.header.block_hash(), tip.height as u32)) + } + IndexerBackend::Esplora(client) => { + let tip_hash = client.get_tip_hash().map_err(|e| { + io::Error::other(format!("failed to fetch esplora tip hash: {e}")) + })?; + let tip_height = client.get_height().map_err(|e| { + io::Error::other(format!("failed to fetch esplora tip height: {e}")) + })?; + Ok(BestBlock::new(tip_hash, tip_height)) + } + } + } + + fn lookup_utxo( + &self, + chain_hash: ChainHash, + short_channel_id: u64, + ) -> Result { + if chain_hash != ChainHash::using_genesis_block(self.network) { + return Err(UtxoLookupError::UnknownChain); + } + + let height = (short_channel_id >> 40) as u32; + let tx_index = ((short_channel_id >> 16) & 0x00ff_ffff) as usize; + let vout = (short_channel_id & 0xffff) as usize; + + let txout = match &self.backend { + IndexerBackend::Electrum(client) => { + match electrum_txid_from_pos(client, height as usize, tx_index) + .and_then(|txid| client.transaction_get(&txid)) + { + Ok(tx) => tx.output.get(vout).cloned(), + Err(_) => None, + } + } + IndexerBackend::Esplora(client) => client + .get_block_hash(height) + .and_then(|block_hash| client.get_txid_at_block_index(&block_hash, tx_index)) + .and_then(|txid| match txid { + Some(txid) => client.get_tx_no_opt(&txid).map(Some), + None => Ok(None), + }) + .ok() + .flatten() + .and_then(|tx| tx.output.get(vout).cloned()), + }; + + match txout { + Some(txout) => Ok(txout), + None => Err(UtxoLookupError::UnknownTx), + } + } +} + +impl UtxoLookup for IndexerClient { + fn get_utxo(&self, chain_hash: &ChainHash, short_channel_id: u64) -> UtxoResult { + UtxoResult::Sync(self.lookup_utxo(*chain_hash, short_channel_id)) + } +} + +impl IndexerGossipVerifier { + pub(crate) fn new( + client: Arc, + gossiper: Arc, + peer_manager_wake: Arc, + ) -> Self { + Self { + client, + gossiper, + peer_manager_wake, + } + } +} + +impl UtxoLookup for IndexerGossipVerifier { + fn get_utxo(&self, chain_hash: &ChainHash, short_channel_id: u64) -> UtxoResult { + let result = UtxoFuture::new(); + let future = result.clone(); + let chain_hash = *chain_hash; + let client = self.client.clone(); + let gossiper = self.gossiper.clone(); + let peer_manager_wake = self.peer_manager_wake.clone(); + self.client.handle.spawn(async move { + let lookup = tokio::task::spawn_blocking(move || { + client.lookup_utxo(chain_hash, short_channel_id) + }) + .await + .unwrap_or(Err(UtxoLookupError::UnknownTx)); + future.resolve(gossiper.network_graph(), &*gossiper, lookup); + peer_manager_wake(); + }); + UtxoResult::Async(result) + } +} + +fn electrum_txid_from_pos( + client: &ElectrumClient, + height: usize, + tx_pos: usize, +) -> Result { + let value = client.raw_call( + "blockchain.transaction.id_from_pos", + [ + Param::Usize(height), + Param::Usize(tx_pos), + Param::Bool(true), + ], + )?; + let txid = value + .as_str() + .or_else(|| value.get("tx_hash").and_then(serde_json::Value::as_str)) + .or_else(|| value.get("txid").and_then(serde_json::Value::as_str)) + .or_else(|| value.get("tx_id").and_then(serde_json::Value::as_str)) + .map(str::to_owned) + .ok_or_else(|| electrum_client::Error::InvalidResponse(value.clone()))?; + + Txid::from_str(&txid).map_err(|_| electrum_client::Error::InvalidResponse(value)) +} + +impl IndexerSyncClient { + pub(crate) fn new( + server_url: String, + protocol: RgbLibIndexerProtocol, + logger: Arc, + ) -> io::Result { + match protocol { + RgbLibIndexerProtocol::Electrum => { + let client = ElectrumSyncClient::new(server_url, logger).map_err(|e| { + io::Error::other(format!("failed to initialize electrum sync client: {e}")) + })?; + Ok(Self::Electrum { + client, + registered_txs: Mutex::new(HashMap::new()), + }) + } + RgbLibIndexerProtocol::Esplora => { + Ok(Self::Esplora(EsploraSyncClient::new(server_url, logger))) + } + } + } + + pub(crate) fn sync( + &self, + confirmables: Vec, + ) -> Result<(), Box> { + match self { + Self::Electrum { + client, + registered_txs, + } => { + client + .sync(confirmables.clone()) + .map_err(|e| -> Box { Box::new(e) })?; + sync_electrum_registered_txs(client.client(), registered_txs, &confirmables) + } + Self::Esplora(client) => client + .sync(confirmables) + .map_err(|e| -> Box { Box::new(e) }), + } + } +} + +impl Filter for IndexerSyncClient { + fn register_tx(&self, txid: &Txid, script_pubkey: &Script) { + match self { + Self::Electrum { + client, + registered_txs, + } => { + registered_txs.lock().unwrap().insert( + *txid, + RegisteredTx { + script_pubkey: script_pubkey.to_owned(), + confirmed: None, + }, + ); + client.register_tx(txid, script_pubkey); + } + Self::Esplora(client) => client.register_tx(txid, script_pubkey), + } + } + + fn register_output(&self, output: WatchedOutput) { + match self { + Self::Electrum { client, .. } => client.register_output(output), + Self::Esplora(client) => client.register_output(output), + } + } +} + +fn sync_electrum_registered_txs( + client: Arc, + registered_txs: &Mutex>, + confirmables: &[Confirmable], +) -> Result<(), Box> { + let mut confirmed = Vec::new(); + let mut unconfirmed = Vec::new(); + + { + let mut registered_txs = registered_txs.lock().unwrap(); + for (txid, registered_tx) in registered_txs.iter_mut() { + let history = client.script_get_history(®istered_tx.script_pubkey)?; + let confirmed_history = history + .iter() + .find(|history| history.tx_hash == *txid && history.height > 0); + + let Some(confirmed_history) = confirmed_history else { + if registered_tx.confirmed.take().is_some() { + unconfirmed.push(*txid); + } + continue; + }; + + let height = confirmed_history.height as u32; + let tx = client.transaction_get(txid)?; + let merkle_res = client.transaction_get_merkle(txid, height as usize)?; + let header = client.block_header(height as usize)?; + if !validate_merkle_proof(txid, &header.merkle_root, &merkle_res) { + return Err(Box::new(io::Error::other(format!( + "invalid merkle proof for transaction {txid}" + )))); + } + + let block_hash = header.block_hash(); + if registered_tx.confirmed == Some((height, block_hash)) { + continue; + } + registered_tx.confirmed = Some((height, block_hash)); + confirmed.push(ConfirmedRegisteredTx { + tx, + header, + height, + pos: merkle_res.pos, + }); + } + for confirmed_tx in &confirmed { + registered_txs.remove(&confirmed_tx.tx.compute_txid()); + } + } + + for txid in unconfirmed { + for confirmable in confirmables { + confirmable.transaction_unconfirmed(&txid); + } + } + for confirmed_tx in confirmed { + for confirmable in confirmables { + confirmable.transactions_confirmed( + &confirmed_tx.header, + &[(confirmed_tx.pos, &confirmed_tx.tx)], + confirmed_tx.height, + ); + } + } + + Ok(()) +} + +impl FeeEstimator for IndexerClient { + fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 { + let fee = self + .fees + .get(&confirmation_target) + .unwrap() + .load(Ordering::Acquire); + #[cfg(test)] + let fee = mock_fee(fee); + fee + } +} + +impl BroadcasterInterface for IndexerClient { + fn broadcast_transactions(&self, txs: &[&Transaction]) { + match &self.backend { + IndexerBackend::Electrum(client) => { + let txs = txs + .iter() + .map(|tx| encode::serialize(*tx)) + .collect::>(); + let client = client.clone(); + let logger = self.logger.clone(); + self.handle.spawn(async move { + let res = tokio::task::spawn_blocking(move || { + let mut last_error = None; + for tx in txs { + if let Err(e) = client.transaction_broadcast_raw(&tx) { + last_error = Some(e.to_string()); + } + } + last_error.map_or(Ok(()), Err) + }) + .await; + + match res { + Ok(Ok(())) => {} + Ok(Err(e)) => { + log_warn!( + logger, + "Warning, failed to broadcast transaction(s) via electrum: {}", + e + ); + } + Err(e) => { + log_warn!( + logger, + "Warning, failed to spawn electrum broadcaster task: {}", + e + ); + } + } + }); + } + IndexerBackend::Esplora(client) => { + let txs = txs.iter().map(|tx| (*tx).clone()).collect::>(); + let client = client.clone(); + let logger = self.logger.clone(); + self.handle.spawn(async move { + let res = tokio::task::spawn_blocking(move || { + let mut last_error = None; + for tx in txs { + if let Err(e) = client.broadcast(&tx) { + last_error = Some(e.to_string()); + } + } + last_error.map_or(Ok(()), Err) + }) + .await; + + match res { + Ok(Ok(())) => {} + Ok(Err(e)) => { + log_warn!( + logger, + "Warning, failed to broadcast transaction(s) via esplora: {}", + e + ); + } + Err(e) => { + log_warn!( + logger, + "Warning, failed to spawn esplora broadcaster task: {}", + e + ); + } + } + }); + } + } + } +} + +fn default_fee_buckets() -> HashMap { + let mut fees = HashMap::new(); + fees.insert( + ConfirmationTarget::MaximumFeeEstimate, + AtomicU32::new(50000), + ); + fees.insert(ConfirmationTarget::UrgentOnChainSweep, AtomicU32::new(5000)); + fees.insert( + ConfirmationTarget::MinAllowedAnchorChannelRemoteFee, + AtomicU32::new(MIN_FEERATE), + ); + fees.insert( + ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee, + AtomicU32::new(MIN_FEERATE), + ); + fees.insert( + ConfirmationTarget::AnchorChannelFee, + AtomicU32::new(MIN_FEERATE), + ); + fees.insert( + ConfirmationTarget::NonAnchorChannelFee, + AtomicU32::new(2000), + ); + fees.insert( + ConfirmationTarget::ChannelCloseMinimum, + AtomicU32::new(MIN_FEERATE), + ); + fees.insert( + ConfirmationTarget::OutputSpendingFee, + AtomicU32::new(MIN_FEERATE), + ); + fees +} + +fn poll_electrum_fee_estimates( + fees: Arc>, + client: Arc, + logger: Arc, + handle: tokio::runtime::Handle, +) { + handle.spawn(async move { + loop { + let res = tokio::task::spawn_blocking({ + let client = client.clone(); + move || { + Ok::<_, electrum_client::Error>(( + client.estimate_fee(144)?, + client.estimate_fee(18)?, + client.estimate_fee(6)?, + client.estimate_fee(2)?, + )) + } + }) + .await; + + match res { + Ok(Ok((background, normal, high_prio, very_high_prio))) => { + let background_estimate = + fee_rate_from_btc_per_kb(background, MIN_FEERATE).unwrap_or(MIN_FEERATE); + let normal_estimate = fee_rate_from_btc_per_kb(normal, 2000).unwrap_or(2000); + let high_prio_estimate = + fee_rate_from_btc_per_kb(high_prio, 5000).unwrap_or(5000); + let very_high_prio_estimate = + fee_rate_from_btc_per_kb(very_high_prio, 50000).unwrap_or(50000); + + fees.get(&ConfirmationTarget::MaximumFeeEstimate) + .unwrap() + .store(very_high_prio_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::UrgentOnChainSweep) + .unwrap() + .store(high_prio_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::MinAllowedAnchorChannelRemoteFee) + .unwrap() + .store(MIN_FEERATE, Ordering::Release); + fees.get(&ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee) + .unwrap() + .store(background_estimate.saturating_sub(250), Ordering::Release); + fees.get(&ConfirmationTarget::AnchorChannelFee) + .unwrap() + .store(background_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::NonAnchorChannelFee) + .unwrap() + .store(normal_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::ChannelCloseMinimum) + .unwrap() + .store(background_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::OutputSpendingFee) + .unwrap() + .store(background_estimate, Ordering::Release); + } + Ok(Err(e)) => { + log_warn!(logger, "Error getting fee estimate from electrum: {}", e); + } + Err(e) => { + log_warn!(logger, "Error polling electrum fee estimates: {}", e); + } + } + + tokio::time::sleep(Duration::from_secs(60)).await; + } + }); +} + +fn poll_esplora_fee_estimates( + fees: Arc>, + client: Arc, + logger: Arc, + handle: tokio::runtime::Handle, +) { + handle.spawn(async move { + loop { + let res = tokio::task::spawn_blocking({ + let client = client.clone(); + move || client.get_fee_estimates() + }) + .await; + + match res { + Ok(Ok(estimate_map)) => { + let background_estimate = + estimate_fee_rate_sat_per_kw(&estimate_map, 144, MIN_FEERATE); + let normal_estimate = estimate_fee_rate_sat_per_kw(&estimate_map, 18, 2000); + let high_prio_estimate = estimate_fee_rate_sat_per_kw(&estimate_map, 6, 5000); + let very_high_prio_estimate = + estimate_fee_rate_sat_per_kw(&estimate_map, 2, 50000); + + fees.get(&ConfirmationTarget::MaximumFeeEstimate) + .unwrap() + .store(very_high_prio_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::UrgentOnChainSweep) + .unwrap() + .store(high_prio_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::MinAllowedAnchorChannelRemoteFee) + .unwrap() + .store(MIN_FEERATE, Ordering::Release); + fees.get(&ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee) + .unwrap() + .store(background_estimate.saturating_sub(250), Ordering::Release); + fees.get(&ConfirmationTarget::AnchorChannelFee) + .unwrap() + .store(background_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::NonAnchorChannelFee) + .unwrap() + .store(normal_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::ChannelCloseMinimum) + .unwrap() + .store(background_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::OutputSpendingFee) + .unwrap() + .store(background_estimate, Ordering::Release); + } + Ok(Err(e)) => { + log_warn!(logger, "Error getting fee estimate from esplora: {}", e) + } + Err(e) => log_warn!(logger, "Error polling esplora fee estimates: {}", e), + } + + tokio::time::sleep(Duration::from_secs(60)).await; + } + }); +} + +fn estimate_fee_rate_sat_per_kw( + fee_estimates: &HashMap, + blocks: u16, + default: u32, +) -> u32 { + let Some(sat_per_vb) = interpolate_fee_rate(fee_estimates, blocks) else { + return default; + }; + std::cmp::max((sat_per_vb * 250.0).round() as u32, MIN_FEERATE) +} + +fn interpolate_fee_rate(fee_estimates: &HashMap, blocks: u16) -> Option { + if blocks == 0 || fee_estimates.is_empty() { + return None; + } + + let estimate_map = BTreeMap::from_iter(fee_estimates.iter().map(|(k, v)| (*k, *v))); + if let Some(estimate) = estimate_map.get(&blocks) { + return Some(*estimate); + } + + let lower_key = estimate_map.range(..blocks).next_back().map(|(k, _)| *k); + let upper_key = estimate_map.range(blocks..).next().map(|(k, _)| *k); + + match (lower_key, upper_key) { + (Some(x1), Some(x2)) if x1 != x2 => { + let y1 = estimate_map[&x1]; + let y2 = estimate_map[&x2]; + Some(y1 + (blocks as f64 - x1 as f64) / (x2 as f64 - x1 as f64) * (y2 - y1)) + } + (Some(x), _) | (_, Some(x)) => estimate_map.get(&x).copied(), + _ => None, + } +} + +fn fee_rate_from_btc_per_kb(feerate_btc_per_kb: f64, default: u32) -> Option { + if !feerate_btc_per_kb.is_finite() || feerate_btc_per_kb.is_sign_negative() { + return Some(default); + } + Some(std::cmp::max( + (feerate_btc_per_kb * 100_000_000.0 / 4.0).round() as u32, + MIN_FEERATE, + )) +} diff --git a/src/ldk.rs b/src/ldk.rs index 83f719c9..15922598 100644 --- a/src/ldk.rs +++ b/src/ldk.rs @@ -3,13 +3,14 @@ use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::hex::DisplayHex; use bitcoin::psbt::{ExtractTxError, Psbt}; use bitcoin::secp256k1::{All, PublicKey, Secp256k1}; +use bitcoin::TxOut; use bitcoin::{io, Amount, Network}; -use bitcoin::{BlockHash, TxOut}; use bitcoin_bech32::WitnessProgram; use lightning::chain::{chainmonitor, ChannelMonitorUpdateStatus}; -use lightning::chain::{BestBlock, Filter}; +use lightning::chain::{BestBlock, Confirm, Filter}; use lightning::events::bump_transaction::{BumpTransactionEventHandler, Wallet}; use lightning::events::{Event, PaymentFailureReason, PaymentPurpose, ReplayEvent}; +use lightning::impl_writeable_tlv_based; use lightning::ln::channelmanager::{self, PaymentId, RecentPaymentDetails}; use lightning::ln::channelmanager::{ ChainParameters, ChannelManagerReadArgs, SimpleArcChannelManager, @@ -32,6 +33,7 @@ use lightning::routing::gossip; use lightning::routing::gossip::{NodeId, P2PGossipSync}; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters}; +use lightning::routing::utxo::UtxoLookup; use lightning::sign::{ EntropySource, InMemorySigner, KeysManager, NodeSigner, OutputSpender, SpendableOutputDescriptor, @@ -46,13 +48,7 @@ use lightning::util::persist::{ }; use lightning::util::ser::{ReadableArgs, Writeable}; use lightning::util::sweep as ldk_sweep; -use lightning::{chain, impl_writeable_tlv_based}; use lightning_background_processor::{process_events_async, GossipSync, NO_LIQUIDITY_MANAGER}; -use lightning_block_sync::gossip::TokioSpawner; -use lightning_block_sync::init; -use lightning_block_sync::poll; -use lightning_block_sync::SpvClient; -use lightning_block_sync::UnboundedCache; use lightning_dns_resolver::OMDomainResolver; use lightning_invoice::PaymentSecret; use lightning_net_tokio::SocketDescriptor; @@ -93,12 +89,12 @@ use tokio::runtime::Handle; use tokio::sync::watch::Sender; use tokio::task::JoinHandle; -use crate::bitcoind::BitcoindClient; use crate::disk::{ self, FilesystemLogger, CHANNEL_IDS_FNAME, CHANNEL_PEER_DATA, INBOUND_PAYMENTS_FNAME, MAKER_SWAPS_FNAME, OUTBOUND_PAYMENTS_FNAME, OUTPUT_SPENDER_TXES, TAKER_SWAPS_FNAME, }; use crate::error::APIError; +use crate::indexer::{IndexerClient, IndexerGossipVerifier, IndexerSyncClient}; use crate::rgb::{check_rgb_proxy_endpoint, get_rgb_channel_info_optional, RgbLibWalletWrapper}; use crate::routes::{HTLCStatus, SwapStatus, UnlockRequest, DUST_LIMIT_MSAT}; use crate::swap::SwapData; @@ -441,8 +437,8 @@ impl UnlockedAppState { pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< InMemorySigner, Arc, - Arc, - Arc, + Arc, + Arc, Arc, Arc< MonitorUpdatingPersister< @@ -450,23 +446,22 @@ pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< Arc, Arc, Arc, - Arc, - Arc, + Arc, + Arc, >, >, Arc, >; -pub(crate) type GossipVerifier = lightning_block_sync::gossip::GossipVerifier< - TokioSpawner, - Arc, - Arc, ->; +pub(crate) type ChainSource = IndexerSyncClient; + +pub(crate) type PeerGossipSync = + P2PGossipSync, Arc, Arc>; pub(crate) type PeerManager = LdkPeerManager< SocketDescriptor, Arc, - Arc, Arc, Arc>>, + Arc, Arc, Arc, IgnoringMessageHandler, @@ -486,7 +481,7 @@ pub(crate) type Router = DefaultRouter< >; pub(crate) type ChannelManager = - SimpleArcChannelManager; + SimpleArcChannelManager; pub(crate) type NetworkGraph = gossip::NetworkGraph>; @@ -503,7 +498,7 @@ pub(crate) type OnionMessenger = LdkOnionMessenger< >; pub(crate) type BumpTxEventHandler = BumpTransactionEventHandler< - Arc, + Arc, Arc, Arc>>, Arc, Arc, @@ -521,9 +516,9 @@ pub(crate) struct RgbOutputSpender { } pub(crate) type OutputSweeper = ldk_sweep::OutputSweeper< - Arc, + Arc, Arc, - Arc, + Arc, Arc, Arc, Arc, @@ -1744,37 +1739,6 @@ pub(crate) async fn start_ldk( let network: Network = bitcoin_network.into(); let ldk_peer_listening_port = static_state.ldk_peer_listening_port; - // Initialize our bitcoind client. - let bitcoind_client = match BitcoindClient::new( - unlock_request.bitcoind_rpc_host.clone(), - unlock_request.bitcoind_rpc_port, - unlock_request.bitcoind_rpc_username.clone(), - unlock_request.bitcoind_rpc_password.clone(), - tokio::runtime::Handle::current(), - Arc::clone(&logger), - ) - .await - { - Ok(client) => Arc::new(client), - Err(e) => { - return Err(APIError::FailedBitcoindConnection(e.to_string())); - } - }; - - // Check that the bitcoind we've connected to is running the network we expect - let bitcoind_chain = bitcoind_client.get_blockchain_info().await.chain; - if bitcoind_chain - != match bitcoin_network { - BitcoinNetwork::Mainnet => "main", - BitcoinNetwork::Testnet => "test", - BitcoinNetwork::Testnet4 => "testnet4", - BitcoinNetwork::Regtest => "regtest", - BitcoinNetwork::Signet | BitcoinNetwork::SignetCustom => "signet", - } - { - return Err(APIError::NetworkMismatch(bitcoind_chain, bitcoin_network)); - } - // RGB setup let indexer_url = if let Some(indexer_url) = &unlock_request.indexer_url { let indexer_protocol = check_indexer_url(indexer_url, bitcoin_network)?; @@ -1821,14 +1785,35 @@ pub(crate) async fn start_ldk( ) .expect("able to write"); + let indexer_protocol = check_indexer_url(indexer_url, bitcoin_network)?; + let indexer_client = Arc::new( + IndexerClient::new( + indexer_url.to_string(), + indexer_protocol.clone(), + network, + tokio::runtime::Handle::current(), + Arc::clone(&logger), + ) + .map_err(|e| APIError::InvalidIndexer(e.to_string()))?, + ); + let tx_sync = Arc::new( + ChainSource::new( + indexer_url.to_string(), + indexer_protocol, + Arc::clone(&logger), + ) + .map_err(|e| APIError::InvalidIndexer(e.to_string()))?, + ); + let chain_source: Arc = tx_sync.clone(); + let chain_tip = indexer_client + .get_best_block() + .map_err(|e| APIError::InvalidIndexer(e.to_string()))?; + // Initialize the FeeEstimator - // BitcoindClient implements the FeeEstimator trait, so it'll act as our fee estimator. - let fee_estimator = bitcoind_client.clone(); + let fee_estimator = indexer_client.clone(); // Initialize the BroadcasterInterface - // BitcoindClient implements the BroadcasterInterface trait, so it'll act as our transaction - // broadcaster. - let broadcaster = bitcoind_client.clone(); + let broadcaster = indexer_client.clone(); // Initialize the KeysManager // The key seed that we use to derive the node privkey (that corresponds to the node pubkey) and @@ -1863,13 +1848,13 @@ pub(crate) async fn start_ldk( 1000, Arc::clone(&keys_manager), Arc::clone(&keys_manager), - Arc::clone(&bitcoind_client), - Arc::clone(&bitcoind_client), + Arc::clone(&indexer_client), + Arc::clone(&indexer_client), )); // Initialize the ChainMonitor let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( - None, + Some(chain_source.clone()), Arc::clone(&broadcaster), Arc::clone(&logger), Arc::clone(&fee_estimator), @@ -1879,12 +1864,7 @@ pub(crate) async fn start_ldk( )); // Read ChannelMonitor state from disk - let mut channelmonitors = persister.read_all_channel_monitors_with_updates().unwrap(); - - // Poll for the best chain tip, which may be used by the channel manager & spv client - let polled_chain_tip = init::validate_best_block_header(bitcoind_client.as_ref()) - .await - .expect("Failed to fetch best block header and best block"); + let channelmonitors = persister.read_all_channel_monitors_with_updates().unwrap(); // Initialize routing ProbabilisticScorer let network_graph_path = ldk_data_dir.join("network_graph"); @@ -1924,8 +1904,7 @@ pub(crate) async fn start_ldk( .channel_handshake_config .negotiate_anchors_zero_fee_htlc_tx = true; user_config.manually_accept_inbound_channels = true; - let mut restarting_node = true; - let (channel_manager_blockhash, channel_manager) = { + let channel_manager = { if let Ok(f) = fs::File::open(ldk_data_dir.join("manager")) { let mut channel_monitor_references = Vec::new(); for (_, channel_monitor) in channelmonitors.iter() { @@ -1945,18 +1924,17 @@ pub(crate) async fn start_ldk( channel_monitor_references, ldk_data_dir_path.clone(), ); - <(BlockHash, ChannelManager)>::read(&mut BufReader::new(f), read_args).unwrap() + let (_, channel_manager) = + <(bitcoin::BlockHash, ChannelManager)>::read(&mut BufReader::new(f), read_args) + .unwrap(); + channel_manager } else { // We're starting a fresh node. - restarting_node = false; - - let polled_best_block = polled_chain_tip.to_best_block(); - let polled_best_block_hash = polled_best_block.block_hash; let chain_params = ChainParameters { network, - best_block: polled_best_block, + best_block: chain_tip, }; - let fresh_channel_manager = channelmanager::ChannelManager::new( + channelmanager::ChannelManager::new( fee_estimator.clone(), chain_monitor.clone(), broadcaster.clone(), @@ -1970,8 +1948,7 @@ pub(crate) async fn start_ldk( chain_params, cur.as_secs() as u32, ldk_data_dir_path.clone(), - ); - (polled_best_block_hash, fresh_channel_manager) + ) } }; @@ -2060,7 +2037,7 @@ pub(crate) async fn start_ldk( txes, proxy_endpoint: proxy_endpoint.to_string(), }); - let (sweeper_best_block, output_sweeper) = match fs_store.read( + let (_sweeper_best_block, output_sweeper) = match fs_store.read( OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_KEY, @@ -2070,7 +2047,7 @@ pub(crate) async fn start_ldk( channel_manager.current_best_block(), broadcaster.clone(), fee_estimator.clone(), - None, + Some(chain_source.clone()), rgb_output_spender, rgb_wallet_wrapper.clone(), fs_store.clone(), @@ -2082,7 +2059,7 @@ pub(crate) async fn start_ldk( let read_args = ( broadcaster.clone(), fee_estimator.clone(), - None, + Some(chain_source.clone()), rgb_output_spender.clone(), rgb_wallet_wrapper.clone(), fs_store.clone(), @@ -2095,71 +2072,8 @@ pub(crate) async fn start_ldk( Err(e) => panic!("Failed to read OutputSweeper with {e}"), }; - // Sync ChannelMonitors, ChannelManager and OutputSweeper to chain tip - let mut chain_listener_channel_monitors = Vec::new(); - let mut cache = UnboundedCache::new(); - let chain_tip = if restarting_node { - let mut chain_listeners = vec![ - ( - channel_manager_blockhash, - &channel_manager as &(dyn chain::Listen + Send + Sync), - ), - ( - sweeper_best_block.block_hash, - &output_sweeper as &(dyn chain::Listen + Send + Sync), - ), - ]; - - for (blockhash, channel_monitor) in channelmonitors.drain(..) { - let outpoint = channel_monitor.get_funding_txo(); - chain_listener_channel_monitors.push(( - blockhash, - ( - channel_monitor, - broadcaster.clone(), - fee_estimator.clone(), - logger.clone(), - ), - outpoint, - )); - } - - for monitor_listener_info in chain_listener_channel_monitors.iter_mut() { - chain_listeners.push(( - monitor_listener_info.0, - &monitor_listener_info.1 as &(dyn chain::Listen + Send + Sync), - )); - } - - let mut attempts = 3; - loop { - match init::synchronize_listeners( - bitcoind_client.as_ref(), - network, - &mut cache, - chain_listeners.clone(), - ) - .await - { - Ok(res) => break res, - Err(e) => { - tracing::error!("Error synchronizing chain: {:?}", e); - attempts -= 1; - if attempts == 0 { - return Err(APIError::FailedBitcoindConnection( - e.into_inner().to_string(), - )); - } - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - } - } else { - polled_chain_tip - }; - // Give ChannelMonitors to ChainMonitor - for (_, (channel_monitor, _, _, _), _) in chain_listener_channel_monitors { + for (_, channel_monitor) in channelmonitors { let channel_id = channel_monitor.channel_id(); assert_eq!( chain_monitor.load_existing_monitor(channel_id, channel_monitor), @@ -2168,7 +2082,7 @@ pub(crate) async fn start_ldk( } // Optional: Initialize the P2PGossipSync - let gossip_sync = Arc::new(P2PGossipSync::new( + let gossip_sync: Arc = Arc::new(P2PGossipSync::new( Arc::clone(&network_graph), None, Arc::clone(&logger), @@ -2218,15 +2132,16 @@ pub(crate) async fn start_ldk( logger.clone(), Arc::clone(&keys_manager), )); - - // Install a GossipVerifier in in the P2PGossipSync - let utxo_lookup = GossipVerifier::new( - Arc::clone(&bitcoind_client.bitcoind_rpc_client), - TokioSpawner, + let peer_manager_wake = Arc::new({ + let peer_manager = Arc::clone(&peer_manager); + move || peer_manager.process_events() + }); + let utxo_lookup: Arc = Arc::new(IndexerGossipVerifier::new( + Arc::clone(&indexer_client), Arc::clone(&gossip_sync), - Arc::clone(&peer_manager), - ); - gossip_sync.add_utxo_lookup(Some(Arc::new(utxo_lookup))); + peer_manager_wake, + )); + gossip_sync.add_utxo_lookup(Some(utxo_lookup)); // ## Running LDK // Initialize networking @@ -2255,26 +2170,24 @@ pub(crate) async fn start_ldk( } }); - // Connect and Disconnect Blocks let output_sweeper: Arc = Arc::new(output_sweeper); - let channel_manager_listener = channel_manager.clone(); - let chain_monitor_listener = chain_monitor.clone(); - let output_sweeper_listener = output_sweeper.clone(); - let bitcoind_block_source = bitcoind_client.clone(); + let confirmables: Vec> = vec![ + channel_manager.clone(), + chain_monitor.clone(), + output_sweeper.clone(), + ]; + sync_chain_data(tx_sync.clone(), confirmables.clone()) + .await + .map_err(|e| APIError::InvalidIndexer(e.to_string()))?; + let stop_listen = Arc::clone(&stop_processing); tokio::spawn(async move { - let chain_poller = poll::ChainPoller::new(bitcoind_block_source.as_ref(), network); - let chain_listener = ( - chain_monitor_listener, - &(channel_manager_listener, output_sweeper_listener), - ); - let mut spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener); loop { if stop_listen.load(Ordering::Acquire) { return; } - if let Err(e) = spv_client.poll_best_tip().await { - tracing::error!("Error while polling best tip: {:?}", e); + if let Err(e) = sync_chain_data(tx_sync.clone(), confirmables.clone()).await { + tracing::error!("Error while syncing via indexer: {:?}", e); } tokio::time::sleep(Duration::from_secs(1)).await; } @@ -2491,6 +2404,15 @@ pub(crate) async fn start_ldk( )) } +async fn sync_chain_data( + tx_sync: Arc, + confirmables: Vec>, +) -> Result<(), Box> { + tokio::task::spawn_blocking(move || tx_sync.sync(confirmables)) + .await + .map_err(|e| -> Box { Box::new(e) })? +} + impl AppState { fn stop_ldk(&self) -> Option>> { let mut ldk_background_services = self.get_ldk_background_services(); diff --git a/src/main.rs b/src/main.rs index 4fcc5bbd..d3fdfe64 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,9 @@ mod args; mod auth; mod backup; -mod bitcoind; mod disk; mod error; +mod indexer; mod ldk; mod rgb; mod routes; diff --git a/src/routes.rs b/src/routes.rs index 8beb586c..802a32ea 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -1258,10 +1258,6 @@ pub(crate) enum TransportType { #[derive(Deserialize, Serialize)] pub(crate) struct UnlockRequest { pub(crate) password: String, - pub(crate) bitcoind_rpc_username: String, - pub(crate) bitcoind_rpc_password: String, - pub(crate) bitcoind_rpc_host: String, - pub(crate) bitcoind_rpc_port: u16, pub(crate) indexer_url: Option, pub(crate) proxy_endpoint: Option, pub(crate) announce_addresses: Vec, diff --git a/src/test/mod.rs b/src/test/mod.rs index 4af5438f..fa792ad9 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -1731,10 +1731,6 @@ async fn taker(node_address: SocketAddr, swapstring: String) -> EmptyResponse { fn unlock_req(password: &str) -> UnlockRequest { UnlockRequest { password: password.to_string(), - bitcoind_rpc_username: s!("user"), - bitcoind_rpc_password: s!("password"), - bitcoind_rpc_host: s!("localhost"), - bitcoind_rpc_port: 18443, indexer_url: Some(ELECTRUM_URL_REGTEST.to_string()), proxy_endpoint: Some(PROXY_ENDPOINT_LOCAL.to_string()), announce_addresses: vec![], diff --git a/src/test/swap_roundtrip_multihop_asset_asset.rs b/src/test/swap_roundtrip_multihop_asset_asset.rs index dcac6f0e..3018d4b0 100644 --- a/src/test/swap_roundtrip_multihop_asset_asset.rs +++ b/src/test/swap_roundtrip_multihop_asset_asset.rs @@ -103,6 +103,10 @@ async fn swap_roundtrip_multihop_asset_asset() { ) .await; + wait_for_usable_channels(node1_addr, 2).await; + wait_for_usable_channels(node2_addr, 4).await; + wait_for_usable_channels(node3_addr, 2).await; + let channels_1_before = list_channels(node1_addr).await; let channels_2_before = list_channels(node2_addr).await; let channels_3_before = list_channels(node3_addr).await; diff --git a/src/test/upload_asset_media.rs b/src/test/upload_asset_media.rs index 641669ea..9b588341 100644 --- a/src/test/upload_asset_media.rs +++ b/src/test/upload_asset_media.rs @@ -101,13 +101,22 @@ async fn fail() { "file", reqwest::multipart::Part::bytes(file_bytes).headers([].into_iter().collect()), ); - let res = reqwest::Client::new() + // RequestBodyLimitLayer can either reply 413 or close the connection mid-stream + // depending on how much of the body the server has read; both prove the limit triggered. + let result = reqwest::Client::new() .post(format!("http://{node1_addr}/postassetmedia")) .multipart(form) .send() - .await - .unwrap(); - assert_eq!(res.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE); - let api_error_response = res.text().await.unwrap(); - assert_eq!(api_error_response, "length limit exceeded"); + .await; + match result { + Ok(res) => { + assert_eq!(res.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE); + let api_error_response = res.text().await.unwrap(); + assert_eq!(api_error_response, "length limit exceeded"); + } + Err(e) => assert!( + e.is_request() || e.is_body(), + "expected payload-too-large rejection, got: {e:?}" + ), + } }