diff --git a/README.md b/README.md index c6c212f1f..f78350a97 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ The project is split up into several crates in the `/crates` directory: - [`wallet`](./crates/wallet): Contains the central high level `Wallet` type that is built from the low-level mechanisms provided by the other components - [`chain`](./crates/chain): Tools for storing and indexing chain data - [`persist`](./crates/persist): Types that define data persistence of a BDK wallet -- [`file_store`](./crates/file_store): A (experimental) persistence backend for storing chain data in a single file. +- [`file_store`](./crates/file_store): Persistence backend for storing chain data in a single file. Intended for testing and development purposes, not for production. - [`esplora`](./crates/esplora): Extends the [`esplora-client`] crate with methods to fetch chain data from an esplora HTTP server in the form that [`bdk_chain`] and `Wallet` can consume. - [`electrum`](./crates/electrum): Extends the [`electrum-client`] crate with methods to fetch chain data from an electrum server in the form that [`bdk_chain`] and `Wallet` can consume. diff --git a/crates/file_store/src/entry_iter.rs b/crates/file_store/src/entry_iter.rs index ad34c77de..8b284f181 100644 --- a/crates/file_store/src/entry_iter.rs +++ b/crates/file_store/src/entry_iter.rs @@ -1,3 +1,4 @@ +use crate::StoreError; use bincode::Options; use std::{ fs::File, @@ -37,7 +38,7 @@ impl Iterator for EntryIter<'_, T> where T: serde::de::DeserializeOwned, { - type Item = Result; + type Item = Result; fn next(&mut self) -> Option { if self.finished { @@ -63,7 +64,7 @@ where } } self.db_file.seek(io::SeekFrom::Start(pos_before_read))?; - Err(IterError::Bincode(*e)) + Err(StoreError::Bincode(*e)) } } })() @@ -80,29 +81,3 @@ impl Drop for EntryIter<'_, T> { } } } - -/// Error type for [`EntryIter`]. -#[derive(Debug)] -pub enum IterError { - /// Failure to read from the file. - Io(io::Error), - /// Failure to decode data from the file. - Bincode(bincode::ErrorKind), -} - -impl core::fmt::Display for IterError { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - match self { - IterError::Io(e) => write!(f, "io error trying to read entry {}", e), - IterError::Bincode(e) => write!(f, "bincode error while reading entry {}", e), - } - } -} - -impl From for IterError { - fn from(value: io::Error) -> Self { - IterError::Io(value) - } -} - -impl std::error::Error for IterError {} diff --git a/crates/file_store/src/lib.rs b/crates/file_store/src/lib.rs index 7c943ca20..8703b1a4d 100644 --- a/crates/file_store/src/lib.rs +++ b/crates/file_store/src/lib.rs @@ -13,14 +13,16 @@ pub(crate) fn bincode_options() -> impl bincode::Options { /// Error that occurs due to problems encountered with the file. #[derive(Debug)] -pub enum FileError { +pub enum StoreError { /// IO error, this may mean that the file is too short. Io(io::Error), /// Magic bytes do not match what is expected. InvalidMagicBytes { got: Vec, expected: Vec }, + /// Failure to decode data from the file. + Bincode(bincode::ErrorKind), } -impl core::fmt::Display for FileError { +impl core::fmt::Display for StoreError { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { Self::Io(e) => write!(f, "io error trying to read file: {}", e), @@ -29,14 +31,15 @@ impl core::fmt::Display for FileError { "file has invalid magic bytes: expected={:?} got={:?}", expected, got, ), + Self::Bincode(e) => write!(f, "bincode error while reading entry {}", e), } } } -impl From for FileError { +impl From for StoreError { fn from(value: io::Error) -> Self { Self::Io(value) } } -impl std::error::Error for FileError {} +impl std::error::Error for StoreError {} diff --git a/crates/file_store/src/store.rs b/crates/file_store/src/store.rs index ec86d4e5e..d870e530a 100644 --- a/crates/file_store/src/store.rs +++ b/crates/file_store/src/store.rs @@ -1,10 +1,10 @@ -use crate::{bincode_options, EntryIter, FileError, IterError}; +use crate::{bincode_options, EntryIter, StoreError}; use bdk_core::Merge; use bincode::Options; use std::{ fmt::{self, Debug}, fs::{File, OpenOptions}, - io::{self, Read, Seek, Write}, + io::{self, Read, Write}, marker::PhantomData, path::Path, }; @@ -14,10 +14,7 @@ use std::{ /// > ⚠ This is a development/testing database. It does not natively support backwards compatible /// > BDK version upgrades so should not be used in production. #[derive(Debug)] -pub struct Store -where - C: Sync + Send, -{ +pub struct Store { magic_len: usize, db_file: File, marker: PhantomData, @@ -25,32 +22,20 @@ where impl Store where - C: Merge - + serde::Serialize - + serde::de::DeserializeOwned - + core::marker::Send - + core::marker::Sync, + C: Merge + serde::Serialize + serde::de::DeserializeOwned, { /// Create a new [`Store`] file in write-only mode; error if the file exists. /// - /// `magic` is the prefixed bytes to write to the new file. This will be checked when opening - /// the `Store` in the future with [`open`]. + /// `magic` is the prefixed bytes to write to the new file. This will be checked when loading + /// the [`Store`] in the future with [`load`]. /// - /// [`open`]: Store::open - pub fn create_new

