Skip to content

Commit 5f292b1

Browse files
committed
Add v2 fallback subcommand
Co-authored-by: zealsham <shammahagwor@gmail.com> Co-authored-by: DanGould <d@ngould.dev> The command broadcasts the fallback tx for a specific sender session. If a session fails or is canceled, pj-cli outputs the fallback command for that particular session.
1 parent f6bbe89 commit 5f292b1

8 files changed

Lines changed: 231 additions & 21 deletions

File tree

payjoin-cli/src/app/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,8 @@ fn handle_subcommands(config: Builder, cli: &Cli) -> Result<Builder, ConfigError
336336
Commands::Resume => Ok(config),
337337
#[cfg(feature = "v2")]
338338
Commands::History => Ok(config),
339+
#[cfg(feature = "v2")]
340+
Commands::Fallback { .. } => Ok(config),
339341
}
340342
}
341343

payjoin-cli/src/app/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ pub mod config;
1010
pub mod wallet;
1111
use crate::app::config::Config;
1212
use crate::app::wallet::BitcoindWallet;
13+
#[cfg(feature = "v2")]
14+
use crate::db::v2::SessionId;
1315

1416
#[cfg(feature = "v1")]
1517
pub(crate) mod v1;
@@ -28,6 +30,8 @@ pub trait App: Send + Sync {
2830
async fn resume_payjoins(&self) -> Result<()>;
2931
#[cfg(feature = "v2")]
3032
async fn history(&self) -> Result<()>;
33+
#[cfg(feature = "v2")]
34+
async fn fallback_sender(&self, session_id: SessionId) -> Result<()>;
3135

3236
fn create_original_psbt(
3337
&self,

payjoin-cli/src/app/v1.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ impl AppTrait for App {
116116
async fn history(&self) -> Result<()> {
117117
unimplemented!("history not implemented for v1");
118118
}
119+
120+
#[cfg(feature = "v2")]
121+
async fn fallback_sender(&self, _session_id: crate::db::v2::SessionId) -> Result<()> {
122+
anyhow::bail!("fallback is only supported for v2 (BIP77) sessions")
123+
}
119124
}
120125

121126
impl App {

payjoin-cli/src/app/v2/mod.rs

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,9 +241,21 @@ impl AppTrait for App {
241241
};
242242
let mut interrupt = self.interrupt.clone();
243243
tokio::select! {
244-
res = self.process_sender_session(sender_state, &persister) => return res,
244+
res = self.process_sender_session(sender_state, &persister) => {
245+
match res {
246+
Ok(()) => return Ok(()),
247+
Err(err) => {
248+
let id = persister.session_id();
249+
println!("Session {id} failed. Run `payjoin-cli fallback {id}` to broadcast the original transaction.");
250+
return Err(err);
251+
}
252+
}
253+
},
245254
_ = interrupt.changed() => {
246-
println!("Interrupted. Call `send` with the same arguments to resume this session or `resume` to resume all sessions.");
255+
let id = persister.session_id();
256+
println!(
257+
"Session {id} interrupted. Call `send` again to resume, `resume` to resume all sessions, or `payjoin-cli fallback {id}` to broadcast the original transaction."
258+
);
247259
return Err(anyhow!("Interrupted"))
248260
}
249261
}
@@ -461,6 +473,32 @@ impl AppTrait for App {
461473

462474
Ok(())
463475
}
476+
477+
async fn fallback_sender(&self, session_id: SessionId) -> Result<()> {
478+
let persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
479+
let (session, history) = replay_sender_event_log(&persister)?;
480+
481+
if let SendSession::Closed(SenderSessionOutcome::Success(proposal)) = session {
482+
let txid = proposal.clone().extract_tx_unchecked_fee_rate().compute_txid();
483+
println!(
484+
"Session {session_id} already produced payjoin transaction {txid}. \
485+
Broadcasting the original now would double-spend against it. \
486+
If the payjoin tx needs re-broadcast, run \
487+
`bitcoin-cli gettransaction {txid}` to fetch the hex, then \
488+
`bitcoin-cli sendrawtransaction <hex>`."
489+
);
490+
return Ok(());
491+
}
492+
493+
let fallback_tx = history.fallback_tx();
494+
self.wallet().broadcast_tx(&fallback_tx)?;
495+
println!("Broadcasted fallback transaction txid: {}", fallback_tx.compute_txid());
496+
497+
if let Err(e) = SessionPersister::close(&persister) {
498+
tracing::warn!("Failed to close session {session_id} after fallback: {e}");
499+
}
500+
Ok(())
501+
}
464502
}
465503

466504
impl App {
@@ -489,7 +527,14 @@ impl App {
489527
self.process_pj_response(proposal)?;
490528
return Ok(());
491529
}
492-
_ => return Err(anyhow!("Unexpected sender state")),
530+
SendSession::Closed(SenderSessionOutcome::Failure)
531+
| SendSession::Closed(SenderSessionOutcome::Cancel) => {
532+
let id = persister.session_id();
533+
println!(
534+
"Session {id} ended without payjoin. Run `payjoin-cli fallback {id}` to broadcast the original transaction."
535+
);
536+
return Ok(());
537+
}
493538
}
494539
Ok(())
495540
}

payjoin-cli/src/cli/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,13 @@ pub enum Commands {
133133
#[cfg(feature = "v2")]
134134
/// Show payjoin session history
135135
History,
136+
#[cfg(feature = "v2")]
137+
/// Broadcast the original transaction for a sender session (BIP77/v2 only)
138+
Fallback {
139+
/// The session ID to broadcast the fallback transaction for
140+
#[arg(required = true)]
141+
session_id: i64,
142+
},
136143
}
137144

138145
pub fn parse_amount_in_sat(s: &str) -> Result<Amount, ParseAmountError> {

payjoin-cli/src/db/v2.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use rusqlite::params;
99
use super::*;
1010

1111
#[derive(Debug, Clone)]
12-
pub(crate) struct SessionId(i64);
12+
pub(crate) struct SessionId(pub(crate) i64);
1313

1414
impl core::ops::Deref for SessionId {
1515
type Target = i64;
@@ -61,6 +61,8 @@ impl SenderPersister {
6161
}
6262

6363
pub fn from_id(db: Arc<Database>, id: SessionId) -> Self { Self { db, session_id: id } }
64+
65+
pub fn session_id(&self) -> SessionId { self.session_id.clone() }
6466
}
6567
impl SessionPersister for SenderPersister {
6668
type SessionEvent = SenderSessionEvent;

payjoin-cli/src/main.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ use cli::{Cli, Commands};
66
use tracing_subscriber::filter::LevelFilter;
77
use tracing_subscriber::EnvFilter;
88

9+
#[cfg(feature = "v2")]
10+
use crate::db::v2::SessionId;
11+
912
mod app;
1013
mod cli;
1114
mod db;
@@ -75,6 +78,10 @@ async fn main() -> Result<()> {
7578
Commands::History => {
7679
app.history().await?;
7780
}
81+
#[cfg(feature = "v2")]
82+
Commands::Fallback { session_id } => {
83+
app.fallback_sender(SessionId(*session_id)).await?;
84+
}
7885
};
7986

8087
Ok(())

payjoin-cli/tests/e2e.rs

Lines changed: 155 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
#[cfg(feature = "_manual-tls")]
22
mod e2e {
33
use std::process::{ExitStatus, Stdio};
4+
use std::str::FromStr;
45

56
use nix::sys::signal::{kill, Signal};
67
use nix::unistd::Pid;
8+
use payjoin::bitcoin::Txid;
79
use payjoin_test_utils::{init_bitcoind_sender_receiver, BoxError};
810
use tempfile::tempdir;
911
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
10-
use tokio::process::Command;
12+
use tokio::process::{Child, Command};
1113

1214
async fn terminate(mut child: tokio::process::Child) -> tokio::io::Result<ExitStatus> {
1315
let pid = child.id().expect("Failed to get child PID");
@@ -64,6 +66,20 @@ mod e2e {
6466
res
6567
}
6668

69+
async fn send_until_request_timeout(mut cli_sender: Child) -> Result<(), BoxError> {
70+
let mut stdout = cli_sender.stdout.take().expect("failed to take stdout of child process");
71+
let timeout = tokio::time::Duration::from_secs(35);
72+
let res = tokio::time::timeout(
73+
timeout,
74+
wait_for_stdout_match(&mut stdout, |line| line.contains("No response yet.")),
75+
)
76+
.await?;
77+
78+
terminate(cli_sender).await.expect("Failed to kill payjoin-cli initial sender");
79+
assert!(res.is_some(), "Fallback send was not detected");
80+
Ok(())
81+
}
82+
6783
#[cfg(feature = "v1")]
6884
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
6985
async fn send_receive_payjoin_v1() -> Result<(), BoxError> {
@@ -201,7 +217,6 @@ mod e2e {
201217
async fn send_receive_payjoin_v2() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
202218
use payjoin_test_utils::{init_tracing, TestServices};
203219
use tempfile::TempDir;
204-
use tokio::process::Child;
205220

206221
type Result<T> = std::result::Result<T, BoxError>;
207222

@@ -385,21 +400,6 @@ mod e2e {
385400
Ok(())
386401
}
387402

388-
async fn send_until_request_timeout(mut cli_sender: Child) -> Result<()> {
389-
let mut stdout =
390-
cli_sender.stdout.take().expect("failed to take stdout of child process");
391-
let timeout = tokio::time::Duration::from_secs(35);
392-
let res = tokio::time::timeout(
393-
timeout,
394-
wait_for_stdout_match(&mut stdout, |line| line.contains("No response yet.")),
395-
)
396-
.await?;
397-
398-
terminate(cli_sender).await.expect("Failed to kill payjoin-cli initial sender");
399-
assert!(res.is_some(), "Fallback send was not detected");
400-
Ok(())
401-
}
402-
403403
async fn respond_with_payjoin(mut cli_receive_resumer: Child) -> Result<()> {
404404
let mut stdout =
405405
cli_receive_resumer.stdout.take().expect("Failed to take stdout of child process");
@@ -621,4 +621,142 @@ mod e2e {
621621

622622
Ok(())
623623
}
624+
625+
#[cfg(feature = "v2")]
626+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
627+
async fn sender_fallback_v2() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
628+
use payjoin_test_utils::{init_tracing, TestServices};
629+
use tempfile::TempDir;
630+
631+
type Result<T> = std::result::Result<T, BoxError>;
632+
633+
init_tracing();
634+
let mut services = TestServices::initialize().await?;
635+
let temp_dir = tempdir()?;
636+
637+
let result = tokio::select! {
638+
res = services.take_ohttp_relay_handle() => Err(format!("Ohttp relay is long running: {res:?}").into()),
639+
res = services.take_directory_handle() => Err(format!("Directory server is long running: {res:?}").into()),
640+
res = fallback_cli_async(&services, &temp_dir) => res,
641+
};
642+
643+
assert!(result.is_ok(), "sender_fallback_v2 failed: {:#?}", result.unwrap_err());
644+
645+
async fn fallback_cli_async(services: &TestServices, temp_dir: &TempDir) -> Result<()> {
646+
let sender_db_path = temp_dir.path().join("sender_db");
647+
let (bitcoind, sender, _receiver) = init_bitcoind_sender_receiver(None, None)?;
648+
let cert_path = &temp_dir.path().join("localhost.der");
649+
tokio::fs::write(cert_path, services.cert()).await?;
650+
services.wait_for_services_ready().await?;
651+
let ohttp_keys = services.fetch_ohttp_keys().await?;
652+
let ohttp_keys_path = temp_dir.path().join("ohttp_keys");
653+
tokio::fs::write(&ohttp_keys_path, ohttp_keys.encode()?).await?;
654+
655+
let receiver_db_path = temp_dir.path().join("receiver_db");
656+
let receiver_rpchost = format!("http://{}/wallet/receiver", bitcoind.params.rpc_socket);
657+
let sender_rpchost = format!("http://{}/wallet/sender", bitcoind.params.rpc_socket);
658+
let cookie_file = &bitcoind.params.cookie_file;
659+
let payjoin_cli = env!("CARGO_BIN_EXE_payjoin-cli");
660+
let directory = &services.directory_url();
661+
let ohttp_relay = &services.ohttp_relay_url();
662+
663+
// Get a BIP21 from a receiver then kill it so the sender can never complete payjoin
664+
let cli_receiver = Command::new(payjoin_cli)
665+
.arg("--root-certificate")
666+
.arg(cert_path)
667+
.arg("--rpchost")
668+
.arg(&receiver_rpchost)
669+
.arg("--cookie-file")
670+
.arg(cookie_file)
671+
.arg("--db-path")
672+
.arg(&receiver_db_path)
673+
.arg("--ohttp-relays")
674+
.arg(ohttp_relay)
675+
.arg("receive")
676+
.arg(RECEIVE_SATS)
677+
.arg("--pj-directory")
678+
.arg(directory)
679+
.arg("--ohttp-keys")
680+
.arg(&ohttp_keys_path)
681+
.stdout(Stdio::piped())
682+
.stderr(Stdio::inherit())
683+
.spawn()
684+
.expect("Failed to execute payjoin-cli receiver");
685+
let bip21 = get_bip21_from_receiver(cli_receiver).await;
686+
687+
// Start sender and capture the session-id from the hint line, then interrupt
688+
let cli_sender = Command::new(payjoin_cli)
689+
.arg("--root-certificate")
690+
.arg(cert_path)
691+
.arg("--rpchost")
692+
.arg(&sender_rpchost)
693+
.arg("--cookie-file")
694+
.arg(cookie_file)
695+
.arg("--db-path")
696+
.arg(&sender_db_path)
697+
.arg("--ohttp-relays")
698+
.arg(ohttp_relay)
699+
.arg("send")
700+
.arg(&bip21)
701+
.arg("--fee-rate")
702+
.arg("1")
703+
.stdout(Stdio::piped())
704+
.stderr(Stdio::inherit())
705+
.spawn()
706+
.expect("Failed to execute payjoin-cli sender");
707+
708+
send_until_request_timeout(cli_sender).await?;
709+
710+
// There is only one sender session in progress.
711+
let session_id = 1i64;
712+
// Ensure the fallback was not broadcast yet
713+
let mempool_size =
714+
sender.get_mempool_info().expect("should be able to get mempool").unbroadcast_count;
715+
assert_eq!(mempool_size, 0, "fallback should not be in mempool");
716+
717+
// Run `payjoin-cli fallback <session-id>` and assert broadcast
718+
let mut cli_fallback = Command::new(payjoin_cli)
719+
.arg("--root-certificate")
720+
.arg(cert_path)
721+
.arg("--rpchost")
722+
.arg(&sender_rpchost)
723+
.arg("--cookie-file")
724+
.arg(cookie_file)
725+
.arg("--db-path")
726+
.arg(&sender_db_path)
727+
.arg("--ohttp-relays")
728+
.arg(ohttp_relay)
729+
.arg("fallback")
730+
.arg(session_id.to_string())
731+
.stdout(Stdio::piped())
732+
.stderr(Stdio::inherit())
733+
.spawn()
734+
.expect("Failed to execute payjoin-cli fallback");
735+
736+
let mut fallback_stdout =
737+
cli_fallback.stdout.take().expect("failed to take stdout of fallback");
738+
let timeout = tokio::time::Duration::from_secs(10);
739+
let broadcast_line = tokio::time::timeout(
740+
timeout,
741+
wait_for_stdout_match(&mut fallback_stdout, |l| {
742+
l.contains("Broadcasted fallback transaction txid")
743+
}),
744+
)
745+
.await?;
746+
747+
terminate(cli_fallback).await.expect("Failed to kill payjoin-cli fallback");
748+
let subcommand_output = broadcast_line.expect("fallback should broadcast");
749+
let fallback_txid = subcommand_output.split_whitespace().nth(4).unwrap_or("");
750+
let fallback_txid = Txid::from_str(fallback_txid).expect("valid txid");
751+
752+
assert!(
753+
sender.get_raw_transaction(fallback_txid).is_ok(),
754+
"fallback tx should be in the mempool"
755+
);
756+
757+
Ok(())
758+
}
759+
760+
Ok(())
761+
}
624762
}

0 commit comments

Comments
 (0)