Skip to content

Commit ae9dd4a

Browse files
committed
feat: add resume and history commands for payjoin sessions
- Implement resume_payjoins() to continue pending sessions - Add history() to display all session states - Add session status text helpers for UI display - Support filtering by session ID
1 parent 68af3e6 commit ae9dd4a

File tree

2 files changed

+325
-12
lines changed

2 files changed

+325
-12
lines changed

src/commands.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,20 @@ pub enum OnlineWalletSubCommand {
479479
)]
480480
fee_rate: u64,
481481
},
482+
/// Resume pending payjoin sessions.
483+
ResumePayjoin {
484+
/// Payjoin directory for the session
485+
#[arg(env = "PAYJOIN_DIRECTORY", long = "directory", required = true)]
486+
directory: String,
487+
/// URL of the Payjoin OHTTP relay. Can be repeated multiple times.
488+
#[arg(env = "PAYJOIN_OHTTP_RELAY", long = "ohttp_relay", required = true)]
489+
ohttp_relay: Vec<String>,
490+
/// Resume only a specific active session ID (sender and/or receiver).
491+
#[arg(env = "PAYJOIN_SESSION_ID", long = "session_id")]
492+
session_id: Option<i64>,
493+
},
494+
/// Show payjoin session history.
495+
PayjoinHistory,
482496
}
483497

484498
/// Subcommands for Key operations.

src/payjoin/mod.rs

Lines changed: 311 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,60 @@ pub(crate) struct PayjoinManager<'a> {
4242
relay_manager: Arc<Mutex<RelayManager>>,
4343
db: Arc<crate::payjoin::db::Database>,
4444
}
45+
46+
trait StatusText {
47+
fn status_text(&self) -> &'static str;
48+
}
49+
50+
impl StatusText for SendSession {
51+
fn status_text(&self) -> &'static str {
52+
match self {
53+
SendSession::WithReplyKey(_) | SendSession::PollingForProposal(_) => {
54+
"Waiting for proposal"
55+
}
56+
SendSession::Closed(session_outcome) => match session_outcome {
57+
SenderSessionOutcome::Failure => "Session failure",
58+
SenderSessionOutcome::Success(_) => "Session success",
59+
SenderSessionOutcome::Cancel => "Session cancelled",
60+
},
61+
}
62+
}
63+
}
64+
65+
impl StatusText for ReceiveSession {
66+
fn status_text(&self) -> &'static str {
67+
match self {
68+
ReceiveSession::Initialized(_) => "Waiting for original proposal",
69+
ReceiveSession::UncheckedOriginalPayload(_)
70+
| ReceiveSession::MaybeInputsOwned(_)
71+
| ReceiveSession::MaybeInputsSeen(_)
72+
| ReceiveSession::OutputsUnknown(_)
73+
| ReceiveSession::WantsOutputs(_)
74+
| ReceiveSession::WantsInputs(_)
75+
| ReceiveSession::WantsFeeRange(_)
76+
| ReceiveSession::ProvisionalProposal(_) => "Processing original proposal",
77+
ReceiveSession::PayjoinProposal(_) => "Payjoin proposal sent",
78+
ReceiveSession::HasReplyableError(_) => {
79+
"Session failure, waiting to post error response"
80+
}
81+
ReceiveSession::Monitor(_) => "Monitoring payjoin proposal",
82+
ReceiveSession::Closed(session_outcome) => match session_outcome {
83+
ReceiverSessionOutcome::Failure => "Session failure",
84+
ReceiverSessionOutcome::Success(_) => {
85+
"Session success, Payjoin proposal was broadcasted"
86+
}
87+
ReceiverSessionOutcome::Cancel => "Session cancelled",
88+
ReceiverSessionOutcome::FallbackBroadcasted => "Fallback broadcasted",
89+
},
90+
}
91+
}
92+
}
93+
94+
struct SessionHistoryRow {
95+
id: String,
96+
role: &'static str,
97+
status: String,
98+
completed_at: Option<String>,
4599
}
46100

