Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,27 +70,29 @@ Contributions should generally follow the [Rust API guidelines](https://rust-lan
unawaited futures or unhandled errors. `let _x =` should only be used for RAII guards.


## Hash-consed types and `Arc<T>`
## Hash-consed types and `linera_cache::Arc<T>`

Any content-addressed immutable data (also known as hash-consed data) — for
example `Block`, `Blob`, `ConfirmedBlockCertificate` — should be cached and
passed around as `Arc<T>`.
passed around as `linera_cache::Arc<T>` (re-exported as `linera_storage::Arc`).

**Never construct `Arc::new(value)` for a hash-consed type.** Always obtain
the canonical `Arc<T>` from the dedup cache. Concretely:
`linera_cache::Arc<T>` is a newtype over `std::sync::Arc<T>` with **no public
constructor**: the only way to obtain one is through `ValueCache::insert`,
`ValueCache::insert_hashed`, or `ValueCache::get`. This makes the
"one allocation per content" invariant a compile-time guarantee rather than a
convention. Concretely:

- For freshly-constructed `ConfirmedBlockCertificate`s (e.g. from network or
proposal flows), call `Storage::cache_certificate`.
- For freshly-constructed `ConfirmedBlock`s, call
`Storage::cache_confirmed_block`.
- For freshly-constructed `Blob`s, call `Storage::cache_blob`.
- For values being re-inserted from a borrowed reference, use
`ValueCache::insert_hashed` or `ValueCache::insert_arc`.
`ValueCache::insert_hashed`.

`Arc::new` for a hash-consed value bypasses the dedup index and creates a
duplicate allocation, breaking the "one allocation per content" invariant.
See the [`linera-cache` README](linera-cache/README.md) for how the
invariant is enforced.
`std::sync::Arc::new` for a hash-consed value bypasses the dedup index and
creates a duplicate allocation; the type system prevents this by design.
See the [`linera-cache` README](linera-cache/README.md) for details.


## Formatting and linting
Expand Down
1 change: 1 addition & 0 deletions linera-bridge/src/relay/linera.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ impl<E: linera_core::environment::Environment> LineraClient<E> {
self.chain_client
.read_certificate(hash)
.await
.map(|c| c.into_std())
.map_err(|e| anyhow::anyhow!(e))
}

Expand Down
106 changes: 106 additions & 0 deletions linera-cache/src/arc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

//! A provenance-tracking wrapper around [`std::sync::Arc`].
//!
//! [`Arc<T>`] is identical to [`std::sync::Arc<T>`] at runtime but has no
//! public constructor. The only way to obtain one is through
//! [`ValueCache::insert`], [`ValueCache::insert_hashed`], or
//! [`ValueCache::get`]. This makes the "one allocation per content" invariant
//! structurally enforced: callers cannot bypass the cache by calling
//! `Arc::new` directly.

use std::{
fmt,
hash::{Hash, Hasher},
ops::Deref,
sync::Arc as StdArc,
};

/// A reference-counted pointer that can only be constructed through a
/// [`crate::ValueCache`].
///
/// `Arc<T>` wraps [`std::sync::Arc<T>`] and implements `Deref<Target = T>`,
/// `Clone`, `Debug`, `Display`, `PartialEq`, `Eq`, and `Hash` identically.
///
/// Use [`Arc::into_std`] or [`Arc::as_std`] to interoperate with APIs that
/// require [`std::sync::Arc<T>`] explicitly.
pub struct Arc<T>(pub(crate) StdArc<T>);

impl<T> Arc<T> {
/// Returns a reference to the underlying [`std::sync::Arc<T>`].
pub fn as_std(&self) -> &StdArc<T> {
&self.0
}

/// Converts into the underlying [`std::sync::Arc<T>`].
pub fn into_std(self) -> StdArc<T> {
self.0
}

/// Unwraps the inner value if this is the only strong reference;
/// otherwise clones it.
pub fn unwrap_or_clone(this: Self) -> T
where
T: Clone,
{
StdArc::unwrap_or_clone(this.0)
}

/// Returns `true` if two `Arc`s point to the same allocation.
pub fn ptr_eq(a: &Self, b: &Self) -> bool {
StdArc::ptr_eq(&a.0, &b.0)
}
}

impl<T> Deref for Arc<T> {
type Target = T;

fn deref(&self) -> &T {
&self.0
}
}

impl<T> Clone for Arc<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

impl<T: fmt::Debug> fmt::Debug for Arc<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&*self.0, f)
}
}

impl<T: fmt::Display> fmt::Display for Arc<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&*self.0, f)
}
}

impl<T: PartialEq> PartialEq for Arc<T> {
fn eq(&self, other: &Self) -> bool {
self.0 == other.0
}
}

impl<T: Eq> Eq for Arc<T> {}

impl<T: Hash> Hash for Arc<T> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.hash(state);
}
}

impl<T> From<Arc<T>> for StdArc<T> {
fn from(arc: Arc<T>) -> Self {
arc.0
}
}

