Skip to content

Commit 989b5cf

Browse files
Add Receiver non-blocking integration test
1 parent 5e29024 commit 989b5cf

1 file changed

Lines changed: 334 additions & 1 deletion

File tree

payjoin/tests/integration.rs

Lines changed: 334 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ mod integration {
194194
use std::sync::Arc;
195195
use std::time::Duration;
196196

197-
use bitcoin::{Address, Transaction};
197+
use bitcoin::{Address, ScriptBuf, Transaction};
198198
use http::StatusCode;
199199
use payjoin::persist::{NoopSessionPersister, OptionalTransitionOutcome};
200200
use payjoin::receive::v2::{
@@ -775,6 +775,80 @@ mod integration {
775775
Ok(())
776776
}
777777

778+
#[tokio::test]
779+
async fn v2_to_v2_non_blocking_receiver_transitions() -> Result<(), BoxSendSyncError> {
780+
init_tracing();
781+
let mut services = TestServices::initialize().await?;
782+
let expected_weight = Weight::from_wu(
783+
TX_HEADER_WEIGHT + (P2WPKH_INPUT_WEIGHT * 2) + P2WPKH_OUTPUT_WEIGHT,
784+
);
785+
let expected_fee = expected_weight * FeeRate::BROADCAST_MIN;
786+
787+
let (_bitcoind, sender, receiver) =
788+
init_bitcoind_sender_receiver(Some(AddressType::Bech32), Some(AddressType::Bech32))
789+
.expect("should be able to initialize the sender and the receiver");
790+
let recv_persister = InMemoryTestPersister::default();
791+
let send_persister = InMemoryTestPersister::default();
792+
793+
let result = tokio::select!(
794+
err = services.take_ohttp_relay_handle() => panic!("Ohttp relay exited early: {:?}", err),
795+
err = services.take_directory_handle() => panic!("Directory server exited early: {:?}", err),
796+
res = do_v2_to_v2_non_blocking_receiver_transitions(&services, &receiver, &sender, &recv_persister, &send_persister, SenderFinalAction::SignAndBroadcastPayjoinProposal) => res
797+
);
798+
799+
assert!(result.is_ok(), "v2 p2wpkh send receive failed: {:#?}", result.unwrap_err());
800+
801+
let (broadcasted_transaction, monitoring_payment) = result.unwrap();
802+
803+
// Sender should have sent the entire value of their UTXO to receiver (minus fees).
804+
assert_eq!(broadcasted_transaction.input.len(), 2);
805+
assert_eq!(broadcasted_transaction.output.len(), 1);
806+
assert_eq!(
807+
receiver.get_balances()?.into_model()?.mine.untrusted_pending,
808+
Amount::from_btc(100.0)? - expected_fee
809+
);
810+
assert_eq!(
811+
sender.get_balances()?.into_model()?.mine.untrusted_pending,
812+
Amount::from_btc(0.0)?
813+
);
814+
815+
// Receiver should be able to validate that the sender has broadcasted the Payjoin proposal.
816+
monitoring_payment
817+
.check_payment(|txid| {
818+
let get_tx_result = receiver.get_raw_transaction(txid);
819+
match get_tx_result {
820+
Ok(tx) =>
821+
Ok(Some(tx.transaction().expect("transaction should be decodable"))),
822+
Err(_) => {
823+
panic!("should be able to find the payjoin proposal broadcasted")
824+
}
825+
}
826+
})
827+
.save(&recv_persister)
828+
.expect("receiver should successfully monitor for the payment");
829+
830+
// Receiver session should have completed with a Success, along with information on the
831+
// sender signatures on the Payjoin that was broadcasted.
832+
let (_session, session_history) = replay_receiver_event_log(&recv_persister)?;
833+
let sender_outpoint = session_history.fallback_tx().unwrap().input[0].previous_output;
834+
let sender_signatures = {
835+
let sender_txin = broadcasted_transaction
836+
.input
837+
.iter()
838+
.find(|txin| txin.previous_output == sender_outpoint)
839+
.expect("sender input must be present in payjoin_tx")
840+
.clone();
841+
vec![(sender_txin.clone().script_sig, sender_txin.clone().witness)]
842+
};
843+
assert_eq!(
844+
recv_persister.load().unwrap().last(),
845+
Some(payjoin::receive::v2::SessionEvent::Closed(payjoin::receive::v2::SessionOutcome::Success(sender_signatures))),
846+
"The last event of the persister should be a SessionOutcome::Success with the correct sender signature",
847+
);
848+
assert_eq!(session_history.status(), SessionStatus::Completed);
849+
Ok(())
850+
}
851+
778852
/// Helper function for running a Payjoin v2 session. Uses the `sender_final_action`
779853
/// parameter to determine what action the sender will take after they receive the Payjoin
780854
/// proposal from the receiver.
@@ -914,6 +988,146 @@ mod integration {
914988
Ok((broadcasted_transaction, monitoring_payment))
915989
}
916990

991+
/// Helper function for running a Payjoin v2 session with receiver non-blocking
992+
/// validation flow. Uses the `sender_final_action` parameter to determine what
993+
/// action the sender will take after they receive the Payjoin proposal from the receiver.
994+
///
995+
/// Returns the transaction which the sender broadcasts and the state of the Receiver
996+
/// before they begin monitoring ([`Receiver<Monitor>`]) so that different tests can modify
997+
/// how the receiver is going to validate the action the sender takes.
998+
async fn do_v2_to_v2_non_blocking_receiver_transitions<R, S>(
999+
services: &TestServices,
1000+
receiver: &corepc_node::Client,
1001+
sender: &corepc_node::Client,
1002+
recv_persister: &R,
1003+
send_persister: &S,
1004+
sender_final_action: SenderFinalAction,
1005+
) -> Result<(Transaction, Receiver<Monitor>), BoxError>
1006+
where
1007+
R: SessionPersister<SessionEvent = payjoin::receive::v2::SessionEvent> + Clone,
1008+
S: SessionPersister<SessionEvent = payjoin::send::v2::SessionEvent> + Clone,
1009+
{
1010+
let agent = services.http_agent();
1011+
services.wait_for_services_ready().await?;
1012+
let ohttp_keys = services.fetch_ohttp_keys().await?;
1013+
// **********************
1014+
// Inside the Receiver:
1015+
let address = receiver.new_address()?;
1016+
1017+
// test session with expiration in the future
1018+
let session =
1019+
ReceiverBuilder::new(address, services.directory_url().as_str(), ohttp_keys)?
1020+
.build()
1021+
.save(recv_persister)?;
1022+
println!("session: {:#?}", &session);
1023+
// Poll receive request
1024+
let (req, ctx) = session.create_poll_request(services.ohttp_relay_url().as_str())?;
1025+
let response = agent
1026+
.post(req.url)
1027+
.header("Content-Type", req.content_type)
1028+
.body(req.body)
1029+
.send()
1030+
.await?;
1031+
assert!(response.status().is_success(), "error response: {}", response.status());
1032+
let response_body = session
1033+
.process_response(response.bytes().await?.to_vec().as_slice(), ctx)
1034+
.save(recv_persister)?;
1035+
// No proposal yet since sender has not responded
1036+
let session = if let OptionalTransitionOutcome::Stasis(current_state) = response_body {
1037+
current_state
1038+
} else {
1039+
panic!("Should still be in initialized state")
1040+
};
1041+
1042+
// **********************
1043+
// Inside the Sender:
1044+
// Create a funded PSBT (not broadcasted) to address with amount given in the pj_uri
1045+
let pj_uri = Uri::from_str(&session.pj_uri().to_string())
1046+
.map_err(|e| e.to_string())?
1047+
.assume_checked()
1048+
.check_pj_supported()
1049+
.map_err(|e| e.to_string())?;
1050+
let psbt = build_sweep_psbt(sender, &pj_uri)?;
1051+
let req_ctx = SenderBuilder::new(psbt, pj_uri)
1052+
.build_recommended(FeeRate::BROADCAST_MIN)?
1053+
.save(send_persister)?;
1054+
let (Request { url, body, content_type, .. }, send_ctx) =
1055+
req_ctx.create_v2_post_request(services.ohttp_relay_url().as_str())?;
1056+
let response =
1057+
agent.post(url).header("Content-Type", content_type).body(body).send().await?;
1058+
tracing::info!("Response: {:#?}", &response);
1059+
assert!(response.status().is_success(), "error response: {}", response.status());
1060+
let send_ctx = req_ctx
1061+
.process_response(&response.bytes().await?, send_ctx)
1062+
.save(send_persister)?;
1063+
// POST Original PSBT
1064+
1065+
// **********************
1066+
// Inside the Receiver:
1067+
1068+
// GET fallback psbt
1069+
let (req, ctx) = session.create_poll_request(services.ohttp_relay_url().as_str())?;
1070+
let response = agent
1071+
.post(req.url)
1072+
.header("Content-Type", req.content_type)
1073+
.body(req.body)
1074+
.send()
1075+
.await?;
1076+
// POST payjoin
1077+
let outcome = session
1078+
.process_response(response.bytes().await?.to_vec().as_slice(), ctx)
1079+
.save(recv_persister)?;
1080+
let proposal = if let OptionalTransitionOutcome::Progress(psbt) = outcome {
1081+
psbt
1082+
} else {
1083+
panic!("proposal should exist");
1084+
};
1085+
let payjoin_proposal =
1086+
handle_directory_proposal_non_blocking(receiver, proposal, recv_persister, None)
1087+
.await?;
1088+
let (req, ctx) =
1089+
payjoin_proposal.create_post_request(services.ohttp_relay_url().as_str())?;
1090+
let response = agent
1091+
.post(req.url)
1092+
.header("Content-Type", req.content_type)
1093+
.body(req.body)
1094+
.send()
1095+
.await?;
1096+
let monitoring_payment = payjoin_proposal
1097+
.process_response(&response.bytes().await?, ctx)
1098+
.save(recv_persister)?;
1099+
1100+
// **********************
1101+
// Inside the Sender:
1102+
// Sender checks, signs, finalizes, constructs, and broadcasts
1103+
// Replay post fallback to get the response
1104+
let (Request { url, body, content_type, .. }, ohttp_ctx) =
1105+
send_ctx.create_poll_request(services.ohttp_relay_url().as_str())?;
1106+
let response =
1107+
agent.post(url).header("Content-Type", content_type).body(body).send().await?;
1108+
tracing::info!("Response: {:#?}", &response);
1109+
let response = send_ctx
1110+
.process_response(&response.bytes().await?, ohttp_ctx)
1111+
.save(send_persister)
1112+
.expect("psbt should exist");
1113+
1114+
let checked_payjoin_proposal_psbt =
1115+
if let OptionalTransitionOutcome::Progress(psbt) = response {
1116+
psbt
1117+
} else {
1118+
panic!("psbt should exist");
1119+
};
1120+
1121+
let broadcasted_transaction = match sender_final_action {
1122+
SenderFinalAction::SignAndBroadcastPayjoinProposal =>
1123+
extract_pj_tx(sender, checked_payjoin_proposal_psbt.clone())?,
1124+
SenderFinalAction::BroadcastFallbackTransaction =>
1125+
replay_sender_event_log(send_persister)?.1.fallback_tx(),
1126+
};
1127+
sender.send_raw_transaction(&broadcasted_transaction)?;
1128+
Ok((broadcasted_transaction, monitoring_payment))
1129+
}
1130+
9171131
#[test]
9181132
fn v2_to_v1() -> Result<(), BoxError> {
9191133
init_tracing();
@@ -1228,6 +1442,125 @@ mod integration {
12281442
Ok(payjoin)
12291443
}
12301444

1445+
async fn handle_directory_proposal_non_blocking(
1446+
receiver: &corepc_node::Client,
1447+
proposal: Receiver<UncheckedOriginalPayload>,
1448+
recv_persister: &impl SessionPersister<SessionEvent = payjoin::receive::v2::SessionEvent>,
1449+
custom_inputs: Option<Vec<InputPair>>,
1450+
) -> Result<Receiver<PayjoinProposal>, BoxError> {
1451+
// Receive Check 1: Can Broadcast
1452+
let tx = proposal.extract_tx_to_check_broadcast_suitability();
1453+
let is_broadcast_suitable = receiver
1454+
.test_mempool_accept(std::slice::from_ref(&tx))
1455+
.map_err(ImplementationError::new)?
1456+
.0
1457+
.first()
1458+
.ok_or(ImplementationError::from("testmempoolaccept should return a result"))?
1459+
.allowed;
1460+
let proposal = proposal
1461+
.process_broadcast_suitability_result(None, is_broadcast_suitable)
1462+
.save(recv_persister)?;
1463+
1464+
// in a payment processor where the sender could go offline, this is where you schedule to broadcast the original_tx
1465+
let _to_broadcast_in_failure_case = proposal.extract_tx_to_schedule_broadcast();
1466+
1467+
// Receive Check 2: receiver can't sign for proposal inputs
1468+
let inputs_owned_validator = proposal.get_inputs_owned_validator()?;
1469+
let mut input_owned_check = |input_script_buf: &ScriptBuf| {
1470+
let input_script_buf = input_script_buf.clone();
1471+
async move {
1472+
let input = input_script_buf.as_script();
1473+
let address = bitcoin::Address::from_script(input, bitcoin::Network::Regtest)
1474+
.map_err(ImplementationError::new)?;
1475+
receiver
1476+
.get_address_info(&address)
1477+
.map(|info| info.is_mine)
1478+
.map_err(ImplementationError::new)
1479+
}
1480+
};
1481+
let finalized_inputs_owned_validator =
1482+
inputs_owned_validator.run_async(&mut input_owned_check).await?;
1483+
let proposal = proposal
1484+
.process_inputs_owned_validator(finalized_inputs_owned_validator)
1485+
.save(recv_persister)?;
1486+
1487+
// Receive Check 3: have we seen this input before? More of a check for non-interactive i.e. payment processor receivers.
1488+
let inputs_seen_validator = proposal.get_inputs_seen_validator();
1489+
let mut input_seen_check = |_outpoint: &OutPoint| async move { Ok(false) };
1490+
let finalized_inputs_seen_validator =
1491+
inputs_seen_validator.run_async(&mut input_seen_check).await?;
1492+
let proposal = proposal
1493+
.process_inputs_seen_validator(finalized_inputs_seen_validator)
1494+
.save(recv_persister)?;
1495+
let outputs_owned_validator = proposal.get_outputs_owned_validator();
1496+
let mut output_owned_check = |output_script_buf: &ScriptBuf| {
1497+
let output_script_buf = output_script_buf.clone();
1498+
async move {
1499+
let output_script = output_script_buf.as_script();
1500+
let address =
1501+
bitcoin::Address::from_script(output_script, bitcoin::Network::Regtest)
1502+
.map_err(ImplementationError::new)?;
1503+
receiver
1504+
.get_address_info(&address)
1505+
.map(|info| info.is_mine)
1506+
.map_err(ImplementationError::new)
1507+
}
1508+
};
1509+
let finalized_outputs_owned_validator =
1510+
outputs_owned_validator.run_async(&mut output_owned_check).await?;
1511+
let payjoin = proposal
1512+
.process_outputs_owned_validator(finalized_outputs_owned_validator)
1513+
.save(recv_persister)?;
1514+
1515+
let payjoin = payjoin.commit_outputs().save(recv_persister)?;
1516+
1517+
let inputs = match custom_inputs {
1518+
Some(inputs) => inputs,
1519+
None => {
1520+
let candidate_inputs = receiver
1521+
.list_unspent()
1522+
.map_err(ImplementationError::new)?
1523+
.0
1524+
.into_iter()
1525+
.map(input_pair_from_list_unspent);
1526+
let selected_input =
1527+
payjoin.try_preserving_privacy(candidate_inputs).map_err(|e| {
1528+
format!("Failed to make privacy preserving selection: {e:?}")
1529+
})?;
1530+
vec![selected_input]
1531+
}
1532+
};
1533+
let payjoin = payjoin
1534+
.contribute_inputs(inputs)
1535+
.map_err(|e| format!("Failed to contribute inputs: {e:?}"))?
1536+
.commit_inputs()
1537+
.save(recv_persister)?;
1538+
1539+
let payjoin = payjoin
1540+
.apply_fee_range(
1541+
Some(FeeRate::BROADCAST_MIN),
1542+
Some(FeeRate::from_sat_per_vb_unchecked(2)),
1543+
)
1544+
.save(recv_persister)?;
1545+
1546+
// Sign and finalize the proposal PSBT
1547+
let psbt = payjoin.psbt_to_sign();
1548+
let signed_psbt = receiver
1549+
// call RPC manually to pass custom options
1550+
.call::<corepc_node::vtype::WalletProcessPsbt>(
1551+
"walletprocesspsbt",
1552+
&[
1553+
json!(psbt.to_string()),
1554+
json!(None as Option<bool>),
1555+
json!(None as Option<&str>),
1556+
json!(Some(true)), // check that the receiver properly clears keypaths
1557+
],
1558+
)
1559+
.map(|res| Psbt::from_str(&res.psbt).expect("psbt should be valid"))?;
1560+
let payjoin = payjoin.finalize_signed_proposal(&signed_psbt).save(recv_persister)?;
1561+
Ok(payjoin)
1562+
}
1563+
12311564
pub fn build_sweep_psbt(
12321565
sender: &corepc_node::Client,
12331566
pj_uri: &PjUri,

0 commit comments

Comments
 (0)