Skip to content

Commit f925bfe

Browse files
committed
Persist creation time correctly in redb store
1 parent a6bb107 commit f925bfe

2 files changed

Lines changed: 91 additions & 55 deletions

File tree

crates/hotfix/src/store/redb.rs

Lines changed: 71 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -10,65 +10,81 @@ const SENDER_KEY: &str = "sender";
1010
const TARGET_KEY: &str = "target";
1111
const CREATION_TIME_KEY: &str = "creation_time";
1212

13-
pub struct RedbMessageStore {
14-
db: Database,
13+
struct MetaData {
1514
creation_time: DateTime<Utc>,
1615
sender_seq_number: u64,
1716
target_seq_number: u64,
1817
}
1918

19+
pub struct RedbMessageStore {
20+
db: Database,
21+
meta: MetaData,
22+
}
23+
2024
impl RedbMessageStore {
2125
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
2226
let db = Database::create(path)?;
23-
let sender_seq_number;
24-
let target_seq_number;
25-
let mut stored_creation_timestamp: Option<u64> = None;
2627

27-
{
28-
let read_txn = db.begin_read()?;
29-
match read_txn.open_table(META_TABLE) {
30-
Ok(table) => {
31-
stored_creation_timestamp = table.get(CREATION_TIME_KEY)?.map(|g| g.value());
32-
sender_seq_number = table.get(SENDER_KEY)?.map_or(0, |g| g.value());
33-
target_seq_number = table.get(TARGET_KEY)?.map_or(0, |g| g.value());
34-
}
35-
Err(TableError::TableDoesNotExist(_)) => {
36-
// Tables don't exist yet, initialise to 0
37-
sender_seq_number = 0;
38-
target_seq_number = 0;
39-
}
40-
Err(err) => {
41-
return Err(err.into());
42-
}
43-
};
44-
}
45-
46-
let creation_time = if let Some(creation_timestamp) = stored_creation_timestamp {
47-
if let Some(creation_time) = DateTime::from_timestamp_micros(creation_timestamp as i64)
48-
{
49-
creation_time
50-
} else {
51-
anyhow::bail!("Invalid creation timestamp")
52-
}
28+
let meta = if let Some(stored_metadata) = Self::read_meta_data(&db)? {
29+
stored_metadata
5330
} else {
54-
let creation_time = Utc::now();
31+
let creation_timestamp = Utc::now().timestamp_micros() as u64;
32+
let sender_seq_number = 0;
33+
let target_seq_number = 0;
5534

5635
// if we have just set the creation time, we need to write it to redb
57-
let write_txn = &db.begin_write()?;
36+
let write_txn = db.begin_write()?;
5837
{
59-
let mut seq_no_table = write_txn.open_table(META_TABLE)?;
60-
let creation_timestamp = creation_time.timestamp_micros() as u64;
61-
seq_no_table.insert(CREATION_TIME_KEY, creation_timestamp)?;
38+
let mut meta_table = write_txn.open_table(META_TABLE)?;
39+
meta_table.insert(CREATION_TIME_KEY, creation_timestamp)?;
40+
meta_table.insert(SENDER_KEY, sender_seq_number)?;
41+
meta_table.insert(TARGET_KEY, target_seq_number)?;
42+
}
43+
write_txn.commit()?;
44+
45+
Self::read_meta_data(&db)?.unwrap()
46+
};
47+
48+
Ok(Self { db, meta })
49+
}
50+
51+
fn read_meta_data(db: &Database) -> Result<Option<MetaData>> {
52+
let read_txn = db.begin_read()?;
53+
let metadata = match read_txn.open_table(META_TABLE) {
54+
Ok(table) => {
55+
let creation_time = if let Some(v) = table.get(CREATION_TIME_KEY)? {
56+
if let Some(ts) = DateTime::from_timestamp_micros(v.value() as i64) {
57+
ts
58+
} else {
59+
anyhow::bail!("invalid creation timestamp found")
60+
}
61+
} else {
62+
anyhow::bail!("no creation timestamp found")
63+
};
64+
let sender_seq_number = if let Some(v) = table.get(SENDER_KEY)? {
65+
v.value()
66+
} else {
67+
anyhow::bail!("no sender seq number found")
68+
};
69+
let target_seq_number = if let Some(v) = table.get(TARGET_KEY)? {
70+
v.value()
71+
} else {
72+
anyhow::bail!("no target seq number found")
73+
};
74+
75+
Some(MetaData {
76+
creation_time,
77+
sender_seq_number,
78+
target_seq_number,
79+
})
80+
}
81+
Err(TableError::TableDoesNotExist(_)) => None,
82+
Err(err) => {
83+
return Err(err.into());
6284
}
63-
creation_time
6485
};
6586

66-
Ok(Self {
67-
db,
68-
creation_time,
69-
sender_seq_number,
70-
target_seq_number,
71-
})
87+
Ok(metadata)
7288
}
7389
}
7490

@@ -103,37 +119,37 @@ impl MessageStore for RedbMessageStore {
103119
}
104120

105121
fn next_sender_seq_number(&self) -> u64 {
106-
self.sender_seq_number + 1
122+
self.meta.sender_seq_number + 1
107123
}
108124

109125
fn next_target_seq_number(&self) -> u64 {
110-
self.target_seq_number + 1
126+
self.meta.target_seq_number + 1
111127
}
112128

113129
async fn increment_sender_seq_number(&mut self) -> Result<()> {
114-
self.sender_seq_number += 1;
130+
self.meta.sender_seq_number += 1;
115131
let write_txn = self.db.begin_write()?;
116132
{
117133
let mut table = write_txn.open_table(META_TABLE)?;
118-
table.insert(SENDER_KEY, self.sender_seq_number)?;
134+
table.insert(SENDER_KEY, self.meta.sender_seq_number)?;
119135
}
120136
write_txn.commit()?;
121137
Ok(())
122138
}
123139

124140
async fn increment_target_seq_number(&mut self) -> Result<()> {
125-
self.target_seq_number += 1;
141+
self.meta.target_seq_number += 1;
126142
let write_txn = self.db.begin_write()?;
127143
{
128144
let mut table = write_txn.open_table(META_TABLE)?;
129-
table.insert(TARGET_KEY, self.target_seq_number)?;
145+
table.insert(TARGET_KEY, self.meta.target_seq_number)?;
130146
}
131147
write_txn.commit()?;
132148
Ok(())
133149
}
134150

135151
async fn set_target_seq_number(&mut self, seq_number: u64) -> Result<()> {
136-
self.target_seq_number = seq_number;
152+
self.meta.target_seq_number = seq_number;
137153
let write_txn = self.db.begin_write()?;
138154
{
139155
let mut table = write_txn.open_table(META_TABLE)?;
@@ -144,13 +160,13 @@ impl MessageStore for RedbMessageStore {
144160
}
145161

146162
async fn reset(&mut self) -> Result<()> {
147-
self.sender_seq_number = 0;
148-
self.target_seq_number = 0;
163+
self.meta.sender_seq_number = 0;
164+
self.meta.target_seq_number = 0;
149165
let write_txn = self.db.begin_write()?;
150166
{
151167
let mut seq_no_table = write_txn.open_table(META_TABLE)?;
152-
seq_no_table.insert(SENDER_KEY, self.sender_seq_number)?;
153-
seq_no_table.insert(TARGET_KEY, self.target_seq_number)?;
168+
seq_no_table.insert(SENDER_KEY, self.meta.sender_seq_number)?;
169+
seq_no_table.insert(TARGET_KEY, self.meta.target_seq_number)?;
154170
let mut messages_table = write_txn.open_table(MESSAGES_TABLE)?;
155171
messages_table.drain::<u64>(..)?;
156172
}
@@ -159,6 +175,6 @@ impl MessageStore for RedbMessageStore {
159175
}
160176

161177
fn creation_time(&self) -> DateTime<Utc> {
162-
self.creation_time
178+
self.meta.creation_time
163179
}
164180
}

crates/hotfix/tests/store_tests.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,26 @@ async fn test_creation_time_is_set() {
283283
}
284284
}
285285

286+
#[tokio::test]
287+
async fn test_creation_time_is_preserved() {
288+
for factory in create_test_store_factories().await {
289+
if !factory.is_persistent() {
290+
continue;
291+
}
292+
293+
let store = factory.create_store().await;
294+
let creation_time1 = store.creation_time();
295+
drop(store);
296+
297+
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
298+
299+
let store = factory.create_store().await;
300+
let creation_time2 = store.creation_time();
301+
302+
assert_eq!(creation_time1, creation_time2);
303+
}
304+
}
305+
286306
#[async_trait::async_trait]
287307
pub trait TestStoreFactory {
288308
async fn create_store(&self) -> Box<dyn MessageStore>;

0 commit comments

Comments
 (0)