Skip to content

Commit c71e200

Browse files
committed
feat: add resume and history commands for payjoin sessions
- Implement resume_payjoins() to continue pending sessions - Add history() to display all session states - Add session status text helpers for UI display - Support filtering by session ID
1 parent 1d9a3ed commit c71e200

File tree

2 files changed

+325
-12
lines changed

2 files changed

+325
-12
lines changed

src/commands.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,20 @@ pub enum OnlineWalletSubCommand {
479479
)]
480480
fee_rate: u64,
481481
},
482+
/// Resume pending payjoin sessions.
483+
ResumePayjoin {
484+
/// Payjoin directory for the session
485+
#[arg(env = "PAYJOIN_DIRECTORY", long = "directory", required = true)]
486+
directory: String,
487+
/// URL of the Payjoin OHTTP relay. Can be repeated multiple times.
488+
#[arg(env = "PAYJOIN_OHTTP_RELAY", long = "ohttp_relay", required = true)]
489+
ohttp_relay: Vec<String>,
490+
/// Resume only a specific active session ID (sender and/or receiver).
491+
#[arg(env = "PAYJOIN_SESSION_ID", long = "session_id")]
492+
session_id: Option<i64>,
493+
},
494+
/// Show payjoin session history.
495+
PayjoinHistory,
482496
}
483497

484498
/// Subcommands for Key operations.

src/payjoin/mod.rs

Lines changed: 311 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,60 @@ pub(crate) struct PayjoinManager<'a> {
4242
relay_manager: Arc<Mutex<RelayManager>>,
4343
db: Arc<crate::payjoin::db::Database>,
4444
}
45+
46+
trait StatusText {
47+
fn status_text(&self) -> &'static str;
48+
}
49+
50+
impl StatusText for SendSession {
51+
fn status_text(&self) -> &'static str {
52+
match self {
53+
SendSession::WithReplyKey(_) | SendSession::PollingForProposal(_) => {
54+
"Waiting for proposal"
55+
}
56+
SendSession::Closed(session_outcome) => match session_outcome {
57+
SenderSessionOutcome::Failure => "Session failure",
58+
SenderSessionOutcome::Success(_) => "Session success",
59+
SenderSessionOutcome::Cancel => "Session cancelled",
60+
},
61+
}
62+
}
63+
}
64+
65+
impl StatusText for ReceiveSession {
66+
fn status_text(&self) -> &'static str {
67+
match self {
68+
ReceiveSession::Initialized(_) => "Waiting for original proposal",
69+
ReceiveSession::UncheckedOriginalPayload(_)
70+
| ReceiveSession::MaybeInputsOwned(_)
71+
| ReceiveSession::MaybeInputsSeen(_)
72+
| ReceiveSession::OutputsUnknown(_)
73+
| ReceiveSession::WantsOutputs(_)
74+
| ReceiveSession::WantsInputs(_)
75+
| ReceiveSession::WantsFeeRange(_)
76+
| ReceiveSession::ProvisionalProposal(_) => "Processing original proposal",
77+
ReceiveSession::PayjoinProposal(_) => "Payjoin proposal sent",
78+
ReceiveSession::HasReplyableError(_) => {
79+
"Session failure, waiting to post error response"
80+
}
81+
ReceiveSession::Monitor(_) => "Monitoring payjoin proposal",
82+
ReceiveSession::Closed(session_outcome) => match session_outcome {
83+
ReceiverSessionOutcome::Failure => "Session failure",
84+
ReceiverSessionOutcome::Success(_) => {
85+
"Session success, Payjoin proposal was broadcasted"
86+
}
87+
ReceiverSessionOutcome::Cancel => "Session cancelled",
88+
ReceiverSessionOutcome::FallbackBroadcasted => "Fallback broadcasted",
89+
},
90+
}
91+
}
92+
}
93+
94+
struct SessionHistoryRow {
95+
id: String,
96+
role: &'static str,
97+
status: String,
98+
completed_at: Option<String>,
4599
}
46100