impl<T> AsRef<StdArc<T>> for Arc<T> {
fn as_ref(&self) -> &StdArc<T> {
&self.0
}
}
8 changes: 5 additions & 3 deletions linera-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
//! prevent unbounded growth.
//!
//! For the invariant to hold, all inserts of hash-consed values must go
//! through the cache (e.g. [`ValueCache::insert_hashed`] or
//! [`ValueCache::insert_arc`]). Constructing an `Arc::new(value)` off-path
//! creates a duplicate allocation that bypasses the dedup index.
//! through the cache (e.g. [`ValueCache::insert`] or [`ValueCache::insert_hashed`]).
//! The [`Arc`] newtype enforces this structurally: it has no public constructor,
//! so callers cannot bypass the cache by calling `std::sync::Arc::new` directly.

mod arc;
mod unique_value_cache;
mod value_cache;

pub use arc::Arc;
pub use unique_value_cache::UniqueValueCache;
pub use value_cache::{ValueCache, DEFAULT_CLEANUP_INTERVAL_SECS};
60 changes: 22 additions & 38 deletions linera-cache/src/value_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub const DEFAULT_CLEANUP_INTERVAL_SECS: u64 = 30;
/// A background task periodically sweeps dead `Weak` entries from the index
/// to prevent unbounded memory growth.
pub struct ValueCache<K, V> {
cache: Cache<K, Arc<V>>,
cache: Cache<K, crate::Arc<V>>,
weak_index: Arc<papaya::HashMap<K, Weak<V>>>,
}

Expand Down Expand Up @@ -67,18 +67,13 @@ where
}
}

/// Inserts a value into the cache, returning the canonical `Arc`.
/// Inserts a value into the cache, returning the canonical [`crate::Arc`].
///
/// The value is wrapped in `Arc` internally. If a live `Arc` for this key
/// already exists (held by another consumer), the existing allocation is
/// reused and the new value is dropped.
pub fn insert(&self, key: &K, value: V) -> Arc<V> {
self.dedup_insert(key, Arc::new(value))
}

