Skip to content

Commit b517f20

Browse files
committed
Persist creation time when reset happens in redb store
1 parent f925bfe commit b517f20

2 files changed

Lines changed: 51 additions & 30 deletions

File tree

crates/hotfix/src/store/redb.rs

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::store::MessageStore;
2-
use anyhow::Result;
2+
use anyhow::{bail, Result};
33
use chrono::{DateTime, Utc};
44
use redb::{Database, ReadableTable, TableDefinition, TableError};
55
use std::path::Path;
@@ -28,26 +28,30 @@ impl RedbMessageStore {
2828
let meta = if let Some(stored_metadata) = Self::read_meta_data(&db)? {
2929
stored_metadata
3030
} else {
31-
let creation_timestamp = Utc::now().timestamp_micros() as u64;
32-
let sender_seq_number = 0;
33-
let target_seq_number = 0;
34-
35-
// if we have just set the creation time, we need to write it to redb
36-
let write_txn = db.begin_write()?;
37-
{
38-
let mut meta_table = write_txn.open_table(META_TABLE)?;
39-
meta_table.insert(CREATION_TIME_KEY, creation_timestamp)?;
40-
meta_table.insert(SENDER_KEY, sender_seq_number)?;
41-
meta_table.insert(TARGET_KEY, target_seq_number)?;
42-
}
43-
write_txn.commit()?;
44-
31+
Self::persist_default_meta_data(&db)?;
4532
Self::read_meta_data(&db)?.unwrap()
4633
};
4734

4835
Ok(Self { db, meta })
4936
}
5037

38+
fn persist_default_meta_data(db: &Database) -> Result<()> {
39+
let creation_timestamp = Utc::now().timestamp_micros() as u64;
40+
let sender_seq_number = 0;
41+
let target_seq_number = 0;
42+
43+
// if we have just set the creation time, we need to write it to redb
44+
let write_txn = db.begin_write()?;
45+
{
46+
let mut meta_table = write_txn.open_table(META_TABLE)?;
47+
meta_table.insert(CREATION_TIME_KEY, creation_timestamp)?;
48+
meta_table.insert(SENDER_KEY, sender_seq_number)?;
49+
meta_table.insert(TARGET_KEY, target_seq_number)?;
50+
}
51+
write_txn.commit()?;
52+
Ok(())
53+
}
54+
5155
fn read_meta_data(db: &Database) -> Result<Option<MetaData>> {
5256
let read_txn = db.begin_read()?;
5357
let metadata = match read_txn.open_table(META_TABLE) {
@@ -56,20 +60,20 @@ impl RedbMessageStore {
5660
if let Some(ts) = DateTime::from_timestamp_micros(v.value() as i64) {
5761
ts
5862
} else {
59-
anyhow::bail!("invalid creation timestamp found")
63+
bail!("invalid creation timestamp found")
6064
}
6165
} else {
62-
anyhow::bail!("no creation timestamp found")
66+
bail!("no creation timestamp found")
6367
};
6468
let sender_seq_number = if let Some(v) = table.get(SENDER_KEY)? {
6569
v.value()
6670
} else {
67-
anyhow::bail!("no sender seq number found")
71+
bail!("no sender seq number found")
6872
};
6973
let target_seq_number = if let Some(v) = table.get(TARGET_KEY)? {
7074
v.value()
7175
} else {
72-
anyhow::bail!("no target seq number found")
76+
bail!("no target seq number found")
7377
};
7478

7579
Some(MetaData {
@@ -160,18 +164,13 @@ impl MessageStore for RedbMessageStore {
160164
}
161165

162166
async fn reset(&mut self) -> Result<()> {
163-
self.meta.sender_seq_number = 0;
164-
self.meta.target_seq_number = 0;
165-
let write_txn = self.db.begin_write()?;
166-
{
167-
let mut seq_no_table = write_txn.open_table(META_TABLE)?;
168-
seq_no_table.insert(SENDER_KEY, self.meta.sender_seq_number)?;
169-
seq_no_table.insert(TARGET_KEY, self.meta.target_seq_number)?;
170-
let mut messages_table = write_txn.open_table(MESSAGES_TABLE)?;
171-
messages_table.drain::<u64>(..)?;
167+
Self::persist_default_meta_data(&self.db)?;
168+
if let Some(meta) = Self::read_meta_data(&self.db)? {
169+
self.meta = meta;
170+
Ok(())
171+
} else {
172+
bail!("meta unexpectedly not found")
172173
}
173-
write_txn.commit()?;
174-
Ok(())
175174
}
176175

177176
fn creation_time(&self) -> DateTime<Utc> {

crates/hotfix/tests/store_tests.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,28 @@ async fn test_creation_time_is_preserved() {
303303
}
304304
}
305305

306+
#[tokio::test]
307+
async fn test_creation_time_gets_reset_correctly() {
308+
for factory in create_test_store_factories().await {
309+
let mut store = factory.create_store().await;
310+
311+
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
312+
let after_sleep = Utc::now();
313+
314+
store.reset().await.expect("failed to reset store");
315+
let reset_creation_time = store.creation_time();
316+
assert!(reset_creation_time >= after_sleep);
317+
318+
if !factory.is_persistent() {
319+
continue;
320+
}
321+
322+
drop(store);
323+
let store = factory.create_store().await;
324+
assert_eq!(reset_creation_time, store.creation_time());
325+
}
326+
}
327+
306328
#[async_trait::async_trait]
307329
pub trait TestStoreFactory {
308330
async fn create_store(&self) -> Box<dyn MessageStore>;

0 commit comments

Comments
 (0)