Skip to content

Commit 64b83af

Browse files
Unify mailbox TTL to a single value
Replace three separate TTL constants (read: 10 min, unread at capacity: 1 day, unread below capacity: 7 days) with a single DEFAULT_TTL of 24 hours based on creation time only. Removes read_order, read_mailbox_ids, mark_read(), and the read-TTL pruning pass. Capacity eviction remains as a storage pressure mechanism.
1 parent 117ce51 commit 64b83af

1 file changed

Lines changed: 23 additions & 104 deletions

File tree

payjoin-mailroom/src/db/files.rs

Lines changed: 23 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::{HashMap, HashSet, VecDeque};
1+
use std::collections::{HashMap, VecDeque};
22
use std::path::PathBuf;
33
use std::str::FromStr;
44
use std::sync::Arc;
@@ -21,13 +21,11 @@ use crate::db::{Db as DbTrait, Error as DbError};
2121
/// mailboxes/tx, ~4K txs/block, and ~144 blocks/24h.
2222
const DEFAULT_CAPACITY: usize = 1 << (1 + 12 + 8);
2323

24-
const DEFAULT_UNREAD_TTL_AT_CAPACITY: Duration = Duration::from_secs(60 * 60 * 24); // 1 day
25-
const DEFAULT_UNREAD_TTL_BELOW_CAPACITY: Duration = Duration::from_secs(60 * 60 * 24 * 7); // 1 week
26-
27-
/// How long read messages should be kept in mailboxes. Defaults to a 10 minute
28-
/// grace period from first read attempt, in case of intermittent network or
29-
/// relay errors.
30-
const DEFAULT_READ_TTL: Duration = Duration::from_secs(60 * 10); // 10 minutes
24+
/// How long a mailbox is retained before being pruned, regardless of
25+
/// whether it has been read.
26+
/// Matches the default receiver session lifetime
27+
/// (`TWENTY_FOUR_HOURS_DEFAULT_EXPIRATION` in `payjoin::receive::v2`)
28+
const DEFAULT_TTL: Duration = Duration::from_secs(60 * 60 * 24); // 24 hours
3129

3230
#[derive(Debug)]
3331
struct V2WaitMapEntry {
@@ -50,11 +48,7 @@ pub(crate) struct Mailboxes {
5048
pending_v1: HashMap<ShortId, V1WaitMapEntry>,
5149
pending_v2: HashMap<ShortId, V2WaitMapEntry>,
5250
insert_order: VecDeque<(SystemTime, ShortId)>,
53-
read_order: VecDeque<(SystemTime, ShortId)>,
54-
read_mailbox_ids: HashSet<ShortId>,
55-
unread_ttl_below_capacity: Duration,
56-
unread_ttl_at_capacity: Duration,
57-
read_ttl: Duration,
51+
ttl: Duration,
5852
early_removal_count: usize,
5953
}
6054

@@ -209,11 +203,7 @@ impl Mailboxes {
209203
capacity: DEFAULT_CAPACITY,
210204
pending_v1: HashMap::default(),
211205
pending_v2: HashMap::default(),
212-
read_order: VecDeque::default(),
213-
read_mailbox_ids: HashSet::default(),
214-
unread_ttl_below_capacity: DEFAULT_UNREAD_TTL_BELOW_CAPACITY,
215-
unread_ttl_at_capacity: DEFAULT_UNREAD_TTL_AT_CAPACITY,
216-
read_ttl: DEFAULT_READ_TTL,
206+
ttl: DEFAULT_TTL,
217207
early_removal_count: 0,
218208
})
219209
}
@@ -337,19 +327,12 @@ impl Mailboxes {
337327

338328
// V2 requests are stored on disk
339329
if let Some((_created, payload)) = self.persistent_storage.get(id).await? {
340-
self.mark_read(id);
341330
return Ok(Some(Arc::new(payload)));
342331
}
343332

344333
Ok(None)
345334
}
346335

