Skip to content

Commit 72227ed

Browse files
committed
feat: integrate rocksdb to persist btc payments
Signed-off-by: Gregory Hill <gregorydhill@outlook.com>
1 parent 1f41767 commit 72227ed

7 files changed

Lines changed: 63 additions & 42 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vault/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ futures = "0.3.5"
2222
async-trait = "0.1.40"
2323
sha2 = "0.8.2"
2424
git-version = "0.3.4"
25+
rocksdb = { version = "0.17", features = ["snappy"], default-features = false }
2526

2627
tracing = { version = "0.1", features = ["log"] }
2728
tracing-subscriber = { version = "0.2.12", features = ["registry", "env-filter", "fmt"] }

vault/src/execution.rs

Lines changed: 30 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{error::Error, VaultIdManager};
22
use bitcoin::{
3-
BitcoinCoreApi, Transaction, TransactionExt, TransactionMetadata, BLOCK_INTERVAL as BITCOIN_BLOCK_INTERVAL,
3+
BitcoinCoreApi, Hash, TransactionExt, TransactionMetadata, Txid, BLOCK_INTERVAL as BITCOIN_BLOCK_INTERVAL,
44
};
55
use futures::{
66
stream::{self, StreamExt},
@@ -11,7 +11,7 @@ use runtime::{
1111
InterBtcReplaceRequest, IssuePallet, RedeemPallet, RedeemRequestStatus, RefundPallet, ReplacePallet,
1212
ReplaceRequestStatus, RequestRefundEvent, SecurityPallet, UtilFuncs, VaultId, VaultRegistryPallet, H256,
1313
};
14-
use std::{collections::HashMap, convert::TryInto, time::Duration};
14+
use std::{collections::HashMap, convert::TryInto, sync::Arc, time::Duration};
1515
use tokio::time::sleep;
1616

1717
const ON_FORK_RETRY_DELAY: Duration = Duration::from_secs(10);
@@ -180,6 +180,7 @@ impl Request {
180180
&self,
181181
parachain_rpc: P,
182182
btc_rpc: B,
183+
rocksdb: Arc<rocksdb::DB>,
183184
num_confirmations: u32,
184185
) -> Result<(), Error> {
185186
// ensure the deadline has not expired yet
@@ -192,7 +193,13 @@ impl Request {
192193
}
193194

194195
let tx_metadata = self
195-
.transfer_btc(&parachain_rpc, btc_rpc, num_confirmations, self.vault_id.clone())
196+
.transfer_btc(
197+
&parachain_rpc,
198+
btc_rpc,
199+
rocksdb,
200+
num_confirmations,
201+
self.vault_id.clone(),
202+
)
196203
.await?;
197204
self.execute(parachain_rpc, tx_metadata).await
198205
}
@@ -213,6 +220,7 @@ impl Request {
213220
&self,
214221
parachain_rpc: &P,
215222
btc_rpc: B,
223+
rocksdb: Arc<rocksdb::DB>,
216224
num_confirmations: u32,
217225
vault_id: VaultId,
218226
) -> Result<TransactionMetadata, Error> {
@@ -244,6 +252,7 @@ impl Request {
244252
};
245253

246254
let txid = btc_rpc.send_transaction(tx).await?;
255+
rocksdb.put(self.hash, txid).expect("could not write to db");
247256

248257
loop {
249258
let tx_metadata = btc_rpc.wait_for_transaction_metadata(txid, num_confirmations).await?;
@@ -310,7 +319,7 @@ impl Request {
310319
pub async fn execute_open_requests<B: BitcoinCoreApi + Clone + Send + Sync + 'static>(
311320
parachain_rpc: InterBtcParachain,
312321
btc_rpc: VaultIdManager<B>,
313-
read_only_btc_rpc: B,
322+
rocksdb: Arc<rocksdb::DB>,
314323
num_confirmations: u32,
315324
payment_margin: Duration,
316325
process_refunds: bool,
@@ -364,25 +373,19 @@ pub async fn execute_open_requests<B: BitcoinCoreApi + Clone + Send + Sync + 'st
364373
.map(|x| (x.hash, x))
365374
.collect::<HashMap<_, _>>();
366375

367-
// find the height of bitcoin chain corresponding to the earliest btc_height
368-
let btc_start_height = match open_requests
369-
.iter()
370-
.map(|(_, request)| request.btc_height.unwrap_or(u32::MAX))
371-
.min()
372-
{
373-
Some(x) => x,
374-
None => return Ok(()), // the iterator is empty so we have nothing to do
375-
};
376-
377-
// iterate through transactions in reverse order, starting from those in the mempool
378-
let mut transaction_stream = bitcoin::reverse_stream_transactions(&read_only_btc_rpc, btc_start_height).await?;
379-
while let Some(result) = transaction_stream.next().await {
380-
let tx = result?;
381-
376+
for (hash, request) in open_requests.clone().into_iter() {
382377
// get the request this transaction corresponds to, if any
383-
if let Some(request) = get_request_for_btc_tx(&tx, &open_requests) {
378+
if let Ok(Some(raw_txid)) = rocksdb.get(&hash) {
384379
// remove request from the hashmap
385-
open_requests.retain(|&key, _| key != request.hash);
380+
open_requests.remove(&request.hash);
381+
382+
let txid = match Txid::from_slice(&raw_txid) {
383+
Ok(txid) => txid,
384+
Err(err) => {
385+
tracing::error!("Could not decode txid: {}", err);
386+
continue;
387+
}
388+
};
386389

387390
tracing::info!(
388391
"{:?} request #{:?} has valid bitcoin payment - processing...",
@@ -409,7 +412,7 @@ pub async fn execute_open_requests<B: BitcoinCoreApi + Clone + Send + Sync + 'st
409412
// Payment has been made, but it might not have been confirmed enough times yet
410413
let tx_metadata = btc_rpc
411414
.clone()
412-
.wait_for_transaction_metadata(tx.txid(), num_confirmations)
415+
.wait_for_transaction_metadata(txid, num_confirmations)
413416
.await;
414417

415418
match tx_metadata {
@@ -456,6 +459,7 @@ pub async fn execute_open_requests<B: BitcoinCoreApi + Clone + Send + Sync + 'st
456459
// make copies of the variables we move into the task
457460
let parachain_rpc = parachain_rpc.clone();
458461
let btc_rpc = btc_rpc.clone();
462+
let rocksdb = rocksdb.clone();
459463
tokio::spawn(async move {
460464
let btc_rpc = match btc_rpc.get_bitcoin_rpc(&request.vault_id).await {
461465
Some(x) => x,
@@ -474,7 +478,10 @@ pub async fn execute_open_requests<B: BitcoinCoreApi + Clone + Send + Sync + 'st
474478
request.hash
475479
);
476480

477-
match request.pay_and_execute(parachain_rpc, btc_rpc, num_confirmations).await {
481+
match request
482+
.pay_and_execute(parachain_rpc, btc_rpc, rocksdb, num_confirmations)
483+
.await
484+
{
478485
Ok(_) => tracing::info!(
479486
"{:?} request #{:?} successfully executed",
480487
request.request_type,
@@ -493,19 +500,6 @@ pub async fn execute_open_requests<B: BitcoinCoreApi + Clone + Send + Sync + 'st
493500
Ok(())
494501
}
495502

496-
/// Get the Request from the hashmap that the given Transaction satisfies, based
497-
/// on the OP_RETURN and the amount of btc that is transfered to the address
498-
fn get_request_for_btc_tx(tx: &Transaction, hash_map: &HashMap<H256, Request>) -> Option<Request> {
499-
let hash = tx.get_op_return()?;
500-
let request = hash_map.get(&hash)?;
501-
let paid_amount = tx.get_payment_amount_to(request.btc_address)?;
502-
if paid_amount as u128 >= request.amount {
503-
Some(request.clone())
504-
} else {
505-
None
506-
}
507-
}
508-
509503
#[cfg(all(test, feature = "standalone-metadata"))]
510504
mod tests {
511505
use super::*;

vault/src/redeem.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::{execution::*, system::VaultIdManager};
22
use bitcoin::BitcoinCoreApi;
33
use runtime::{InterBtcParachain, RedeemPallet, RequestRedeemEvent};
44
use service::Error as ServiceError;
5-
use std::time::Duration;
5+
use std::{sync::Arc, time::Duration};
66

77
/// Listen for RequestRedeemEvent directed at this vault; upon reception, transfer
88
/// bitcoin and call execute_redeem
@@ -16,6 +16,7 @@ use std::time::Duration;
1616
pub async fn listen_for_redeem_requests<B: BitcoinCoreApi + Clone + Send + Sync + 'static>(
1717
parachain_rpc: InterBtcParachain,
1818
btc_rpc: VaultIdManager<B>,
19+
rocksdb: Arc<rocksdb::DB>,
1920
num_confirmations: u32,
2021
payment_margin: Duration,
2122
) -> Result<(), ServiceError> {
@@ -33,6 +34,7 @@ pub async fn listen_for_redeem_requests<B: BitcoinCoreApi + Clone + Send + Sync
3334
// by reference. Since spawn requires static lifetimes, we will need to capture the
3435
// arguments by value rather than by reference, so clone these:
3536
let parachain_rpc = parachain_rpc.clone();
37+
let rocksdb = rocksdb.clone();
3638
// Spawn a new task so that we handle these events concurrently
3739
tokio::spawn(async move {
3840
tracing::info!("Executing redeem #{:?}", event.redeem_id);
@@ -42,7 +44,9 @@ pub async fn listen_for_redeem_requests<B: BitcoinCoreApi + Clone + Send + Sync
4244
parachain_rpc.get_redeem_request(event.redeem_id).await?,
4345
payment_margin,
4446
)?;
45-
request.pay_and_execute(parachain_rpc, btc_rpc, num_confirmations).await
47+
request
48+
.pay_and_execute(parachain_rpc, btc_rpc, rocksdb, num_confirmations)
49+
.await
4650
}
4751
.await;
4852

vault/src/refund.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::{execution::*, system::VaultIdManager};
22
use bitcoin::BitcoinCoreApi;
33
use runtime::{InterBtcParachain, RequestRefundEvent};
44
use service::Error as ServiceError;
5+
use std::sync::Arc;
56

67
/// Listen for RequestRefundEvent directed at this vault; upon reception, transfer
78
/// bitcoin and call execute_refund
@@ -16,6 +17,7 @@ use service::Error as ServiceError;
1617
pub async fn listen_for_refund_requests<B: BitcoinCoreApi + Clone + Send + Sync + 'static>(
1718
parachain_rpc: InterBtcParachain,
1819
btc_rpc: VaultIdManager<B>,
20+
rocksdb: Arc<rocksdb::DB>,
1921
num_confirmations: u32,
2022
process_refunds: bool,
2123
) -> Result<(), ServiceError> {
@@ -37,12 +39,15 @@ pub async fn listen_for_refund_requests<B: BitcoinCoreApi + Clone + Send + Sync
3739
// by reference. Since spawn requires static lifetimes, we will need to capture the
3840
// arguments by value rather than by reference, so clone these:
3941
let parachain_rpc = parachain_rpc.clone();
42+
let rocksdb = rocksdb.clone();
4043
// Spawn a new task so that we handle these events concurrently
4144
tokio::spawn(async move {
4245
tracing::info!("Executing refund #{:?}", event.refund_id);
4346
// prepare the action that will be executed after the bitcoin transfer
4447
let request = Request::from_refund_request_event(&event);
45-
let result = request.pay_and_execute(parachain_rpc, btc_rpc, num_confirmations).await;
48+
let result = request
49+
.pay_and_execute(parachain_rpc, btc_rpc, rocksdb, num_confirmations)
50+
.await;
4651

4752
match result {
4853
Ok(_) => tracing::info!(

vault/src/replace.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use runtime::{
66
RequestReplaceEvent, UtilFuncs, VaultId, VaultRegistryPallet,
77
};
88
use service::Error as ServiceError;
9-
use std::time::Duration;
9+
use std::{sync::Arc, time::Duration};
1010

1111
/// Listen for AcceptReplaceEvent directed at this vault and continue the replacement
1212
/// procedure by transferring bitcoin and calling execute_replace
@@ -19,11 +19,13 @@ use std::time::Duration;
1919
pub async fn listen_for_accept_replace<B: BitcoinCoreApi + Clone + Send + Sync + 'static>(
2020
parachain_rpc: InterBtcParachain,
2121
btc_rpc: VaultIdManager<B>,
22+
rocksdb: Arc<rocksdb::DB>,
2223
num_confirmations: u32,
2324
payment_margin: Duration,
2425
) -> Result<(), ServiceError> {
2526
let parachain_rpc = &parachain_rpc;
2627
let btc_rpc = &btc_rpc;
28+
let rocksdb = &rocksdb;
2729
parachain_rpc
2830
.on_event::<AcceptReplaceEvent, _, _, _>(
2931
|event| async move {
@@ -37,6 +39,7 @@ pub async fn listen_for_accept_replace<B: BitcoinCoreApi + Clone + Send + Sync +
3739
// by reference. Since spawn requires static lifetimes, we will need to capture the
3840
// arguments by value rather than by reference, so clone these:
3941
let parachain_rpc = parachain_rpc.clone();
42+
let rocksdb = rocksdb.clone();
4043
// Spawn a new task so that we handle these events concurrently
4144
tokio::spawn(async move {
4245
tracing::info!("Executing accept replace #{:?}", event.replace_id);
@@ -47,7 +50,9 @@ pub async fn listen_for_accept_replace<B: BitcoinCoreApi + Clone + Send + Sync +
4750
parachain_rpc.get_replace_request(event.replace_id).await?,
4851
payment_margin,
4952
)?;
50-
request.pay_and_execute(parachain_rpc, btc_rpc, num_confirmations).await
53+
request
54+
.pay_and_execute(parachain_rpc, btc_rpc, rocksdb, num_confirmations)
55+
.await
5156
}
5257
.await;
5358

vault/src/system.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ pub struct VaultServiceConfig {
100100
/// Defaults to the relay chain currency if not set.
101101
#[clap(long, parse(try_from_str = parse_collateral_currency))]
102102
pub collateral_currency_id: Option<CurrencyId>,
103+
104+
#[clap(long, default_value = ".vault_db")]
105+
pub rocksdb_path: String,
103106
}
104107

105108
async fn active_block_listener(
@@ -324,10 +327,15 @@ impl VaultService {
324327

325328
let startup_height = self.await_parachain_block().await?;
326329

330+
// persist request payments since checking mempool is unreliable
331+
// we could use the `listtransactions` bitcoin rpc to fetch txs
332+
// but this requires paging and may eventually be very large
333+
let rocksdb = Arc::new(rocksdb::DB::open_default(self.config.rocksdb_path.clone()).expect("could not open db"));
334+
327335
let open_request_executor = execute_open_requests(
328336
self.btc_parachain.clone(),
329337
self.vault_id_manager.clone(),
330-
walletless_btc_rpc.clone(),
338+
rocksdb.clone(),
331339
num_confirmations,
332340
self.config.payment_margin_minutes,
333341
!self.config.no_auto_refund,
@@ -416,6 +424,7 @@ impl VaultService {
416424
listen_for_accept_replace(
417425
self.btc_parachain.clone(),
418426
self.vault_id_manager.clone(),
427+
rocksdb.clone(),
419428
num_confirmations,
420429
self.config.payment_margin_minutes,
421430
),
@@ -466,6 +475,7 @@ impl VaultService {
466475
listen_for_redeem_requests(
467476
self.btc_parachain.clone(),
468477
self.vault_id_manager.clone(),
478+
rocksdb.clone(),
469479
num_confirmations,
470480
self.config.payment_margin_minutes,
471481
),
@@ -477,6 +487,7 @@ impl VaultService {
477487
listen_for_refund_requests(
478488
self.btc_parachain.clone(),
479489
self.vault_id_manager.clone(),
490+
rocksdb.clone(),
480491
num_confirmations,
481492
!self.config.no_auto_refund,
482493
),

0 commit comments

Comments
 (0)