Skip to content

Commit 0c8c672

Browse files
committed
feat: use electrum_streaming_client for handling requests
1 parent 9905dc1 commit 0c8c672

5 files changed

Lines changed: 146 additions & 238 deletions

File tree

Cargo.lock

Lines changed: 28 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cli/v2/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@ rand = "0.9.0"
2121
indexer = { version = "0.1.0", path = "../../indexer", features = ["serde"]}
2222
bdk_sp_wallet = { version = "0.1.0", path = "../../wallet", features = ["serde"]}
2323
bdk_sp_oracles = { version = "0.1.0", path = "../../oracles" }
24+
electrum_streaming_client = { git = "https://github.com/sdmg15/electrum_streaming_client", branch = "master"}
2425
tracing = "0.1.41"
2526
tracing-subscriber = "0.3.19"

cli/v2/src/main.rs

Lines changed: 26 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@ use bdk_sp::{
1010
self,
1111
address::NetworkUnchecked,
1212
bip32,
13-
consensus::{deserialize, Decodable},
14-
hashes::Hash,
13+
consensus::{self, deserialize, Decodable},
1514
hex::{DisplayHex, FromHex},
1615
key::Secp256k1,
1716
script::PushBytesBuf,
@@ -35,7 +34,7 @@ use bdk_sp_oracles::{
3534
TrustedPeer, UnboundedReceiver, Warning,
3635
},
3736
filters::kyoto::{FilterEvent, FilterSubscriber},
38-
frigate::{FrigateClient, History, SubscribeRequest, UnsubscribeRequest, DUMMY_COINBASE},
37+
frigate::{FrigateClient, StreamExt, SubscribeRequest, UnsubscribeRequest, DUMMY_COINBASE},
3938
tweaks::blindbit::{BlindbitSubscriber, TweakEvent},
4039
};
4140
use bdk_sp_wallet::{
@@ -47,6 +46,7 @@ use bdk_sp_wallet::{
4746
ChangeSet, SpWallet,
4847
};
4948
use clap::{self, ArgGroup, Args, Parser, Subcommand};
49+
use electrum_streaming_client::{notification::Notification, Event};
5050
use indexer::bdk_chain::BlockId;
5151
use rand::RngCore;
5252
use serde_json::json;
@@ -163,7 +163,6 @@ pub enum Commands {
163163
#[clap(long)]
164164
hash: Option<BlockHash>,
165165
},
166-
167166
ScanFrigate {
168167
#[clap(flatten)]
169168
rpc_args: RpcArgs,
@@ -172,7 +171,6 @@ pub enum Commands {
172171
#[clap(long)]
173172
hash: Option<BlockHash>,
174173
},
175-
176174
Create {
177175
/// Network
178176
#[clap(long, short, default_value = "signet")]
@@ -625,35 +623,15 @@ async fn main() -> anyhow::Result<()> {
625623
labels,
626624
};
627625

628-
// Attempt to subscribe; any timeout will trigger unsubscribe automatically.
629-
match client.subscribe_with_timeout(&subscribe_params).await {
630-
Ok(Some((histories, progress))) => {
631-
tracing::info!(
632-
"Initial subscription result: {} histories, progress {}",
633-
histories.len(),
634-
progress
635-
);
636-
}
637-
Ok(None) => {
638-
tracing::info!("Subscription acknowledged, awaiting notifications");
639-
}
640-
Err(e) => {
641-
tracing::error!("Subscribe failed: {}", e);
642-
return Err(e.into());
643-
}
644-
}
626+
client.version().await?;
627+
client.subscribe(&subscribe_params).await?;
645628

646-
tracing::info!("Starting frigate scanning loop...");
647-
loop {
648-
match client.read_from_stream(4096).await {
649-
Ok(subscribe_result) => {
650-
if subscribe_result["params"].is_object() {
651-
let histories: Vec<History> = serde_json::from_value(
652-
subscribe_result["params"]["history"].clone(),
653-
)?;
654-
let progress = subscribe_result["params"]["progress"]
655-
.as_f64()
656-
.unwrap_or(0.0) as f32;
629+
while let Some(event) = client.events.next().await {
630+
if let Event::Notification(notification) = event {
631+
match notification {
632+
Notification::SpSubscribe(sp_subscribe_notification) => {
633+
let histories = sp_subscribe_notification.history().clone();
634+
let progress = sp_subscribe_notification.progress();
657635

658636
let mut secrets_by_height: HashMap<u32, HashMap<Txid, PublicKey>> =
659637
HashMap::new();
@@ -674,65 +652,50 @@ async fn main() -> anyhow::Result<()> {
674652
// Since frigate doesn't provide a blockchain.getblock we will mimick that here
675653
// By constructing a block from the block header and the list of transactions
676654
// received from the scan request
677-
let mut raw_blk = client.get_block_header(secret.0).await.unwrap();
678-
raw_blk.push_str("00");
655+
let header = client.get_block_header(secret.0).await?.header;
656+
let mut raw_blk = consensus::serialize(&header);
657+
raw_blk.push(0);
679658

680659
// Push dummy coinbase
681660
let coinbase: Transaction =
682661
deserialize(&Vec::<u8>::from_hex(DUMMY_COINBASE).unwrap())
683662
.unwrap();
684-
let mut block: Block =
685-
deserialize(&Vec::<u8>::from_hex(&raw_blk).unwrap()).unwrap();
686-
687-
let mut blockhash = BlockHash::all_zeros();
663+
let mut block: Block = deserialize(&raw_blk).unwrap();
688664

665+
let blockhash = header.block_hash();
689666
let mut txs: Vec<Transaction> = vec![coinbase];
667+
690668
for key in secret.1.keys() {
691-
let tx_result =
692-
client.get_transaction(key.to_string()).await.unwrap();
693-
let tx: Transaction =
694-
deserialize(&Vec::<u8>::from_hex(&tx_result.1).unwrap())
695-
.unwrap();
696-
txs.push(tx);
697-
698-
blockhash = BlockHash::from_str(&tx_result.0).unwrap();
669+
let tx_result = client.get_transaction(*key).await?;
670+
txs.push(tx_result.tx);
699671
}
700672

701673
block.txdata = txs;
702674
tracing::debug!("Final block {:?}", block);
703675
wallet.apply_block_relevant(&block, secret.1, secret.0);
704676

705-
tracing::debug!("Checkpoint hash {blockhash:?}");
706677
let checkpoint = wallet.chain().tip().insert(BlockId {
707678
height: secret.0,
708679
hash: blockhash,
709680
});
710681
wallet.update_chain(checkpoint);
711682
}
712683

713-
tracing::info!("Progress {progress}");
714-
// Check the progress
715684
if progress >= 1.0 {
716685
tracing::info!("Scanning completed");
717686
break;
718687
}
719688
}
720-
}
721-
Err(e) if e.to_string().contains("timed out") => {
722-
tracing::warn!("read_from_stream timeout, exiting scan");
723-
let unsubscribe_request = UnsubscribeRequest {
724-
scan_privkey: *wallet.indexer().scan_sk(),
725-
spend_pubkey: *wallet.indexer().spend_pk(),
726-
};
727-
let _ = client.unsubscribe(&unsubscribe_request).await;
728-
break;
729-
}
730-
Err(e) => {
731-
tracing::error!("read_from_stream error: {}", e);
732-
return Err(e.into());
689+
_ => tracing::error!("Notification event not supported"),
733690
}
734691
}
735692
}
693+
// Unsubscribe once scanning is done
694+
let unsub_req = UnsubscribeRequest {
695+
scan_priv_key: *wallet.indexer().scan_sk(),
696+
spend_pub_key: *wallet.indexer().spend_pk(),
697+
};
698+
client.unsubscribe(&unsub_req).await?;
736699
}
737700
Commands::Balance => {
738701
fn print_balances<'a>(

oracles/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ rayon = "1.11.0"
1414
reqwest = { version = "0.12.23", features = ["json", "rustls-tls", "http2", "charset"], default-features = false }
1515
serde = { version = "1.0.219", features = ["serde_derive"] }
1616
serde_json = { version = "1.0.142", features = ["raw_value"]}
17+
electrum_streaming_client = { path = "/home/sdmg15/workspace/electrum_streaming_client"}
1718
url = "2.5.4"
1819
tracing = "0.1.41"
1920
jsonrpc = "=0.18.0"

0 commit comments

Comments
 (0)