diff --git a/crates/file_store/src/lib.rs b/crates/file_store/src/lib.rs index 7c943ca20..9c2cef09c 100644 --- a/crates/file_store/src/lib.rs +++ b/crates/file_store/src/lib.rs @@ -40,3 +40,51 @@ impl From for FileError { } impl std::error::Error for FileError {} + +/// An error while opening or creating the file store +#[derive(Debug)] +pub enum StoreError { + /// Entry iter error + EntryIter { + /// Index that caused the error + index: usize, + /// Iter error + iter: IterError, + /// Amount of bytes read so far + bytes_read: u64, + }, + /// IO error, this may mean that the file is too short. + Io(io::Error), + /// Magic bytes do not match what is expected. + InvalidMagicBytes { got: Vec, expected: Vec }, +} + +impl core::fmt::Display for StoreError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::EntryIter { + index, + iter, + bytes_read, + } => write!( + f, + "{}: changeset index={}, bytes read={}", + iter, index, bytes_read + ), + Self::Io(e) => write!(f, "io error trying to read file: {}", e), + Self::InvalidMagicBytes { got, expected } => write!( + f, + "file has invalid magic bytes: expected={:?} got={:?}", + expected, got, + ), + } + } +} + +impl std::error::Error for StoreError {} + +impl From for StoreError { + fn from(value: io::Error) -> Self { + Self::Io(value) + } +} diff --git a/crates/file_store/src/store.rs b/crates/file_store/src/store.rs index 62c3d91b6..be92b14ed 100644 --- a/crates/file_store/src/store.rs +++ b/crates/file_store/src/store.rs @@ -1,4 +1,4 @@ -use crate::{bincode_options, EntryIter, FileError, IterError}; +use crate::{bincode_options, EntryIter, FileError, IterError, StoreError}; use bdk_chain::Merge; use bincode::Options; use std::{ @@ -92,6 +92,62 @@ where }) } + /// Open an existing [`Store`], read its content, and return it ready to start receiving new + /// changesets. + /// + /// Use [`create_new`] to create a new `Store`. + /// + /// # Errors + /// + /// If the prefixed bytes of the opened file does not match the provided `magic`, the + /// [`StoreError::InvalidMagicBytes`] error variant will be returned. + /// + /// If there is an error while decoding the changesets stored, the [`StoreError::EntryIter`] + /// error variant will be returned, with the index of the failing changeset and the error it + /// caused. + /// + /// [`create_new`]: Store::create_new + pub fn reopen

