Skip to content

Commit abebc6d

Browse files
Unify mailbox TTL to a single value
Replace three separate TTL constants (read: 10 min, unread at capacity: 1 day, unread below capacity: 7 days) with a single DEFAULT_TTL of 24 hours based on creation time only. Removes read_order, read_mailbox_ids, mark_read(), and the read-TTL pruning pass. Capacity eviction remains as a storage pressure mechanism.
1 parent 8569be8 commit abebc6d

File tree

4 files changed

+72
-150
lines changed

4 files changed

+72
-150
lines changed

payjoin-mailroom/src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ pub struct Config {
1212
pub storage_dir: PathBuf,
1313
#[serde(deserialize_with = "deserialize_duration_secs")]
1414
pub timeout: Duration,
15+
#[serde(deserialize_with = "deserialize_duration_secs")]
16+
pub mailbox_ttl: Duration,
1517
pub v1: Option<V1Config>,
1618
#[cfg(feature = "telemetry")]
1719
pub telemetry: Option<TelemetryConfig>,
@@ -85,6 +87,7 @@ impl Default for Config {
8587
listener: "[::]:8080".parse().expect("valid default listener address"),
8688
storage_dir: PathBuf::from("./data"),
8789
timeout: Duration::from_secs(30),
90+
mailbox_ttl: Duration::from_secs(60 * 60 * 24 * 7), // 1 week
8891
v1: None,
8992
#[cfg(feature = "telemetry")]
9093
telemetry: None,
@@ -115,6 +118,7 @@ impl Config {
115118
listener,
116119
storage_dir,
117120
timeout,
121+
mailbox_ttl: Duration::from_secs(60 * 60 * 24 * 7), // 1 week
118122
v1,
119123
#[cfg(feature = "telemetry")]
120124
telemetry: None,

payjoin-mailroom/src/db/files.rs

Lines changed: 57 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::{HashMap, HashSet, VecDeque};
1+
use std::collections::{HashMap, VecDeque};
22
use std::path::PathBuf;
33
use std::str::FromStr;
44
use std::sync::Arc;
@@ -11,7 +11,7 @@ use rand::RngCore;
1111
use tokio::fs::{self, File};
1212
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
1313
use tokio::sync::{oneshot, Mutex};
14-
use tracing::trace;
14+
use tracing::{trace, warn};
1515

1616
use crate::db::{Db as DbTrait, Error as DbError};
1717

@@ -21,14 +21,6 @@ use crate::db::{Db as DbTrait, Error as DbError};
2121
/// mailboxes/tx, ~4K txs/block, and ~144 blocks/24h.
2222
const DEFAULT_CAPACITY: usize = 1 << (1 + 12 + 8);
2323

24-
const DEFAULT_UNREAD_TTL_AT_CAPACITY: Duration = Duration::from_secs(60 * 60 * 24); // 1 day
25-
const DEFAULT_UNREAD_TTL_BELOW_CAPACITY: Duration = Duration::from_secs(60 * 60 * 24 * 7); // 1 week
26-
27-
/// How long read messages should be kept in mailboxes. Defaults to a 10 minute
28-
/// grace period from first read attempt, in case of intermittent network or
29-
/// relay errors.
30-
const DEFAULT_READ_TTL: Duration = Duration::from_secs(60 * 10); // 10 minutes
31-
3224
#[derive(Debug)]
3325
struct V2WaitMapEntry {
3426
receiver: future::Shared<oneshot::Receiver<Arc<Vec<u8>>>>,
@@ -50,12 +42,7 @@ pub(crate) struct Mailboxes {
5042
pending_v1: HashMap<ShortId, V1WaitMapEntry>,
5143
pending_v2: HashMap<ShortId, V2WaitMapEntry>,
5244
insert_order: VecDeque<(SystemTime, ShortId)>,
53-
read_order: VecDeque<(SystemTime, ShortId)>,
54-
read_mailbox_ids: HashSet<ShortId>,
55-
unread_ttl_below_capacity: Duration,
56-
unread_ttl_at_capacity: Duration,
57-
read_ttl: Duration,
58-
early_removal_count: usize,
45+
ttl: Duration,
5946
}
6047

6148
#[derive(Debug)]
@@ -200,7 +187,7 @@ impl DiskStorage {
200187
}
201188

202189
impl Mailboxes {
203-
async fn init(dir: PathBuf) -> io::Result<Self> {
190+
async fn init(dir: PathBuf, ttl: Duration) -> io::Result<Self> {
204191
let storage = DiskStorage::init(dir).await?;
205192
let insert_order = storage.insert_order().await?.into();
206193
Ok(Self {
@@ -209,12 +196,7 @@ impl Mailboxes {
209196
capacity: DEFAULT_CAPACITY,
210197
pending_v1: HashMap::default(),
211198
pending_v2: HashMap::default(),
212-
read_order: VecDeque::default(),
213-
read_mailbox_ids: HashSet::default(),
214-
unread_ttl_below_capacity: DEFAULT_UNREAD_TTL_BELOW_CAPACITY,
215-
unread_ttl_at_capacity: DEFAULT_UNREAD_TTL_AT_CAPACITY,
216-
read_ttl: DEFAULT_READ_TTL,
217-
early_removal_count: 0,
199+
ttl,
218200
})
219201
}
220202
}
@@ -226,8 +208,8 @@ pub struct FilesDb {
226208
}
227209

228210
impl FilesDb {
229-
pub async fn init(timeout: Duration, path: PathBuf) -> io::Result<Self> {
230-
Ok(Self { timeout, mailboxes: Arc::new(Mutex::new(Mailboxes::init(path).await?)) })
211+
pub async fn init(timeout: Duration, path: PathBuf, ttl: Duration) -> io::Result<Self> {
212+
Ok(Self { timeout, mailboxes: Arc::new(Mutex::new(Mailboxes::init(path, ttl).await?)) })
231213
}
232214

233215
pub async fn prune(&self) -> io::Result<Duration> { self.mailboxes.lock().await.prune().await }
@@ -337,19 +319,12 @@ impl Mailboxes {
337319

338320
// V2 requests are stored on disk
339321
if let Some((_created, payload)) = self.persistent_storage.get(id).await? {
340-
self.mark_read(id);
341322
return Ok(Some(Arc::new(payload)));
342323
}
343324

344325
Ok(None)
345326
}
346327

347-
fn mark_read(&mut self, id: &ShortId) {
348-
if self.read_mailbox_ids.insert(*id) {
349-
self.read_order.push_back((SystemTime::now(), *id));
350-
}
351-
}
352-
353328
async fn has_capacity(&mut self) -> io::Result<bool> {
354329
self.maybe_prune().await?;
355330
Ok(self.len() < self.capacity)
@@ -399,12 +374,10 @@ impl Mailboxes {
399374

400375
self.insert_order.push_back((created, *id));
401376

402-
// If there are pending readers, satisfy them and mark the payload as read
377+
// If there are pending readers, satisfy them
403378
if let Some(pending) = self.pending_v2.remove(id) {
404379
trace!("notifying pending readers for {}", id);
405380

406-
self.mark_read(id);
407-
408381
pending
409382
.sender
410383
.send(Arc::new(payload))
@@ -444,7 +417,6 @@ impl Mailboxes {
444417
}
445418

446419
async fn remove(&mut self, id: &ShortId) -> io::Result<Option<()>> {
447-
self.read_mailbox_ids.remove(id);
448420
self.persistent_storage.remove(id).await
449421
}
450422

@@ -457,9 +429,7 @@ impl Mailboxes {
457429
}
458430

459431
fn len(&self) -> usize {
460-
(self.insert_order.len() - self.early_removal_count)
461-
+ self.pending_v1.len()
462-
+ self.pending_v2.len()
432+
self.insert_order.len() + self.pending_v1.len() + self.pending_v2.len()
463433
}
464434

465435
async fn maybe_prune(&mut self) -> io::Result<Duration> {
@@ -479,86 +449,37 @@ impl Mailboxes {
479449
trace!("pruning");
480450
let now = SystemTime::now();
481451

482-
debug_assert!(self.read_ttl < self.unread_ttl_at_capacity);
483-
debug_assert!(self.unread_ttl_at_capacity < self.unread_ttl_below_capacity);
484452
debug_assert!(self.pending_v1.iter().all(|(_, v)| !v.sender.is_closed()));
485453

486454
// Prune in flight requests, these can persist in the case of an incomplete session
487455
self.pending_v2.retain(|_, v| v.receiver.strong_count().unwrap_or(0) > 1);
488456

489-
// Prune any fully expired mailboxes, whether read or unread
457+
// Prune any expired mailboxes
490458
while let Some((created, id)) = self.insert_order.front().cloned() {
491-
if created + self.unread_ttl_below_capacity < now {
492-
debug_assert!(self.insert_order.len() >= self.early_removal_count);
459+
if created + self.ttl < now {
493460
_ = self.insert_order.pop_front();
494461
if self.remove(&id).await?.is_none() {
495-
self.early_removal_count = self
496-
.early_removal_count
497-
.checked_sub(1)
498-
.expect("early removal adjustment should never underflow");
499-
}
500-
debug_assert!(self.insert_order.len() >= self.early_removal_count);
501-
trace!("Pruned old mailbox {id}");
502-
} else {
503-
break;
504-
}
505-
}
506-
507-
// So long as there expired read mailboxes, prune those. Stop when a
508-
// mailbox within the TTL is encountered.
509-
while let Some((read, id)) = self.read_order.front().cloned() {
510-
if read + self.read_ttl < now {
511-
_ = self.read_order.pop_front();
512-
if self.remove(&id).await?.is_some() {
513-
self.early_removal_count += 1;
514-
debug_assert!(self.insert_order.len() >= self.early_removal_count);
462+
warn!("Mailbox file missing during prune; possible external deletion or disk error");
463+
} else {
464+
trace!("Pruned old mailbox {id}");
515465
}
516-
trace!("Pruned read mailbox {id}");
517466
} else {
518467
break;
519468
}
520469
}
521470

522-
// If no room was created, try to prune the oldest unread mailbox if
523-
// it's over the minimum TTL
524-
debug_assert!(self.len() <= self.capacity);
525-
if self.len() == self.capacity {
526-
if let Some((created, id)) = self.insert_order.front().cloned() {
527-
if created + self.unread_ttl_at_capacity < now {
528-
_ = self.insert_order.pop_front();
529-
self.remove(&id).await?;
530-
trace!("Pruned unread mailbox {id} to make room");
531-
} else {
532-
trace!("Nothing to prune, {} entries remain", self.len());
533-
}
534-
}
535-
}
536-
537471
Ok(self.next_prune())
538472
}
539473

540474
fn next_prune(&mut self) -> Duration {
541-
let earliest_read_prune_opportunity = self
542-
.read_order
543-
.front()
544-
.map(|(read, _id)| {
545-
self.read_ttl
546-
.checked_sub(read.elapsed().expect("system clock moved back"))
547-
.unwrap_or(self.read_ttl)
548-
})
549-
.unwrap_or_else(|| self.read_ttl);
550-
551-
let earliest_unread_prune_opportunity = self
552-
.insert_order
475+
self.insert_order
553476
.front()
554477
.map(|(created, _id)| {
555-
self.unread_ttl_at_capacity
478+
self.ttl
556479
.checked_sub(created.elapsed().expect("system clock moved back"))
557-
.unwrap_or(self.unread_ttl_at_capacity)
480+
.unwrap_or(self.ttl)
558481
})
559-
.unwrap_or_else(|| self.unread_ttl_at_capacity);
560-
561-
std::cmp::min(earliest_read_prune_opportunity, earliest_unread_prune_opportunity)
482+
.unwrap_or(self.ttl)
562483
}
563484
}
564485

@@ -754,9 +675,13 @@ async fn test_disk_storage_mailboxes() -> std::io::Result<()> {
754675
async fn test_mailbox_storage() -> std::io::Result<()> {
755676
let dir = tempfile::tempdir()?;
756677

757-
let db = FilesDb::init(Duration::from_millis(10), dir.path().to_owned())
758-
.await
759-
.expect("initializing mailbox database should succeed");
678+
let db = FilesDb::init(
679+
Duration::from_millis(10),
680+
dir.path().to_owned(),
681+
Duration::from_secs(60 * 60 * 24 * 7),
682+
)
683+
.await
684+
.expect("initializing mailbox database should succeed");
760685

761686
let id = ShortId([0u8; 8]);
762687
let contents = b"foo bar";
@@ -775,9 +700,13 @@ async fn test_mailbox_storage() -> std::io::Result<()> {
775700
async fn test_v2_wait() -> std::io::Result<()> {
776701
let dir = tempfile::tempdir()?;
777702

778-
let db = FilesDb::init(Duration::from_millis(1), dir.path().to_owned())
779-
.await
780-
.expect("initializing mailbox database should succeed");
703+
let db = FilesDb::init(
704+
Duration::from_millis(1),
705+
dir.path().to_owned(),
706+
Duration::from_secs(60 * 60 * 24 * 7),
707+
)
708+
.await
709+
.expect("initializing mailbox database should succeed");
781710

782711
let id = ShortId([0u8; 8]);
783712
let contents = b"foo bar";
@@ -832,9 +761,13 @@ async fn test_v1_wait() -> std::io::Result<()> {
832761
let dir = tempfile::tempdir()?;
833762

834763
let db = Arc::new(
835-
FilesDb::init(Duration::from_millis(1), dir.path().to_owned())
836-
.await
837-
.expect("initializing mailbox database should succeed"),
764+
FilesDb::init(
765+
Duration::from_millis(1),
766+
dir.path().to_owned(),
767+
Duration::from_secs(60 * 60 * 24 * 7),
768+
)
769+
.await
770+
.expect("initializing mailbox database should succeed"),
838771
);
839772

840773
let id = ShortId([0u8; 8]);
@@ -879,9 +812,13 @@ async fn test_v1_data_minimization() -> std::io::Result<()> {
879812
let dir = tempfile::tempdir()?;
880813

881814
let db = Arc::new(
882-
FilesDb::init(Duration::from_millis(500), dir.path().to_owned())
883-
.await
884-
.expect("initializing mailbox database should succeed"),
815+
FilesDb::init(
816+
Duration::from_millis(500),
817+
dir.path().to_owned(),
818+
Duration::from_secs(60 * 60 * 24 * 7),
819+
)
820+
.await
821+
.expect("initializing mailbox database should succeed"),
885822
);
886823

887824
let id = ShortId([0u8; 8]);
@@ -934,20 +871,20 @@ async fn test_v1_data_minimization() -> std::io::Result<()> {
934871
async fn test_prune() -> std::io::Result<()> {
935872
let dir = tempfile::tempdir()?;
936873

937-
let db = FilesDb::init(Duration::from_millis(2), dir.path().to_owned())
938-
.await
939-
.expect("initializing mailbox database should succeed");
874+
let db = FilesDb::init(
875+
Duration::from_millis(2),
876+
dir.path().to_owned(),
877+
Duration::from_secs(60 * 60 * 24 * 7),
878+
)
879+
.await
880+
.expect("initializing mailbox database should succeed");
940881

941-
let read_ttl = Duration::from_secs(60);
942-
let unread_ttl_at_capacity = Duration::from_secs(600);
943-
let unread_ttl_below_capacity = Duration::from_secs(3600);
882+
let ttl = Duration::from_secs(600);
944883

945884
{
946885
let mut guard = db.mailboxes.lock().await;
947886
guard.capacity = 2;
948-
guard.read_ttl = read_ttl;
949-
guard.unread_ttl_at_capacity = unread_ttl_at_capacity;
950-
guard.unread_ttl_below_capacity = unread_ttl_below_capacity;
887+
guard.ttl = ttl;
951888
}
952889

953890
assert_eq!(db.mailboxes.lock().await.len(), 0);
@@ -984,38 +921,11 @@ async fn test_prune() -> std::io::Result<()> {
984921
db.prune().await.expect("pruning should not fail");
985922
assert_eq!(db.mailboxes.lock().await.len(), 1);
986923

987-
// Shift insert timestamps past unread_ttl_below_capacity
924+
// Shift insert timestamps past ttl
988925
{
989926
let mut guard = db.mailboxes.lock().await;
990927
for (ts, _) in guard.insert_order.iter_mut() {
991-
*ts -= unread_ttl_below_capacity + Duration::from_secs(1);
992-
}
993-
}
994-
995-
assert_eq!(db.mailboxes.lock().await.len(), 1);
996-
db.prune().await.expect("pruning should not fail");
997-
assert_eq!(db.mailboxes.lock().await.len(), 0);
998-
999-
// Post again, read it, then verify read TTL pruning
1000-
db.post_v2_payload(&id, contents.to_vec())
1001-
.await
1002-
.expect("posting payload should succeed")
1003-
.expect("contents should be accepted");
1004-
1005-
assert_eq!(db.mailboxes.lock().await.len(), 1);
1006-
1007-
// Mark the mailbox as read
1008-
_ = db.wait_for_v2_payload(&id).await.expect("waiting for payload should succeed");
1009-
1010-
assert_eq!(db.mailboxes.lock().await.len(), 1);
1011-
db.prune().await.expect("pruning should not fail");
1012-
assert_eq!(db.mailboxes.lock().await.len(), 1);
1013-
1014-
// Shift read timestamps past read_ttl
1015-
{
1016-
let mut guard = db.mailboxes.lock().await;
1017-
for (ts, _) in guard.read_order.iter_mut() {
1018-
*ts -= read_ttl + Duration::from_secs(1);
928+
*ts -= ttl + Duration::from_secs(1);
1019929
}
1020930
}
1021931

0 commit comments

Comments
 (0)