Skip to content

Commit 55cbcdf

Browse files
committed
Address regression where messages are not drained on reset
1 parent b517f20 commit 55cbcdf

1 file changed

Lines changed: 62 additions & 62 deletions

File tree

crates/hotfix/src/store/redb.rs

Lines changed: 62 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::store::MessageStore;
22
use anyhow::{bail, Result};
33
use chrono::{DateTime, Utc};
4-
use redb::{Database, ReadableTable, TableDefinition, TableError};
4+
use redb::{Database, ReadOnlyTable, ReadableTable, TableDefinition, TableError};
55
use std::path::Path;
66

77
const MESSAGES_TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("messages");
@@ -25,17 +25,18 @@ impl RedbMessageStore {
2525
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
2626
let db = Database::create(path)?;
2727

28-
let meta = if let Some(stored_metadata) = Self::read_meta_data(&db)? {
28+
let meta = if let Some(stored_metadata) = Self::load_metadata(&db)? {
2929
stored_metadata
3030
} else {
31-
Self::persist_default_meta_data(&db)?;
32-
Self::read_meta_data(&db)?.unwrap()
31+
Self::persist_default_metadata(&db)?;
32+
Self::load_metadata(&db)?
33+
.ok_or_else(|| anyhow::anyhow!("failed to read metadata after initialization"))?
3334
};
3435

3536
Ok(Self { db, meta })
3637
}
3738

38-
fn persist_default_meta_data(db: &Database) -> Result<()> {
39+
fn persist_default_metadata(db: &Database) -> Result<()> {
3940
let creation_timestamp = Utc::now().timestamp_micros() as u64;
4041
let sender_seq_number = 0;
4142
let target_seq_number = 0;
@@ -47,34 +48,23 @@ impl RedbMessageStore {
4748
meta_table.insert(CREATION_TIME_KEY, creation_timestamp)?;
4849
meta_table.insert(SENDER_KEY, sender_seq_number)?;
4950
meta_table.insert(TARGET_KEY, target_seq_number)?;
51+
let mut messages_table = write_txn.open_table(MESSAGES_TABLE)?;
52+
messages_table.drain::<u64>(..)?;
5053
}
5154
write_txn.commit()?;
5255
Ok(())
5356
}
5457

55-
fn read_meta_data(db: &Database) -> Result<Option<MetaData>> {
58+
fn load_metadata(db: &Database) -> Result<Option<MetaData>> {
5659
let read_txn = db.begin_read()?;
5760
let metadata = match read_txn.open_table(META_TABLE) {
5861
Ok(table) => {
59-
let creation_time = if let Some(v) = table.get(CREATION_TIME_KEY)? {
60-
if let Some(ts) = DateTime::from_timestamp_micros(v.value() as i64) {
61-
ts
62-
} else {
63-
bail!("invalid creation timestamp found")
64-
}
65-
} else {
66-
bail!("no creation timestamp found")
67-
};
68-
let sender_seq_number = if let Some(v) = table.get(SENDER_KEY)? {
69-
v.value()
70-
} else {
71-
bail!("no sender seq number found")
72-
};
73-
let target_seq_number = if let Some(v) = table.get(TARGET_KEY)? {
74-
v.value()
75-
} else {
76-
bail!("no target seq number found")
77-
};
62+
let creation_time = Self::parse_timestamp(Self::read_required_meta_field(
63+
&table,
64+
CREATION_TIME_KEY,
65+
)?)?;
66+
let sender_seq_number = Self::read_required_meta_field(&table, SENDER_KEY)?;
67+
let target_seq_number = Self::read_required_meta_field(&table, TARGET_KEY)?;
7868

7969
Some(MetaData {
8070
creation_time,
@@ -90,6 +80,28 @@ impl RedbMessageStore {
9080

9181
Ok(metadata)
9282
}
83+
84+
fn read_required_meta_field(table: &ReadOnlyTable<&str, u64>, key: &str) -> Result<u64> {
85+
table
86+
.get(key)?
87+
.map(|v| v.value())
88+
.ok_or_else(|| anyhow::anyhow!("missing required metadata field: {key}"))
89+
}
90+
91+
fn parse_timestamp(timestamp: u64) -> Result<DateTime<Utc>> {
92+
DateTime::from_timestamp_micros(timestamp as i64)
93+
.ok_or_else(|| anyhow::anyhow!("invalid timestamp: {timestamp}"))
94+
}
95+
96+
async fn update_sequence_number(&mut self, key: &str, value: u64) -> Result<()> {
97+
let write_txn = self.db.begin_write()?;
98+
{
99+
let mut table = write_txn.open_table(META_TABLE)?;
100+
table.insert(key, value)?;
101+
}
102+
write_txn.commit()?;
103+
Ok(())
104+
}
93105
}
94106

95107
#[async_trait::async_trait]
@@ -105,21 +117,23 @@ impl MessageStore for RedbMessageStore {
105117
}
106118

107119
async fn get_slice(&self, begin: usize, end: usize) -> Result<Vec<Vec<u8>>> {
108-
let read_txn = self.db.begin_read()?;
109-
{
110-
let res = match read_txn.open_table(MESSAGES_TABLE) {
111-
Ok(table) => {
112-
let messages: std::result::Result<Vec<Vec<u8>>, redb::StorageError> = table
113-
.range(begin as u64..=end as u64)?
114-
.map(|m| m.map(|v| v.1.value().to_vec()))
115-
.collect();
116-
Ok(messages?)
117-
}
118-
Err(TableError::TableDoesNotExist(_)) => Ok(vec![]),
119-
Err(err) => Err(err.into()),
120-
};
121-
res
120+
if begin > end {
121+
return Ok(vec![]);
122122
}
123+
124+
let read_txn = self.db.begin_read()?;
125+
let res = match read_txn.open_table(MESSAGES_TABLE) {
126+
Ok(table) => {
127+
let messages: std::result::Result<Vec<Vec<u8>>, redb::StorageError> = table
128+
.range(begin as u64..=end as u64)?
129+
.map(|m| m.map(|v| v.1.value().to_vec()))
130+
.collect();
131+
Ok(messages?)
132+
}
133+
Err(TableError::TableDoesNotExist(_)) => Ok(vec![]),
134+
Err(err) => Err(err.into()),
135+
};
136+
res
123137
}
124138

125139
fn next_sender_seq_number(&self) -> u64 {
@@ -131,41 +145,27 @@ impl MessageStore for RedbMessageStore {
131145
}
132146

133147
async fn increment_sender_seq_number(&mut self) -> Result<()> {
134-
self.meta.sender_seq_number += 1;
135-
let write_txn = self.db.begin_write()?;
136-
{
137-
let mut table = write_txn.open_table(META_TABLE)?;
138-
table.insert(SENDER_KEY, self.meta.sender_seq_number)?;
139-
}
140-
write_txn.commit()?;
148+
let sender_seq_number = self.meta.sender_seq_number + 1;
149+
self.update_sequence_number(SENDER_KEY, sender_seq_number)
150+
.await?;
151+
self.meta.sender_seq_number = sender_seq_number;
141152
Ok(())
142153
}
143154

144155
async fn increment_target_seq_number(&mut self) -> Result<()> {
145-
self.meta.target_seq_number += 1;
146-
let write_txn = self.db.begin_write()?;
147-
{
148-
let mut table = write_txn.open_table(META_TABLE)?;
149-
table.insert(TARGET_KEY, self.meta.target_seq_number)?;
150-
}
151-
write_txn.commit()?;
152-
Ok(())
156+
self.set_target_seq_number(self.meta.target_seq_number + 1)
157+
.await
153158
}
154159

155160
async fn set_target_seq_number(&mut self, seq_number: u64) -> Result<()> {
161+
self.update_sequence_number(TARGET_KEY, seq_number).await?;
156162
self.meta.target_seq_number = seq_number;
157-
let write_txn = self.db.begin_write()?;
158-
{
159-
let mut table = write_txn.open_table(META_TABLE)?;
160-
table.insert(TARGET_KEY, seq_number)?;
161-
}
162-
write_txn.commit()?;
163163
Ok(())
164164
}
165165

166166
async fn reset(&mut self) -> Result<()> {
167-
Self::persist_default_meta_data(&self.db)?;
168-
if let Some(meta) = Self::read_meta_data(&self.db)? {
167+
Self::persist_default_metadata(&self.db)?;
168+
if let Some(meta) = Self::load_metadata(&self.db)? {
169169
self.meta = meta;
170170
Ok(())
171171
} else {

0 commit comments

Comments
 (0)