Skip to content

Commit 1b0bbac

Browse files
committed
Add ChannelScheduler and PayjoinHandler to Node
`ChannelSchedulder` is used by `PayjoinHandler` in order to manage scheduled channels for opening a channel through incoming payjoin requests. `PayjoinHandler` starts an http server in the background and allows the node to accept payjoin transactions on the new config arguemnt `payjoin_server_port` utilising `lightning_payjoin` crate.
1 parent 88bed87 commit 1b0bbac

File tree

7 files changed

+437
-50
lines changed

7 files changed

+437
-50
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ bitcoincore-rpc = { version = "0.17.0", default-features = false }
9090
proptest = "1.0.0"
9191
regex = "1.5.6"
9292
reqwest = { version = "0.11", default-features = false, features = ["blocking"] }
93+
payjoin = { git = "https://github.com/jbesraa/rust-payjoin.git", rev = "9e4f454", features = ["send"] }
9394

9495
[target.'cfg(not(no_download))'.dev-dependencies]
9596
electrsd = { version = "0.26.0", features = ["legacy", "esplora_a33e97e1", "bitcoind_25_0"] }

src/builder.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::io::sqlite_store::SqliteStore;
1111
use crate::liquidity::LiquiditySource;
1212
use crate::logger::{log_error, log_info, FilesystemLogger, Logger};
1313
use crate::message_handler::NodeCustomMessageHandler;
14+
use crate::payjoin_handler::PayjoinHandler;
1415
use crate::payment_store::PaymentStore;
1516
use crate::peer_store::PeerStore;
1617
use crate::tx_broadcaster::TransactionBroadcaster;
@@ -39,6 +40,7 @@ use lightning::util::persist::{
3940
use lightning::util::ser::ReadableArgs;
4041
use lightning::util::sweep::OutputSweeper;
4142

43+
use lightning_payjoin::scheduler::ChannelScheduler;
4244
use lightning_persister::fs_store::FilesystemStore;
4345

4446
use lightning_transaction_sync::EsploraSyncClient;
@@ -94,6 +96,12 @@ struct LiquiditySourceConfig {
9496
lsps2_service: Option<(SocketAddress, PublicKey, Option<String>)>,
9597
}
9698

99+
#[derive(Debug, Clone)]
100+
struct PayjoinConfig {
101+
payjoin_directory: lightning_payjoin::Url,
102+
payjoin_relay: lightning_payjoin::Url,
103+
}
104+
97105
impl Default for LiquiditySourceConfig {
98106
fn default() -> Self {
99107
Self { lsps2_service: None }
@@ -173,6 +181,7 @@ pub struct NodeBuilder {
173181
chain_data_source_config: Option<ChainDataSourceConfig>,
174182
gossip_source_config: Option<GossipSourceConfig>,
175183
liquidity_source_config: Option<LiquiditySourceConfig>,
184+
payjoin_config: Option<PayjoinConfig>,
176185
}
177186

178187
impl NodeBuilder {
@@ -189,12 +198,14 @@ impl NodeBuilder {
189198
let chain_data_source_config = None;
190199
let gossip_source_config = None;
191200
let liquidity_source_config = None;
201+
let payjoin_config = None;
192202
Self {
193203
config,
194204
entropy_source_config,
195205
chain_data_source_config,
196206
gossip_source_config,
197207
liquidity_source_config,
208+
payjoin_config,
198209
}
199210
}
200211

@@ -249,6 +260,15 @@ impl NodeBuilder {
249260
self
250261
}
251262

263+
/// Configures the [`Node`] instance to source its gossip data from the given RapidGossipSync
264+
/// server.
265+
pub fn set_payjoin_config(
266+
&mut self, payjoin_directory: lightning_payjoin::Url, payjoin_relay: lightning_payjoin::Url,
267+
) -> &mut Self {
268+
self.payjoin_config = Some(PayjoinConfig { payjoin_directory, payjoin_relay });
269+
self
270+
}
271+
252272
/// Configures the [`Node`] instance to source its inbound liquidity from the given
253273
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
254274
/// service.
@@ -367,6 +387,7 @@ impl NodeBuilder {
367387
self.chain_data_source_config.as_ref(),
368388
self.gossip_source_config.as_ref(),
369389
self.liquidity_source_config.as_ref(),
390+
self.payjoin_config.as_ref(),
370391
seed_bytes,
371392
logger,
372393
vss_store,
@@ -391,6 +412,7 @@ impl NodeBuilder {
391412
seed_bytes,
392413
logger,
393414
kv_store,
415+
self.payjoin_config.as_ref(),
394416
)
395417
}
396418
}
@@ -461,6 +483,13 @@ impl ArcedNodeBuilder {
461483
self.inner.write().unwrap().set_gossip_source_rgs(rgs_server_url);
462484
}
463485

486+
/// Configures the [`Node`] instance to use payjoin.
487+
pub fn set_payjoin_config(
488+
&self, payjoin_directory: lightning_payjoin::Url, payjoin_relay: lightning_payjoin::Url,
489+
) {
490+
self.inner.write().unwrap().set_payjoin_config(payjoin_directory, payjoin_relay);
491+
}
492+
464493
/// Configures the [`Node`] instance to source its inbound liquidity from the given
465494
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
466495
/// service.
@@ -524,7 +553,7 @@ fn build_with_store_internal(
524553
config: Arc<Config>, chain_data_source_config: Option<&ChainDataSourceConfig>,
525554
gossip_source_config: Option<&GossipSourceConfig>,
526555
liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64],
527-
logger: Arc<FilesystemLogger>, kv_store: Arc<DynStore>,
556+
logger: Arc<FilesystemLogger>, kv_store: Arc<DynStore>, payjoin_config: Option<&PayjoinConfig>,
528557
) -> Result<Node, BuildError> {
529558
// Initialize the on-chain wallet and chain access
530559
let xprv = bitcoin::bip32::ExtendedPrivKey::new_master(config.network.into(), &seed_bytes)
@@ -557,6 +586,7 @@ fn build_with_store_internal(
557586
log_error!(logger, "Failed to set up wallet: {}", e);
558587
BuildError::WalletSetupFailed
559588
})?;
589+
let channel_scheduler = Arc::new(tokio::sync::Mutex::new(ChannelScheduler::new()));
560590

561591
let (blockchain, tx_sync, tx_broadcaster, fee_estimator) = match chain_data_source_config {
562592
Some(ChainDataSourceConfig::Esplora(server_url)) => {
@@ -567,6 +597,7 @@ fn build_with_store_internal(
567597
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
568598
tx_sync.client().clone(),
569599
Arc::clone(&logger),
600+
Arc::clone(&channel_scheduler),
570601
));
571602
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
572603
tx_sync.client().clone(),
@@ -585,6 +616,7 @@ fn build_with_store_internal(
585616
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
586617
tx_sync.client().clone(),
587618
Arc::clone(&logger),
619+
Arc::clone(&channel_scheduler),
588620
));
589621
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
590622
tx_sync.client().clone(),
@@ -974,6 +1006,17 @@ fn build_with_store_internal(
9741006
};
9751007

9761008
let (stop_sender, _) = tokio::sync::watch::channel(());
1009+
let payjoin_handler = if let Some(payjoin_config) = payjoin_config {
1010+
Some(Arc::new(PayjoinHandler::new(
1011+
Arc::clone(&wallet),
1012+
Arc::clone(&channel_manager),
1013+
Arc::clone(&channel_scheduler),
1014+
payjoin_config.payjoin_directory.clone(),
1015+
payjoin_config.payjoin_relay.clone(),
1016+
)))
1017+
} else {
1018+
None
1019+
};
9771020

9781021
let is_listening = Arc::new(AtomicBool::new(false));
9791022
let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None));
@@ -994,6 +1037,8 @@ fn build_with_store_internal(
9941037
channel_manager,
9951038
chain_monitor,
9961039
output_sweeper,
1040+
payjoin_handler,
1041+
channel_scheduler,
9971042
peer_manager,
9981043
connection_manager,
9991044
keys_manager,

src/event.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use bitcoin::blockdata::locktime::absolute::LockTime;
2828
use bitcoin::secp256k1::PublicKey;
2929
use bitcoin::OutPoint;
3030

31+
use lightning_payjoin::scheduler::ChannelScheduler;
3132
use rand::{thread_rng, Rng};
3233

3334
use core::future::Future;
@@ -305,6 +306,7 @@ where
305306
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
306307
logger: L,
307308
config: Arc<Config>,
309+
channel_scheduler: Arc<tokio::sync::Mutex<ChannelScheduler>>,
308310
}
309311

