Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/adapter/src/coord/caught_up.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ impl Coordinator {
.get_entry(&replica_frontier_item_id)
.latest_global_id();

// `snapshot_latest` requires that the collection consolidates to a
// set. `mz_cluster_replica_frontiers` is a controller-managed builtin
// written with ±1 diffs, so it satisfies that invariant.
let live_frontiers = self
.controller
.storage_collections
Expand Down
22 changes: 18 additions & 4 deletions src/storage-client/src/storage_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,20 @@ pub trait StorageCollections: Debug + Sync {
as_of: Timestamp,
) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError>>;

/// Returns a snapshot of the contents of collection `id` at the largest
/// readable `as_of`.
/// Returns a snapshot of the contents of collection `id` at the largest readable `as_of`.
/// The collection must consolidate to a set, i.e., the multiplicity of every row must be 1!
///
/// # Errors
///
/// - Returns `StorageError::InvalidUsage` if the collection is closed.
/// - Propagates the error if the underlying `snapshot` call errors.
///
/// # Panics
///
/// Panics if the collection does not consolidate to a set at that `as_of`
/// (i.e., if any row survives with a multiplicity other than `+1`). Only
/// safe to call on collections whose producer guarantees set semantics;
/// not safe on arbitrary user collections.
async fn snapshot_latest(&self, id: GlobalId) -> Result<Vec<Row>, StorageError>;

/// Returns a snapshot of the contents of collection `id` at `as_of`.
Expand Down Expand Up @@ -1490,12 +1502,14 @@ impl StorageCollections for StorageCollectionsImpl {
let upper = self.recent_upper(id).await?;
let res = match upper.as_option() {
Some(f) if f > &Timestamp::MIN => {
let as_of = f.step_back().unwrap();
let as_of = f.step_back().expect("checked that f > &Timestamp::MIN");

let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
let snapshot = self.snapshot(id, as_of, &self.txns_read).await?;
Copy link
Copy Markdown
Contributor Author

@ggevay ggevay Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not so important for the main point of the PR, but I did it because why unwrap when you can just ?-propagate the same error?

snapshot
.into_iter()
.map(|(row, diff)| {
// See the trait doc: `snapshot_latest` is only meant for collections that
// consolidate to a set.
assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
row
})
Expand Down
Loading