Skip to content

Commit f55718f

Browse files
committed
Add ChannelScheduler and PayjoinHandler to Node
`ChannelSchedulder` is used by `PayjoinHandler` in order to manage scheduled channels for opening a channel through incoming payjoin requests. `PayjoinHandler` starts an http server in the background and allows the node to accept payjoin transactions on the new config arguemnt `payjoin_server_port` utilising `lightning_payjoin` crate.
1 parent 6afc7d0 commit f55718f

File tree

7 files changed

+433
-51
lines changed

7 files changed

+433
-51
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,14 @@ winapi = { version = "0.3", features = ["winbase"] }
8686

8787
[dev-dependencies]
8888
lightning = { git = "https://github.com/jbesraa/rust-lightning.git", rev = "d3e2d5a", features = ["std", "_test_utils"] }
89+
reqwest = { version = "0.11", default-features = false, features = ["blocking"] }
8990
# lightning = { version = "0.0.121", features = ["std", "_test_utils"] }
9091
#lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["std", "_test_utils"] }
9192
electrum-client = { version = "0.15.1", default-features = true }
9293
bitcoincore-rpc = { version = "0.17.0", default-features = false }
9394
proptest = "1.0.0"
9495
regex = "1.5.6"
96+
payjoin = { git = "https://github.com/jbesraa/rust-payjoin.git", rev = "9e4f454", features = ["send"] }
9597

9698
[target.'cfg(not(no_download))'.dev-dependencies]
9799
electrsd = { version = "0.26.0", features = ["legacy", "esplora_a33e97e1", "bitcoind_25_0"] }

src/builder.rs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::io::sqlite_store::SqliteStore;
1010
use crate::liquidity::LiquiditySource;
1111
use crate::logger::{log_error, FilesystemLogger, Logger};
1212
use crate::message_handler::NodeCustomMessageHandler;
13+
use crate::payjoin_handler::PayjoinHandler;
1314
use crate::payment_store::PaymentStore;
1415
use crate::peer_store::PeerStore;
1516
use crate::sweep::OutputSweeper;
@@ -38,6 +39,7 @@ use lightning::util::persist::{
3839
};
3940
use lightning::util::ser::ReadableArgs;
4041

42+
use lightning_payjoin::scheduler::ChannelScheduler;
4143
use lightning_persister::fs_store::FilesystemStore;
4244

4345
use lightning_transaction_sync::EsploraSyncClient;
@@ -93,6 +95,12 @@ struct LiquiditySourceConfig {
9395
lsps2_service: Option<(SocketAddress, PublicKey, Option<String>)>,
9496
}
9597