310312
impl<L: Deref + Clone + Sync + Send + 'static> EventHandler<L>
@@ -316,6 +318,7 @@ where
316318
output_sweeper: Arc<Sweeper>, network_graph: Arc<NetworkGraph>,
317319
payment_store: Arc<PaymentStore<L>>, peer_store: Arc<PeerStore<L>>,
318320
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>, logger: L, config: Arc<Config>,
321+
channel_scheduler: Arc<tokio::sync::Mutex<ChannelScheduler>>,
319322
) -> Self {
320323
Self {
321324
event_queue,
@@ -324,6 +327,7 @@ where
324327
output_sweeper,
325328
network_graph,
326329
payment_store,
330+
channel_scheduler,
327331
peer_store,
328332
logger,
329333
runtime,
@@ -338,6 +342,7 @@ where
338342
counterparty_node_id,
339343
channel_value_satoshis,
340344
output_script,
345+
user_channel_id,
341346
..
342347
} => {
343348
// Construct the raw transaction with the output that is paid the amount of the
@@ -348,6 +353,16 @@ where
348353
let cur_height = self.channel_manager.current_best_block().height;
349354
let locktime = LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO);
350355

356+
let mut channel_scheduler = self.channel_scheduler.lock().await;
357+
if channel_scheduler.is_channel_created(user_channel_id) {
358+
channel_scheduler.set_channel_accepted(
359+
user_channel_id,
360+
output_script,
361+
temporary_channel_id.0,
362+
);
363+
return {};
364+
}
365+
351366
// Sign the final funding transaction and broadcast it.
352367
match self.wallet.create_funding_transaction(
353368
output_script,

src/lib.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ pub use error::Error as NodeError;
108108
use error::Error;
109109

110110
pub use event::Event;
111+
use lightning_payjoin::scheduler::{ChannelScheduler, ScheduledChannel};
112+
use payjoin_handler::PayjoinHandler;
111113
pub use types::ChannelConfig;
112114
mod payjoin_handler;
113115

@@ -191,6 +193,8 @@ pub struct Node {
191193
output_sweeper: Arc<Sweeper>,
192194
peer_manager: Arc<PeerManager>,
193195
connection_manager: Arc<ConnectionManager<Arc<FilesystemLogger>>>,
196+
payjoin_handler: Option<Arc<PayjoinHandler>>,
197+
channel_scheduler: Arc<tokio::sync::Mutex<ChannelScheduler>>,
194198
keys_manager: Arc<KeysManager>,
195199
network_graph: Arc<NetworkGraph>,
196200
gossip_source: Arc<GossipSource>,
@@ -501,6 +505,28 @@ impl Node {
501505
});
502506
}
503507

508+
if let Some(payjoin_handler) = &self.payjoin_handler {
509+
let mut stop_payjoin_server = self.stop_sender.subscribe();
510+
let payjoin_handler = Arc::clone(&payjoin_handler);
511+
let payjoin_check_interval = 2;
512+
runtime.spawn(async move {
513+
let mut payjoin_interval =
514+
tokio::time::interval(Duration::from_secs(payjoin_check_interval));
515+
payjoin_interval.reset();
516+
payjoin_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
517+
loop {
518+
tokio::select! {
519+
_ = stop_payjoin_server.changed() => {
520+
return;
521+
}
522+
_ = payjoin_interval.tick() => {
523+
let _ = payjoin_handler.process_request().await;
524+
}
525+
}
526+
}
527+
});
528+
}
529+
504530
// Regularly reconnect to persisted peers.
505531
let connect_cm = Arc::clone(&self.connection_manager);
506532
let connect_pm = Arc::clone(&self.peer_manager);
@@ -638,6 +664,7 @@ impl Node {
638664
Arc::clone(&self.runtime),
639665
Arc::clone(&self.logger),
640666
Arc::clone(&self.config),
667+
Arc::clone(&self.channel_scheduler),
641668
));
642669

