Skip to content

Commit b2a4785

Browse files
authored
feat: adding wal crash recovery optimizations (#1)
1 parent f58a06f commit b2a4785

3 files changed

Lines changed: 258 additions & 9 deletions

File tree

src/main.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,16 @@ async fn main() -> anyhow::Result<()> {
233233
let map_fn = Arc::new(vindex::JsonKeysMapFn::new(&args.vindex_key_field));
234234

235235
let vi = if let Some(wal_path) = &args.vindex_wal_path {
236-
tracing::info!("Vindex WAL path: {}", wal_path);
237-
vindex::VerifiableIndex::with_wal(map_fn, wal_path)?
236+
// Get the integrated_size from the database for WAL validation
237+
// This ensures we truncate the WAL to match the database state after a crash
238+
let log_state = db.get_log_state().await?;
239+
let expected_tree_size = log_state.integrated_size.value();
240+
tracing::info!(
241+
"Vindex WAL path: {}, expected tree size from DB: {}",
242+
wal_path,
243+
expected_tree_size
244+
);
245+
vindex::VerifiableIndex::with_wal(map_fn, wal_path, expected_tree_size)?
238246
} else {
239247
vindex::VerifiableIndex::new(map_fn)
240248
};

src/vindex/mod.rs

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ use crate::error::Result;
2121
use crate::types::LogIndex;
2222
pub use prefix_tree::{LookupProof, PrefixTree, ProofNode};
2323
use sha2::{Digest, Sha256};
24-
use std::collections::HashMap;
24+
use std::collections::{HashMap, HashSet};
2525
use std::io::Write as _;
2626
use std::path::Path;
2727
use std::sync::{Arc, RwLock};
28-
pub use wal::{WalReader, WalWriter};
28+
pub use wal::{validate_and_truncate_wal, WalReader, WalWriter};
2929

3030
/// A 32-byte SHA256 hash used as a key in the index.
3131
pub type IndexKey = [u8; 32];
@@ -145,21 +145,49 @@ impl VerifiableIndex {
145145

146146
/// Create a new verifiable index with WAL persistence.
147147
///
148-
/// If the WAL file exists, it will be replayed to rebuild the index.
149-
pub fn with_wal(map_fn: Arc<dyn MapFn>, wal_path: impl AsRef<Path>) -> Result<Self> {
148+
/// If the WAL file exists, it will be validated against the expected tree size
149+
/// and truncated if necessary (to handle crash recovery), then replayed to
150+
/// rebuild the index.
151+
///
152+
/// # Arguments
153+
/// * `map_fn` - The function to extract keys from entry data
154+
/// * `wal_path` - Path to the WAL file
155+
/// * `expected_tree_size` - The integrated tree size from the database; used to
156+
/// truncate the WAL if it ran ahead before a crash
157+
pub fn with_wal(
158+
map_fn: Arc<dyn MapFn>,
159+
wal_path: impl AsRef<Path>,
160+
expected_tree_size: u64,
161+
) -> Result<Self> {
150162
let wal_path = wal_path.as_ref();
151163

164+
// Validate and truncate WAL to match expected tree size
165+
// This is critical for crash recovery: if the WAL was flushed but the database
166+
// wasn't updated before a crash, we truncate the WAL to avoid duplicates
167+
let actual_wal_size = validate_and_truncate_wal(wal_path, expected_tree_size)?;
168+
tracing::info!(
169+
"WAL validated: expected_tree_size={}, actual_wal_size={}",
170+
expected_tree_size,
171+
actual_wal_size
172+
);
173+
152174
// Create or open WAL
153175
let mut tree_size = 0u64;
154176
let mut index: HashMap<IndexKey, Vec<LogIndex>> = HashMap::new();
155177
let mut prefix_tree = PrefixTree::new();
156178

179+
// Track seen (idx, key) pairs to prevent duplicates (defense in depth)
180+
let mut seen: HashSet<(u64, IndexKey)> = HashSet::new();
181+
157182
// Replay existing WAL if it exists
158183
if wal_path.exists() {
159184
let mut reader = WalReader::open(wal_path)?;
160185
while let Some((idx, keys)) = reader.next_entry()? {
161186
for key in keys {
162-
index.entry(key).or_default().push(idx);
187+
// Deduplicate: only add if we haven't seen this (idx, key) pair
188+
if seen.insert((idx.value(), key)) {
189+
index.entry(key).or_default().push(idx);
190+
}
163191
}
164192
tree_size = tree_size.max(idx.value() + 1);
165193
}
@@ -169,6 +197,12 @@ impl VerifiableIndex {
169197
let value_hash = compute_value_hash(indices);
170198
prefix_tree.insert(key, value_hash);
171199
}
200+
201+
tracing::info!(
202+
"WAL replayed: {} unique keys, tree_size={}",
203+
index.len(),
204+
tree_size
205+
);
172206
}
173207

174208
let wal_writer = WalWriter::open(wal_path)?;

src/vindex/wal.rs

Lines changed: 209 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@
77
//!
88
//! Each line represents one log entry and the keys it maps to.
99
//! Empty lines (entries with no keys) are skipped.
10+
//!
11+
//! On startup, the WAL is validated and truncated to match the expected
12+
//! tree size from the database. This prevents duplicate entries after a crash.
1013
1114
use crate::error::{Error, Result};
1215
use crate::types::LogIndex;
1316
use std::fs::{File, OpenOptions};
14-
use std::io::{BufRead, BufReader, BufWriter, Write};
17+
use std::io::{BufRead, BufReader, BufWriter, Read, Write};
1518
use std::path::Path;
1619

1720
use super::IndexKey;
@@ -60,11 +63,18 @@ impl WalWriter {
6063
Ok(())
6164
}
6265

63-
/// Flush the WAL to disk.
66+
/// Flush the WAL to disk with fsync for durability.
6467
pub fn flush(&mut self) -> Result<()> {
6568
self.writer
6669
.flush()
6770
.map_err(|e| Error::Internal(format!("failed to flush WAL: {}", e)))?;
71+
72+
// fsync to ensure data is persisted to disk (not just OS buffer)
73+
self.writer
74+
.get_ref()
75+
.sync_data()
76+
.map_err(|e| Error::Internal(format!("failed to sync WAL to disk: {}", e)))?;
77+
6878
Ok(())
6979
}
7080
}
@@ -106,6 +116,106 @@ impl WalReader {
106116
}
107117
}
108118

119+
/// Validate and truncate the WAL file to match the expected tree size.
120+
///
121+
/// This function reads the WAL backwards to find the last complete entry,
122+
/// and truncates any entries with index >= expected_tree_size.
123+
/// This is critical for crash recovery: if the WAL was flushed but the database
124+
/// wasn't updated before a crash, we need to truncate the WAL to match the
125+
/// database state to avoid duplicate entries.
126+
///
127+
/// Returns the actual tree size found in the WAL (may be less than expected if WAL is behind).
128+
pub fn validate_and_truncate_wal(path: impl AsRef<Path>, expected_tree_size: u64) -> Result<u64> {
129+
let path = path.as_ref();
130+
131+
if !path.exists() {
132+
return Ok(0);
133+
}
134+
135+
let mut file = OpenOptions::new()
136+
.read(true)
137+
.write(true)
138+
.open(path)
139+
.map_err(|e| Error::Internal(format!("failed to open WAL for validation: {}", e)))?;
140+
141+
let file_size = file
142+
.metadata()
143+
.map_err(|e| Error::Internal(format!("failed to get WAL metadata: {}", e)))?
144+
.len();
145+
146+
if file_size == 0 {
147+
return Ok(0);
148+
}
149+
150+
// Read the entire file to find all entries and their positions
151+
let mut content = String::new();
152+
file.read_to_string(&mut content)
153+
.map_err(|e| Error::Internal(format!("failed to read WAL: {}", e)))?;
154+
155+
// Find all line boundaries and parse entries
156+
let mut last_valid_pos: u64 = 0;
157+
let mut max_valid_idx: Option<u64> = None;
158+
let mut current_pos: u64 = 0;
159+
160+
for line in content.lines() {
161+
let line_len = line.len() as u64 + 1; // +1 for newline
162+
163+
if line.trim().is_empty() {
164+
current_pos += line_len;
165+
continue;
166+
}
167+
168+
match parse_wal_line(line) {
169+
Ok((idx, _)) => {
170+
let idx_val = idx.value();
171+
if expected_tree_size == 0 || idx_val < expected_tree_size {
172+
// This entry is within bounds
173+
last_valid_pos = current_pos + line_len;
174+
max_valid_idx = Some(match max_valid_idx {
175+
Some(prev) => prev.max(idx_val),
176+
None => idx_val,
177+
});
178+
} else {
179+
// Entry is beyond expected tree size - stop here
180+
tracing::warn!(
181+
"WAL entry {} >= expected tree size {}, truncating",
182+
idx_val,
183+
expected_tree_size
184+
);
185+
break;
186+
}
187+
}
188+
Err(e) => {
189+
tracing::warn!(
190+
"Failed to parse WAL line, truncating at position {}: {}",
191+
current_pos,
192+
e
193+
);
194+
break;
195+
}
196+
}
197+
198+
current_pos += line_len;
199+
}
200+
201+
// Truncate file if needed
202+
if last_valid_pos < file_size {
203+
tracing::info!(
204+
"Truncating WAL from {} to {} bytes (removing {} bytes)",
205+
file_size,
206+
last_valid_pos,
207+
file_size - last_valid_pos
208+
);
209+
file.set_len(last_valid_pos)
210+
.map_err(|e| Error::Internal(format!("failed to truncate WAL: {}", e)))?;
211+
file.sync_all()
212+
.map_err(|e| Error::Internal(format!("failed to sync truncated WAL: {}", e)))?;
213+
}
214+
215+
// Return the tree size based on max index found + 1 (since indices are 0-based)
216+
Ok(max_valid_idx.map(|idx| idx + 1).unwrap_or(0))
217+
}
218+
109219
/// Parse a single WAL line.
110220
fn parse_wal_line(line: &str) -> Result<(LogIndex, Vec<IndexKey>)> {
111221
let line = line.trim();
@@ -205,4 +315,101 @@ mod tests {
205315
assert_eq!(idx.value(), 123);
206316
assert_eq!(keys.len(), 0);
207317
}
318+
319+
#[test]
320+
fn test_validate_and_truncate_wal_no_truncation_needed() {
321+
let temp_file = NamedTempFile::new().unwrap();
322+
let path = temp_file.path();
323+
324+
// Write entries 0-4
325+
{
326+
let mut writer = WalWriter::open(path).unwrap();
327+
let key = [1u8; 32];
328+
for i in 0..5 {
329+
writer.append(LogIndex::new(i), &[key]).unwrap();
330+
}
331+
writer.flush().unwrap();
332+
}
333+
334+
// Validate with expected size 5 - no truncation needed
335+
let actual_size = validate_and_truncate_wal(path, 5).unwrap();
336+
assert_eq!(actual_size, 5);
337+
338+
// Verify all entries still present
339+
let mut reader = WalReader::open(path).unwrap();
340+
for i in 0..5 {
341+
let (idx, _) = reader.next_entry().unwrap().unwrap();
342+
assert_eq!(idx.value(), i);
343+
}
344+
assert!(reader.next_entry().unwrap().is_none());
345+
}
346+
347+
#[test]
348+
fn test_validate_and_truncate_wal_truncates_excess() {
349+
let temp_file = NamedTempFile::new().unwrap();
350+
let path = temp_file.path();
351+
352+
// Write entries 0-9
353+
{
354+
let mut writer = WalWriter::open(path).unwrap();
355+
let key = [1u8; 32];
356+
for i in 0..10 {
357+
writer.append(LogIndex::new(i), &[key]).unwrap();
358+
}
359+
writer.flush().unwrap();
360+
}
361+
362+
// Validate with expected size 5 - should truncate entries 5-9
363+
let actual_size = validate_and_truncate_wal(path, 5).unwrap();
364+
assert_eq!(actual_size, 5);
365+
366+
// Verify only entries 0-4 remain
367+
let mut reader = WalReader::open(path).unwrap();
368+
for i in 0..5 {
369+
let (idx, _) = reader.next_entry().unwrap().unwrap();
370+
assert_eq!(idx.value(), i);
371+
}
372+
assert!(reader.next_entry().unwrap().is_none());
373+
}
374+
375+
#[test]
376+
fn test_validate_and_truncate_wal_empty_file() {
377+
let temp_file = NamedTempFile::new().unwrap();
378+
let path = temp_file.path();
379+
380+
// Create empty file
381+
File::create(path).unwrap();
382+
383+
let actual_size = validate_and_truncate_wal(path, 5).unwrap();
384+
assert_eq!(actual_size, 0);
385+
}
386+
387+
#[test]
388+
fn test_validate_and_truncate_wal_nonexistent_file() {
389+
let temp_dir = tempfile::tempdir().unwrap();
390+
let path = temp_dir.path().join("nonexistent.wal");
391+
392+
let actual_size = validate_and_truncate_wal(&path, 5).unwrap();
393+
assert_eq!(actual_size, 0);
394+
}
395+
396+
#[test]
397+
fn test_validate_and_truncate_wal_wal_behind_expected() {
398+
let temp_file = NamedTempFile::new().unwrap();
399+
let path = temp_file.path();
400+
401+
// Write only entries 0-2 (WAL is behind expected)
402+
{
403+
let mut writer = WalWriter::open(path).unwrap();
404+
let key = [1u8; 32];
405+
for i in 0..3 {
406+
writer.append(LogIndex::new(i), &[key]).unwrap();
407+
}
408+
writer.flush().unwrap();
409+
}
410+
411+
// Validate with expected size 10 - WAL is behind, no truncation
412+
let actual_size = validate_and_truncate_wal(path, 10).unwrap();
413+
assert_eq!(actual_size, 3);
414+
}
208415
}

0 commit comments

Comments
 (0)