347-
fn mark_read(&mut self, id: &ShortId) {
348-
if self.read_mailbox_ids.insert(*id) {
349-
self.read_order.push_back((SystemTime::now(), *id));
350-
}
351-
}
352-
353336
async fn has_capacity(&mut self) -> io::Result<bool> {
354337
self.maybe_prune().await?;
355338
Ok(self.len() < self.capacity)
@@ -399,12 +382,10 @@ impl Mailboxes {
399382

400383
self.insert_order.push_back((created, *id));
401384

402-
// If there are pending readers, satisfy them and mark the payload as read
385+
// If there are pending readers, satisfy them
403386
if let Some(pending) = self.pending_v2.remove(id) {
404387
trace!("notifying pending readers for {}", id);
405388

406-
self.mark_read(id);
407-
408389
pending
409390
.sender
410391
.send(Arc::new(payload))
@@ -444,7 +425,6 @@ impl Mailboxes {
444425
}
445426

446427
async fn remove(&mut self, id: &ShortId) -> io::Result<Option<()>> {
447-
self.read_mailbox_ids.remove(id);
448428
self.persistent_storage.remove(id).await
449429
}
450430

@@ -479,16 +459,14 @@ impl Mailboxes {
479459
trace!("pruning");
480460
let now = SystemTime::now();
481461

482-
debug_assert!(self.read_ttl < self.unread_ttl_at_capacity);
483-
debug_assert!(self.unread_ttl_at_capacity < self.unread_ttl_below_capacity);
484462
debug_assert!(self.pending_v1.iter().all(|(_, v)| !v.sender.is_closed()));
485463

486464
// Prune in flight requests, these can persist in the case of an incomplete session
487465
self.pending_v2.retain(|_, v| v.receiver.strong_count().unwrap_or(0) > 1);
488466

489-
// Prune any fully expired mailboxes, whether read or unread
467+
// Prune any fully expired mailboxes
490468
while let Some((created, id)) = self.insert_order.front().cloned() {
491-
if created + self.unread_ttl_below_capacity < now {
469+
if created + self.ttl < now {
492470
debug_assert!(self.insert_order.len() >= self.early_removal_count);
493471
_ = self.insert_order.pop_front();
494472
if self.remove(&id).await?.is_none() {
@@ -504,30 +482,15 @@ impl Mailboxes {
504482
}
505483
}
506484

507-
// So long as there expired read mailboxes, prune those. Stop when a
508-
// mailbox within the TTL is encountered.
509-
while let Some((read, id)) = self.read_order.front().cloned() {
510-
if read + self.read_ttl < now {
511-
_ = self.read_order.pop_front();
512-
if self.remove(&id).await?.is_some() {
513-
self.early_removal_count += 1;
514-
debug_assert!(self.insert_order.len() >= self.early_removal_count);
515-
}
516-
trace!("Pruned read mailbox {id}");
517-
} else {
518-
break;
519-
}
520-
}
521-
522-
// If no room was created, try to prune the oldest unread mailbox if
523-
// it's over the minimum TTL
485+
// If no room was created, try to prune the oldest mailbox if
486+
// it's over the TTL
524487
debug_assert!(self.len() <= self.capacity);
525488
if self.len() == self.capacity {
526489
if let Some((created, id)) = self.insert_order.front().cloned() {
527-
if created + self.unread_ttl_at_capacity < now {
490+
if created + self.ttl < now {
528491
_ = self.insert_order.pop_front();
529492
self.remove(&id).await?;
530-
trace!("Pruned unread mailbox {id} to make room");
493+
trace!("Pruned mailbox {id} to make room");
531494
} else {
532495
trace!("Nothing to prune, {} entries remain", self.len());
533496
}
@@ -538,27 +501,14 @@ impl Mailboxes {
538501
}
539502

540503
fn next_prune(&mut self) -> Duration {
541-
let earliest_read_prune_opportunity = self
542-
.read_order
543-
.front()
544-
.map(|(read, _id)| {
545-
self.read_ttl
546-
.checked_sub(read.elapsed().expect("system clock moved back"))
547-
.unwrap_or(self.read_ttl)
548-
})
549-
.unwrap_or_else(|| self.read_ttl);
550-
551-
let earliest_unread_prune_opportunity = self
552-
.insert_order
504+
self.insert_order
553505
.front()
554506
.map(|(created, _id)| {
555-
self.unread_ttl_at_capacity
507+
self.ttl
556508
.checked_sub(created.elapsed().expect("system clock moved back"))
557-
.unwrap_or(self.unread_ttl_at_capacity)
509+
.unwrap_or(self.ttl)
558510
})
559-
.unwrap_or_else(|| self.unread_ttl_at_capacity);
560-
561-
std::cmp::min(earliest_read_prune_opportunity, earliest_unread_prune_opportunity)
511+
.unwrap_or(self.ttl)
562512
}
563513
}
564514

@@ -938,16 +888,12 @@ async fn test_prune() -> std::io::Result<()> {
938888
.await
939889
.expect("initializing mailbox database should succeed");
940890

941-
let read_ttl = Duration::from_secs(60);
942-
let unread_ttl_at_capacity = Duration::from_secs(600);
943-
let unread_ttl_below_capacity = Duration::from_secs(3600);
891+
let ttl = Duration::from_secs(600);
944892

945893
{
946894
let mut guard = db.mailboxes.lock().await;
947895
guard.capacity = 2;
948-
guard.read_ttl = read_ttl;
949-
guard.unread_ttl_at_capacity = unread_ttl_at_capacity;
950-
guard.unread_ttl_below_capacity = unread_ttl_below_capacity;
896+
guard.ttl = ttl;
951897
}
952898

953899
assert_eq!(db.mailboxes.lock().await.len(), 0);
@@ -984,38 +930,11 @@ async fn test_prune() -> std::io::Result<()> {
984930
db.prune().await.expect("pruning should not fail");
985931
assert_eq!(db.mailboxes.lock().await.len(), 1);
986932

987-
// Shift insert timestamps past unread_ttl_below_capacity
933+
// Shift insert timestamps past ttl
988934
{
989935
let mut guard = db.mailboxes.lock().await;
990936
for (ts, _) in guard.insert_order.iter_mut() {
991-
*ts -= unread_ttl_below_capacity + Duration::from_secs(1);
992-
}
993-
}
994-
995-
assert_eq!(db.mailboxes.lock().await.len(), 1);
996-
db.prune().await.expect("pruning should not fail");
997-
assert_eq!(db.mailboxes.lock().await.len(), 0);
998-
999-
// Post again, read it, then verify read TTL pruning
1000-
db.post_v2_payload(&id, contents.to_vec())
1001-
.await
1002-
.expect("posting payload should succeed")
1003-
.expect("contents should be accepted");
1004-
1005-
assert_eq!(db.mailboxes.lock().await.len(), 1);
1006-
1007-
// Mark the mailbox as read
1008-
_ = db.wait_for_v2_payload(&id).await.expect("waiting for payload should succeed");
1009-
1010-
assert_eq!(db.mailboxes.lock().await.len(), 1);
1011-
db.prune().await.expect("pruning should not fail");
1012-
assert_eq!(db.mailboxes.lock().await.len(), 1);
1013-
1014-
// Shift read timestamps past read_ttl
1015-
{
1016-
let mut guard = db.mailboxes.lock().await;
1017-
for (ts, _) in guard.read_order.iter_mut() {
1018-
*ts -= read_ttl + Duration::from_secs(1);
937+
*ts -= ttl + Duration::from_secs(1);
1019938
}
1020939
}
1021940

0 commit comments

Comments
 (0)