Skip to content

Commit 8d063a0

Browse files
committed
Force cli to cancel and broadcast fallback when available mid loop
Previously the cli only checked for expiry at program start during long polling loops there was not an expiry check leading to the possiblity of an indefinite poll past expiry where the counterparty had already given up. This also adds an `--expire-in` flag for the receiver to both test this functionality and demonstrate that the receiver is not actually fixed to a 24hr expiration and is in controll of this expiry time.
1 parent 099fc58 commit 8d063a0

7 files changed

Lines changed: 363 additions & 39 deletions

File tree

payjoin-cli/src/app/config.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ pub struct Config {
6161
pub root_certificate: Option<PathBuf>,
6262
#[cfg(feature = "_manual-tls")]
6363
pub certificate_key: Option<PathBuf>,
64+
#[cfg(feature = "v2")]
65+
#[serde(skip)]
66+
pub expire_in_secs: Option<u64>,
6467
}
6568

6669
impl Config {
@@ -156,6 +159,8 @@ impl Config {
156159
max_fee_rate: built_config.get("max_fee_rate").ok(),
157160
bitcoind: built_config.get("bitcoind")?,
158161
version: None,
162+
#[cfg(feature = "v2")]
163+
expire_in_secs: None,
159164
#[cfg(feature = "_manual-tls")]
160165
root_certificate: built_config.get("root_certificate").ok(),
161166
#[cfg(feature = "_manual-tls")]
@@ -243,6 +248,11 @@ impl Config {
243248
));
244249
}
245250

251+
#[cfg(feature = "v2")]
252+
if let Commands::Receive { expire_in, .. } = &cli.command {
253+
config.expire_in_secs = *expire_in;
254+
}
255+
246256
tracing::trace!("App config: {config:?}");
247257
Ok(config)
248258
}

payjoin-cli/src/app/v2/mod.rs