(magic: &[u8], file_path: P) -> Result + /// [`load`]: Store::load + pub fn create

(magic: &[u8], file_path: P) -> Result where P: AsRef, { - if file_path.as_ref().exists() { - // `io::Error` is used instead of a variant on `FileError` because there is already a - // nightly-only `File::create_new` method - return Err(FileError::Io(io::Error::new( - io::ErrorKind::Other, - "file already exists", - ))); - } let mut f = OpenOptions::new() - .create(true) + .create_new(true) .read(true) .write(true) .truncate(true) @@ -63,17 +48,101 @@ where }) } - /// Open an existing [`Store`]. + /// Load an existing [`Store`]. /// - /// Use [`create_new`] to create a new `Store`. + /// Use [`create`] to create a new [`Store`]. /// /// # Errors /// - /// If the prefixed bytes of the opened file does not match the provided `magic`, the - /// [`FileError::InvalidMagicBytes`] error variant will be returned. + /// If the prefixed bytes of the loaded file do not match the provided `magic`, a + /// [`StoreErrorWithDump`] will be returned with the [`StoreError::InvalidMagicBytes`] error variant in + /// its error field and changeset field set to [`Option::None`] + /// + /// If there exist changesets in the file, [`load`] will try to aggregate them in + /// a single changeset to verify their integrity. If aggregation fails + /// [`StoreErrorWithDump`] will be returned with the [`StoreError::Bincode`] error variant in + /// its error field and the aggregated changeset so far in the changeset field. + /// + /// To get a new working file store from this error use [`Store::create`] and [`Store::append`] + /// to add the aggregated changeset obtained from [`StoreErrorWithDump`]. + /// + /// To analyze the causes of the problem in the original database do not recreate the [`Store`] + /// using the same file path. Not changing the file path will overwrite previous file without + /// being able to recover its original data. /// - /// [`create_new`]: Store::create_new - pub fn open

(magic: &[u8], file_path: P) -> Result + /// # Examples + /// ``` + /// use bdk_file_store::{Store, StoreErrorWithDump}; + /// # use std::fs::OpenOptions; + /// # use bdk_core::Merge; + /// # use std::collections::BTreeSet; + /// # use std::io; + /// # use std::io::SeekFrom; + /// # use std::io::{Seek, Write}; + /// # + /// # fn main() -> io::Result<()> { + /// # const MAGIC_BYTES_LEN: usize = 12; + /// # const MAGIC_BYTES: [u8; MAGIC_BYTES_LEN] = + /// # [98, 100, 107, 102, 115, 49, 49, 49, 49, 49, 49, 49]; + /// # + /// # type TestChangeSet = BTreeSet; + /// # + /// # let temp_dir = tempfile::tempdir()?; + /// # let file_path = temp_dir.path().join("db_file"); + /// # let mut store = Store::::create(&MAGIC_BYTES, &file_path).unwrap(); + /// # let changesets = [ + /// # TestChangeSet::from(["1".into()]), + /// # TestChangeSet::from(["2".into(), "3".into()]), + /// # TestChangeSet::from(["4".into(), "5".into(), "6".into()]), + /// # ]; + /// # + /// # for changeset in &changesets[..] { + /// # store.append(changeset)?; + /// # } + /// # + /// # drop(store); + /// # + /// # // Simulate the file is broken + /// # let mut data = [255_u8; 2000]; + /// # data[..MAGIC_BYTES_LEN].copy_from_slice(&MAGIC_BYTES); + /// # let mut file = OpenOptions::new().append(true).open(file_path.clone())?; + /// # let new_len = file.seek(SeekFrom::End(-2))?; + /// # file.set_len(new_len)?; + /// + /// let (mut new_store, _aggregate_changeset) = + /// match Store::::load(&MAGIC_BYTES, &file_path) { + /// # Ok(_) => panic!("should have errored"), + /// Ok((store, changeset)) => (store, changeset), + /// Err(StoreErrorWithDump { changeset, .. }) => { + /// let new_file_path = file_path.with_extension("backup"); + /// let mut new_store = + /// Store::create(&MAGIC_BYTES, &new_file_path).expect("must create new file"); + /// if let Some(aggregated_changeset) = changeset { + /// new_store.append(&aggregated_changeset)?; + /// } + /// // The following will overwrite the original file. You will loose the corrupted + /// // portion of the original file forever. + /// drop(new_store); + /// std::fs::rename(&new_file_path, &file_path)?; + /// Store::load(&MAGIC_BYTES, &file_path).expect("must load new file") + /// } + /// }; + /// # + /// # assert_eq!( + /// # new_store.dump().expect("should dump changeset: {1, 2, 3} "), + /// # changesets[..2].iter().cloned().reduce(|mut acc, cs| { + /// # Merge::merge(&mut acc, cs); + /// # acc + /// # }), + /// # "should recover all changesets", + /// # ); + /// # + /// # Ok(()) + /// # } + /// ``` + /// [`create`]: Store::create + /// [`load`]: Store::load + pub fn load

(magic: &[u8], file_path: P) -> Result<(Self, Option), StoreErrorWithDump> where P: AsRef, { @@ -82,87 +151,92 @@ where let mut magic_buf = vec![0_u8; magic.len()]; f.read_exact(&mut magic_buf)?; if magic_buf != magic { - return Err(FileError::InvalidMagicBytes { - got: magic_buf, - expected: magic.to_vec(), + return Err(StoreErrorWithDump { + changeset: Option::::None, + error: StoreError::InvalidMagicBytes { + got: magic_buf, + expected: magic.to_vec(), + }, }); } - Ok(Self { + let mut store = Self { magic_len: magic.len(), db_file: f, marker: Default::default(), - }) + }; + + // Get aggregated changeset + let aggregated_changeset = store.dump()?; + + Ok((store, aggregated_changeset)) + } + + /// Dump the aggregate of all changesets in [`Store`]. + /// + /// # Errors + /// + /// If there exist changesets in the file, [`dump`] will try to aggregate them in a single + /// changeset. If aggregation fails [`StoreErrorWithDump`] will be returned with the + /// [`StoreError::Bincode`] error variant in its error field and the aggregated changeset so + /// far in the changeset field. + /// + /// [`dump`]: Store::dump + pub fn dump(&mut self) -> Result, StoreErrorWithDump> { + EntryIter::new(self.magic_len as u64, &mut self.db_file).try_fold( + Option::::None, + |mut aggregated_changeset: Option, next_changeset| match next_changeset { + Ok(next_changeset) => { + match &mut aggregated_changeset { + Some(aggregated_changeset) => aggregated_changeset.merge(next_changeset), + aggregated_changeset => *aggregated_changeset = Some(next_changeset), + } + Ok(aggregated_changeset) + } + Err(iter_error) => Err(StoreErrorWithDump { + changeset: aggregated_changeset, + error: iter_error, + }), + }, + ) } - /// Attempt to open existing [`Store`] file; create it if the file is non-existent. + /// Attempt to load existing [`Store`] file; create it if the file does not exist. /// - /// Internally, this calls either [`open`] or [`create_new`]. + /// Internally, this calls either [`load`] or [`create`]. /// - /// [`open`]: Store::open - /// [`create_new`]: Store::create_new - pub fn open_or_create_new

(magic: &[u8], file_path: P) -> Result + /// [`load`]: Store::load + /// [`create`]: Store::create + pub fn load_or_create

( + magic: &[u8], + file_path: P, + ) -> Result<(Self, Option), StoreErrorWithDump> where P: AsRef, { if file_path.as_ref().exists() { - Self::open(magic, file_path) + Self::load(magic, file_path) } else { - Self::create_new(magic, file_path) + Self::create(magic, file_path) + .map(|store| (store, Option::::None)) + .map_err(|err: StoreError| StoreErrorWithDump { + changeset: Option::::None, + error: err, + }) } } - /// Iterates over the stored changeset from first to last, changing the seek position at each - /// iteration. - /// - /// The iterator may fail to read an entry and therefore return an error. However, the first time - /// it returns an error will be the last. After doing so, the iterator will always yield `None`. - /// - /// **WARNING**: This method changes the write position in the underlying file. You should - /// always iterate over all entries until `None` is returned if you want your next write to go - /// at the end; otherwise, you will write over existing entries. - pub fn iter_changesets(&mut self) -> EntryIter { - EntryIter::new(self.magic_len as u64, &mut self.db_file) - } - - /// Loads all the changesets that have been stored as one giant changeset. - /// - /// This function returns the aggregate changeset, or `None` if nothing was persisted. - /// If reading or deserializing any of the entries fails, an error is returned that - /// consists of all those it was able to read. + /// Append a new changeset to the file. Does nothing if the changeset is empty. Truncation is + /// not needed because file pointer is always moved to the end of the last decodable data from + /// beginning to end. /// - /// You should usually check the error. In many applications, it may make sense to do a full - /// wallet scan with a stop-gap after getting an error, since it is likely that one of the - /// changesets was unable to read changes of the derivation indices of a keychain. - /// - /// **WARNING**: This method changes the write position of the underlying file. The next - /// changeset will be written over the erroring entry (or the end of the file if none existed). - pub fn aggregate_changesets(&mut self) -> Result, AggregateChangesetsError> { - let mut changeset = Option::::None; - for next_changeset in self.iter_changesets() { - let next_changeset = match next_changeset { - Ok(next_changeset) => next_changeset, - Err(iter_error) => { - return Err(AggregateChangesetsError { - changeset, - iter_error, - }) - } - }; - match &mut changeset { - Some(changeset) => changeset.merge(next_changeset), - changeset => *changeset = Some(next_changeset), - } - } - Ok(changeset) - } - - /// Append a new changeset to the file and truncate the file to the end of the appended - /// changeset. + /// If multiple garbage writes are produced on the file, the next load will only retrieve the + /// first chunk of valid changesets. /// - /// The truncation is to avoid the possibility of having a valid but inconsistent changeset - /// directly after the appended changeset. - pub fn append_changeset(&mut self, changeset: &C) -> Result<(), io::Error> { + /// If garbage data is written and then valid changesets, the next load will still only + /// retrieve the first chunk of valid changesets. The recovery of those valid changesets after + /// the garbage data is responsibility of the user. + pub fn append(&mut self, changeset: &C) -> Result<(), io::Error> { // no need to write anything if changeset is empty if changeset.is_empty() { return Ok(()); @@ -175,45 +249,46 @@ where unexpected_err => panic!("unexpected bincode error: {}", unexpected_err), })?; - // truncate file after this changeset addition - // if this is not done, data after this changeset may represent valid changesets, however - // applying those changesets on top of this one may result in an inconsistent state - let pos = self.db_file.stream_position()?; - self.db_file.set_len(pos)?; - Ok(()) } } -/// Error type for [`Store::aggregate_changesets`]. +/// Error type for [`Store::dump`]. #[derive(Debug)] -pub struct AggregateChangesetsError { +pub struct StoreErrorWithDump { /// The partially-aggregated changeset. pub changeset: Option, - /// The error returned by [`EntryIter`]. - pub iter_error: IterError, + /// The [`StoreError`] + pub error: StoreError, +} + +impl From for StoreErrorWithDump { + fn from(value: io::Error) -> Self { + Self { + changeset: Option::::None, + error: StoreError::Io(value), + } + } } -impl std::fmt::Display for AggregateChangesetsError { +impl std::fmt::Display for StoreErrorWithDump { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(&self.iter_error, f) + std::fmt::Display::fmt(&self.error, f) } } -impl std::error::Error for AggregateChangesetsError {} +impl std::error::Error for StoreErrorWithDump {} #[cfg(test)] mod test { use super::*; - use bincode::DefaultOptions; use std::{ collections::BTreeSet, - io::{Read, Write}, - vec::Vec, + fs, + io::{Seek, Write}, }; - use tempfile::NamedTempFile; const TEST_MAGIC_BYTES_LEN: usize = 12; const TEST_MAGIC_BYTES: [u8; TEST_MAGIC_BYTES_LEN] = @@ -221,65 +296,50 @@ mod test { type TestChangeSet = BTreeSet; - /// Check behavior of [`Store::create_new`] and [`Store::open`]. + /// Check behavior of [`Store::create`] and [`Store::load`]. #[test] fn construct_store() { let temp_dir = tempfile::tempdir().unwrap(); let file_path = temp_dir.path().join("db_file"); - let _ = Store::::open(&TEST_MAGIC_BYTES, &file_path) + let _ = Store::::load(&TEST_MAGIC_BYTES, &file_path) .expect_err("must not open as file does not exist yet"); - let _ = Store::::create_new(&TEST_MAGIC_BYTES, &file_path) + let _ = Store::::create(&TEST_MAGIC_BYTES, &file_path) .expect("must create file"); // cannot create new as file already exists - let _ = Store::::create_new(&TEST_MAGIC_BYTES, &file_path) + let _ = Store::::create(&TEST_MAGIC_BYTES, &file_path) .expect_err("must fail as file already exists now"); - let _ = Store::::open(&TEST_MAGIC_BYTES, &file_path) + let _ = Store::::load(&TEST_MAGIC_BYTES, &file_path) .expect("must open as file exists now"); } #[test] - fn open_or_create_new() { - let temp_dir = tempfile::tempdir().unwrap(); - let file_path = temp_dir.path().join("db_file"); - let changeset = BTreeSet::from(["hello".to_string(), "world".to_string()]); - - { - let mut db = Store::::open_or_create_new(&TEST_MAGIC_BYTES, &file_path) - .expect("must create"); - assert!(file_path.exists()); - db.append_changeset(&changeset).expect("must succeed"); - } - - { - let mut db = Store::::open_or_create_new(&TEST_MAGIC_BYTES, &file_path) - .expect("must recover"); - let recovered_changeset = db.aggregate_changesets().expect("must succeed"); - assert_eq!(recovered_changeset, Some(changeset)); - } - } - - #[test] - fn new_fails_if_file_is_too_short() { - let mut file = NamedTempFile::new().unwrap(); - file.write_all(&TEST_MAGIC_BYTES[..TEST_MAGIC_BYTES_LEN - 1]) - .expect("should write"); - - match Store::::open(&TEST_MAGIC_BYTES, file.path()) { - Err(FileError::Io(e)) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof), + fn load_fails_if_file_is_too_short() { + let tempdir = tempfile::tempdir().unwrap(); + let file_path = tempdir.path().join("db_file"); + fs::write(&file_path, &TEST_MAGIC_BYTES[..TEST_MAGIC_BYTES_LEN - 1]).expect("should write"); + + match Store::::load(&TEST_MAGIC_BYTES, &file_path) { + Err(StoreErrorWithDump { + error: StoreError::Io(e), + .. + }) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof), unexpected => panic!("unexpected result: {:?}", unexpected), }; } #[test] - fn new_fails_if_magic_bytes_are_invalid() { + fn load_fails_if_magic_bytes_are_invalid() { let invalid_magic_bytes = "ldkfs0000000"; - let mut file = NamedTempFile::new().unwrap(); - file.write_all(invalid_magic_bytes.as_bytes()) - .expect("should write"); + let tempdir = tempfile::tempdir().unwrap(); + let file_path = tempdir.path().join("db_file"); + fs::write(&file_path, invalid_magic_bytes.as_bytes()).expect("should write"); - match Store::::open(&TEST_MAGIC_BYTES, file.path()) { - Err(FileError::InvalidMagicBytes { got, .. }) => { + match Store::::load(&TEST_MAGIC_BYTES, &file_path) { + Err(StoreErrorWithDump { + error: StoreError::InvalidMagicBytes { got, .. }, + .. + }) => { assert_eq!(got, invalid_magic_bytes.as_bytes()) } unexpected => panic!("unexpected result: {:?}", unexpected), @@ -287,46 +347,120 @@ mod test { } #[test] - fn append_changeset_truncates_invalid_bytes() { + fn load_fails_if_undecodable_bytes() { // initial data to write to file (magic bytes + invalid data) let mut data = [255_u8; 2000]; data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES); - let changeset = TestChangeSet::from(["one".into(), "two".into(), "three!".into()]); + let test_changesets = TestChangeSet::from(["one".into(), "two".into(), "three!".into()]); - let mut file = NamedTempFile::new().unwrap(); - file.write_all(&data).expect("should write"); + let temp_dir = tempfile::tempdir().unwrap(); + let file_path = temp_dir.path().join("db_file"); + let mut store = + Store::::create(&TEST_MAGIC_BYTES, &file_path).expect("should create"); + store.append(&test_changesets).expect("should append"); + + // Write garbage to file + store.db_file.write_all(&data).expect("should write"); + drop(store); + + match Store::::load(&TEST_MAGIC_BYTES, file_path) { + Err(StoreErrorWithDump { + changeset, + error: StoreError::Bincode(_), + }) => { + assert_eq!(changeset, Some(test_changesets)) + } + unexpected_res => panic!("unexpected result: {:?}", unexpected_res), + } + } + + #[test] + fn dump_fails_if_undecodable_bytes() { + // initial data to write to file (magic bytes + invalid data) + let mut data = [255_u8; 2000]; + data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES); + + let test_changesets = TestChangeSet::from(["one".into(), "two".into(), "three!".into()]); + + let temp_dir = tempfile::tempdir().unwrap(); + let file_path = temp_dir.path().join("db_file"); let mut store = - Store::::open(&TEST_MAGIC_BYTES, file.path()).expect("should open"); - match store.iter_changesets().next() { - Some(Err(IterError::Bincode(_))) => {} + Store::::create(&TEST_MAGIC_BYTES, file_path).expect("should create"); + store.append(&test_changesets).expect("should append"); + + // Write garbage to file + store.db_file.write_all(&data).expect("should write"); + + match store.dump() { + Err(StoreErrorWithDump { + changeset, + error: StoreError::Bincode(_), + }) => { + assert_eq!(changeset, Some(test_changesets)) + } unexpected_res => panic!("unexpected result: {:?}", unexpected_res), } + } - store.append_changeset(&changeset).expect("should append"); + #[test] + fn append() { + let temp_dir = tempfile::tempdir().unwrap(); + let file_path = temp_dir.path().join("db_file"); - drop(store); + let not_empty_changeset = BTreeSet::from(["hello".to_string(), "world".to_string()]); - let got_bytes = { - let mut buf = Vec::new(); - file.reopen() - .unwrap() - .read_to_end(&mut buf) - .expect("should read"); - buf - }; + let mut store = + Store::::create(&TEST_MAGIC_BYTES, file_path).expect("must create"); + + store + .append(¬_empty_changeset) + .expect("must append changeset"); + let aggregated_changeset = store + .dump() + .expect("should aggregate") + .expect("should not be empty"); + assert_eq!(not_empty_changeset, aggregated_changeset); + } - let expected_bytes = { - let mut buf = TEST_MAGIC_BYTES.to_vec(); - DefaultOptions::new() - .with_varint_encoding() - .serialize_into(&mut buf, &changeset) - .expect("should encode"); - buf - }; + #[test] + fn append_empty_changeset_does_nothing() { + let temp_dir = tempfile::tempdir().unwrap(); + let file_path = temp_dir.path().join("db_file"); + + let empty_changeset = BTreeSet::new(); - assert_eq!(got_bytes, expected_bytes); + let mut store = + Store::::create(&TEST_MAGIC_BYTES, file_path).expect("must create"); + + store + .append(&empty_changeset) + .expect("must append changeset"); + let aggregated_changeset = store.dump().expect("should aggregate"); + assert_eq!(None, aggregated_changeset); + } + + #[test] + fn load_or_create() { + let temp_dir = tempfile::tempdir().unwrap(); + let file_path = temp_dir.path().join("db_file"); + let changeset = BTreeSet::from(["hello".to_string(), "world".to_string()]); + + { + let (mut store, _) = + Store::::load_or_create(&TEST_MAGIC_BYTES, &file_path) + .expect("must create"); + assert!(file_path.exists()); + store.append(&changeset).expect("must succeed"); + } + + { + let (_, recovered_changeset) = + Store::::load_or_create(&TEST_MAGIC_BYTES, &file_path) + .expect("must load"); + assert_eq!(recovered_changeset, Some(changeset)); + } } #[test] @@ -346,13 +480,14 @@ mod test { // simulate creating a file, writing data where the last write is incomplete { - let mut db = - Store::::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap(); + let mut store = + Store::::create(&TEST_MAGIC_BYTES, &file_path).unwrap(); for changeset in &changesets { - db.append_changeset(changeset).unwrap(); + store.append(changeset).unwrap(); } // this is the incomplete write - db.db_file + store + .db_file .write_all(&last_changeset_bytes[..short_write_len]) .unwrap(); } @@ -360,10 +495,8 @@ mod test { // load file again and aggregate changesets // write the last changeset again (this time it succeeds) { - let mut db = Store::::open(&TEST_MAGIC_BYTES, &file_path).unwrap(); - let err = db - .aggregate_changesets() - .expect_err("should return error as last read is short"); + let err = Store::::load(&TEST_MAGIC_BYTES, &file_path) + .expect_err("should fail to aggregate"); assert_eq!( err.changeset, changesets.iter().cloned().reduce(|mut acc, cs| { @@ -372,17 +505,26 @@ mod test { }), "should recover all changesets that are written in full", ); - db.db_file.write_all(&last_changeset_bytes).unwrap(); + // Remove file and start again + fs::remove_file(&file_path).expect("should remove file"); + let mut store = + Store::::create(&TEST_MAGIC_BYTES, &file_path).unwrap(); + for changeset in &changesets { + store.append(changeset).unwrap(); + } + // this is the complete write + store + .db_file + .write_all(&last_changeset_bytes) + .expect("should write last changeset in full"); } // load file again - this time we should successfully aggregate all changesets { - let mut db = Store::::open(&TEST_MAGIC_BYTES, &file_path).unwrap(); - let aggregated_changesets = db - .aggregate_changesets() - .expect("aggregating all changesets should succeed"); + let (_, aggregated_changeset) = + Store::::load(&TEST_MAGIC_BYTES, &file_path).unwrap(); assert_eq!( - aggregated_changesets, + aggregated_changeset, changesets .iter() .cloned() @@ -398,47 +540,58 @@ mod test { } #[test] - fn write_after_short_read() { + fn test_load_recovers_state_after_last_write() { let temp_dir = tempfile::tempdir().unwrap(); + let file_path = temp_dir.path().join("db_file"); + let changeset1 = BTreeSet::from(["hello".to_string(), "world".to_string()]); + let changeset2 = BTreeSet::from(["change after write".to_string()]); - let changesets = (0..20) - .map(|n| TestChangeSet::from([format!("{}", n)])) - .collect::>(); - let last_changeset = TestChangeSet::from(["last".into()]); + { + // create new store + let mut store = + Store::::create(&TEST_MAGIC_BYTES, &file_path).expect("must create"); - for read_count in 0..changesets.len() { - let file_path = temp_dir.path().join(format!("{}.dat", read_count)); + // append first changeset to store + store.append(&changeset1).expect("must succeed"); + } - // First, we create the file with all the changesets! - let mut db = Store::::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap(); - for changeset in &changesets { - db.append_changeset(changeset).unwrap(); - } - drop(db); - - // We re-open the file and read `read_count` number of changesets. - let mut db = Store::::open(&TEST_MAGIC_BYTES, &file_path).unwrap(); - let mut exp_aggregation = db - .iter_changesets() - .take(read_count) - .map(|r| r.expect("must read valid changeset")) - .fold(TestChangeSet::default(), |mut acc, v| { - Merge::merge(&mut acc, v); - acc - }); - // We write after a short read. - db.append_changeset(&last_changeset) - .expect("last write must succeed"); - Merge::merge(&mut exp_aggregation, last_changeset.clone()); - drop(db); - - // We open the file again and check whether aggregate changeset is expected. - let aggregation = Store::::open(&TEST_MAGIC_BYTES, &file_path) - .unwrap() - .aggregate_changesets() - .expect("must aggregate changesets") - .unwrap_or_default(); - assert_eq!(aggregation, exp_aggregation); + { + // open store + let (mut store, _) = Store::::load(&TEST_MAGIC_BYTES, &file_path) + .expect("failed to load store"); + + // now append the second changeset + store.append(&changeset2).expect("must succeed"); + + // Retrieve stored changesets from the database + let stored_changesets = store + .dump() + .expect("must succeed") + .expect("must be not empty"); + + // expected changeset must be changeset2 + changeset1 + let mut expected_changeset = changeset2.clone(); + expected_changeset.extend(changeset1); + + // Assert that stored_changesets matches expected_changeset but not changeset2 + assert_eq!(stored_changesets, expected_changeset); + assert_ne!(stored_changesets, changeset2); } + + // Open the store again to verify file pointer position at the end of the file + let (mut store, _) = Store::::load(&TEST_MAGIC_BYTES, &file_path) + .expect("should load correctly"); + + // get the current position of file pointer just after loading store + let current_pointer = store.db_file.stream_position().expect("must suceed"); + + // end pointer for the loaded store + let expected_pointer = store + .db_file + .seek(io::SeekFrom::End(0)) + .expect("must succeed"); + + // current position matches EOF + assert_eq!(current_pointer, expected_pointer); } } diff --git a/example-crates/example_bitcoind_rpc_polling/src/main.rs b/example-crates/example_bitcoind_rpc_polling/src/main.rs index 95c547967..83cb25f8a 100644 --- a/example-crates/example_bitcoind_rpc_polling/src/main.rs +++ b/example-crates/example_bitcoind_rpc_polling/src/main.rs @@ -169,7 +169,7 @@ fn main() -> anyhow::Result<()> { let db = &mut *db.lock().unwrap(); last_db_commit = Instant::now(); if let Some(changeset) = db_stage.take() { - db.append_changeset(&changeset)?; + db.append(&changeset)?; } println!( "[{:>10}s] committed to db (took {}s)", @@ -213,7 +213,7 @@ fn main() -> anyhow::Result<()> { ..Default::default() }); if let Some(changeset) = db_stage.take() { - db.append_changeset(&changeset)?; + db.append(&changeset)?; } } } @@ -307,7 +307,7 @@ fn main() -> anyhow::Result<()> { let db = &mut *db.lock().unwrap(); last_db_commit = Instant::now(); if let Some(changeset) = db_stage.take() { - db.append_changeset(&changeset)?; + db.append(&changeset)?; } println!( "[{:>10}s] committed to db (took {}s)", diff --git a/example-crates/example_cli/src/lib.rs b/example-crates/example_cli/src/lib.rs index 3a700db3a..e965495e0 100644 --- a/example-crates/example_cli/src/lib.rs +++ b/example-crates/example_cli/src/lib.rs @@ -466,7 +466,7 @@ pub fn handle_commands( let ((spk_i, spk), index_changeset) = spk_chooser(index, Keychain::External).expect("Must exist"); let db = &mut *db.lock().unwrap(); - db.append_changeset(&ChangeSet { + db.append(&ChangeSet { indexer: index_changeset, ..Default::default() })?; @@ -629,7 +629,7 @@ pub fn handle_commands( // If we're unable to persist this, then we don't want to broadcast. { let db = &mut *db.lock().unwrap(); - db.append_changeset(&ChangeSet { + db.append(&ChangeSet { indexer, ..Default::default() })?; @@ -719,7 +719,7 @@ pub fn handle_commands( // We know the tx is at least unconfirmed now. Note if persisting here fails, // it's not a big deal since we can always find it again from the // blockchain. - db.lock().unwrap().append_changeset(&ChangeSet { + db.lock().unwrap().append(&ChangeSet { tx_graph: changeset.tx_graph, indexer: changeset.indexer, ..Default::default() @@ -789,9 +789,10 @@ pub fn init_or_load( Commands::Generate { network } => generate_bip86_helper(network).map(|_| None), // try load _ => { - let mut db = - Store::::open(db_magic, db_path).context("could not open file store")?; - let changeset = db.aggregate_changesets()?.expect("db must not be empty"); + let (db, changeset) = + Store::::load(db_magic, db_path).context("could not open file store")?; + + let changeset = changeset.expect("should not be empty"); let network = changeset.network.expect("changeset network"); @@ -866,8 +867,8 @@ where LocalChain::from_genesis_hash(constants::genesis_block(network).block_hash()); changeset.network = Some(network); changeset.local_chain = chain_changeset; - let mut db = Store::::create_new(db_magic, db_path)?; - db.append_changeset(&changeset)?; + let mut db = Store::::create(db_magic, db_path)?; + db.append(&changeset)?; println!("New database {db_path}"); } diff --git a/example-crates/example_electrum/src/main.rs b/example-crates/example_electrum/src/main.rs index 9c705a3df..8e3110d68 100644 --- a/example-crates/example_electrum/src/main.rs +++ b/example-crates/example_electrum/src/main.rs @@ -278,6 +278,6 @@ fn main() -> anyhow::Result<()> { }; let mut db = db.lock().unwrap(); - db.append_changeset(&db_changeset)?; + db.append(&db_changeset)?; Ok(()) } diff --git a/example-crates/example_esplora/src/main.rs b/example-crates/example_esplora/src/main.rs index cba86b862..2c00751c2 100644 --- a/example-crates/example_esplora/src/main.rs +++ b/example-crates/example_esplora/src/main.rs @@ -278,7 +278,7 @@ fn main() -> anyhow::Result<()> { // We persist the changes let mut db = db.lock().unwrap(); - db.append_changeset(&ChangeSet { + db.append(&ChangeSet { local_chain: local_chain_changeset, tx_graph: indexed_tx_graph_changeset.tx_graph, indexer: indexed_tx_graph_changeset.indexer,