(magic: &[u8], file_path: P) -> Result + where + P: AsRef, + { + let mut f = OpenOptions::new().read(true).write(true).open(file_path)?; + + let mut magic_buf = vec![0_u8; magic.len()]; + f.read_exact(&mut magic_buf)?; + if magic_buf != magic { + return Err(StoreError::InvalidMagicBytes { + got: magic_buf, + expected: magic.to_vec(), + }); + } + + let mut store = Self { + magic_len: magic.len(), + db_file: f, + marker: Default::default(), + }; + + let mut index: usize = 0; + let mut error: Option = None; + for (idx, next_changeset) in store.iter_changesets().enumerate() { + if let Err(iter_error) = next_changeset { + index = idx; + error = Some(iter_error); + }; + } + + if let Some(iter) = error { + return Err(StoreError::EntryIter { + index, + iter, + bytes_read: store.db_file.stream_position()?, + }); + } + + Ok(store) + } + /// Attempt to open existing [`Store`] file; create it if the file is non-existent. /// /// Internally, this calls either [`open`] or [`create_new`]. @@ -438,4 +494,171 @@ mod test { assert_eq!(aggregation, exp_aggregation); } } + + #[test] + fn reopen_recovers_state_after_last_write() { + let temp_dir = tempfile::tempdir().unwrap(); + let file_path = temp_dir.path().join("db_file"); + + let changeset1 = TestChangeSet::from(["1".into(), "2".into(), "3".into()]); + let changeset2 = TestChangeSet::from(["4".into(), "5".into(), "6".into()]); + + { + // create new db + let mut db = Store::::create_new(&TEST_MAGIC_BYTES, &file_path) + .expect("must create"); + + // append first changeset to db + db.append_changeset(&changeset1).expect("must succeed"); + } + + { + // open db + let mut db = Store::::reopen(&TEST_MAGIC_BYTES, &file_path) + .expect("failed to load db"); + + // now append the second changeset + db.append_changeset(&changeset2).expect("must succeed"); + + // Retrieve stored changesets from the database + let stored_changesets = db + .aggregate_changesets() + .expect("must succeed") + .expect("must succeed"); + + // expected changeset must be changeset2 + changeset1 + let mut expected_changeset = changeset2.clone(); + expected_changeset.extend(changeset1); + + // Assert that stored_changesets matches expected_changeset but not changeset2 + assert_eq!(stored_changesets, expected_changeset); + assert_ne!(stored_changesets, changeset2); + } + + // Open the store again to verify file pointer position at the EOF + let mut db = Store::::reopen(&TEST_MAGIC_BYTES, &file_path) + .expect("failed to load db"); + + // get the current position of file pointer just after loading store + let current_pointer = db.db_file.stream_position().expect("must suceed"); + + // get pointer to last position for the loaded db + let expected_pointer = db.db_file.seek(io::SeekFrom::End(0)).expect("must succeed"); + + // both should match + assert_eq!(current_pointer, expected_pointer); + } + + #[test] + fn fail_to_reopen_if_write_is_short() { + let temp_dir = tempfile::tempdir().unwrap(); + + let changesets = [ + TestChangeSet::from(["1".into()]), + TestChangeSet::from(["2".into(), "3".into()]), + TestChangeSet::from(["4".into(), "5".into(), "6".into()]), + ]; + let last_changeset = TestChangeSet::from(["7".into(), "8".into(), "9".into()]); + let last_changeset_bytes = bincode_options().serialize(&last_changeset).unwrap(); + + for short_write_len in 1..last_changeset_bytes.len() - 1 { + let file_path = temp_dir.path().join(format!("{}.dat", short_write_len)); + + // simulate creating a file, writing data where the last write is incomplete + { + let mut db = + Store::::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap(); + for changeset in &changesets { + db.append_changeset(changeset).unwrap(); + } + // this is the incomplete write + db.db_file + .write_all(&last_changeset_bytes[..short_write_len]) + .unwrap(); + } + + // Reopen file and fail, but recover once file is truncated to valid bytes + { + match Store::::reopen(&TEST_MAGIC_BYTES, &file_path) { + Err(StoreError::EntryIter { + index, bytes_read, .. + }) => { + // Open file again and truncate file to valid content + let mut f = OpenOptions::new() + .read(true) + .write(true) + .open(&file_path) + .expect("should open"); + f.set_len(bytes_read) + .expect("should truncate the file length to bytes_read"); + f.seek(io::SeekFrom::End(0)) + .expect("should position the file pointer to the new EOF"); + + // Once file is truncated reopen file again + let mut db = Store::::reopen(&TEST_MAGIC_BYTES, &file_path) + .expect("should not fail now"); + let exp_aggregation = db + .iter_changesets() + .take(index) + .map(|r| r.expect("must read valid changeset")) + .fold(TestChangeSet::default(), |mut acc, v| { + Merge::merge(&mut acc, v); + acc + }); + + assert_eq!( + exp_aggregation, + changesets + .iter() + .cloned() + .reduce(|mut acc, cs| { + Merge::merge(&mut acc, cs); + acc + }) + .expect("should merge normally"), + "should recover all changesets that are written in full", + ); + + db.db_file.write_all(&last_changeset_bytes).unwrap(); + } + _ => panic!("reopen must fail to read"), + } + } + + // load file again - this time we should successfully read all changesets + { + let mut db = Store::::reopen(&TEST_MAGIC_BYTES, &file_path).unwrap(); + let aggregated_changesets = db + .aggregate_changesets() + .expect("aggregating all changesets should succeed"); + assert_eq!( + aggregated_changesets, + changesets + .iter() + .cloned() + .chain(core::iter::once(last_changeset.clone())) + .reduce(|mut acc, cs| { + Merge::merge(&mut acc, cs); + acc + }), + "should recover all changesets", + ); + } + } + } + + #[test] + #[should_panic( + expected = "Byte 255 is treated as an extension point; it should not be encoding anything." + )] + fn reopen_fails_to_read_if_invalid_bytes() { + // initial data to write to file (magic bytes + invalid data) + let mut data = [255_u8; 2000]; + data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES); + + let mut file = NamedTempFile::new().unwrap(); + file.write_all(&data).expect("should write"); + + Store::::reopen(&TEST_MAGIC_BYTES, file.path()).unwrap(); + } }