Skip to content

Commit dbc9469

Browse files
committed
f WIP
1 parent 9af2d86 commit dbc9469

File tree

2 files changed

+61
-15
lines changed

2 files changed

+61
-15
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ rand = "0.8.5"
7777
chrono = { version = "0.4", default-features = false, features = ["clock"] }
7878
tokio = { version = "1.37", default-features = false, features = [ "rt-multi-thread", "time", "sync", "macros" ] }
7979
esplora-client = { version = "0.11", default-features = false, features = ["tokio", "async-https-rustls"] }
80+
electrum-client = { version = "0.22.0", default-features = true }
8081
libc = "0.2"
8182
uniffi = { version = "0.27.3", features = ["build"], optional = true }
8283
serde = { version = "1.0.210", default-features = false, features = ["std", "derive"] }

src/chain/electrum.rs

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,39 +6,44 @@
66
// accordance with one or both of these licenses.
77

88
use crate::error::Error;
9-
use crate::logger::{log_error, LdkLogger, Logger};
9+
use crate::logger::Logger;
1010

11-
use bitcoin::{FeeRate, Transaction};
11+
use electrum_client::{Batch, Client, ElectrumApi};
1212

1313
use tokio::sync::oneshot;
1414

1515
use std::collections::VecDeque;
16+
use std::mem;
1617
use std::sync::{Arc, Condvar, Mutex};
1718
use std::thread;
19+
use std::time::{Duration, SystemTime};
1820

1921
type SyncCommandResultSender<T> = oneshot::Sender<Result<T, Error>>;
2022
type SyncCommandResultReceiver<T> = oneshot::Receiver<Result<T, Error>>;
2123
type SyncCommandQueue = Mutex<VecDeque<SyncCommand>>;
2224

2325
const NUM_WORKER_THREADS: usize = 2;
26+
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
2427

2528
enum SyncCommand {
2629
Stop,
30+
EstimateFees { targets: Vec<usize>, result_sender: SyncCommandResultSender<Vec<f64>> },
2731
}
2832

2933
struct ElectrumSyncActor {
34+
server_url: String,
3035
command_queue: Arc<SyncCommandQueue>,
3136
command_queue_condvar: Arc<Condvar>,
32-
worker_threads: Mutex<Vec<thread::JoinHandle<()>>>,
37+
worker_threads: Mutex<Vec<thread::JoinHandle<Result<(), Error>>>>,
3338
logger: Arc<Logger>,
3439
}
3540

3641
impl ElectrumSyncActor {
37-
pub(crate) fn new(logger: Arc<Logger>) -> Self {
42+
pub(crate) fn new(server_url: String, logger: Arc<Logger>) -> Self {
3843
let command_queue = Arc::new(Mutex::new(VecDeque::new()));
3944
let command_queue_condvar = Arc::new(Condvar::new());
4045
let worker_threads = Mutex::new(Vec::with_capacity(NUM_WORKER_THREADS));
41-
Self { command_queue, command_queue_condvar, worker_threads, logger }
46+
Self { server_url, command_queue, command_queue_condvar, worker_threads, logger }
4247
}
4348

4449
pub(crate) fn start(&self) -> Result<(), Error> {
@@ -47,8 +52,9 @@ impl ElectrumSyncActor {
4752
let command_queue = Arc::clone(&self.command_queue);
4853
let command_queue_condvar = Arc::clone(&self.command_queue_condvar);
4954

55+
let server_url = self.server_url.clone();
5056
let handle = std::thread::spawn(move || {
51-
queue_processing_loop(command_queue, command_queue_condvar);
57+
queue_processing_loop(server_url, command_queue, command_queue_condvar)
5258
});
5359
worker_threads.push(handle);
5460
}
@@ -62,26 +68,48 @@ impl ElectrumSyncActor {
6268
let cmd = SyncCommand::Stop;
6369
cmd_queue.push_back(cmd);
6470
}
65-
self.command_queue_condvar.notify_one();
71+
self.command_queue_condvar.notify_all();
6672

6773
{
68-
let mut worker_threads = self.worker_threads.lock().unwrap();
69-
for handle in worker_threads.drain(..) {
70-
handle.join().expect("TODO");
74+
let mut worker_threads_lock = self.worker_threads.lock().unwrap();
75+
let mut worker_threads = mem::take(&mut *worker_threads_lock);
76+
let all_finished = worker_threads.iter().all(|h| h.is_finished());
77+
78+
if all_finished {
79+
for handle in worker_threads.drain(..) {
80+
if handle.is_finished() {
81+
handle.join().expect("TODO");
82+
}
83+
}
7184
}
7285
}
7386
Ok(())
7487
}
88+
89+
pub(crate) async fn estimate_fees(&self, targets: Vec<usize>) -> Result<Vec<f64>, Error> {
90+
let (result_sender, result_receiver) = oneshot::channel();
91+
{
92+
let mut cmd_queue = self.command_queue.lock().unwrap();
93+
let cmd = SyncCommand::EstimateFees { targets, result_sender };
94+
cmd_queue.push_back(cmd);
95+
}
96+
self.command_queue_condvar.notify_all();
97+
// TODO:
98+
result_receiver.await.unwrap()
99+
}
75100
}
76101

77102
fn queue_processing_loop(
78-
command_queue: Arc<SyncCommandQueue>, command_queue_condvar: Arc<Condvar>,
79-
) {
80-
loop {
103+
server_url: String, command_queue: Arc<SyncCommandQueue>, command_queue_condvar: Arc<Condvar>,
104+
) -> Result<(), Error> {
105+
let client = Client::new(&server_url).map_err(|e| Error::TxSyncFailed)?;
106+
107+
'worker_loop: loop {
81108
let mut queue_lock = command_queue_condvar
82109
.wait_while(command_queue.lock().unwrap(), |queue| queue.is_empty())
83110
.unwrap();
84-
let next_cmd = match queue_lock.pop_front() {
111+
112+
let next_cmd = match queue_lock.front() {
85113
Some(cmd) => cmd,
86114
None => {
87115
debug_assert!(false, "We just checked the queue is never empty, so we should always have an entry to process.");
@@ -90,8 +118,25 @@ fn queue_processing_loop(
90118
};
91119

92120
match next_cmd {
93-
SyncCommand::Stop => break,
121+
SyncCommand::Stop => {
122+
// Early-abort on `Stop` leaving it the front of the queue so all other workers
123+
// will also try to handle it.
124+
break 'worker_loop;
125+
},
126+
SyncCommand::EstimateFees { targets, result_sender } => {
127+
let mut batch = Batch::default();
128+
for t in targets {
129+
batch.estimate_fee(*t)
130+
}
131+
let res = client
132+
.batch_call(&batch)
133+
.map(|r| r.into_iter().map(|v| v.as_f64()))
134+
.map_err(|_| Error::TxSyncFailed);
135+
},
94136
_ => {},
95137
}
138+
139+
queue_lock.pop_front();
96140
}
141+
Ok(())
97142
}

0 commit comments

Comments
 (0)