Skip to content

Commit 4d37434

Browse files
committed
Guard concurrent sends with exclusive DB lock and URI/RK checks
Two concurrent send commands targeting the same URI could select different coins. Set PRAGMA locking_mode = EXCLUSIVE so only one process holds write access at a time. Check uniqueness of URI and recceiver pubkey with duplicate checks
1 parent 66492aa commit 4d37434

4 files changed

Lines changed: 137 additions & 11 deletions

File tree

payjoin-cli/src/app/v2/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ impl AppTrait for App {
229229
Some((sender_state, persister)) => (sender_state, persister),
230230
None => {
231231
let persister =
232-
SenderPersister::new(self.db.clone(), receiver_pubkey.clone())?;
232+
SenderPersister::new(self.db.clone(), bip21, receiver_pubkey)?;
233233
let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
234234
let sender =
235235
SenderBuilder::from_parts(psbt, pj_param, &address, Some(amount))
@@ -307,17 +307,17 @@ impl AppTrait for App {
307307

308308
// Process sender sessions
309309
for session_id in send_session_ids {
310-
let sender_persiter = SenderPersister::from_id(self.db.clone(), session_id.clone());
311-
match replay_sender_event_log(&sender_persiter) {
310+
let sender_persister = SenderPersister::from_id(self.db.clone(), session_id.clone());
311+
match replay_sender_event_log(&sender_persister) {
312312
Ok((sender_state, _)) => {
313313
let self_clone = self.clone();
314314
tasks.push(tokio::spawn(async move {
315-
self_clone.process_sender_session(sender_state, &sender_persiter).await
315+
self_clone.process_sender_session(sender_state, &sender_persister).await
316316
}));
317317
}
318318
Err(e) => {
319319
tracing::error!("An error {:?} occurred while replaying Sender session", e);
320-
Self::close_failed_session(&sender_persiter, &session_id, "sender");
320+
Self::close_failed_session(&sender_persister, &session_id, "sender");
321321
}
322322
}
323323
}

payjoin-cli/src/db/error.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@ use rusqlite::Error as RusqliteError;
66

77
pub(crate) type Result<T> = std::result::Result<T, Error>;
88

9+
#[cfg(feature = "v2")]
10+
#[derive(Debug)]
11+
pub(crate) enum DuplicateKind {
12+
Uri,
13+
ReceiverPubkey,
14+
}
15+
916
#[derive(Debug)]
1017
pub(crate) enum Error {
1118
Rusqlite(RusqliteError),
@@ -14,6 +21,8 @@ pub(crate) enum Error {
1421
Serialize(serde_json::Error),
1522
#[cfg(feature = "v2")]
1623
Deserialize(serde_json::Error),
24+
#[cfg(feature = "v2")]
25+
DuplicateSendSession(DuplicateKind),
1726
}
1827

1928
impl fmt::Display for Error {
@@ -25,6 +34,15 @@ impl fmt::Display for Error {
2534
Error::Serialize(e) => write!(f, "Serialization failed: {e}"),
2635
#[cfg(feature = "v2")]
2736
Error::Deserialize(e) => write!(f, "Deserialization failed: {e}"),
37+
#[cfg(feature = "v2")]
38+
Error::DuplicateSendSession(DuplicateKind::Uri) => {
39+
write!(f, "A send session for this URI is already active")
40+
}
41+
#[cfg(feature = "v2")]
42+
Error::DuplicateSendSession(DuplicateKind::ReceiverPubkey) => write!(
43+
f,
44+
"A send session with this receiver pubkey is already active under a different URI"
45+
),
2846
}
2947
}
3048
}
@@ -38,6 +56,8 @@ impl std::error::Error for Error {
3856
Error::Serialize(e) => Some(e),
3957
#[cfg(feature = "v2")]
4058
Error::Deserialize(e) => Some(e),
59+
#[cfg(feature = "v2")]
60+
Error::DuplicateSendSession(_) => None,
4161
}
4262
}
4363
}

payjoin-cli/src/db/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,16 @@ pub(crate) fn now() -> i64 {
1515

1616
pub(crate) const DB_PATH: &str = "payjoin.sqlite";
1717

18+
#[derive(Debug)]
1819
pub(crate) struct Database(Pool<SqliteConnectionManager>);
1920

2021
impl Database {
2122
pub(crate) fn create(path: impl AsRef<Path>) -> Result<Self> {
22-
let manager = SqliteConnectionManager::file(path.as_ref());
23+
// locking_mode is a per-connection PRAGMA, so it must be set via
24+
// with_init to apply to every connection the pool creates, not only
25+
// the first one used during init_schema.
26+
let manager = SqliteConnectionManager::file(path.as_ref())
27+
.with_init(|conn| conn.execute_batch("PRAGMA locking_mode = EXCLUSIVE;"));
2328
let pool = Pool::new(manager)?;
2429

2530
// Initialize database schema
@@ -36,6 +41,7 @@ impl Database {
3641
conn.execute(
3742
"CREATE TABLE IF NOT EXISTS send_sessions (
3843
session_id INTEGER PRIMARY KEY AUTOINCREMENT,
44+
pj_uri TEXT NOT NULL,
3945
receiver_pubkey BLOB NOT NULL,
4046
completed_at INTEGER
4147
)",

payjoin-cli/src/db/v2.rs

Lines changed: 105 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,29 +20,49 @@ impl std::fmt::Display for SessionId {
2020
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) }
2121
}
2222

23-
#[derive(Clone)]
23+
#[derive(Clone, Debug)]
2424
pub(crate) struct SenderPersister {
2525
db: Arc<Database>,
2626
session_id: SessionId,
2727
}
2828

2929
impl SenderPersister {
30-
pub fn new(db: Arc<Database>, receiver_pubkey: HpkePublicKey) -> crate::db::Result<Self> {
30+
pub fn new(
31+
db: Arc<Database>,
32+
pj_uri: &str,
33+
receiver_pubkey: &HpkePublicKey,
34+
) -> crate::db::Result<Self> {
3135
let conn = db.get_connection()?;
36+
let receiver_pubkey_bytes = receiver_pubkey.to_compressed_bytes();
37+
38+
let (duplicate_uri, duplicate_rk): (bool, bool) = conn.query_row(
39+
"SELECT \
40+
EXISTS(SELECT 1 FROM send_sessions WHERE pj_uri = ?1), \
41+
EXISTS(SELECT 1 FROM send_sessions WHERE receiver_pubkey = ?2)",
42+
params![pj_uri, &receiver_pubkey_bytes],
43+
|row| Ok((row.get(0)?, row.get(1)?)),
44+
)?;
45+
46+
if duplicate_uri {
47+
return Err(Error::DuplicateSendSession(DuplicateKind::Uri));
48+
}
49+
if duplicate_rk {
50+
return Err(Error::DuplicateSendSession(DuplicateKind::ReceiverPubkey));
51+
}
3252

3353
// Create a new session in send_sessions and get its ID
3454
let session_id: i64 = conn.query_row(
35-
"INSERT INTO send_sessions (session_id, receiver_pubkey) VALUES (NULL, ?1) RETURNING session_id",
36-
params![receiver_pubkey.to_compressed_bytes()],
55+
"INSERT INTO send_sessions (pj_uri, receiver_pubkey) VALUES (?1, ?2) RETURNING session_id",
56+
params![pj_uri, &receiver_pubkey_bytes],
3757
|row| row.get(0),
3858
)?;
3959

4060
Ok(Self { db, session_id: SessionId(session_id) })
4161
}
4262

4363
pub fn from_id(db: Arc<Database>, id: SessionId) -> Self { Self { db, session_id: id } }
64+
4465
}
45-
4666
impl SessionPersister for SenderPersister {
4767
type SessionEvent = SenderSessionEvent;
4868
type InternalStorageError = crate::db::error::Error;
@@ -268,3 +288,83 @@ impl Database {
268288
Ok(session_ids)
269289
}
270290
}
291+
292+
#[cfg(all(test, feature = "v2"))]
293+
mod tests {
294+
use std::sync::Arc;
295+
296+
use payjoin::HpkeKeyPair;
297+
298+
use super::*;
299+
300+
fn create_test_db() -> Arc<Database> {
301+
// Use an in-memory database for tests
302+
let manager = r2d2_sqlite::SqliteConnectionManager::memory()
303+
.with_init(|conn| conn.execute_batch("PRAGMA locking_mode = EXCLUSIVE;"));
304+
let pool = r2d2::Pool::new(manager).expect("pool creation should succeed");
305+
let conn = pool.get().expect("connection should succeed");
306+
Database::init_schema(&conn).expect("schema init should succeed");
307+
Arc::new(Database(pool))
308+
}
309+
310+
fn make_receiver_pubkey() -> payjoin::HpkePublicKey { HpkeKeyPair::gen_keypair().1 }
311+
312+
// Second call with the same URI (same active session) should return DuplicateSendSession(Uri).
313+
#[test]
314+
fn test_duplicate_uri_returns_error() {
315+
let db = create_test_db();
316+
let rk1 = make_receiver_pubkey();
317+
let rk2 = make_receiver_pubkey();
318+
let uri = "bitcoin:addr1?pj=https://example.com/BBBBBBBB";
319+
320+
SenderPersister::new(db.clone(), uri, &rk1).expect("first session should succeed");
321+
322+
let err = SenderPersister::new(db, uri, &rk2).expect_err("duplicate URI should fail");
323+
assert!(
324+
matches!(err, Error::DuplicateSendSession(DuplicateKind::Uri)),
325+
"expected DuplicateSendSession(Uri), got: {err:?}"
326+
);
327+
}
328+
329+
// Same receiver pubkey under a different URI should return DuplicateSendSession(ReceiverPubkey).
330+
#[test]
331+
fn test_duplicate_rk_returns_error() {
332+
let db = create_test_db();
333+
let rk = make_receiver_pubkey();
334+
let uri1 = "bitcoin:addr1?pj=https://example.com/CCCCCCCC";
335+
let uri2 = "bitcoin:addr1?pj=https://example.com/DDDDDDDD";
336+
337+
SenderPersister::new(db.clone(), uri1, &rk).expect("first session should succeed");
338+
339+
let err = SenderPersister::new(db, uri2, &rk).expect_err("duplicate RK should fail");
340+
assert!(
341+
matches!(err, Error::DuplicateSendSession(DuplicateKind::ReceiverPubkey)),
342+
"expected DuplicateSendSession(ReceiverPubkey), got: {err:?}"
343+
);
344+
}
345+
346+
// After a session is marked completed, a new session with the same URI must still be rejected
347+
// to prevent address reuse, HPKE receiver-key reuse
348+
#[test]
349+
fn test_completed_session_blocks_reuse() {
350+
let db = create_test_db();
351+
let rk1 = make_receiver_pubkey();
352+
let rk2 = make_receiver_pubkey();
353+
let uri = "bitcoin:addr1?pj=https://example.com/EEEEEEEE";
354+
355+
let persister =
356+
SenderPersister::new(db.clone(), uri, &rk1).expect("first session should succeed");
357+
358+
// Mark the session as completed
359+
use payjoin::persist::SessionPersister;
360+
persister.close().expect("close should succeed");
361+
362+
// A new session with the same URI must be rejected even after completion
363+
let err = SenderPersister::new(db, uri, &rk2)
364+
.expect_err("reuse of a completed session URI must be rejected");
365+
assert!(
366+
matches!(err, Error::DuplicateSendSession(DuplicateKind::Uri)),
367+
"expected DuplicateSendSession(Uri), got: {err:?}"
368+
);
369+
}
370+
}

0 commit comments

Comments
 (0)