643670
// Setup background processing
@@ -707,6 +734,67 @@ impl Node {
707734
Ok(())
708735
}
709736

737+
/// Open a channel with funds coming from a payjoin transaction.
738+
///
739+
/// This method will add a new channel to the scheduler and start the process of opening a
740+
/// channel with the give node until it reaches the `ChannelAccepted` state.
741+
///
742+
/// In order to finalise the channel opening process, the user must make a payjoin payment to
743+
/// the returned BIP21 URL.
744+
pub async fn open_payjoin_channel(
745+
&self, channel_amount_sats: u64, push_msat: Option<u64>, announce_channel: bool,
746+
node_id: PublicKey, address: SocketAddress,
747+
) -> Result<String, Error> {
748+
let rt_lock = self.runtime.read().unwrap();
749+
if rt_lock.is_none() {
750+
return Err(Error::NotRunning);
751+
}
752+
let runtime = rt_lock.as_ref().unwrap();
753+
let user_channel_id: u128 = rand::thread_rng().gen::<u128>();
754+
let payjoin_handler = self.payjoin_handler.as_ref().ok_or(Error::WalletOperationFailed)?; // fix error
755+
payjoin_handler
756+
.schedule_channel(
757+
bitcoin::Amount::from_sat(channel_amount_sats),
758+
node_id,
759+
user_channel_id,
760+
)
761+
.await;
762+
let user_config = UserConfig {
763+
channel_handshake_limits: Default::default(),
764+
channel_handshake_config: ChannelHandshakeConfig {
765+
announced_channel: announce_channel,
766+
..Default::default()
767+
},
768+
..Default::default()
769+
};
770+
let push_msat = push_msat.unwrap_or(0);
771+
self.internal_connect_open_channel(
772+
node_id,
773+
channel_amount_sats,
774+
push_msat,
775+
user_channel_id,
776+
address,
777+
Some(user_config),
778+
runtime,
779+
)?;
780+
let payjoin_uri = self
781+
.payjoin_handler
782+
.as_ref()
783+
.ok_or(Error::WalletOperationFailed)?
784+
.payjoin_uri(bitcoin::Amount::from_sat(channel_amount_sats))
785+
.await;
786+
Ok(payjoin_uri)
787+
}
788+
789+
/// List all scheduled payjoin channels.
790+
pub async fn list_scheduled_channels(&self) -> Result<Vec<ScheduledChannel>, Error> {
791+
if let Some(payjoin_handler) = self.payjoin_handler.clone() {
792+
Ok(payjoin_handler.list_scheduled_channels().await)
793+
} else {
794+
Ok(Vec::new())
795+
}
796+
}
797+
710798
/// Disconnects all peers, stops all running background tasks, and shuts down [`Node`].
711799
///
712800
/// After this returns most API methods will return [`Error::NotRunning`].

0 commit comments

Comments
 (0)