98+
#[derive(Debug, Clone)]
99+
struct PayjoinConfig {
100+
payjoin_directory: lightning_payjoin::Url,
101+
payjoin_relay: lightning_payjoin::Url,
102+
}
103+
96104
impl Default for LiquiditySourceConfig {
97105
fn default() -> Self {
98106
Self { lsps2_service: None }
@@ -166,6 +174,7 @@ pub struct NodeBuilder {
166174
chain_data_source_config: Option<ChainDataSourceConfig>,
167175
gossip_source_config: Option<GossipSourceConfig>,
168176
liquidity_source_config: Option<LiquiditySourceConfig>,
177+
payjoin_config: Option<PayjoinConfig>,
169178
}
170179

171180
impl NodeBuilder {
@@ -182,12 +191,14 @@ impl NodeBuilder {
182191
let chain_data_source_config = None;
183192
let gossip_source_config = None;
184193
let liquidity_source_config = None;
194+
let payjoin_config = None;
185195
Self {
186196
config,
187197
entropy_source_config,
188198
chain_data_source_config,
189199
gossip_source_config,
190200
liquidity_source_config,
201+
payjoin_config,
191202
}
192203
}
193204

@@ -242,6 +253,15 @@ impl NodeBuilder {
242253
self
243254
}
244255

256+
/// Configures the [`Node`] instance to source its gossip data from the given RapidGossipSync
257+
/// server.
258+
pub fn set_payjoin_config(
259+
&mut self, payjoin_directory: lightning_payjoin::Url, payjoin_relay: lightning_payjoin::Url,
260+
) -> &mut Self {
261+
self.payjoin_config = Some(PayjoinConfig { payjoin_directory, payjoin_relay });
262+
self
263+
}
264+
245265
/// Configures the [`Node`] instance to source its inbound liquidity from the given
246266
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
247267
/// service.
@@ -362,6 +382,7 @@ impl NodeBuilder {
362382
self.chain_data_source_config.as_ref(),
363383
self.gossip_source_config.as_ref(),
364384
self.liquidity_source_config.as_ref(),
385+
self.payjoin_config.as_ref(),
365386
seed_bytes,
366387
logger,
367388
vss_store,
@@ -385,6 +406,7 @@ impl NodeBuilder {
385406
self.chain_data_source_config.as_ref(),
386407
self.gossip_source_config.as_ref(),
387408
self.liquidity_source_config.as_ref(),
409+
self.payjoin_config.as_ref(),
388410
seed_bytes,
389411
logger,
390412
kv_store,
@@ -458,6 +480,13 @@ impl ArcedNodeBuilder {
458480
self.inner.write().unwrap().set_gossip_source_rgs(rgs_server_url);
459481
}
460482

483+
/// Configures the [`Node`] instance to use payjoin.
484+
pub fn set_payjoin_config(
485+
&self, payjoin_directory: lightning_payjoin::Url, payjoin_relay: lightning_payjoin::Url,
486+
) {
487+
self.inner.write().unwrap().set_payjoin_config(payjoin_directory, payjoin_relay);
488+
}
489+
461490
/// Configures the [`Node`] instance to source its inbound liquidity from the given
462491
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
463492
/// service.
@@ -522,8 +551,9 @@ impl ArcedNodeBuilder {
522551
fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
523552
config: Arc<Config>, chain_data_source_config: Option<&ChainDataSourceConfig>,
524553
gossip_source_config: Option<&GossipSourceConfig>,
525-
liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64],
526-
logger: Arc<FilesystemLogger>, kv_store: Arc<K>,
554+
liquidity_source_config: Option<&LiquiditySourceConfig>,
555+
payjoin_config: Option<&PayjoinConfig>, seed_bytes: [u8; 64], logger: Arc<FilesystemLogger>,
556+
kv_store: Arc<K>,
527557
) -> Result<Node<K>, BuildError> {
528558
// Initialize the on-chain wallet and chain access
529559
let xprv = bitcoin::bip32::ExtendedPrivKey::new_master(config.network.into(), &seed_bytes)
@@ -556,6 +586,7 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
556586
log_error!(logger, "Failed to set up wallet: {}", e);
557587
BuildError::WalletSetupFailed
558588
})?;
589+
let channel_scheduler = Arc::new(tokio::sync::Mutex::new(ChannelScheduler::new()));
559590

560591
let (blockchain, tx_sync, tx_broadcaster, fee_estimator) = match chain_data_source_config {
561592
Some(ChainDataSourceConfig::Esplora(server_url)) => {
@@ -566,6 +597,7 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
566597
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
567598
tx_sync.client().clone(),
568599
Arc::clone(&logger),
600+
Arc::clone(&channel_scheduler),
569601
));
570602
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
571603
tx_sync.client().clone(),
@@ -584,6 +616,7 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
584616
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
585617
tx_sync.client().clone(),
586618
Arc::clone(&logger),
619+
Arc::clone(&channel_scheduler),
587620
));
588621
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
589622
tx_sync.client().clone(),
@@ -945,6 +978,17 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
945978
};
946979

