Skip to content

Commit 80a81d7

Browse files
committed
Add comments around snapshot_latest, documenting its preconditions
1 parent 2bbb5eb commit 80a81d7

2 files changed

Lines changed: 21 additions & 4 deletions

File tree

src/adapter/src/coord/caught_up.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ impl Coordinator {
6363
.get_entry(&replica_frontier_item_id)
6464
.latest_global_id();
6565

66+
// `snapshot_latest` requires that the collection consolidates to a
67+
// set. `mz_cluster_replica_frontiers` is a controller-managed builtin
68+
// written with ±1 diffs, so it satisfies that invariant.
6669
let live_frontiers = self
6770
.controller
6871
.storage_collections

src/storage-client/src/storage_collections.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,20 @@ pub trait StorageCollections: Debug + Sync {
163163
as_of: Timestamp,
164164
) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError>>;
165165

166-
/// Returns a snapshot of the contents of collection `id` at the largest
167-
/// readable `as_of`.
166+
/// Returns a snapshot of the contents of collection `id` at the largest readable `as_of`.
167+
/// The collection must consolidate to a set, i.e., the multiplicity of every row must be 1!
168+
///
169+
/// # Errors
170+
///
171+
/// - Returns `StorageError::InvalidUsage` if the collection is closed.
172+
/// - Propagates the error if the underlying `snapshot` call errors.
173+
///
174+
/// # Panics
175+
///
176+
/// Panics if the collection does not consolidate to a set at that `as_of`
177+
/// (i.e., if any row survives with a multiplicity other than `+1`). Only
178+
/// safe to call on collections whose producer guarantees set semantics;
179+
/// not safe on arbitrary user collections.
168180
async fn snapshot_latest(&self, id: GlobalId) -> Result<Vec<Row>, StorageError>;
169181

170182
/// Returns a snapshot of the contents of collection `id` at `as_of`.
@@ -1490,12 +1502,14 @@ impl StorageCollections for StorageCollectionsImpl {
14901502
let upper = self.recent_upper(id).await?;
14911503
let res = match upper.as_option() {
14921504
Some(f) if f > &Timestamp::MIN => {
1493-
let as_of = f.step_back().unwrap();
1505+
let as_of = f.step_back().expect("checked that f > &Timestamp::MIN");
14941506

1495-
let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1507+
let snapshot = self.snapshot(id, as_of, &self.txns_read).await?;
14961508
snapshot
14971509
.into_iter()
14981510
.map(|(row, diff)| {
1511+
// See the trait doc: `snapshot_latest` is only meant for collections that
1512+
// consolidate to a set.
14991513
assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
15001514
row
15011515
})

0 commit comments

Comments
 (0)