Skip to content

Commit 9af2d86

Browse files
committed
Add ElectrumSyncActor struct
We add an `ElectrumSyncActor` that follows the actor pattern (hence the name) to isolate the blocking/threaded API of `rust-electrum-client` from our async internals.
1 parent 7db0ba2 commit 9af2d86

File tree

2 files changed

+98
-0
lines changed

2 files changed

+98
-0
lines changed

src/chain/electrum.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// This file is Copyright its original authors, visible in version control history.
2+
//
3+
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5+
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
6+
// accordance with one or both of these licenses.
7+
8+
use crate::error::Error;
9+
use crate::logger::{log_error, LdkLogger, Logger};
10+
11+
use bitcoin::{FeeRate, Transaction};
12+
13+
use tokio::sync::oneshot;
14+
15+
use std::collections::VecDeque;
16+
use std::sync::{Arc, Condvar, Mutex};
17+
use std::thread;
18+
19+
type SyncCommandResultSender<T> = oneshot::Sender<Result<T, Error>>;
20+
type SyncCommandResultReceiver<T> = oneshot::Receiver<Result<T, Error>>;
21+
type SyncCommandQueue = Mutex<VecDeque<SyncCommand>>;
22+
23+
const NUM_WORKER_THREADS: usize = 2;
24+
25+
enum SyncCommand {
26+
Stop,
27+
}
28+
29+
struct ElectrumSyncActor {
30+
command_queue: Arc<SyncCommandQueue>,
31+
command_queue_condvar: Arc<Condvar>,
32+
worker_threads: Mutex<Vec<thread::JoinHandle<()>>>,
33+
logger: Arc<Logger>,
34+
}
35+
36+
impl ElectrumSyncActor {
37+
pub(crate) fn new(logger: Arc<Logger>) -> Self {
38+
let command_queue = Arc::new(Mutex::new(VecDeque::new()));
39+
let command_queue_condvar = Arc::new(Condvar::new());
40+
let worker_threads = Mutex::new(Vec::with_capacity(NUM_WORKER_THREADS));
41+
Self { command_queue, command_queue_condvar, worker_threads, logger }
42+
}
43+
44+
pub(crate) fn start(&self) -> Result<(), Error> {
45+
let mut worker_threads = self.worker_threads.lock().unwrap();
46+
for _ in 0..NUM_WORKER_THREADS {
47+
let command_queue = Arc::clone(&self.command_queue);
48+
let command_queue_condvar = Arc::clone(&self.command_queue_condvar);
49+
50+
let handle = std::thread::spawn(move || {
51+
queue_processing_loop(command_queue, command_queue_condvar);
52+
});
53+
worker_threads.push(handle);
54+
}
55+
56+
Ok(())
57+
}
58+
59+
pub(crate) async fn stop(&self) -> Result<(), Error> {
60+
{
61+
let mut cmd_queue = self.command_queue.lock().unwrap();
62+
let cmd = SyncCommand::Stop;
63+
cmd_queue.push_back(cmd);
64+
}
65+
self.command_queue_condvar.notify_one();
66+
67+
{
68+
let mut worker_threads = self.worker_threads.lock().unwrap();
69+
for handle in worker_threads.drain(..) {
70+
handle.join().expect("TODO");
71+
}
72+
}
73+
Ok(())
74+
}
75+
}
76+
77+
fn queue_processing_loop(
78+
command_queue: Arc<SyncCommandQueue>, command_queue_condvar: Arc<Condvar>,
79+
) {
80+
loop {
81+
let mut queue_lock = command_queue_condvar
82+
.wait_while(command_queue.lock().unwrap(), |queue| queue.is_empty())
83+
.unwrap();
84+
let next_cmd = match queue_lock.pop_front() {
85+
Some(cmd) => cmd,
86+
None => {
87+
debug_assert!(false, "We just checked the queue is never empty, so we should always have an entry to process.");
88+
continue;
89+
},
90+
};
91+
92+
match next_cmd {
93+
SyncCommand::Stop => break,
94+
_ => {},
95+
}
96+
}
97+
}

src/chain/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
// accordance with one or both of these licenses.
77

88
mod bitcoind_rpc;
9+
mod electrum;
910

1011
use crate::chain::bitcoind_rpc::{
1112
BitcoindRpcClient, BoundedHeaderCache, ChainListener, FeeRateEstimationMode,

0 commit comments

Comments
 (0)