47101
impl<'a> PayjoinManager<'a> {
@@ -230,14 +284,14 @@ impl<'a> PayjoinManager<'a> {
230284
SenderPersister::new(self.db.clone(), receiver_pubkey)?
231285
};
232286

233-
let sender = payjoin::send::v2::SenderBuilder::new(original_psbt.clone(), uri)
234-
.build_recommended(fee_rate)?
235-
.save(&persister)
236-
.map_err(|e| {
237-
Error::Generic(format!(
238-
"Failed to save the Payjoin v2 sender in the persister: {e}"
239-
))
240-
})?;
287+
let sender = payjoin::send::v2::SenderBuilder::new(original_psbt.clone(), uri)
288+
.build_recommended(fee_rate)?
289+
.save(&persister)
290+
.map_err(|e| {
291+
Error::Generic(format!(
292+
"Failed to save the Payjoin v2 sender in the persister: {e}"
293+
))
294+
})?;
241295

242296
(SendSession::WithReplyKey(sender), persister)
243297
};
@@ -451,7 +505,7 @@ impl<'a> PayjoinManager<'a> {
451505
) -> Result<(), Error> {
452506
let next_receiver_typestate = receiver
453507
.identify_receiver_outputs(&mut |output_script| {
454-
Ok(self.wallet.is_mine(output_script.to_owned()))
508+
Ok(self.wallet.is_mine(output_script.to_owned()))
455509
})
456510
.save(persister)?;
457511

@@ -583,7 +637,7 @@ impl<'a> PayjoinManager<'a> {
583637
.create_post_request(
584638
self.unwrap_relay_or_else_fetch(vec![], None::<&str>)
585639
.await?
586-
.as_str(),
640+
.as_str(),
587641
)
588642
.map_err(|e| {
589643
Error::Generic(format!("Error occurred when creating a post request for sending final Payjoin proposal: {e}"))
@@ -755,7 +809,7 @@ impl<'a> PayjoinManager<'a> {
755809
blockchain_client,
756810
relay.to_string(),
757811
)
758-
.await
812+
.await
759813
}
760814
SendSession::PollingForProposal(context) => {
761815
let relay = self
@@ -767,7 +821,7 @@ impl<'a> PayjoinManager<'a> {
767821
blockchain_client,
768822
relay.to_string(),
769823
)
770-
.await
824+
.await
771825
}
772826
SendSession::Closed(SenderSessionOutcome::Success(psbt)) => {
773827
self.process_payjoin_proposal(psbt, blockchain_client).await
@@ -876,4 +930,249 @@ impl<'a> PayjoinManager<'a> {
876930
.send()
877931
.await
878932
}
933+
934+
/// Resume pending payjoin sessions from the database
935+
pub async fn resume_payjoins(
936+
&mut self,
937+
directory: String,
938+
ohttp_relays: Vec<String>,
939+
session_id: Option<i64>,
940+
blockchain_client: &BlockchainClient,
941+
) -> Result<String, Error> {
942+
let db = self.db.clone();
943+
let mut recv_session_ids = db.get_recv_session_ids()?;
944+
let mut send_session_ids = db.get_send_session_ids()?;
945+
946+
if let Some(session_id) = session_id {
947+
recv_session_ids.retain(|id| id.as_i64() == session_id);
948+
send_session_ids.retain(|id| id.as_i64() == session_id);
949+
950+
if recv_session_ids.is_empty() && send_session_ids.is_empty() {
951+
return Ok(serde_json::to_string_pretty(&json!({
952+
"message": format!("No active session found for session_id {}.", session_id)
953+
}))?);
954+
}
955+
}
956+
957+
if recv_session_ids.is_empty() && send_session_ids.is_empty() {
958+
return Ok(serde_json::to_string_pretty(&json!({
959+
"message": "No sessions to resume."
960+
}))?);
961+
}
962+
963+
let ohttp_relays: Vec<url::Url> = ohttp_relays
964+
.into_iter()
965+
.map(|s| url::Url::parse(&s))
966+
.collect::<Result<_, _>>()
967+
.map_err(|e| Error::Generic(format!("Failed to parse OHTTP URLs: {e}")))?;
968+
969+
let relay = self
970+
.unwrap_relay_or_else_fetch(ohttp_relays, Some(&directory))
971+
.await?;
972+
973+
let max_fee_rate = FeeRate::BROADCAST_MIN;
974+
let total_sessions = recv_session_ids.len() + send_session_ids.len();
975+
let mut completed = 0usize;
976+
let mut timed_out = 0usize;
977+
let mut failed = 0usize;
978+
979+
println!("Resuming {} payjoin session(s)...\n", total_sessions);
980+
981+
// Resume receiver sessions
982+
for session_id in recv_session_ids {
983+
let persister = ReceiverPersister::from_id(db.clone(), session_id.clone());
984+
match replay_receiver_event_log(&persister) {
985+
Ok((receiver_state, _)) => {
986+
println!("Resuming receiver session {}", session_id);
987+
match tokio::time::timeout(
988+
std::time::Duration::from_secs(30),
989+
self.proceed_receiver_session(
990+
receiver_state,
991+
&persister,
992+
relay.as_str(),
993+
max_fee_rate,
994+
blockchain_client,
995+
),
996+
)
997+
.await
998+
{
999+
Ok(Ok(_)) => {
1000+
completed += 1;
1001+
}
1002+
Ok(Err(e)) => {
1003+
failed += 1;
1004+
println!("Receiver session {} failed: {}", session_id, e);
1005+
}
1006+
Err(_) => {
1007+
timed_out += 1;
1008+
println!("Receiver session {} timed out", session_id);
1009+
}
1010+
}
1011+
}
1012+
Err(e) => {
1013+
failed += 1;
1014+
println!("Failed to replay receiver session {}: {:?}", session_id, e);
1015+
}
1016+
}
1017+
}
1018+
1019+
// Resume sender sessions
1020+
for session_id in send_session_ids {
1021+
let persister = SenderPersister::from_id(db.clone(), session_id.clone());
1022+
match replay_sender_event_log(&persister) {
1023+
Ok((sender_state, _)) => {
1024+
println!("Resuming sender session {}", session_id);
1025+
match tokio::time::timeout(
1026+
std::time::Duration::from_secs(30),
1027+
self.proceed_sender_session(
1028+
sender_state,
1029+
&persister,
1030+
vec![relay.clone()],
1031+
blockchain_client,
1032+
),
1033+
)
1034+
.await
1035+
{
1036+
Ok(Ok(_)) => {
1037+
completed += 1;
1038+
}
1039+
Ok(Err(e)) => {
1040+
failed += 1;
1041+
println!("Sender session {} failed: {}", session_id, e);
1042+
}
1043+
Err(_) => {
1044+
timed_out += 1;
1045+
println!("Sender session {} timed out", session_id);
1046+
}
1047+
}
1048+
}
1049+
Err(e) => {
1050+
failed += 1;
1051+
println!("Failed to replay sender session {}: {:?}", session_id, e);
1052+
}
1053+
}
1054+
}
1055+
1056+
Ok(serde_json::to_string_pretty(&json!({
1057+
"message": format!("Resumed polling for {} session(s).", total_sessions),
1058+
"outcome": format!(
1059+
"Completed: {}, timed out: {}, failed: {}.",
1060+
completed, timed_out, failed
1061+
)
1062+
}))?)
1063+
}
1064+
1065+
/// Show payjoin session history
1066+
pub fn history(datadir: Option<std::path::PathBuf>) -> Result<String, Error> {
1067+
let db = open_payjoin_db(datadir)?;
1068+
let mut send_rows: Vec<SessionHistoryRow> = Vec::new();
1069+
let mut recv_rows: Vec<SessionHistoryRow> = Vec::new();
1070+
1071+
// Active send sessions
1072+
for session_id in db
1073+
.get_send_session_ids()
1074+
.map_err(|e| Error::Generic(format!("{e}")))?
1075+
{
1076+
let persister = SenderPersister::from_id(db.clone(), session_id.clone());
1077+
let status = match replay_sender_event_log(&persister) {
1078+
Ok((state, _)) => state.status_text().to_string(),
1079+
Err(e) => e.to_string(),
1080+
};
1081+
send_rows.push(SessionHistoryRow {
1082+
id: session_id.to_string(),
1083+
role: "Sender",
1084+
status,
1085+
completed_at: None,
1086+
});
1087+
}
1088+
1089+
// Active receive sessions
1090+
for session_id in db
1091+
.get_recv_session_ids()
1092+
.map_err(|e| Error::Generic(format!("{e}")))?
1093+
{
1094+
let persister = ReceiverPersister::from_id(db.clone(), session_id.clone());
1095+
let status = match replay_receiver_event_log(&persister) {
1096+
Ok((state, _)) => state.status_text().to_string(),
1097+
Err(e) => e.to_string(),
1098+
};
1099+
recv_rows.push(SessionHistoryRow {
1100+
id: session_id.to_string(),
1101+
role: "Receiver",
1102+
status,
1103+
completed_at: None,
1104+
});
1105+
}
1106+
1107+
// Completed send sessions
1108+
for (session_id, completed_at) in db
1109+
.get_inactive_send_session_ids()
1110+
.map_err(|e| Error::Generic(format!("{e}")))?
1111+
{
1112+
let persister = SenderPersister::from_id(db.clone(), session_id.clone());
1113+
let status = match replay_sender_event_log(&persister) {
1114+
Ok((state, _)) => state.status_text().to_string(),
1115+
Err(e) => e.to_string(),
1116+
};
1117+
let completed_at = db
1118+
.format_unix_timestamp(completed_at)
1119+
.map_err(|e| Error::Generic(format!("{e}")))?;
1120+
send_rows.push(SessionHistoryRow {
1121+
id: session_id.to_string(),
1122+
role: "Sender",
1123+
status,
1124+
completed_at: Some(completed_at),
1125+
});
1126+
}
1127+
1128+
// Completed receive sessions
1129+
for (session_id, completed_at) in db
1130+
.get_inactive_recv_session_ids()
1131+
.map_err(|e| Error::Generic(format!("{e}")))?
1132+
{
1133+
let persister = ReceiverPersister::from_id(db.clone(), session_id.clone());
1134+
let status = match replay_receiver_event_log(&persister) {
1135+
Ok((state, _)) => state.status_text().to_string(),
1136+
Err(e) => e.to_string(),
1137+
};
1138+
let completed_at = db
1139+
.format_unix_timestamp(completed_at)
1140+
.map_err(|e| Error::Generic(format!("{e}")))?;
1141+
recv_rows.push(SessionHistoryRow {
1142+
id: session_id.to_string(),
1143+
role: "Receiver",
1144+
status,
1145+
completed_at: Some(completed_at),
1146+
});
1147+
}
1148+
1149+
let rows: Vec<Vec<CellStruct>> = send_rows
1150+
.iter()
1151+
.chain(recv_rows.iter())
1152+
.map(|row| {
1153+
vec![
1154+
row.id.as_str().cell(),
1155+
row.role.cell(),
1156+
row.completed_at
1157+
.clone()
1158+
.unwrap_or_else(|| "Not Completed".to_string())
1159+
.cell(),
1160+
row.status.as_str().cell(),
1161+
]
1162+
})
1163+
.collect();
1164+
1165+
let table = rows
1166+
.table()
1167+
.title(vec![
1168+
"Session ID".cell().bold(true),
1169+
"Sender/Receiver".cell().bold(true),
1170+
"Completed At".cell().bold(true),
1171+
"Status".cell().bold(true),
1172+
])
1173+
.display()
1174+
.map_err(|e| Error::Generic(e.to_string()))?;
1175+
1176+
Ok(format!("{table}"))
1177+
}
8791178
}

0 commit comments

Comments
 (0)