47101
impl<'a> PayjoinManager<'a> {
@@ -230,14 +284,14 @@ impl<'a> PayjoinManager<'a> {
230284
SenderPersister::new(self.db.clone(), receiver_pubkey)?
231285
};
232286

233-
let sender = payjoin::send::v2::SenderBuilder::new(original_psbt.clone(), uri)
234-
.build_recommended(fee_rate)?
235-
.save(&persister)
236-
.map_err(|e| {
237-
Error::Generic(format!(
238-
"Failed to save the Payjoin v2 sender in the persister: {e}"
239-
))
240-
})?;
287+
let sender = payjoin::send::v2::SenderBuilder::new(original_psbt.clone(), uri)
288+
.build_recommended(fee_rate)?
289+
.save(&persister)
290+
.map_err(|e| {
291+
Error::Generic(format!(
292+
"Failed to save the Payjoin v2 sender in the persister: {e}"
293+
))
294+
})?;
241295

242296
(SendSession::WithReplyKey(sender), persister)
243297
};
@@ -450,7 +504,7 @@ impl<'a> PayjoinManager<'a> {
450504
) -> Result<(), Error> {
451505
let next_receiver_typestate = receiver
452506
.identify_receiver_outputs(&mut |output_script| {
453-
Ok(self.wallet.is_mine(output_script.to_owned()))
507+
Ok(self.wallet.is_mine(output_script.to_owned()))
454508
})
455509
.save(persister)?;
456510

@@ -582,7 +636,7 @@ impl<'a> PayjoinManager<'a> {
582636
.create_post_request(
583637
self.unwrap_relay_or_else_fetch(vec![], None::<&str>)
584638
.await?
585-
.as_str(),
639+
.as_str(),
586640
)
587641
.map_err(|e| {
588642
Error::Generic(format!("Error occurred when creating a post request for sending final Payjoin proposal: {e}"))
@@ -750,7 +804,7 @@ impl<'a> PayjoinManager<'a> {
750804
blockchain_client,
751805
relay.to_string(),
752806
)
753-
.await
807+
.await
754808
}
755809
SendSession::PollingForProposal(context) => {
756810
let relay = self
@@ -762,7 +816,7 @@ impl<'a> PayjoinManager<'a> {
762816
blockchain_client,
763817
relay.to_string(),
764818
)
765-
.await
819+
.await
766820
}
767821
SendSession::Closed(SenderSessionOutcome::Success(psbt)) => {
768822
self.process_payjoin_proposal(psbt, blockchain_client).await
@@ -871,4 +925,249 @@ impl<'a> PayjoinManager<'a> {
871925
.send()
872926
.await
873927
}
928+
929+
/// Resume pending payjoin sessions from the database
930+
pub async fn resume_payjoins(
931+
&mut self,
932+
directory: String,
933+
ohttp_relays: Vec<String>,
934+
session_id: Option<i64>,
935+
blockchain_client: &BlockchainClient,
936+
) -> Result<String, Error> {
937+
let db = self.db.clone();
938+
let mut recv_session_ids = db.get_recv_session_ids()?;
939+
let mut send_session_ids = db.get_send_session_ids()?;
940+
941+
if let Some(session_id) = session_id {
942+
recv_session_ids.retain(|id| id.as_i64() == session_id);
943+
send_session_ids.retain(|id| id.as_i64() == session_id);
944+
945+
if recv_session_ids.is_empty() && send_session_ids.is_empty() {
946+
return Ok(serde_json::to_string_pretty(&json!({
947+
"message": format!("No active session found for session_id {}.", session_id)
948+
}))?);
949+
}
950+
}
951+
952+
if recv_session_ids.is_empty() && send_session_ids.is_empty() {
953+
return Ok(serde_json::to_string_pretty(&json!({
954+
"message": "No sessions to resume."
955+
}))?);
956+
}
957+
958+
let ohttp_relays: Vec<url::Url> = ohttp_relays
959+
.into_iter()
960+
.map(|s| url::Url::parse(&s))
961+
.collect::<Result<_, _>>()
962+
.map_err(|e| Error::Generic(format!("Failed to parse OHTTP URLs: {e}")))?;
963+
964+
let relay = self
965+
.unwrap_relay_or_else_fetch(ohttp_relays, Some(&directory))
966+
.await?;
967+
968+
let max_fee_rate = FeeRate::BROADCAST_MIN;
969+
let total_sessions = recv_session_ids.len() + send_session_ids.len();
970+
let mut completed = 0usize;
971+
let mut timed_out = 0usize;
972+
let mut failed = 0usize;
973+
974+
println!("Resuming {} payjoin session(s)...\n", total_sessions);
975+
976+
// Resume receiver sessions
977+
for session_id in recv_session_ids {
978+
let persister = ReceiverPersister::from_id(db.clone(), session_id.clone());
979+
match replay_receiver_event_log(&persister) {
980+
Ok((receiver_state, _)) => {
981+
println!("Resuming receiver session {}", session_id);
982+
match tokio::time::timeout(
983+
std::time::Duration::from_secs(30),
984+
self.proceed_receiver_session(
985+
receiver_state,
986+
&persister,
987+
relay.as_str(),
988+
max_fee_rate,
989+
blockchain_client,
990+
),
991+
)
992+
.await
993+
{
994+
Ok(Ok(_)) => {
995+
completed += 1;
996+
}
997+
Ok(Err(e)) => {
998+
failed += 1;
999+
println!("Receiver session {} failed: {}", session_id, e);
1000+
}
1001+
Err(_) => {
1002+
timed_out += 1;
1003+
println!("Receiver session {} timed out", session_id);
1004+
}
1005+
}
1006+
}
1007+
Err(e) => {
1008+
failed += 1;
1009+
println!("Failed to replay receiver session {}: {:?}", session_id, e);
1010+
}
1011+
}
1012+
}
1013+
1014+
// Resume sender sessions
1015+
for session_id in send_session_ids {
1016+
let persister = SenderPersister::from_id(db.clone(), session_id.clone());
1017+
match replay_sender_event_log(&persister) {
1018+
Ok((sender_state, _)) => {
1019+
println!("Resuming sender session {}", session_id);
1020+
match tokio::time::timeout(
1021+
std::time::Duration::from_secs(30),
1022+
self.proceed_sender_session(
1023+
sender_state,
1024+
&persister,
1025+
vec![relay.clone()],
1026+
blockchain_client,
1027+
),
1028+
)
1029+
.await
1030+
{
1031+
Ok(Ok(_)) => {
1032+
completed += 1;
1033+
}
1034+
Ok(Err(e)) => {
1035+
failed += 1;
1036+
println!("Sender session {} failed: {}", session_id, e);
1037+
}
1038+
Err(_) => {
1039+
timed_out += 1;
1040+
println!("Sender session {} timed out", session_id);
1041+
}
1042+
}
1043+
}
1044+
Err(e) => {
1045+
failed += 1;
1046+
println!("Failed to replay sender session {}: {:?}", session_id, e);
1047+
}
1048+
}
1049+
}
1050+
1051+
Ok(serde_json::to_string_pretty(&json!({
1052+
"message": format!("Resumed polling for {} session(s).", total_sessions),
1053+
"outcome": format!(
1054+
"Completed: {}, timed out: {}, failed: {}.",
1055+
completed, timed_out, failed
1056+
)
1057+
}))?)
1058+
}
1059+
1060+
/// Show payjoin session history
1061+
pub fn history(datadir: Option<std::path::PathBuf>) -> Result<String, Error> {
1062+
let db = open_payjoin_db(datadir)?;
1063+
let mut send_rows: Vec<SessionHistoryRow> = Vec::new();
1064+
let mut recv_rows: Vec<SessionHistoryRow> = Vec::new();
1065+
1066+
// Active send sessions
1067+
for session_id in db
1068+
.get_send_session_ids()
1069+
.map_err(|e| Error::Generic(format!("{e}")))?
1070+
{
1071+
let persister = SenderPersister::from_id(db.clone(), session_id.clone());
1072+
let status = match replay_sender_event_log(&persister) {
1073+
Ok((state, _)) => state.status_text().to_string(),
1074+
Err(e) => e.to_string(),
1075+
};
1076+
send_rows.push(SessionHistoryRow {
1077+
id: session_id.to_string(),
1078+
role: "Sender",
1079+
status,
1080+
completed_at: None,
1081+
});
1082+
}
1083+
1084+
// Active receive sessions
1085+
for session_id in db
1086+
.get_recv_session_ids()
1087+
.map_err(|e| Error::Generic(format!("{e}")))?
1088+
{
1089+
let persister = ReceiverPersister::from_id(db.clone(), session_id.clone());
1090+
let status = match replay_receiver_event_log(&persister) {
1091+
Ok((state, _)) => state.status_text().to_string(),
1092+
Err(e) => e.to_string(),
1093+
};
1094+
recv_rows.push(SessionHistoryRow {
1095+
id: session_id.to_string(),
1096+
role: "Receiver",
1097+
status,
1098+
completed_at: None,
1099+
});
1100+
}
1101+
1102+
// Completed send sessions
1103+
for (session_id, completed_at) in db
1104+
.get_inactive_send_session_ids()
1105+
.map_err(|e| Error::Generic(format!("{e}")))?
1106+
{
1107+
let persister = SenderPersister::from_id(db.clone(), session_id.clone());
1108+
let status = match replay_sender_event_log(&persister) {
1109+
Ok((state, _)) => state.status_text().to_string(),
1110+
Err(e) => e.to_string(),
1111+
};
1112+
let completed_at = db
1113+
.format_unix_timestamp(completed_at)
1114+
.map_err(|e| Error::Generic(format!("{e}")))?;
1115+
send_rows.push(SessionHistoryRow {
1116+
id: session_id.to_string(),
1117+
role: "Sender",
1118+
status,
1119+
completed_at: Some(completed_at),
1120+
});
1121+
}
1122+
1123+
// Completed receive sessions
1124+
for (session_id, completed_at) in db
1125+
.get_inactive_recv_session_ids()
1126+
.map_err(|e| Error::Generic(format!("{e}")))?
1127+
{
1128+
let persister = ReceiverPersister::from_id(db.clone(), session_id.clone());
1129+
let status = match replay_receiver_event_log(&persister) {
1130+
Ok((state, _)) => state.status_text().to_string(),
1131+
Err(e) => e.to_string(),
1132+
};
1133+
let completed_at = db
1134+
.format_unix_timestamp(completed_at)
1135+
.map_err(|e| Error::Generic(format!("{e}")))?;
1136+
recv_rows.push(SessionHistoryRow {
1137+
id: session_id.to_string(),
1138+
role: "Receiver",
1139+
status,
1140+
completed_at: Some(completed_at),
1141+
});
1142+
}
1143+
1144+
let rows: Vec<Vec<CellStruct>> = send_rows
1145+
.iter()
1146+
.chain(recv_rows.iter())
1147+
.map(|row| {
1148+
vec![
1149+
row.id.as_str().cell(),
1150+
row.role.cell(),
1151+
row.completed_at
1152+
.clone()
1153+
.unwrap_or_else(|| "Not Completed".to_string())
1154+
.cell(),
1155+
row.status.as_str().cell(),
1156+
]
1157+
})
1158+
.collect();
1159+
1160+
let table = rows
1161+
.table()
1162+
.title(vec![
1163+
"Session ID".cell().bold(true),
1164+
"Sender/Receiver".cell().bold(true),
1165+
"Completed At".cell().bold(true),
1166+
"Status".cell().bold(true),
1167+
])
1168+
.display()
1169+
.map_err(|e| Error::Generic(e.to_string()))?;
1170+
1171+
Ok(format!("{table}"))
1172+
}
8741173
}

0 commit comments

Comments
 (0)