/// Inserts a pre-wrapped `Arc<V>` into the cache, returning the canonical `Arc`.
pub fn insert_arc(&self, key: &K, value: Arc<V>) -> Arc<V> {
self.dedup_insert(key, value)
pub fn insert(&self, key: &K, value: V) -> crate::Arc<V> {
self.dedup_insert(key, crate::Arc(Arc::new(value)))
}

/// Removes a value from the bounded cache.
Expand All @@ -87,17 +82,17 @@ where
/// still hold an `Arc` to this value, and the weak index must be able
/// to deduplicate against it. Dead weak entries are cleaned up by the
/// background task.
pub fn remove(&self, key: &K) -> Option<Arc<V>> {
pub fn remove(&self, key: &K) -> Option<crate::Arc<V>> {
let value = self.cache.peek(key);
if value.is_some() {
self.cache.remove(key);
}
Self::track_cache_usage(value)
}

/// Returns an `Arc` reference to the value, checking both the bounded
/// Returns an [`crate::Arc`] to the value, checking both the bounded
/// cache and the weak index.
pub fn get(&self, key: &K) -> Option<Arc<V>> {
pub fn get(&self, key: &K) -> Option<crate::Arc<V>> {
// Tier 1: bounded cache (hot path)
if let Some(arc) = self.cache.get(key) {
return Self::track_cache_usage(Some(arc));
Expand All @@ -107,6 +102,7 @@ where
let guard = self.weak_index.guard();
if let Some(weak) = self.weak_index.get(key, &guard) {
if let Some(arc) = weak.upgrade() {
let arc = crate::Arc(arc);
// Re-insert into bounded cache for future fast lookups
self.cache.insert(key.clone(), arc.clone());
return Self::track_cache_usage(Some(arc));
Expand Down Expand Up @@ -160,9 +156,9 @@ where
/// Core dedup logic: atomically checks the weak index for an existing
/// live allocation. If found, reuses it. Otherwise inserts the new Arc.
/// Returns the canonical `Arc`.
fn dedup_insert(&self, key: &K, new_arc: Arc<V>) -> Arc<V> {
fn dedup_insert(&self, key: &K, new_arc: crate::Arc<V>) -> crate::Arc<V> {
let guard = self.weak_index.guard();
let weak = Arc::downgrade(&new_arc);
let weak = Arc::downgrade(&new_arc.0);

let result = self.weak_index.compute(
key.clone(),
Expand All @@ -178,15 +174,15 @@ where

let canonical_arc = match result {
Compute::Inserted(..) | Compute::Updated { .. } => new_arc,
Compute::Aborted(existing_arc) => existing_arc,
Compute::Aborted(existing_arc) => crate::Arc(existing_arc),
_ => unreachable!(),
};

self.cache.insert(key.clone(), canonical_arc.clone());
canonical_arc
}

fn track_cache_usage(maybe_value: Option<Arc<V>>) -> Option<Arc<V>> {
fn track_cache_usage(maybe_value: Option<crate::Arc<V>>) -> Option<crate::Arc<V>> {
#[cfg(with_metrics)]
{
let metric = if maybe_value.is_some() {
Expand All @@ -209,7 +205,7 @@ impl<V: Clone + Send + Sync + 'static> ValueCache<CryptoHash, V> {
///
/// The `value` is wrapped in a [`Cow`] so that it is only cloned if it
/// needs to be inserted in the cache.
pub fn insert_hashed<T>(&self, value: Cow<Hashed<T>>) -> Arc<V>
pub fn insert_hashed<T>(&self, value: Cow<Hashed<T>>) -> crate::Arc<V>
where
T: Clone,
V: From<Hashed<T>>,
Expand All @@ -223,12 +219,13 @@ impl<V: Clone + Send + Sync + 'static> ValueCache<CryptoHash, V> {
let guard = self.weak_index.guard();
if let Some(weak) = self.weak_index.get(&hash, &guard) {
if let Some(arc) = weak.upgrade() {
let arc = crate::Arc(arc);
self.cache.insert(hash, arc.clone());
return arc;
}
}
drop(guard);
self.dedup_insert(&hash, Arc::new(value.into_owned().into()))
self.dedup_insert(&hash, crate::Arc(Arc::new(value.into_owned().into())))
}

/// Inserts multiple values constructed from [`Hashed<T>`]s into the cache.
Expand Down Expand Up @@ -270,12 +267,13 @@ mod metrics {

#[cfg(test)]
mod tests {
use std::{borrow::Cow, sync::Arc};
use std::borrow::Cow;

use linera_base::{crypto::CryptoHash, hashed::Hashed};
use serde::{Deserialize, Serialize};

use super::{ValueCache, DEFAULT_CLEANUP_INTERVAL_SECS};
use crate::Arc as CacheArc;

/// Test cache size for unit tests.
const TEST_CACHE_SIZE: usize = 10;
Expand Down Expand Up @@ -364,7 +362,7 @@ mod tests {
// Re-inserting should return the same Arc (dedup)
for (value, first_arc) in values.iter().zip(&first_arcs) {
let second_arc = cache.insert_hashed(Cow::Borrowed(value));
assert!(Arc::ptr_eq(&second_arc, first_arc));
assert!(CacheArc::ptr_eq(&second_arc, first_arc));
}
}

Expand Down Expand Up @@ -415,7 +413,7 @@ mod tests {

let first = cache.insert_hashed(Cow::Borrowed(&promoted));
let second = cache.insert_hashed(Cow::Borrowed(&promoted));
assert!(Arc::ptr_eq(&first, &second));
assert!(CacheArc::ptr_eq(&first, &second));

let extras = create_test_values(1..=TEST_CACHE_SIZE as u64 * 2);
for value in &extras {
Expand Down Expand Up @@ -445,13 +443,13 @@ mod tests {
.get(&1)
.expect("held Arc should keep entry findable via weak index");
assert!(
Arc::ptr_eq(&retrieved, &held),
CacheArc::ptr_eq(&retrieved, &held),
"must return same allocation, not a duplicate"
);

// Re-inserting should also return the same Arc
let reinserted = cache.insert(&1, "replacement".to_string());
assert!(Arc::ptr_eq(&reinserted, &held));
assert!(CacheArc::ptr_eq(&reinserted, &held));
assert_eq!(&*reinserted, "hello");
}

Expand All @@ -466,7 +464,7 @@ mod tests {

// Still findable via weak index since we hold an Arc
let retrieved = cache.get(&1).expect("weak index should find held Arc");
assert!(Arc::ptr_eq(&retrieved, &held));
assert!(CacheArc::ptr_eq(&retrieved, &held));
}

#[test]
Expand Down Expand Up @@ -501,18 +499,4 @@ mod tests {
// Key 1 still findable (we hold an Arc)
assert!(cache.contains(&1));
}

#[test]
fn test_insert_arc_dedup() {
let cache = new_string_cache(TEST_CACHE_SIZE);
let value = Arc::new("hello".to_string());

let first = cache.insert_arc(&1, value.clone());
assert!(Arc::ptr_eq(&first, &value));

let second = cache.insert_arc(&1, Arc::new("other".to_string()));
assert!(Arc::ptr_eq(&second, &value));

assert_eq!(&*cache.get(&1).expect("just inserted"), "hello");
}
}
4 changes: 2 additions & 2 deletions linera-client/src/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use linera_core::{
worker::{Notification, Reason},
Environment, Wallet,
};
use linera_storage::Storage as _;
use linera_storage::{Arc as CacheArc, Storage as _};
use tokio::sync::{mpsc::UnboundedReceiver, Notify};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn, Instrument as _};
Expand Down Expand Up @@ -408,7 +408,7 @@ impl<C: ClientContext + 'static> ChainListener<C> {
/// add them to the wallet and start listening for notifications. (This is not done for
/// fallback owners, as those would have to monitor all chains anyway.)
async fn add_new_chains(&mut self, hash: CryptoHash) -> Result<(), Error> {
let block = Arc::unwrap_or_clone(
let block = CacheArc::unwrap_or_clone(
self.storage
.read_confirmed_block(hash)
.await?
Expand Down
Loading
Loading