From 80a81d724045cada52f1bc5b8e495c0026aae999 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Tue, 21 Apr 2026 20:12:26 +0200 Subject: [PATCH] Add comments around `snapshot_latest`, documenting its preconditions --- src/adapter/src/coord/caught_up.rs | 3 +++ src/storage-client/src/storage_collections.rs | 22 +++++++++++++++---- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/adapter/src/coord/caught_up.rs b/src/adapter/src/coord/caught_up.rs index 3837977216194..f05696bd8b3ce 100644 --- a/src/adapter/src/coord/caught_up.rs +++ b/src/adapter/src/coord/caught_up.rs @@ -63,6 +63,9 @@ impl Coordinator { .get_entry(&replica_frontier_item_id) .latest_global_id(); + // `snapshot_latest` requires that the collection consolidates to a + // set. `mz_cluster_replica_frontiers` is a controller-managed builtin + // written with ±1 diffs, so it satisfies that invariant. let live_frontiers = self .controller .storage_collections diff --git a/src/storage-client/src/storage_collections.rs b/src/storage-client/src/storage_collections.rs index 50646aa412590..e11b00ea06e5f 100644 --- a/src/storage-client/src/storage_collections.rs +++ b/src/storage-client/src/storage_collections.rs @@ -163,8 +163,20 @@ pub trait StorageCollections: Debug + Sync { as_of: Timestamp, ) -> BoxFuture<'static, Result, StorageError>>; - /// Returns a snapshot of the contents of collection `id` at the largest - /// readable `as_of`. + /// Returns a snapshot of the contents of collection `id` at the largest readable `as_of`. + /// The collection must consolidate to a set, i.e., the multiplicity of every row must be 1! + /// + /// # Errors + /// + /// - Returns `StorageError::InvalidUsage` if the collection is closed. + /// - Propagates the error if the underlying `snapshot` call errors. + /// + /// # Panics + /// + /// Panics if the collection does not consolidate to a set at that `as_of` + /// (i.e., if any row survives with a multiplicity other than `+1`). Only + /// safe to call on collections whose producer guarantees set semantics; + /// not safe on arbitrary user collections. async fn snapshot_latest(&self, id: GlobalId) -> Result, StorageError>; /// Returns a snapshot of the contents of collection `id` at `as_of`. @@ -1490,12 +1502,14 @@ impl StorageCollections for StorageCollectionsImpl { let upper = self.recent_upper(id).await?; let res = match upper.as_option() { Some(f) if f > &Timestamp::MIN => { - let as_of = f.step_back().unwrap(); + let as_of = f.step_back().expect("checked that f > &Timestamp::MIN"); - let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap(); + let snapshot = self.snapshot(id, as_of, &self.txns_read).await?; snapshot .into_iter() .map(|(row, diff)| { + // See the trait doc: `snapshot_latest` is only meant for collections that + // consolidate to a set. assert_eq!(diff, 1, "snapshot doesn't accumulate to set"); row })