947980
let (stop_sender, _) = tokio::sync::watch::channel(());
981+
let payjoin_handler = if let Some(payjoin_config) = payjoin_config {
982+
Some(Arc::new(PayjoinHandler::new(
983+
Arc::clone(&wallet),
984+
Arc::clone(&channel_manager),
985+
Arc::clone(&channel_scheduler),
986+
payjoin_config.payjoin_directory.clone(),
987+
payjoin_config.payjoin_relay.clone(),
988+
)))
989+
} else {
990+
None
991+
};
948992

949993
let is_listening = Arc::new(AtomicBool::new(false));
950994
let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None));
@@ -965,6 +1009,8 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
9651009
channel_manager,
9661010
chain_monitor,
9671011
output_sweeper,
1012+
payjoin_handler,
1013+
channel_scheduler,
9681014
peer_manager,
9691015
keys_manager,
9701016
network_graph,

src/event.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use bitcoin::blockdata::locktime::absolute::LockTime;
2929
use bitcoin::secp256k1::PublicKey;
3030
use bitcoin::OutPoint;
3131

32+
use lightning_payjoin::scheduler::ChannelScheduler;
3233
use rand::{thread_rng, Rng};
3334

3435
use core::future::Future;
@@ -302,6 +303,7 @@ where
302303
output_sweeper: Arc<Sweeper<K>>,
303304
network_graph: Arc<NetworkGraph>,
304305
payment_store: Arc<PaymentStore<K, L>>,
306+
channel_scheduler: Arc<tokio::sync::Mutex<ChannelScheduler>>,
305307
peer_store: Arc<PeerStore<K, L>>,
306308
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
307309
logger: L,
@@ -318,6 +320,7 @@ where
318320
network_graph: Arc<NetworkGraph>, payment_store: Arc<PaymentStore<K, L>>,
319321
peer_store: Arc<PeerStore<K, L>>, runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
320322
logger: L, config: Arc<Config>,
323+
channel_scheduler: Arc<tokio::sync::Mutex<ChannelScheduler>>,
321324
) -> Self {
322325
Self {
323326
event_queue,
@@ -326,6 +329,7 @@ where
326329
output_sweeper,
327330
network_graph,
328331
payment_store,
332+
channel_scheduler,
329333
peer_store,
330334
logger,
331335
runtime,
@@ -340,6 +344,7 @@ where
340344
counterparty_node_id,
341345
channel_value_satoshis,
342346
output_script,
347+
user_channel_id,
343348
..
344349
} => {
345350
// Construct the raw transaction with the output that is paid the amount of the
@@ -350,6 +355,16 @@ where
350355
let cur_height = self.channel_manager.current_best_block().height();
351356
let locktime = LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO);
352357

358+
let mut channel_scheduler = self.channel_scheduler.lock().await;
359+
if channel_scheduler.is_channel_created(user_channel_id) {
360+
channel_scheduler.set_channel_accepted(
361+
user_channel_id,
362+
output_script,
363+
temporary_channel_id,
364+
);
365+
return {};
366+
}
367+
353368
// Sign the final funding transaction and broadcast it.
354369
match self.wallet.create_funding_transaction(
355370
output_script,

src/lib.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ pub use error::Error as NodeError;
107107
use error::Error;
108108

109109
pub use event::Event;
110+
use lightning_payjoin::scheduler::{ChannelScheduler, ScheduledChannel};
111+
use payjoin_handler::PayjoinHandler;
110112
pub use types::{BestBlock, ChannelConfig};
111113
mod payjoin_handler;
112114

@@ -190,6 +192,8 @@ pub struct Node<K: KVStore + Sync + Send + 'static> {
190192
channel_manager: Arc<ChannelManager<K>>,
191193
chain_monitor: Arc<ChainMonitor<K>>,
192194
output_sweeper: Arc<Sweeper<K>>,
195+
payjoin_handler: Option<Arc<PayjoinHandler<K>>>,
196+
channel_scheduler: Arc<tokio::sync::Mutex<ChannelScheduler>>,
193197
peer_manager: Arc<PeerManager<K>>,
194198
keys_manager: Arc<KeysManager>,
195199
network_graph: Arc<NetworkGraph>,
@@ -501,6 +505,28 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
501505
});
502506
}
503507

