Skip to content

Commit 5af8d24

Browse files
committed
Add comments around snapshot_latest, documenting its preconditions
1 parent 2bbb5eb commit 5af8d24

2 files changed

Lines changed: 22 additions & 3 deletions

File tree

src/adapter/src/coord/caught_up.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ impl Coordinator {
6363
.get_entry(&replica_frontier_item_id)
6464
.latest_global_id();
6565

66+
// `snapshot_latest` requires that the collection consolidates to a
67+
// set. `mz_cluster_replica_frontiers` is a controller-managed builtin
68+
// written with ±1 diffs, so it satisfies that invariant.
6669
let live_frontiers = self
6770
.controller
6871
.storage_collections

src/storage-client/src/storage_collections.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,22 @@ pub trait StorageCollections: Debug + Sync {
163163
as_of: Timestamp,
164164
) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError>>;
165165

166-
/// Returns a snapshot of the contents of collection `id` at the largest
167-
/// readable `as_of`.
166+
/// Returns a snapshot of the contents of collection `id` at the largest readable `as_of`.
167+
/// The collection must consolidate to a set, i.e., the multiplicity of every row must be 1!
168+
///
169+
/// # Errors
170+
///
171+
/// - Returns `StorageError::InvalidUsage` if the collection is closed.
172+
/// - Propagates the error if the underlying `snapshot` call errors.
173+
///
174+
/// # Panics
175+
///
176+
/// - Panics if the collection does not consolidate to a set at that `as_of`
177+
/// (i.e., if any row survives with a multiplicity other than `+1`). Only
178+
/// safe to call on collections whose producer guarantees set semantics;
179+
/// not safe on arbitrary user collections.
180+
/// - Panics if the collection's upper is `0`, i.e., if it doesn't have data at
181+
/// any timestamp.
168182
async fn snapshot_latest(&self, id: GlobalId) -> Result<Vec<Row>, StorageError>;
169183

170184
/// Returns a snapshot of the contents of collection `id` at `as_of`.
@@ -1492,10 +1506,12 @@ impl StorageCollections for StorageCollectionsImpl {
14921506
Some(f) if f > &Timestamp::MIN => {
14931507
let as_of = f.step_back().unwrap();
14941508

1495-
let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1509+
let snapshot = self.snapshot(id, as_of, &self.txns_read).await?;
14961510
snapshot
14971511
.into_iter()
14981512
.map(|(row, diff)| {
1513+
// See the trait doc: `snapshot_latest` is only meant for collections that
1514+
// consolidate to a set.
14991515
assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
15001516
row
15011517
})

0 commit comments

Comments
 (0)