diff --git a/.gitignore b/.gitignore index 01626769f..969341414 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ Cargo.lock **/target +.vscode/ diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 000000000..14e16cf43 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,8 @@ +{ + "rust-analyzer.cargo.features": [ + "client-async" + ], + "rust-analyzer.check.features": [ + "client-async" + ] +} diff --git a/client/Cargo.toml b/client/Cargo.toml index b7c13e367..6758a20d4 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -18,7 +18,9 @@ rustdoc-args = ["--cfg", "docsrs"] [features] # Enable this feature to get a blocking JSON-RPC client. -client-sync = ["jsonrpc"] +client-sync = ["jsonrpc", "jsonrpc/bitreq_http"] +# Enable this feature to get an async JSON-RPC client. +client-async = ["jsonrpc", "jsonrpc/bitreq_http_async", "jsonrpc/client_async"] [dependencies] bitcoin = { version = "0.32.0", default-features = false, features = ["std", "serde"] } @@ -27,6 +29,6 @@ serde = { version = "1.0.103", default-features = false, features = [ "derive", serde_json = { version = "1.0.117" } types = { package = "corepc-types", version = "0.11.0", path = "../types", default-features = false, features = ["std"] } -jsonrpc = { version = "0.19.0", path = "../jsonrpc", features = ["bitreq_http"], optional = true } +jsonrpc = { version = "0.19.0", path = "../jsonrpc", optional = true } [dev-dependencies] diff --git a/client/README.md b/client/README.md index 0c09dedd8..a842b1555 100644 --- a/client/README.md +++ b/client/README.md @@ -1,7 +1,16 @@ # corepc-client -Rust client for the Bitcoin Core daemon's JSON-RPC API. Currently this -is only a blocking client and is intended to be used in integration testing. +Rust client for the Bitcoin Core daemon's JSON-RPC API. + +This crate provides: + +- A blocking client intended for integration testing (`client-sync`). +- An async client intended for production (`client-async`). + +## Features + +- `client-sync`: Blocking JSON-RPC client. +- `client-async`: Async JSON-RPC client. ## Minimum Supported Rust Version (MSRV) diff --git a/client/src/bdk_client/error.rs b/client/src/bdk_client/error.rs new file mode 100644 index 000000000..27ba7c93d --- /dev/null +++ b/client/src/bdk_client/error.rs @@ -0,0 +1,149 @@ +// SPDX-License-Identifier: CC0-1.0 + +use std::{error, fmt, io}; + +use bitcoin::hex; +use types::v17::{ + GetBlockHeaderError, GetBlockHeaderVerboseError, GetBlockVerboseOneError, + GetRawTransactionVerboseError, +}; +use types::v19::GetBlockFilterError; +use types::v29::{ + GetBlockHeaderVerboseError as GetBlockHeaderVerboseErrorV29, + GetBlockVerboseOneError as GetBlockVerboseOneErrorV29, +}; + +/// The error type for errors produced in this library. +#[derive(Debug)] +pub enum Error { + JsonRpc(jsonrpc::error::Error), + HexToArray(hex::HexToArrayError), + HexToBytes(hex::HexToBytesError), + Json(serde_json::error::Error), + BitcoinSerialization(bitcoin::consensus::encode::FromHexError), + Io(io::Error), + InvalidCookieFile, + /// The JSON result had an unexpected structure. + UnexpectedStructure, + /// The daemon returned an error string. + Returned(String), + /// The server version did not match what was expected. + ServerVersion(UnexpectedServerVersionError), + /// Missing user/password. + MissingUserPassword, +} + +impl From for Error { + fn from(e: jsonrpc::error::Error) -> Error { Error::JsonRpc(e) } +} + +impl From for Error { + fn from(e: hex::HexToArrayError) -> Self { Self::HexToArray(e) } +} + +impl From for Error { + fn from(e: hex::HexToBytesError) -> Self { Self::HexToBytes(e) } +} + +impl From for Error { + fn from(e: serde_json::error::Error) -> Error { Error::Json(e) } +} + +impl From for Error { + fn from(e: bitcoin::consensus::encode::FromHexError) -> Error { Error::BitcoinSerialization(e) } +} + +impl From for Error { + fn from(e: io::Error) -> Error { Error::Io(e) } +} + +impl From for Error { + fn from(e: GetBlockHeaderError) -> Self { Self::Returned(e.to_string()) } +} + +impl From for Error { + fn from(e: GetBlockHeaderVerboseError) -> Self { Self::Returned(e.to_string()) } +} + +impl From for Error { + fn from(e: GetBlockVerboseOneError) -> Self { Self::Returned(e.to_string()) } +} + +impl From for Error { + fn from(e: GetRawTransactionVerboseError) -> Self { Self::Returned(e.to_string()) } +} + +impl From for Error { + fn from(e: GetBlockHeaderVerboseErrorV29) -> Self { Self::Returned(e.to_string()) } +} + +impl From for Error { + fn from(e: GetBlockVerboseOneErrorV29) -> Self { Self::Returned(e.to_string()) } +} + +impl From for Error { + fn from(e: GetBlockFilterError) -> Self { Self::Returned(e.to_string()) } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + use Error::*; + + match *self { + JsonRpc(ref e) => write!(f, "JSON-RPC error: {}", e), + HexToArray(ref e) => write!(f, "hex to array decode error: {}", e), + HexToBytes(ref e) => write!(f, "hex to bytes decode error: {}", e), + Json(ref e) => write!(f, "JSON error: {}", e), + BitcoinSerialization(ref e) => write!(f, "Bitcoin serialization error: {}", e), + Io(ref e) => write!(f, "I/O error: {}", e), + InvalidCookieFile => write!(f, "invalid cookie file"), + UnexpectedStructure => write!(f, "the JSON result had an unexpected structure"), + Returned(ref s) => write!(f, "the daemon returned an error string: {}", s), + ServerVersion(ref e) => write!(f, "server version: {}", e), + MissingUserPassword => write!(f, "missing user and/or password"), + } + } +} + +impl error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + use Error::*; + + match *self { + JsonRpc(ref e) => Some(e), + HexToArray(ref e) => Some(e), + HexToBytes(ref e) => Some(e), + Json(ref e) => Some(e), + BitcoinSerialization(ref e) => Some(e), + Io(ref e) => Some(e), + ServerVersion(ref e) => Some(e), + InvalidCookieFile | UnexpectedStructure | Returned(_) | MissingUserPassword => None, + } + } +} + +/// Error returned when RPC client expects a different version than bitcoind reports. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct UnexpectedServerVersionError { + /// Version from server. + pub got: usize, + /// Expected server version. + pub expected: Vec, +} + +impl fmt::Display for UnexpectedServerVersionError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let mut expected = String::new(); + for version in &self.expected { + let v = format!(" {} ", version); + expected.push_str(&v); + } + write!(f, "unexpected bitcoind version, got: {} expected one of: {}", self.got, expected) + } +} + +impl error::Error for UnexpectedServerVersionError {} + +impl From for Error { + fn from(e: UnexpectedServerVersionError) -> Self { Self::ServerVersion(e) } +} diff --git a/client/src/bdk_client/mod.rs b/client/src/bdk_client/mod.rs new file mode 100644 index 000000000..5aebc1738 --- /dev/null +++ b/client/src/bdk_client/mod.rs @@ -0,0 +1,166 @@ +// SPDX-License-Identifier: CC0-1.0 + +//! Async JSON-RPC clients for Bitcoin Core v25 to v30. + +mod error; +mod rpcs; + +use std::fmt; +use std::fs::File; +use std::io::{BufRead, BufReader}; +use std::path::PathBuf; + +pub use crate::bdk_client::error::Error; + +/// Crate-specific Result type. +/// +/// Shorthand for `std::result::Result` with our crate-specific [`Error`] type. +pub type Result = std::result::Result; + +/// The different authentication methods for the client. +#[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)] +pub enum Auth { + None, + UserPass(String, String), + CookieFile(PathBuf), +} + +impl Auth { + /// Convert into the arguments that jsonrpc::Client needs. + pub fn get_user_pass(self) -> Result<(Option, Option)> { + match self { + Auth::None => Ok((None, None)), + Auth::UserPass(u, p) => Ok((Some(u), Some(p))), + Auth::CookieFile(path) => { + let line = BufReader::new(File::open(path)?) + .lines() + .next() + .ok_or(Error::InvalidCookieFile)??; + let colon = line.find(':').ok_or(Error::InvalidCookieFile)?; + Ok((Some(line[..colon].into()), Some(line[colon + 1..].into()))) + } + } + } +} + +/// Client implements an async JSON-RPC client for the Bitcoin Core daemon or compatible APIs. +pub struct Client { + pub(crate) inner: jsonrpc::client_async::Client, +} + +impl fmt::Debug for Client { + fn fmt(&self, f: &mut fmt::Formatter) -> core::fmt::Result { + write!(f, "corepc_client::client_async::Client({:?})", self.inner) + } +} + +impl Client { + /// Creates a client to a bitcoind JSON-RPC server without authentication. + pub fn new(url: &str) -> Self { + let transport = jsonrpc::bitreq_http_async::Builder::new() + .url(url) + .expect("jsonrpc v0.19, this function does not error") + .timeout(std::time::Duration::from_secs(60)) + .build(); + let inner = jsonrpc::client_async::Client::with_transport(transport); + + Self { inner } + } + + /// Creates a client to a bitcoind JSON-RPC server with authentication. + pub fn new_with_auth(url: &str, auth: Auth) -> Result { + if matches!(auth, Auth::None) { + return Err(Error::MissingUserPassword); + } + let (user, pass) = auth.get_user_pass()?; + let user = user.ok_or(Error::MissingUserPassword)?; + let transport = jsonrpc::bitreq_http_async::Builder::new() + .url(url) + .expect("jsonrpc v0.19, this function does not error") + .timeout(std::time::Duration::from_secs(60)) + .basic_auth(user, pass) + .build(); + let inner = jsonrpc::client_async::Client::with_transport(transport); + + Ok(Self { inner }) + } + + /// Call an RPC `method` with given `args` list. + pub async fn call serde::de::Deserialize<'a>>( + &self, + method: &str, + args: &[serde_json::Value], + ) -> Result { + let raw = serde_json::value::to_raw_value(args)?; + let req = self.inner.build_request(method, Some(&*raw)); + if log::log_enabled!(log::Level::Debug) { + log::debug!(target: "corepc", "request: {} {}", method, serde_json::Value::from(args)); + } + + let resp = self.inner.send_request(req).await.map_err(Error::from); + log_response(method, &resp); + Ok(resp?.result()?) + } +} + +/// Implements the `check_expected_server_version()` on `Client`. +/// +/// Requires `Client` to be in scope and implement `server_version()`. +/// See and/or use `impl_client_v17__getnetworkinfo`. +/// +/// # Parameters +/// +/// - `$expected_versions`: An vector of expected server versions e.g., `[230100, 230200]`. +#[macro_export] +macro_rules! impl_async_client_check_expected_server_version { + ($expected_versions:expr) => { + impl Client { + /// Checks that the JSON-RPC endpoint is for a `bitcoind` instance with the expected version. + pub async fn check_expected_server_version(&self) -> Result<()> { + let server_version = self.server_version().await?; + if !$expected_versions.contains(&server_version) { + return Err($crate::bdk_client::error::UnexpectedServerVersionError { + got: server_version, + expected: $expected_versions.to_vec(), + })?; + } + Ok(()) + } + } + }; +} + +/// Shorthand for converting a variable into a `serde_json::Value`. +pub(crate) fn into_json(val: T) -> Result +where + T: serde::ser::Serialize, +{ + Ok(serde_json::to_value(val)?) +} + +/// Helper to log an RPC response. +fn log_response(method: &str, resp: &Result) { + use log::Level::{Debug, Trace, Warn}; + + if log::log_enabled!(Warn) || log::log_enabled!(Debug) || log::log_enabled!(Trace) { + match resp { + Err(ref e) => + if log::log_enabled!(Debug) { + log::debug!(target: "corepc", "error: {}: {:?}", method, e); + }, + Ok(ref resp) => + if let Some(ref e) = resp.error { + if log::log_enabled!(Debug) { + log::debug!(target: "corepc", "response error for {}: {:?}", method, e); + } + } else if log::log_enabled!(Trace) { + if let Ok(def) = + serde_json::value::to_raw_value(&serde_json::value::Value::Null) + { + let result = resp.result.as_ref().unwrap_or(&def); + log::trace!(target: "corepc", "response for {}: {}", method, result); + } + }, + } + } +} diff --git a/client/src/bdk_client/rpcs.rs b/client/src/bdk_client/rpcs.rs new file mode 100644 index 000000000..11e1d7dd7 --- /dev/null +++ b/client/src/bdk_client/rpcs.rs @@ -0,0 +1,97 @@ +// SPDX-License-Identifier: CC0-1.0 + +//! RPC set used by BDK. +//! All functions return the version nonspecific, strongly typed types. + +use bitcoin::{block, Block, BlockHash, Transaction, Txid}; + +use crate::bdk_client::{into_json, Client, Result}; +use crate::types::model::{GetBlockFilter, GetBlockHeaderVerbose, GetBlockVerboseOne}; + +impl Client { + /// Gets a block by blockhash. + pub async fn get_block(&self, hash: &BlockHash) -> Result { + let json: crate::types::v25::GetBlockVerboseZero = + self.call("getblock", &[into_json(hash)?, into_json(0)?]).await?; + Ok(json.into_model()?.0) + } + + /// Gets the block count. + pub async fn get_block_count(&self) -> Result { + let json: crate::types::v25::GetBlockCount = self.call("getblockcount", &[]).await?; + Ok(json.into_model().0) + } + + /// Gets the block hash for a height. + pub async fn get_block_hash(&self, height: u32) -> Result { + let json: crate::types::v25::GetBlockHash = + self.call("getblockhash", &[into_json(height)?]).await?; + Ok(json.into_model()?.0) + } + + /// Gets the hash of the chain tip. + pub async fn get_best_block_hash(&self) -> Result { + let json: crate::types::v25::GetBestBlockHash = self.call("getbestblockhash", &[]).await?; + Ok(json.into_model()?.0) + } + + /// Gets the block header by blockhash. + pub async fn get_block_header(&self, hash: &BlockHash) -> Result { + let json: crate::types::v25::GetBlockHeader = + self.call("getblockheader", &[into_json(hash)?, into_json(false)?]).await?; + Ok(json.into_model()?.0) + } + + /// Gets the block header with verbose output. + pub async fn get_block_header_verbose( + &self, + hash: &BlockHash, + ) -> Result { + let response: serde_json::Value = + self.call("getblockheader", &[into_json(hash)?, into_json(true)?]).await?; + + if let Ok(json) = + serde_json::from_value::(response.clone()) + { + Ok(json.into_model()?) + } else { + let json: crate::types::v25::GetBlockHeaderVerbose = serde_json::from_value(response)?; + Ok(json.into_model()?) + } + } + + /// Gets a block by blockhash with verbose set to 1. + pub async fn get_block_verbose(&self, hash: &BlockHash) -> Result { + let response: serde_json::Value = + self.call("getblock", &[into_json(hash)?, into_json(1)?]).await?; + + if let Ok(json) = + serde_json::from_value::(response.clone()) + { + Ok(json.into_model()?) + } else { + let json: crate::types::v25::GetBlockVerboseOne = serde_json::from_value(response)?; + Ok(json.into_model()?) + } + } + + /// Gets the block filter for a blockhash. + pub async fn get_block_filter(&self, hash: &BlockHash) -> Result { + let json: crate::types::v25::GetBlockFilter = + self.call("getblockfilter", &[into_json(hash)?]).await?; + Ok(json.into_model()?) + } + + /// Gets the transaction IDs currently in the mempool. + pub async fn get_raw_mempool(&self) -> Result> { + let json: crate::types::v25::GetRawMempool = self.call("getrawmempool", &[]).await?; + Ok(json.into_model()?.0) + } + + /// Gets the raw transaction by txid. + pub async fn get_raw_transaction(&self, txid: &Txid) -> Result { + let json: crate::types::v25::GetRawTransaction = + self.call("getrawtransaction", &[into_json(txid)?]).await?; + Ok(json.into_model()?.0) + } +} diff --git a/client/src/lib.rs b/client/src/lib.rs index 4b34271e3..3efc5d00f 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -11,3 +11,6 @@ pub extern crate types; #[cfg(feature = "client-sync")] #[macro_use] pub mod client_sync; + +#[cfg(feature = "client-async")] +pub mod bdk_client; diff --git a/integration_test/Cargo.toml b/integration_test/Cargo.toml index 05421c435..0a3ec8c9d 100644 --- a/integration_test/Cargo.toml +++ b/integration_test/Cargo.toml @@ -62,6 +62,7 @@ TODO = [] # This is a dirty hack while writing the tests. [dependencies] bitcoin = { version = "0.32.0", default-features = false, features = ["std", "serde"] } +corepc-client = { version = "0.11.0", path = "../client", default-features = false, features = ["client-async"] } env_logger = "0.9.0" node = { package = "corepc-node", version = "0.11.0", path = "../node", default-features = false } rand = "0.8.5" @@ -69,6 +70,7 @@ rand = "0.8.5" types = { package = "corepc-types", version = "0.11.0", path = "../types", features = ["serde-deny-unknown-fields"] } [dev-dependencies] +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } [patch.crates-io.corepc-client] diff --git a/integration_test/tests/bdk_client.rs b/integration_test/tests/bdk_client.rs new file mode 100644 index 000000000..1abe8ae73 --- /dev/null +++ b/integration_test/tests/bdk_client.rs @@ -0,0 +1,131 @@ +// SPDX-License-Identifier: CC0-1.0 + +//! Tests for the async client. + +#![cfg(not(feature = "v24_and_below"))] +#![allow(non_snake_case)] // Test names intentionally use double underscore. + +use bitcoin::address::KnownHrp; +use bitcoin::{Address, CompressedPublicKey, PrivateKey}; +use corepc_client::bdk_client::{Auth, Client, Error as AsyncClientError}; +use integration_test::{Node, NodeExt as _, Wallet}; +use node::mtype; + +fn async_client_for(node: &Node) -> Client { + Client::new_with_auth(&node.rpc_url(), auth_for(node)).expect("async client") +} + +#[tokio::test] +async fn async__get_best_block_hash__modelled() { + let node = Node::with_wallet(Wallet::None, &[]); + let client = async_client_for(&node); + + let model: Result = client.get_best_block_hash().await; + let model = model.unwrap(); + let expected = node.client.best_block_hash().expect("best_block_hash"); + assert_eq!(model, expected); +} + +#[tokio::test] +async fn async__get_block__modelled() { + let node = Node::with_wallet(Wallet::None, &[]); + let client = async_client_for(&node); + + let best_hash = node.client.best_block_hash().expect("best_block_hash"); + + let model: Result = client.get_block(&best_hash).await; + let model = model.unwrap(); + assert_eq!(model.block_hash(), best_hash); + + let model: Result = + client.get_block_verbose(&best_hash).await; + let model = model.unwrap(); + assert_eq!(model.hash, best_hash); +} + +#[tokio::test] +async fn async__get_block_count__modelled() { + let node = Node::with_wallet(Wallet::None, &[]); + let client = async_client_for(&node); + + let model: Result = client.get_block_count().await; + let model = model.unwrap(); + assert_eq!(model, 0); +} + +#[tokio::test] +#[cfg(not(feature = "v18_and_below"))] +async fn async__get_block_filter__modelled() { + let node = Node::with_wallet(Wallet::None, &["-blockfilterindex"]); + let client = async_client_for(&node); + + let best_hash = node.client.best_block_hash().expect("best_block_hash"); + let model: Result = + client.get_block_filter(&best_hash).await; + let model = model.unwrap(); + + assert!(!model.filter.is_empty()); + assert_eq!(model.header.to_string().len(), 64); +} + +#[tokio::test] +async fn async__get_block_hash__modelled() { + let node = Node::with_wallet(Wallet::None, &[]); + let client = async_client_for(&node); + + let model: Result = client.get_block_hash(0).await; + let model = model.unwrap(); + let expected = node.client.best_block_hash().expect("best_block_hash"); + assert_eq!(model, expected); +} + +#[tokio::test] +async fn async__get_block_header__modelled() { + let node = Node::with_wallet(Wallet::None, &[]); + let client = async_client_for(&node); + + let best_hash = node.client.best_block_hash().expect("best_block_hash"); + let model: Result = + client.get_block_header(&best_hash).await; + let model = model.unwrap(); + assert_eq!(model.block_hash(), best_hash); + + let model: Result = + client.get_block_header_verbose(&best_hash).await; + let model = model.unwrap(); + assert_eq!(model.hash, best_hash); + assert_eq!(model.height, 0); +} + +#[tokio::test] +async fn async__get_raw_mempool__modelled() { + let node = Node::with_wallet(Wallet::None, &[]); + let client = async_client_for(&node); + + let model: Result, AsyncClientError> = client.get_raw_mempool().await; + let model = model.unwrap(); + assert!(model.is_empty()); +} + +#[tokio::test] +async fn async__get_raw_transaction__modelled() { + let node = Node::with_wallet(Wallet::None, &["-txindex"]); + let privkey = + PrivateKey::from_wif("cVt4o7BGAig1UXywgGSmARhxMdzP5qvQsxKkSsc1XEkw3tDTQFpy").expect("wif"); + let secp = bitcoin::secp256k1::Secp256k1::new(); + let pubkey = privkey.public_key(&secp); + let address = Address::p2wpkh(&CompressedPublicKey(pubkey.inner), KnownHrp::Regtest); + node.client.generate_to_address(1, &address).expect("generatetoaddress"); + + let client = async_client_for(&node); + let best_hash = node.client.best_block_hash().expect("best_block_hash"); + let block = client.get_block(&best_hash).await.expect("getblock"); + let txid = block.txdata[0].compute_txid(); + + let model: Result = + client.get_raw_transaction(&txid).await; + let model = model.unwrap(); + assert_eq!(model.compute_txid(), txid); +} + +fn auth_for(node: &Node) -> Auth { Auth::CookieFile(node.params.cookie_file.clone()) } diff --git a/jsonrpc/Cargo.toml b/jsonrpc/Cargo.toml index 7c06709e7..ca818f1bc 100644 --- a/jsonrpc/Cargo.toml +++ b/jsonrpc/Cargo.toml @@ -22,6 +22,10 @@ default = [ "simple_http", "simple_tcp" ] simple_http = [ "base64" ] # A transport that uses `bitreq` as the HTTP client. bitreq_http = [ "base64", "bitreq" ] +# A transport that uses `bitreq` as the async HTTP client. +bitreq_http_async = [ "base64", "bitreq", "bitreq/async", "client_async" ] +# An async JSON-RPC client implementation. +client_async = [] # Basic transport over a raw TcpListener simple_tcp = [] # Basic transport over a raw UnixStream diff --git a/jsonrpc/src/client_async.rs b/jsonrpc/src/client_async.rs new file mode 100644 index 000000000..e3b6d4d4a --- /dev/null +++ b/jsonrpc/src/client_async.rs @@ -0,0 +1,280 @@ +// SPDX-License-Identifier: CC0-1.0 + +//! # Async client support +//! +//! Support for connecting to JSONRPC servers over HTTP, sending requests, +//! and parsing responses. + +use std::borrow::Cow; +use std::collections::HashMap; +use std::fmt; +use std::future::Future; +use std::hash::{Hash, Hasher}; +use std::pin::Pin; +use std::sync::atomic; + +use serde_json::value::RawValue; +use serde_json::Value; + +use crate::error::Error; +use crate::{Request, Response}; + +const JSONRPC_VERSION: &str = "2.0"; + +/// Boxed future type used by async transports. +pub type BoxFuture<'a, T> = Pin + Send + 'a>>; + +/// An interface for an async transport over which to use the JSONRPC protocol. +pub trait Transport: Send + Sync + 'static { + /// Sends an RPC request over the transport. + fn send_request<'a>(&'a self, req: Request<'a>) -> BoxFuture<'a, Result>; + /// Sends a batch of RPC requests over the transport. + fn send_batch<'a>( + &'a self, + reqs: &'a [Request<'a>], + ) -> BoxFuture<'a, Result, Error>>; + /// Formats the target of this transport. I.e. the URL/socket/... + fn fmt_target(&self, f: &mut fmt::Formatter) -> fmt::Result; +} + +/// An async JSON-RPC client. +/// +/// Creates a new Client using one of the transport-specific constructors. +pub struct Client { + pub(crate) transport: Box, + nonce: atomic::AtomicUsize, +} + +impl Client { + /// Creates a new client with the given transport. + pub fn with_transport(transport: T) -> Client { + Client { transport: Box::new(transport), nonce: atomic::AtomicUsize::new(1) } + } + + /// Builds a request. + /// + /// To construct the arguments, one can use one of the shorthand methods. + /// [`crate::arg`] or [`crate::try_arg`]. + pub fn build_request<'a>(&self, method: &'a str, params: Option<&'a RawValue>) -> Request<'a> { + let nonce = self.nonce.fetch_add(1, atomic::Ordering::Relaxed); + Request { + method, + params, + id: serde_json::Value::from(nonce), + jsonrpc: Some(JSONRPC_VERSION), + } + } + + /// Sends a request to a client. + pub async fn send_request(&self, request: Request<'_>) -> Result { + self.transport.send_request(request).await + } + + /// Sends a batch of requests to the client. + /// + /// Note that the requests need to have valid IDs, so it is advised to create the requests + /// with [`Client::build_request`]. + /// + /// # Returns + /// + /// The return vector holds the response for the request at the corresponding index. If no + /// response was provided, it's [`None`]. + pub async fn send_batch( + &self, + requests: &[Request<'_>], + ) -> Result>, Error> { + if requests.is_empty() { + return Err(Error::EmptyBatch); + } + + // If the request body is invalid JSON, the response is a single response object. + // We ignore this case since we are confident we are producing valid JSON. + let responses = self.transport.send_batch(requests).await?; + if responses.len() > requests.len() { + return Err(Error::WrongBatchResponseSize); + } + + //TODO(stevenroose) check if the server preserved order to avoid doing the mapping + + // First index responses by ID and catch duplicate IDs. + let mut by_id = HashMap::with_capacity(requests.len()); + for resp in responses.into_iter() { + if resp.jsonrpc.is_some() && resp.jsonrpc.as_deref() != Some(JSONRPC_VERSION) { + return Err(Error::VersionMismatch); + } + let id = HashableValue(Cow::Owned(resp.id.clone())); + if let Some(dup) = by_id.insert(id, resp) { + return Err(Error::BatchDuplicateResponseId(dup.id)); + } + } + // Match responses to the requests. + let results = + requests.iter().map(|r| by_id.remove(&HashableValue(Cow::Borrowed(&r.id)))).collect(); + + // Since we're also just producing the first duplicate ID, we can also just produce the + // first incorrect ID in case there are multiple. + if let Some(id) = by_id.keys().next() { + return Err(Error::WrongBatchResponseId((*id.0).clone())); + } + + Ok(results) + } + + /// Makes a request and deserializes the response. + /// + /// To construct the arguments, one can use one of the shorthand methods + /// [`crate::arg`] or [`crate::try_arg`]. + pub async fn call serde::de::Deserialize<'a>>( + &self, + method: &str, + args: Option<&RawValue>, + ) -> Result { + let request = self.build_request(method, args); + let id = request.id.clone(); + + let response = self.send_request(request).await?; + if response.jsonrpc.is_some() && response.jsonrpc.as_deref() != Some(JSONRPC_VERSION) { + return Err(Error::VersionMismatch); + } + if response.id != id { + return Err(Error::NonceMismatch); + } + + response.result() + } +} + +impl fmt::Debug for Client { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "jsonrpc::Client(")?; + self.transport.fmt_target(f)?; + write!(f, ")") + } +} + +impl From for Client { + fn from(t: T) -> Client { Client::with_transport(t) } +} + +/// Newtype around `Value` which allows hashing for use as hashmap keys, +/// this is needed for batch requests. +/// +/// The reason `Value` does not support `Hash` or `Eq` by itself +/// is that it supports `f64` values; but for batch requests we +/// will only be hashing the "id" field of the request/response +/// pair, which should never need decimal precision and therefore +/// never use `f64`. +#[derive(Clone, PartialEq, Debug)] +struct HashableValue<'a>(pub Cow<'a, Value>); + +impl Eq for HashableValue<'_> {} + +impl Hash for HashableValue<'_> { + fn hash(&self, state: &mut H) { + match *self.0.as_ref() { + Value::Null => "null".hash(state), + Value::Bool(false) => "false".hash(state), + Value::Bool(true) => "true".hash(state), + Value::Number(ref n) => { + "number".hash(state); + if let Some(n) = n.as_i64() { + n.hash(state); + } else if let Some(n) = n.as_u64() { + n.hash(state); + } else { + n.to_string().hash(state); + } + } + Value::String(ref s) => { + "string".hash(state); + s.hash(state); + } + Value::Array(ref v) => { + "array".hash(state); + v.len().hash(state); + for obj in v { + HashableValue(Cow::Borrowed(obj)).hash(state); + } + } + Value::Object(ref m) => { + "object".hash(state); + m.len().hash(state); + for (key, val) in m { + key.hash(state); + HashableValue(Cow::Borrowed(val)).hash(state); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::borrow::Cow; + use std::collections::HashSet; + use std::str::FromStr; + use std::sync; + + use super::*; + + struct DummyTransport; + impl Transport for DummyTransport { + fn send_request<'a>(&'a self, _: Request<'a>) -> BoxFuture<'a, Result> { + Box::pin(async { Err(Error::NonceMismatch) }) + } + + fn send_batch<'a>( + &'a self, + _: &'a [Request<'a>], + ) -> BoxFuture<'a, Result, Error>> { + Box::pin(async { Ok(vec![]) }) + } + + fn fmt_target(&self, _: &mut fmt::Formatter) -> fmt::Result { Ok(()) } + } + + #[test] + fn sanity() { + let client = Client::with_transport(DummyTransport); + assert_eq!(client.nonce.load(sync::atomic::Ordering::Relaxed), 1); + let req1 = client.build_request("test", None); + assert_eq!(client.nonce.load(sync::atomic::Ordering::Relaxed), 2); + let req2 = client.build_request("test", None); + assert_eq!(client.nonce.load(sync::atomic::Ordering::Relaxed), 3); + assert!(req1.id != req2.id); + } + + #[test] + fn hash_value() { + let val = HashableValue(Cow::Owned(Value::from_str("null").unwrap())); + let t = HashableValue(Cow::Owned(Value::from_str("true").unwrap())); + let f = HashableValue(Cow::Owned(Value::from_str("false").unwrap())); + let ns = + HashableValue(Cow::Owned(Value::from_str("[0, -0, 123.4567, -100000000]").unwrap())); + let m = + HashableValue(Cow::Owned(Value::from_str("{ \"field\": 0, \"field\": -0 }").unwrap())); + + let mut coll = HashSet::new(); + + assert!(!coll.contains(&val)); + coll.insert(val.clone()); + assert!(coll.contains(&val)); + + assert!(!coll.contains(&t)); + assert!(!coll.contains(&f)); + coll.insert(t.clone()); + assert!(coll.contains(&t)); + assert!(!coll.contains(&f)); + coll.insert(f.clone()); + assert!(coll.contains(&t)); + assert!(coll.contains(&f)); + + assert!(!coll.contains(&ns)); + coll.insert(ns.clone()); + assert!(coll.contains(&ns)); + + assert!(!coll.contains(&m)); + coll.insert(m.clone()); + assert!(coll.contains(&m)); + } +} diff --git a/jsonrpc/src/http/bitreq_http_async.rs b/jsonrpc/src/http/bitreq_http_async.rs new file mode 100644 index 000000000..046c47a3d --- /dev/null +++ b/jsonrpc/src/http/bitreq_http_async.rs @@ -0,0 +1,247 @@ +//! This module implements the [`crate::client_async::Transport`] trait using [`bitreq`] +//! as the underlying HTTP transport. +//! +//! [bitreq]: + +use std::time::Duration; +use std::{error, fmt}; + +use base64::engine::general_purpose::STANDARD as BASE64; +use base64::Engine; + +use crate::client_async::{BoxFuture, Transport}; +use crate::{Request, Response}; + +const DEFAULT_URL: &str = "http://localhost"; +const DEFAULT_PORT: u16 = 8332; // the default RPC port for bitcoind. +const DEFAULT_TIMEOUT_SECONDS: u64 = 15; + +/// An HTTP transport that uses [`bitreq`] and is useful for running a bitcoind RPC client. +#[derive(Clone, Debug)] +pub struct BitreqHttpTransport { + /// URL of the RPC server. + url: String, + /// Timeout only supports second granularity. + timeout: Duration, + /// The value of the `Authorization` HTTP header, i.e., a base64 encoding of 'user:password'. + basic_auth: Option, +} + +impl Default for BitreqHttpTransport { + fn default() -> Self { + BitreqHttpTransport { + url: format!("{}:{}", DEFAULT_URL, DEFAULT_PORT), + timeout: Duration::from_secs(DEFAULT_TIMEOUT_SECONDS), + basic_auth: None, + } + } +} + +impl BitreqHttpTransport { + /// Constructs a new [`BitreqHttpTransport`] with default parameters. + pub fn new() -> Self { BitreqHttpTransport::default() } + + /// Returns a builder for [`BitreqHttpTransport`]. + pub fn builder() -> Builder { Builder::new() } + + async fn request(&self, req: impl serde::Serialize) -> Result + where + R: for<'a> serde::de::Deserialize<'a>, + { + let req = match &self.basic_auth { + Some(auth) => bitreq::Request::new(bitreq::Method::Post, &self.url) + .with_timeout(self.timeout.as_secs()) + .with_header("Authorization", auth) + .with_json(&req)?, + None => bitreq::Request::new(bitreq::Method::Post, &self.url) + .with_timeout(self.timeout.as_secs()) + .with_json(&req)?, + }; + + // Send the request and parse the response. If the response is an error that does not + // contain valid JSON in its body (for instance if the bitcoind HTTP server work queue + // depth is exceeded), return the raw HTTP error so users can match against it. + let resp = req.send_async().await?; + match resp.json() { + Ok(json) => Ok(json), + Err(bitreq_err) => + if resp.status_code != 200 { + Err(Error::Http(HttpError { + status_code: resp.status_code, + body: resp.as_str().unwrap_or("").to_string(), + })) + } else { + Err(Error::Bitreq(bitreq_err)) + }, + } + } +} + +impl Transport for BitreqHttpTransport { + fn send_request<'a>( + &'a self, + req: Request<'a>, + ) -> BoxFuture<'a, Result> { + Box::pin(async move { Ok(self.request(req).await?) }) + } + + fn send_batch<'a>( + &'a self, + reqs: &'a [Request<'a>], + ) -> BoxFuture<'a, Result, crate::Error>> { + Box::pin(async move { Ok(self.request(reqs).await?) }) + } + + fn fmt_target(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}", self.url) } +} + +/// Builder for async bitcoind [`BitreqHttpTransport`]. +#[derive(Clone, Debug)] +pub struct Builder { + tp: BitreqHttpTransport, +} + +impl Builder { + /// Constructs a new [`Builder`] with default configuration and the URL to use. + pub fn new() -> Builder { Builder { tp: BitreqHttpTransport::new() } } + + /// Sets the timeout after which requests will abort if they aren't finished. + pub fn timeout(mut self, timeout: Duration) -> Self { + self.tp.timeout = timeout; + self + } + + /// Sets the URL of the server to the transport. + #[allow(clippy::assigning_clones)] // clone_into is only available in Rust 1.63 + pub fn url(mut self, url: &str) -> Result { + self.tp.url = url.to_owned(); + Ok(self) + } + + /// Adds authentication information to the transport. + pub fn basic_auth(mut self, user: String, pass: Option) -> Self { + let mut s = user; + s.push(':'); + if let Some(ref pass) = pass { + s.push_str(pass.as_ref()); + } + self.tp.basic_auth = Some(format!("Basic {}", &BASE64.encode(s.as_bytes()))); + self + } + + /// Adds authentication information to the transport using a cookie string ('user:pass'). + /// + /// Does no checking on the format of the cookie string, just base64 encodes whatever is passed in. + /// + /// # Examples + /// + /// ```no_run + /// # use jsonrpc::bitreq_http_async::BitreqHttpTransport; + /// # use std::fs::{self, File}; + /// # use std::path::Path; + /// # let cookie_file = Path::new("~/.bitcoind/.cookie"); + /// let mut file = File::open(cookie_file).expect("couldn't open cookie file"); + /// let mut cookie = String::new(); + /// fs::read_to_string(&mut cookie).expect("couldn't read cookie file"); + /// let client = BitreqHttpTransport::builder().cookie_auth(cookie); + /// ``` + pub fn cookie_auth>(mut self, cookie: S) -> Self { + self.tp.basic_auth = Some(format!("Basic {}", &BASE64.encode(cookie.as_ref().as_bytes()))); + self + } + + /// Builds the final [`BitreqHttpTransport`]. + pub fn build(self) -> BitreqHttpTransport { self.tp } +} + +impl Default for Builder { + fn default() -> Self { Builder::new() } +} + +/// An HTTP error. +#[derive(Debug)] +pub struct HttpError { + /// Status code of the error response. + pub status_code: i32, + /// Raw body of the error response. + pub body: String, +} + +impl fmt::Display for HttpError { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "status: {}, body: {}", self.status_code, self.body) + } +} + +impl error::Error for HttpError {} + +/// Error that can happen when sending requests. +/// +/// In case of error, a JSON error is returned if the body of the response could be parsed as such. +/// Otherwise, an HTTP error is returned containing the status code and the raw body. +#[non_exhaustive] +#[derive(Debug)] +pub enum Error { + /// JSON parsing error. + Json(serde_json::Error), + /// Bitreq error. + Bitreq(bitreq::Error), + /// HTTP error that does not contain valid JSON as body. + Http(HttpError), +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + match *self { + Error::Json(ref e) => write!(f, "parsing JSON failed: {}", e), + Error::Bitreq(ref e) => write!(f, "bitreq: {}", e), + Error::Http(ref e) => write!(f, "http ({})", e), + } + } +} + +impl error::Error for Error { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + use self::Error::*; + + match *self { + Json(ref e) => Some(e), + Bitreq(ref e) => Some(e), + Http(ref e) => Some(e), + } + } +} + +impl From for Error { + fn from(e: serde_json::Error) -> Self { Error::Json(e) } +} + +impl From for Error { + fn from(e: bitreq::Error) -> Self { Error::Bitreq(e) } +} + +impl From for crate::Error { + fn from(e: Error) -> crate::Error { + match e { + Error::Json(e) => crate::Error::Json(e), + e => crate::Error::Transport(Box::new(e)), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::client_async::Client; + + #[test] + fn construct() { + let tp = Builder::new() + .timeout(Duration::from_millis(100)) + .url("http://localhost:22") + .unwrap() + .basic_auth("user".to_string(), None) + .build(); + let _ = Client::with_transport(tp); + } +} diff --git a/jsonrpc/src/http/mod.rs b/jsonrpc/src/http/mod.rs index f6221f388..d6f7f5a73 100644 --- a/jsonrpc/src/http/mod.rs +++ b/jsonrpc/src/http/mod.rs @@ -6,6 +6,9 @@ pub mod simple_http; #[cfg(feature = "bitreq_http")] pub mod bitreq_http; +#[cfg(feature = "bitreq_http_async")] +pub mod bitreq_http_async; + /// The default TCP port to use for connections. /// Set to 8332, the default RPC port for bitcoind. pub const DEFAULT_PORT: u16 = 8332; diff --git a/jsonrpc/src/lib.rs b/jsonrpc/src/lib.rs index ee2953ee7..c06178510 100644 --- a/jsonrpc/src/lib.rs +++ b/jsonrpc/src/lib.rs @@ -21,11 +21,15 @@ pub extern crate base64; pub extern crate bitreq; pub mod client; +#[cfg(feature = "client_async")] +pub mod client_async; pub mod error; pub mod http; #[cfg(feature = "bitreq_http")] pub use http::bitreq_http; +#[cfg(feature = "bitreq_http_async")] +pub use http::bitreq_http_async; #[cfg(feature = "simple_http")] pub use http::simple_http;