Lines changed: 119 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,10 @@ impl AppTrait for App {
296296
if let Some(max_fee_rate) = self.config.max_fee_rate {
297297
receiver_builder = receiver_builder.with_max_fee_rate(max_fee_rate);
298298
}
299+
if let Some(expire_in_secs) = self.config.expire_in_secs {
300+
let expiration = std::time::Duration::from_secs(expire_in_secs);
301+
receiver_builder = receiver_builder.with_expiration(expiration);
302+
}
299303
let session = receiver_builder.build().save(&persister)?;
300304
println!("Receive session established: {}", persister.session_id());
301305

@@ -337,6 +341,12 @@ impl AppTrait for App {
337341
Err(e) => {
338342
if e.is_expired() {
339343
println!("Session {session_id} receiver expired.");
344+
match e.fallback_tx() {
345+
Some(tx) => self.broadcast_fallback_tx(tx, &session_id, "receiver"),
346+
None => println!(
347+
"No fallback transaction available for expired receiver session {session_id}."
348+
),
349+
}
340350
} else {
341351
tracing::error!(
342352
"An error {:?} occurred while replaying receiver session",
@@ -365,6 +375,12 @@ impl AppTrait for App {
365375
Err(e) => {
366376
if e.is_expired() {
367377
println!("Session {session_id} sender expired.");
378+
match e.fallback_tx() {
379+
Some(tx) => self.broadcast_fallback_tx(tx, &session_id, "sender"),
380+
None => println!(
381+
"No fallback transaction available for expired sender session {session_id}."
382+
),
383+
}
368384
} else {
369385
tracing::error!("An error {:?} occurred while replaying Sender session", e);
370386
println!("Session {session_id} sender failed to replay - {e}");
@@ -688,7 +704,31 @@ impl App {
688704
if let Err(close_err) = SessionPersister::close(persister) {
689705
tracing::error!("Failed to close {} session {}: {:?}", role, session_id, close_err);
690706
} else {
691-
tracing::error!("Closed failed {} session: {}", role, session_id);
707+
tracing::debug!("Closed failed {} session: {}", role, session_id);
708+
}
709+
}
710+
711+
/// Broadcast a fallback transaction with graceful error handling. Logs
712+
/// success/failure, never propagates the broadcast error.
713+
fn broadcast_fallback_tx(
714+
&self,
715+
tx: &payjoin::bitcoin::Transaction,
716+
session_id: &SessionId,
717+
role: &str,
718+
) {
719+
match self.wallet().broadcast_tx(tx) {
720+
Ok(txid) =>
721+
println!("Broadcasted fallback transaction for {role} session {session_id}: {txid}"),
722+
Err(err) => {
723+
println!(
724+
"Expired {role} session {session_id} has a fallback transaction \
725+
but it failed to broadcast: {err}\n\
726+
Recover it manually with `payjoin-cli history` / `cancel`."
727+
);
728+
tracing::error!(
729+
"Fallback broadcast failed for {role} session {session_id}: {err:?}"
730+
);
731+
}
692732
}
693733
}
694734

@@ -739,26 +779,53 @@ impl App {
739779
persister: &SenderPersister,
740780
) -> Result<()> {
741781
let mut session = sender.clone();
742-
// Long poll until we get a response
782+
// Long poll until we get a response. The session's expiration is enforced
783+
// here: once `create_poll_request` reports expiry, the session is driven to
784+
// a terminal `Closed(Aborted)` state and the fallback transaction is surfaced
785+
// so it can be broadcast manually. The persisted terminal outcome prevents
786+
// the session from being mistakenly resumed.
743787
loop {
744-
let (response, ctx) =
745-
self.post_via_relay(|relay| session.create_poll_request(relay)).await?;
746-
let res = session.process_response(&response.bytes().await?, ctx).save(persister);
747-
match res {
748-
Ok(OptionalTransitionOutcome::Progress(psbt)) => {
749-
println!("Proposal received. Processing...");
750-
self.process_pj_response(psbt)?;
751-
return Ok(());
788+
let relay = self.mailroom_manager.choose_relay()?;
789+
let (req, ctx) = match session.create_poll_request(relay.as_str()) {
790+
Ok(r) => r,
791+
Err(e) if e.is_expired() => {
792+
let pending = session.cancel().save(persister)?;
793+
self.broadcast_fallback_tx(
794+
pending.fallback_tx(),
795+
&persister.session_id(),
796+
"sender",
797+
);
798+
pending.close().save(persister)?;
799+
return Err(anyhow!("Sender session expired"));
752800
}
753-
Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
754-
println!("No response yet.");
755-
session = current_state;
756-
continue;
801+
Err(e) => return Err(e.into()),
802+
};
803+
match self.post_request(req).await {
804+
Ok(response) => {
805+
let bytes = response.bytes().await?;
806+
let res = session.process_response(&bytes, ctx).save(persister);
807+
match res {
808+
Ok(OptionalTransitionOutcome::Progress(psbt)) => {
809+
println!("Proposal received. Processing...");
810+
self.process_pj_response(psbt)?;
811+
return Ok(());
812+
}
813+
Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
814+
println!("No response yet.");
815+
session = current_state;
816+
continue;
817+
}
818+
Err(re) => {
819+
println!("{re}");
820+
tracing::debug!("{re:?}");
821+
return Err(anyhow!("Response error").context(re));
822+
}
823+
}
757824
}
758-
Err(re) => {
759-
println!("{re}");
760-
tracing::debug!("{re:?}");
761-
return Err(anyhow!("Response error").context(re));
825+
Err(e) => {
826+
tracing::debug!("Request to relay {relay} failed: {e:?}");
827+
self.mailroom_manager.add_failed_relay(relay);
828+
continue;
762829
}
763830
}
764831
}
@@ -771,22 +838,44 @@ impl App {
771838
) -> Result<Receiver<UncheckedOriginalPayload>> {
772839
let mut session = session;
773840
loop {
841+
let relay = self.mailroom_manager.choose_relay()?;
842+
let (req, context) = match session.create_poll_request(relay.as_str()) {
843+
Ok(r) => r,
844+
Err(e) if e.is_expired() => {
845+
session.cancel().save(persister)?;
846+
println!(
847+
"Receiver session expired before any proposal was received; \
848+
no fallback transaction to broadcast."
849+
);
850+
return Err(anyhow!("Receiver session expired"));
851+
}
852+
Err(e) => return Err(e.into()),
853+
};
774854
println!("Polling receive request...");
775-
let (ohttp_response, context) =
776-
self.post_via_relay(|relay| session.create_poll_request(relay)).await?;
777-
let state_transition = session
778-
.process_response(ohttp_response.bytes().await?.to_vec().as_slice(), context)
779-
.save(persister);
780-
match state_transition {
781-
Ok(OptionalTransitionOutcome::Progress(next_state)) => {
782-
println!("Got a request from the sender. Responding with a Payjoin proposal.");
783-
return Ok(next_state);
855+
match self.post_request(req).await {
856+
Ok(ohttp_response) => {
857+
let bytes = ohttp_response.bytes().await?.to_vec();
858+
let state_transition =
859+
session.process_response(bytes.as_slice(), context).save(persister);
860+
match state_transition {
861+
Ok(OptionalTransitionOutcome::Progress(next_state)) => {
862+
println!(
863+
"Got a request from the sender. Responding with a Payjoin proposal."
864+
);
865+
return Ok(next_state);
866+
}
867+
Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
868+
session = current_state;
869+
continue;
870+
}
871+
Err(e) => return Err(e.into()),
872+
}
784873
}
785-
Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
786-
session = current_state;
874+
Err(e) => {
875+
tracing::debug!("Request to relay {relay} failed: {e:?}");
876+
self.mailroom_manager.add_failed_relay(relay);
787877
continue;
788878
}
789-
Err(e) => return Err(e.into()),
790879
}
791880
}
792881
}

payjoin-cli/src/cli/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@ pub enum Commands {
125125
/// The path to the ohttp keys file
126126
#[arg(long = "ohttp-keys", value_parser = value_parser!(PathBuf))]
127127
ohttp_keys: Option<PathBuf>,
128+
129+
#[cfg(feature = "v2")]
130+
/// Session expiration in seconds from now
131+
#[arg(long = "expire-in")]
132+
expire_in: Option<u64>,
128133
},
129134
/// Resume pending payjoins (BIP77/v2 only)
130135
#[cfg(feature = "v2")]

0 commit comments

Comments
 (0)