508+
if let Some(payjoin_handler) = &self.payjoin_handler {
509+
let mut stop_payjoin_server = self.stop_sender.subscribe();
510+
let payjoin_handler = Arc::clone(&payjoin_handler);
511+
let payjoin_check_interval = 2;
512+
runtime.spawn(async move {
513+
let mut payjoin_interval =
514+
tokio::time::interval(Duration::from_secs(payjoin_check_interval));
515+
payjoin_interval.reset();
516+
payjoin_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
517+
loop {
518+
tokio::select! {
519+
_ = stop_payjoin_server.changed() => {
520+
return;
521+
}
522+
_ = payjoin_interval.tick() => {
523+
let _ = payjoin_handler.process_request().await;
524+
}
525+
}
526+
}
527+
});
528+
}
529+
504530
// Regularly reconnect to persisted peers.
505531
let connect_pm = Arc::clone(&self.peer_manager);
506532
let connect_logger = Arc::clone(&self.logger);
@@ -639,6 +665,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
639665
Arc::clone(&self.runtime),
640666
Arc::clone(&self.logger),
641667
Arc::clone(&self.config),
668+
Arc::clone(&self.channel_scheduler),
642669
));
643670

644671
// Setup background processing
@@ -708,6 +735,60 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
708735
Ok(())
709736
}
710737

738+
/// Open a channel with funds coming from a payjoin transaction.
739+
///
740+
/// This method will add a new channel to the scheduler and start the process of opening a
741+
/// channel with the give node until it reaches the `ChannelAccepted` state.
742+
///
743+
/// In order to finalise the channel opening process, the user must make a payjoin payment to
744+
/// the returned BIP21 URL.
745+
pub async fn open_payjoin_channel(
746+
&self, channel_amount_sats: u64, push_msat: Option<u64>, announce_channel: bool,
747+
node_id: PublicKey, address: SocketAddress,
748+
) -> Result<String, Error> {
749+
let rt_lock = self.runtime.read().unwrap();
750+
if rt_lock.is_none() {
751+
return Err(Error::NotRunning);
752+
}
753+
let runtime = rt_lock.as_ref().unwrap();
754+
let user_channel_id: u128 = rand::thread_rng().gen::<u128>();
755+
self.channel_scheduler.lock().await.schedule(
756+
bitcoin::Amount::from_sat(channel_amount_sats),
757+
node_id,
758+
user_channel_id,
759+
);
760+
let user_config = UserConfig {
761+
channel_handshake_limits: Default::default(),
762+
channel_handshake_config: ChannelHandshakeConfig {
763+
announced_channel: announce_channel,
764+
..Default::default()
765+
},
766+
..Default::default()
767+
};
768+
let push_msat = push_msat.unwrap_or(0);
769+
self.internal_connect_open_channel(
770+
node_id,
771+
channel_amount_sats,
772+
push_msat,
773+
user_channel_id,
774+
address,
775+
Some(user_config),
776+
runtime,
777+
)?;
778+
let payjoin_uri = self
779+
.payjoin_handler
780+
.as_ref()
781+
.ok_or(Error::WalletOperationFailed)?
782+
.payjoin_uri(bitcoin::Amount::from_sat(channel_amount_sats))
783+
.await;
784+
Ok(payjoin_uri)
785+
}
786+
787+
/// List all scheduled payjoin channels.
788+
pub async fn list_scheduled_channels(&self) -> Result<Vec<ScheduledChannel>, Error> {
789+
Ok(self.channel_scheduler.lock().await.list_channels().clone())
790+
}
791+
711792
/// Disconnects all peers, stops all running background tasks, and shuts down [`Node`].
712793
///
713794
/// After this returns most API methods will return [`Error::NotRunning`].

0 commit comments

Comments
 (0)