Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/heads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl AuthorHeads {
}

/// Create an iterator over the entries in this state.
pub fn iter(&self) -> std::collections::btree_map::Iter<AuthorId, Timestamp> {
pub fn iter(&self) -> std::collections::btree_map::Iter<'_, AuthorId, Timestamp> {
self.heads.iter()
}

Expand Down
73 changes: 42 additions & 31 deletions src/ranger.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Implementation of Set Reconcilliation based on
//! "Range-Based Set Reconciliation" by Aljoscha Meyer.

use std::{cmp::Ordering, fmt::Debug, pin::Pin};
use std::{fmt::Debug, pin::Pin};

use n0_future::StreamExt;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -62,36 +62,33 @@ pub trait RangeValue: Sized + Debug + Ord + PartialEq + Clone + 'static {}
///
/// This means that ranges are "wrap around" conceptually.
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize, Default)]
pub struct Range<K> {
pub(crate) struct Range<K> {
x: K,
y: K,
}

impl<K> Range<K> {
pub fn x(&self) -> &K {
pub(crate) fn x(&self) -> &K {
&self.x
}

pub fn y(&self) -> &K {
pub(crate) fn y(&self) -> &K {
&self.y
}

pub fn new(x: K, y: K) -> Self {
Range { x, y }
}

pub fn map<X>(self, f: impl FnOnce(K, K) -> (X, X)) -> Range<X> {
let (x, y) = f(self.x, self.y);
pub(crate) fn new(x: K, y: K) -> Self {
Range { x, y }
}
}

impl<K: Ord> Range<K> {
pub fn is_all(&self) -> bool {
pub(crate) fn is_all(&self) -> bool {
self.x() == self.y()
}

pub fn contains(&self, t: &K) -> bool {
#[cfg(test)]
pub(crate) fn contains(&self, t: &K) -> bool {
use std::cmp::Ordering;
match self.x().cmp(self.y()) {
Ordering::Equal => true,
Ordering::Less => self.x() <= t && t < self.y(),
Expand All @@ -117,13 +114,9 @@ impl Debug for Fingerprint {

impl Fingerprint {
/// The fingerprint of the empty set
pub fn empty() -> Self {
pub(crate) fn empty() -> Self {
Fingerprint(*blake3::hash(&[]).as_bytes())
}

pub fn new<T: RangeEntry>(val: T) -> Self {
val.as_fingerprint()
}
}

impl std::ops::BitXorAssign for Fingerprint {
Expand All @@ -140,9 +133,9 @@ pub struct RangeFingerprint<K> {
serialize = "Range<K>: Serialize",
deserialize = "Range<K>: Deserialize<'de>"
))]
pub range: Range<K>,
pub(crate) range: Range<K>,
/// The fingerprint of `range`.
pub fingerprint: Fingerprint,
pub(crate) fingerprint: Fingerprint,
}

/// Transfers items inside a range to the other participant.
Expand All @@ -153,12 +146,12 @@ pub struct RangeItem<E: RangeEntry> {
serialize = "Range<E::Key>: Serialize",
deserialize = "Range<E::Key>: Deserialize<'de>"
))]
pub range: Range<E::Key>,
pub(crate) range: Range<E::Key>,
#[serde(bound(serialize = "E: Serialize", deserialize = "E: Deserialize<'de>"))]
pub values: Vec<(E, ContentStatus)>,
pub(crate) values: Vec<(E, ContentStatus)>,
/// If false, requests to send local items in the range.
/// Otherwise not.
pub have_local: bool,
pub(crate) have_local: bool,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
Expand All @@ -176,15 +169,17 @@ pub enum MessagePart<E: RangeEntry> {
}

impl<E: RangeEntry> MessagePart<E> {
pub fn is_range_fingerprint(&self) -> bool {
#[cfg(test)]
pub(crate) fn is_range_fingerprint(&self) -> bool {
matches!(self, MessagePart::RangeFingerprint(_))
}

pub fn is_range_item(&self) -> bool {
#[cfg(test)]
pub(crate) fn is_range_item(&self) -> bool {
matches!(self, MessagePart::RangeItem(_))
}

pub fn values(&self) -> Option<&[(E, ContentStatus)]> {
pub(crate) fn values(&self) -> Option<&[(E, ContentStatus)]> {
match self {
MessagePart::RangeFingerprint(_) => None,
MessagePart::RangeItem(RangeItem { values, .. }) => Some(values),
Expand All @@ -211,15 +206,15 @@ impl<E: RangeEntry> Message<E> {
Ok(Message { parts: vec![part] })
}

pub fn parts(&self) -> &[MessagePart<E>] {
pub(crate) fn parts(&self) -> &[MessagePart<E>] {
&self.parts
}

pub fn values(&self) -> impl Iterator<Item = &(E, ContentStatus)> {
pub(crate) fn values(&self) -> impl Iterator<Item = &(E, ContentStatus)> {
self.parts().iter().filter_map(|p| p.values()).flatten()
}

pub fn value_count(&self) -> usize {
pub(crate) fn value_count(&self) -> usize {
self.values().count()
}
}
Expand All @@ -241,12 +236,16 @@ pub trait Store<E: RangeEntry>: Sized {
fn get_first(&mut self) -> Result<E::Key, Self::Error>;

/// Get a single entry.
#[cfg(test)]
fn get(&mut self, key: &E::Key) -> Result<Option<E>, Self::Error>;

/// Get the number of entries in the store.
#[cfg(test)]
fn len(&mut self) -> Result<usize, Self::Error>;

/// Returns `true` if the vector contains no elements.
#[cfg(test)]
#[allow(unused)]
fn is_empty(&mut self) -> Result<bool, Self::Error>;

/// Calculate the fingerprint of the given range.
Expand Down Expand Up @@ -274,17 +273,21 @@ pub trait Store<E: RangeEntry>: Sized {
}

/// Returns all entries whose key starts with the given `prefix`.
#[cfg(test)]
#[allow(unused)]
fn prefixed_by(&mut self, prefix: &E::Key) -> Result<Self::RangeIterator<'_>, Self::Error>;

/// Returns all entries that share a prefix with `key`, including the entry for `key` itself.
fn prefixes_of(&mut self, key: &E::Key) -> Result<Self::ParentIterator<'_>, Self::Error>;

/// Get all entries in the store
#[cfg(test)]
fn all(&mut self) -> Result<Self::RangeIterator<'_>, Self::Error>;

/// Remove an entry from the store.
///
/// This will remove just the entry with the given key, but will not perform prefix deletion.
#[cfg(test)]
fn entry_remove(&mut self, key: &E::Key) -> Result<Option<E>, Self::Error>;

/// Remove all entries whose key start with a prefix and for which the `predicate` callback
Expand Down Expand Up @@ -602,14 +605,17 @@ impl<E: RangeEntry, S: Store<E>> Store<E> for &mut S {
(**self).get_first()
}

#[cfg(test)]
fn get(&mut self, key: &<E as RangeEntry>::Key) -> Result<Option<E>, Self::Error> {
(**self).get(key)
}

#[cfg(test)]
fn len(&mut self) -> Result<usize, Self::Error> {
(**self).len()
}

#[cfg(test)]
fn is_empty(&mut self) -> Result<bool, Self::Error> {
(**self).is_empty()
}
Expand All @@ -632,6 +638,7 @@ impl<E: RangeEntry, S: Store<E>> Store<E> for &mut S {
(**self).get_range(range)
}

#[cfg(test)]
fn prefixed_by(
&mut self,
prefix: &<E as RangeEntry>::Key,
Expand All @@ -646,10 +653,12 @@ impl<E: RangeEntry, S: Store<E>> Store<E> for &mut S {
(**self).prefixes_of(key)
}

#[cfg(test)]
fn all(&mut self) -> Result<Self::RangeIterator<'_>, Self::Error> {
(**self).all()
}

#[cfg(test)]
fn entry_remove(&mut self, key: &<E as RangeEntry>::Key) -> Result<Option<E>, Self::Error> {
(**self).entry_remove(key)
}
Expand All @@ -664,7 +673,7 @@ impl<E: RangeEntry, S: Store<E>> Store<E> for &mut S {
}

#[derive(Debug, Clone, Copy)]
pub struct SyncConfig {
pub(crate) struct SyncConfig {
/// Up to how many values to send immediately, before sending only a fingerprint.
max_set_size: usize,
/// `k` in the protocol, how many splits to generate. at least 2
Expand All @@ -682,7 +691,7 @@ impl Default for SyncConfig {

/// The outcome of a [`Store::put`] operation.
#[derive(Debug)]
pub enum InsertOutcome {
pub(crate) enum InsertOutcome {
/// The entry was not inserted because a newer entry for its key or a
/// prefix of its key exists.
NotInserted,
Expand Down Expand Up @@ -865,7 +874,7 @@ mod tests {
}

#[derive(Debug)]
pub struct SimpleRangeIterator<'a, K, V> {
pub(crate) struct SimpleRangeIterator<'a, K, V> {
iter: std::collections::btree_map::Iter<'a, K, V>,
filter: SimpleFilter<K>,
}
Expand All @@ -874,6 +883,8 @@ mod tests {
enum SimpleFilter<K> {
None,
Range(Range<K>),
// TODO(Frando): Add test for this.
#[allow(unused)]
Prefix(K),
}

Expand Down
21 changes: 16 additions & 5 deletions src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use anyhow::{anyhow, Result};
use ed25519_dalek::{SignatureError, VerifyingKey};
use iroh_blobs::Hash;
use rand_core::CryptoRngCore;
use redb::{Database, DatabaseError, ReadableMultimapTable, ReadableTable, ReadableTableMetadata};
use redb::{Database, DatabaseError, ReadableMultimapTable, ReadableTable};
use tracing::warn;

use super::{
Expand Down Expand Up @@ -186,7 +186,7 @@ impl Store {
///
/// As such, there is also no guarantee that the data you see is
/// already persisted.
fn tables(&mut self) -> Result<&Tables> {
fn tables(&mut self) -> Result<&Tables<'_>> {
let guard = &mut self.transaction;
let tables = match std::mem::take(guard) {
CurrentTransaction::None => {
Expand Down Expand Up @@ -258,7 +258,7 @@ type PeersIter = std::vec::IntoIter<PeerIdBytes>;

impl Store {
/// Create a new replica for `namespace` and persist in this store.
pub fn new_replica(&mut self, namespace: NamespaceSecret) -> Result<Replica> {
pub fn new_replica(&mut self, namespace: NamespaceSecret) -> Result<Replica<'_>> {
let id = namespace.id();
self.import_namespace(namespace.into())?;
self.open_replica(&id).map_err(Into::into)
Expand Down Expand Up @@ -295,7 +295,7 @@ impl Store {
/// Open a replica from this store.
///
/// This just calls load_replica_info and then creates a new replica with the info.
pub fn open_replica(&mut self, namespace_id: &NamespaceId) -> Result<Replica, OpenError> {
pub fn open_replica(&mut self, namespace_id: &NamespaceId) -> Result<Replica<'_>, OpenError> {
let info = self.load_replica_info(namespace_id)?;
let instance = StoreInstance::new(*namespace_id, self);
Ok(Replica::new(instance, Box::new(info)))
Expand Down Expand Up @@ -465,7 +465,10 @@ impl Store {
}

/// Get the latest entry for each author in a namespace.
pub fn get_latest_for_each_author(&mut self, namespace: NamespaceId) -> Result<LatestIterator> {
pub fn get_latest_for_each_author(
&mut self,
namespace: NamespaceId,
) -> Result<LatestIterator<'_>> {
LatestIterator::new(&self.tables()?.latest_per_author, namespace)
}

Expand Down Expand Up @@ -679,20 +682,24 @@ impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
Ok(id)
}

#[cfg(test)]
fn get(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
self.store
.as_mut()
.get_exact(id.namespace(), id.author(), id.key(), true)
}

#[cfg(test)]
fn len(&mut self) -> Result<usize> {
let tables = self.store.as_mut().tables()?;
let bounds = RecordsBounds::namespace(self.namespace);
let records = tables.records.range(bounds.as_ref())?;
Ok(records.count())
}

#[cfg(test)]
fn is_empty(&mut self) -> Result<bool> {
use redb::ReadableTableMetadata;
let tables = self.store.as_mut().tables()?;
Ok(tables.records.is_empty()?)
}
Expand Down Expand Up @@ -782,6 +789,7 @@ impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
Ok(iter)
}

#[cfg(test)]
fn entry_remove(&mut self, id: &RecordIdentifier) -> Result<Option<SignedEntry>> {
self.store.as_mut().modify(|tables| {
let entry = {
Expand All @@ -796,6 +804,7 @@ impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
})
}

#[cfg(test)]
fn all(&mut self) -> Result<Self::RangeIterator<'_>> {
let tables = self.store.as_mut().tables()?;
let bounds = RecordsBounds::namespace(self.namespace);
Expand All @@ -811,6 +820,7 @@ impl<'a> crate::ranger::Store<SignedEntry> for StoreInstance<'a> {
ParentIterator::new(tables, id.namespace(), id.author(), id.key().to_vec())
}

#[cfg(test)]
fn prefixed_by(&mut self, id: &RecordIdentifier) -> Result<Self::RangeIterator<'_>> {
let tables = self.store.as_mut().tables()?;
let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes());
Expand Down Expand Up @@ -1136,6 +1146,7 @@ mod tests {

#[test]
fn test_migration_004_populate_by_key_index() -> Result<()> {
use redb::ReadableTableMetadata;
let dbfile = tempfile::NamedTempFile::new()?;

let mut store = Store::persistent(dbfile.path())?;
Expand Down
8 changes: 4 additions & 4 deletions src/store/fs/bounds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ impl RecordsBounds {
Self::new(start, Self::namespace_end(ns))
}

pub fn as_ref(&self) -> (Bound<RecordsId>, Bound<RecordsId>) {
fn map(id: &RecordsIdOwned) -> RecordsId {
pub fn as_ref(&self) -> (Bound<RecordsId<'_>>, Bound<RecordsId<'_>>) {
fn map(id: &RecordsIdOwned) -> RecordsId<'_> {
(&id.0, &id.1, &id.2[..])
}
(map_bound(&self.0, map), map_bound(&self.1, map))
Expand Down Expand Up @@ -139,8 +139,8 @@ impl ByKeyBounds {
Self(start, end)
}

pub fn as_ref(&self) -> (Bound<RecordsByKeyId>, Bound<RecordsByKeyId>) {
fn map(id: &RecordsByKeyIdOwned) -> RecordsByKeyId {
pub fn as_ref(&self) -> (Bound<RecordsByKeyId<'_>>, Bound<RecordsByKeyId<'_>>) {
fn map(id: &RecordsByKeyIdOwned) -> RecordsByKeyId<'_> {
(&id.0, &id.1[..], &id.2)
}
(map_bound(&self.0, map), map_bound(&self.1, map))
Expand Down
2 changes: 1 addition & 1 deletion src/store/fs/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl TransactionAndTables {
})
}

pub fn tables(&self) -> &Tables {
pub fn tables(&self) -> &Tables<'_> {
self.inner.borrow_dependent()
}

Expand Down
Loading