Skip to content

Commit af183df

Browse files
committed
feat: add default timeout long requests
1 parent 676d135 commit af183df

2 files changed

Lines changed: 160 additions & 75 deletions

File tree

cli/v2/src/main.rs

Lines changed: 105 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use bdk_sp_oracles::{
3535
TrustedPeer, UnboundedReceiver, Warning,
3636
},
3737
filters::kyoto::{FilterEvent, FilterSubscriber},
38-
frigate::{FrigateClient, History, SubscribeRequest},
38+
frigate::{FrigateClient, History, SubscribeRequest, UnsubscribeRequest},
3939
tweaks::blindbit::{BlindbitSubscriber, TweakEvent},
4040
};
4141
use bdk_sp_wallet::{
@@ -583,7 +583,11 @@ async fn main() -> anyhow::Result<()> {
583583
// We are doing a one time scanning only. So instead of calling `blockchain.scripthash.subscribe` on each script from the wallet,
584584
// we just subscribe and read the scanning result from the stream. On each result received we update the wallet state and once scanning progress reaches 1.0 (100%) we stop.
585585

586-
let mut client = FrigateClient::connect(&rpc_args.url).await.unwrap();
586+
let mut client = FrigateClient::connect(&rpc_args.url)
587+
.await
588+
.unwrap()
589+
.with_timeout(tokio::time::Duration::from_secs(600));
590+
587591
let labels = wallet
588592
.indexer()
589593
.index()
@@ -597,88 +601,120 @@ async fn main() -> anyhow::Result<()> {
597601
None
598602
};
599603

600-
// send a subscribe request
601604
let subscribe_params = SubscribeRequest {
602605
scan_priv_key: *wallet.indexer().scan_sk(),
603606
spend_pub_key: *wallet.indexer().spend_pk(),
604607
start_height: start,
605608
labels,
606609
};
607610

608-
client.version().await.unwrap();
609-
client.subscribe(&subscribe_params).await.unwrap();
611+
// Attempt to subscribe; any timeout will trigger unsubscribe automatically.
612+
match client.subscribe_with_timeout(&subscribe_params).await {
613+
Ok(Some((histories, progress))) => {
614+
tracing::info!(
615+
"Initial subscription result: {} histories, progress {}",
616+
histories.len(),
617+
progress
618+
);
619+
}
620+
Ok(None) => {
621+
tracing::info!("Subscription acknowledged, awaiting notifications");
622+
}
623+
Err(e) => {
624+
tracing::error!("Subscribe failed: {}", e);
625+
return Err(e.into());
626+
}
627+
}
610628

611629
tracing::info!("Starting frigate scanning loop...");
612630
loop {
613-
let subscribe_result = client.read_from_stream(4096).await.unwrap();
614-
615-
if subscribe_result["params"].is_object() {
616-
let histories: Vec<History> =
617-
serde_json::from_value(subscribe_result["params"]["history"].clone())?;
618-
let progress = subscribe_result["params"]["progress"]
619-
.as_f64()
620-
.unwrap_or(0.0) as f32;
621-
622-
// Group by block height, then iterate over by fetching block details and apply
623-
let mut secrets_by_height: HashMap<u32, HashMap<Txid, PublicKey>> =
624-
HashMap::new();
625-
626-
tracing::debug!("Received history {:#?}", histories);
627-
628-
histories.iter().for_each(|h| {
629-
secrets_by_height
630-
.entry(h.height)
631-
.and_modify(|v| {
632-
v.insert(h.tx_hash, h.tweak_key);
633-
})
634-
.or_insert(HashMap::from([(h.tx_hash, h.tweak_key)]));
635-
});
636-
637-
// Filter when the height is 0, because that would mean mempool transaction
638-
for secret in secrets_by_height.into_iter().filter(|v| v.0 > 0) {
639-
// Since frigate doesn't provide a blockchain.getblock we will mimick that here
640-
// By constructing a block from the block header and the list of transactions
641-
// received from the scan request
642-
let mut raw_blk = client.get_block_header(secret.0).await.unwrap();
643-
raw_blk.push_str("00");
644-
645-
// Push dummy coinbase
646-
let dummy_coinbase = "01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff1b03951a0604f15ccf5609013803062b9b5a0100072f425443432f20000000000000000000";
647-
let coinbase: Transaction =
648-
deserialize(&Vec::<u8>::from_hex(dummy_coinbase).unwrap()).unwrap();
649-
let mut block: Block =
650-
deserialize(&Vec::<u8>::from_hex(&raw_blk).unwrap()).unwrap();
651-
652-
let mut blockhash = BlockHash::all_zeros();
653-
654-
let mut txs: Vec<Transaction> = vec![coinbase];
655-
for key in secret.1.keys() {
656-
let tx_result = client.get_transaction(key.to_string()).await.unwrap();
657-
let tx: Transaction =
658-
deserialize(&Vec::<u8>::from_hex(&tx_result.1).unwrap()).unwrap();
659-
txs.push(tx);
660-
661-
blockhash = BlockHash::from_str(&tx_result.0)?;
662-
}
631+
match client.read_from_stream(4096).await {
632+
Ok(subscribe_result) => {
633+
if subscribe_result["params"].is_object() {
634+
let histories: Vec<History> = serde_json::from_value(
635+
subscribe_result["params"]["history"].clone(),
636+
)?;
637+
let progress = subscribe_result["params"]["progress"]
638+
.as_f64()
639+
.unwrap_or(0.0) as f32;
640+
641+
let mut secrets_by_height: HashMap<u32, HashMap<Txid, PublicKey>> =
642+
HashMap::new();
643+
644+
tracing::debug!("Received history {:#?}", histories);
645+
646+
histories.iter().for_each(|h| {
647+
secrets_by_height
648+
.entry(h.height)
649+
.and_modify(|v| {
650+
v.insert(h.tx_hash, h.tweak_key);
651+
})
652+
.or_insert(HashMap::from([(h.tx_hash, h.tweak_key)]));
653+
});
654+
655+
// Filter when the height is 0, because that would mean mempool transaction
656+
for secret in secrets_by_height.into_iter().filter(|v| v.0 > 0) {
657+
// Since frigate doesn't provide a blockchain.getblock we will mimick that here
658+
// By constructing a block from the block header and the list of transactions
659+
// received from the scan request
660+
let mut raw_blk = client.get_block_header(secret.0).await.unwrap();
661+
raw_blk.push_str("00");
662+
663+
// Push dummy coinbase
664+
let dummy_coinbase = "01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff1b03951a0604f15ccf5609013803062b9b5a0100072f425443432f20000000000000000000";
665+
let coinbase: Transaction =
666+
deserialize(&Vec::<u8>::from_hex(dummy_coinbase).unwrap())
667+
.unwrap();
668+
let mut block: Block =
669+
deserialize(&Vec::<u8>::from_hex(&raw_blk).unwrap()).unwrap();
670+
671+
let mut blockhash = BlockHash::all_zeros();
672+
673+
let mut txs: Vec<Transaction> = vec![coinbase];
674+
for key in secret.1.keys() {
675+
let tx_result =
676+
client.get_transaction(key.to_string()).await.unwrap();
677+
let tx: Transaction =
678+
deserialize(&Vec::<u8>::from_hex(&tx_result.1).unwrap())
679+
.unwrap();
680+
txs.push(tx);
681+
682+
blockhash = BlockHash::from_str(&tx_result.0).unwrap();
683+
}
663684

664-
block.txdata = txs;
665-
tracing::debug!("Final block {:?}", block);
666-
wallet.apply_block_relevant(&block, secret.1, secret.0);
685+
block.txdata = txs;
686+
tracing::debug!("Final block {:?}", block);
687+
wallet.apply_block_relevant(&block, secret.1, secret.0);
667688

668-
tracing::debug!("Checkpoint hash {blockhash:?}");
669-
let checkpoint = wallet.chain().tip().insert(BlockId {
670-
height: secret.0,
671-
hash: blockhash,
672-
});
673-
wallet.update_chain(checkpoint);
674-
}
689+
tracing::debug!("Checkpoint hash {blockhash:?}");
690+
let checkpoint = wallet.chain().tip().insert(BlockId {
691+
height: secret.0,
692+
hash: blockhash,
693+
});
694+
wallet.update_chain(checkpoint);
695+
}
675696

676-
tracing::info!("Progress {progress}");
677-
// Check the progress
678-
if progress >= 1.0 {
679-
tracing::info!("Scanning completed");
697+
tracing::info!("Progress {progress}");
698+
// Check the progress
699+
if progress >= 1.0 {
700+
tracing::info!("Scanning completed");
701+
break;
702+
}
703+
}
704+
}
705+
Err(e) if e.to_string().contains("timed out") => {
706+
tracing::warn!("read_from_stream timeout, exiting scan");
707+
let unsubscribe_request = UnsubscribeRequest {
708+
scan_privkey: *wallet.indexer().scan_sk(),
709+
spend_pubkey: *wallet.indexer().spend_pk(),
710+
};
711+
let _ = client.unsubscribe(&unsubscribe_request).await;
680712
break;
681713
}
714+
Err(e) => {
715+
tracing::error!("read_from_stream error: {}", e);
716+
return Err(e.into());
717+
}
682718
}
683719
}
684720
}

oracles/src/frigate/mod.rs

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use bip157::tokio::io::{AsyncReadExt, AsyncWriteExt};
22
use bip157::tokio::net::TcpStream;
3+
use bip157::tokio::time::{timeout, Duration};
34
use bitcoin::secp256k1::{PublicKey, SecretKey};
45
use bitcoin::Txid;
56
use serde::{Deserialize, Serialize};
@@ -37,9 +38,20 @@ impl From<std::io::Error> for FrigateError {
3738
}
3839
}
3940

41+
impl std::fmt::Display for FrigateError {
42+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43+
match self {
44+
FrigateError::Generic(str) => write!(f, "{str}"),
45+
_ => write!(f, "Something wrong happened"),
46+
}
47+
}
48+
}
49+
impl std::error::Error for FrigateError {}
50+
4051
pub struct FrigateClient {
4152
pub host_url: String,
4253
client: Box<TcpStream>,
54+
pub request_timeout: Duration,
4355
}
4456

4557
#[derive(Debug, Serialize, Deserialize)]
@@ -102,9 +114,16 @@ impl FrigateClient {
102114
Ok(Self {
103115
host_url: host_url.to_string(),
104116
client: Box::new(stream),
117+
request_timeout: Duration::from_secs(10),
105118
})
106119
}
107120

121+
/// Sets a custom request timeout for this client.
122+
pub fn with_timeout(mut self, timeout: Duration) -> Self {
123+
self.request_timeout = timeout;
124+
self
125+
}
126+
108127
pub async fn read_from_stream(&mut self, size: usize) -> Result<Value, FrigateError> {
109128
let mut buffer = vec![0; size];
110129
let n = self.client.read(&mut buffer).await?;
@@ -123,12 +142,20 @@ impl FrigateClient {
123142
}
124143

125144
async fn send_request(&mut self, req_bytes: &[u8]) -> Result<Value, FrigateError> {
126-
self.client.write_all(req_bytes).await?;
127-
128-
self.client.write_all(b"\n").await?;
129-
self.client.flush().await?;
130-
131-
self.read_from_stream(4096).await
145+
match timeout(self.request_timeout, async {
146+
self.client.write_all(req_bytes).await?;
147+
self.client.write_all(b"\n").await?;
148+
self.client.flush().await?;
149+
self.read_from_stream(4096).await
150+
})
151+
.await
152+
{
153+
Ok(res) => res,
154+
Err(_) => Err(FrigateError::Generic(format!(
155+
"request timed out after {:?}",
156+
self.request_timeout
157+
))),
158+
}
132159
}
133160

134161
pub async fn get_block_header(&mut self, height: u32) -> Result<String, FrigateError> {
@@ -186,6 +213,7 @@ impl FrigateClient {
186213
&mut self,
187214
req: &SubscribeRequest,
188215
) -> Result<Option<(Vec<History>, f32)>, FrigateError> {
216+
self.version().await?;
189217
let mut params: Vec<Value> = vec![
190218
serde_json::json!(req.scan_priv_key),
191219
serde_json::json!(req.spend_pub_key),
@@ -232,6 +260,7 @@ impl FrigateClient {
232260
serde_json::json!(req.spend_pubkey),
233261
];
234262

263+
self.version().await?;
235264
let req = RequestPayload {
236265
method: UNSUBSCRIBE_RPC_METHOD.to_string(),
237266
params: serde_json::json!(params),
@@ -244,4 +273,24 @@ impl FrigateClient {
244273

245274
Ok(result["result"].to_string())
246275
}
276+
277+
pub async fn subscribe_with_timeout(
278+
&mut self,
279+
req: &SubscribeRequest,
280+
) -> Result<Option<(Vec<History>, f32)>, FrigateError> {
281+
match self.subscribe(req).await {
282+
Ok(res) => Ok(res),
283+
Err(e) => {
284+
if e.to_string().contains("timed out") {
285+
tracing::warn!("subscribe request timed out, attempting unsubscribe");
286+
let unsub = UnsubscribeRequest {
287+
scan_privkey: req.scan_priv_key,
288+
spend_pubkey: req.spend_pub_key,
289+
};
290+
let _ = self.unsubscribe(&unsub).await;
291+
}
292+
Err(e)
293+
}
294+
}
295+
}
247296
}

0 commit comments

Comments
 (0)